You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:45 UTC
[09/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and
API
[FLINK-2780] Remove Old Windowing Logic and API
This rewrites the few examples and tests that are remaining using the
new Windowing API.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c1141ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c1141ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c1141ab
Branch: refs/heads/master
Commit: 0c1141abcd0f58dcf2ec9abcccd47c5e5410b074
Parents: b2b2781
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Oct 4 10:33:06 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 9 11:15:59 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 89 --
.../api/datastream/DiscretizedStream.java | 341 --------
.../api/datastream/WindowedDataStream.java | 867 -------------------
.../api/functions/RichWindowMapFunction.java | 40 -
.../api/functions/WindowMapFunction.java | 35 -
.../windowing/delta/CosineDistance.java | 2 +-
.../windowing/delta/EuclideanDistance.java | 2 +-
.../delta/ExtractionAwareDeltaFunction.java | 2 +-
.../delta/extractor/ArrayFromTuple.java | 74 ++
.../delta/extractor/ConcatenatedExtract.java | 68 ++
.../windowing/delta/extractor/Extractor.java | 43 +
.../delta/extractor/FieldFromArray.java | 59 ++
.../delta/extractor/FieldFromTuple.java | 58 ++
.../delta/extractor/FieldsFromArray.java | 67 ++
.../delta/extractor/FieldsFromTuple.java | 53 ++
.../flink/streaming/api/graph/StreamGraph.java | 4 -
.../streaming/api/graph/WindowingOptimizer.java | 161 ----
.../operators/windowing/EmptyWindowFilter.java | 32 -
.../windowing/GroupedActiveDiscretizer.java | 129 ---
.../windowing/GroupedStreamDiscretizer.java | 132 ---
.../windowing/GroupedWindowBuffer.java | 69 --
.../windowing/ParallelGroupedMerge.java | 41 -
.../api/operators/windowing/ParallelMerge.java | 145 ----
.../windowing/ParallelMergeOperator.java | 43 -
.../operators/windowing/StreamDiscretizer.java | 237 -----
.../operators/windowing/StreamWindowBuffer.java | 69 --
.../operators/windowing/WindowFlattener.java | 51 --
.../api/operators/windowing/WindowFolder.java | 100 ---
.../api/operators/windowing/WindowMapper.java | 94 --
.../api/operators/windowing/WindowMerger.java | 73 --
.../windowing/WindowPartExtractor.java | 55 --
.../operators/windowing/WindowPartitioner.java | 79 --
.../api/operators/windowing/WindowReducer.java | 99 ---
.../streaming/api/windowing/StreamWindow.java | 276 ------
.../api/windowing/StreamWindowSerializer.java | 148 ----
.../api/windowing/StreamWindowTypeInfo.java | 104 ---
.../streaming/api/windowing/WindowEvent.java | 71 --
.../streaming/api/windowing/WindowUtils.java | 203 -----
.../api/windowing/evictors/TimeEvictor.java | 3 +-
.../api/windowing/extractor/ArrayFromTuple.java | 74 --
.../extractor/ConcatenatedExtract.java | 68 --
.../api/windowing/extractor/Extractor.java | 43 -
.../api/windowing/extractor/FieldFromArray.java | 59 --
.../api/windowing/extractor/FieldFromTuple.java | 58 --
.../windowing/extractor/FieldsFromArray.java | 67 --
.../windowing/extractor/FieldsFromTuple.java | 53 --
.../streaming/api/windowing/helper/Count.java | 101 ---
.../streaming/api/windowing/helper/Delta.java | 105 ---
.../api/windowing/helper/FullStream.java | 57 --
.../api/windowing/helper/SystemTimestamp.java | 37 -
.../streaming/api/windowing/helper/Time.java | 153 ----
.../api/windowing/helper/Timestamp.java | 39 -
.../api/windowing/helper/TimestampWrapper.java | 65 --
.../api/windowing/helper/WindowingHelper.java | 61 --
.../ActiveCloneableEvictionPolicyWrapper.java | 62 --
.../windowing/policy/ActiveEvictionPolicy.java | 52 --
.../policy/ActiveEvictionPolicyWrapper.java | 64 --
.../windowing/policy/ActiveTriggerCallback.java | 45 -
.../windowing/policy/ActiveTriggerPolicy.java | 81 --
.../windowing/policy/CentralActiveTrigger.java | 45 -
.../policy/CloneableEvictionPolicy.java | 44 -
.../policy/CloneableMultiEvictionPolicy.java | 84 --
.../policy/CloneableMultiTriggerPolicy.java | 63 --
.../policy/CloneableTriggerPolicy.java | 44 -
.../windowing/policy/CountEvictionPolicy.java | 155 ----
.../windowing/policy/CountTriggerPolicy.java | 117 ---
.../api/windowing/policy/DeltaPolicy.java | 167 ----
.../api/windowing/policy/EvictionPolicy.java | 47 -
.../windowing/policy/KeepAllEvictionPolicy.java | 29 -
.../windowing/policy/MultiEvictionPolicy.java | 170 ----
.../windowing/policy/MultiTriggerPolicy.java | 123 ---
.../api/windowing/policy/PunctuationPolicy.java | 147 ----
.../windowing/policy/TimeEvictionPolicy.java | 167 ----
.../api/windowing/policy/TimeTriggerPolicy.java | 209 -----
.../api/windowing/policy/TriggerPolicy.java | 54 --
.../policy/TumblingEvictionPolicy.java | 104 ---
.../windowbuffer/BasicWindowBuffer.java | 73 --
.../JumpingCountGroupedPreReducer.java | 54 --
.../windowbuffer/JumpingCountPreReducer.java | 56 --
.../JumpingTimeGroupedPreReducer.java | 56 --
.../windowbuffer/JumpingTimePreReducer.java | 58 --
.../windowing/windowbuffer/PreAggregator.java | 27 -
.../SlidingCountGroupedPreReducer.java | 83 --
.../windowbuffer/SlidingCountPreReducer.java | 81 --
.../windowbuffer/SlidingGroupedPreReducer.java | 153 ----
.../windowbuffer/SlidingPreReducer.java | 175 ----
.../SlidingTimeGroupedPreReducer.java | 102 ---
.../windowbuffer/SlidingTimePreReducer.java | 102 ---
.../windowbuffer/TumblingGroupedPreReducer.java | 107 ---
.../windowbuffer/TumblingPreReducer.java | 98 ---
.../windowing/windowbuffer/WindowBuffer.java | 61 --
.../flink/streaming/api/DataStreamTest.java | 125 +--
.../api/complex/ComplexIntegrationTest.java | 198 +++--
.../delta/extractor/ArrayFromTupleTest.java | 118 +++
.../extractor/ConcatenatedExtractTest.java | 77 ++
.../delta/extractor/FieldFromArrayTest.java | 54 ++
.../delta/extractor/FieldFromTupleTest.java | 83 ++
.../delta/extractor/FieldsFromArrayTest.java | 108 +++
.../delta/extractor/FieldsFromTupleTest.java | 106 +++
.../api/operators/co/SelfConnectionTest.java | 18 -
.../windowing/GroupedStreamDiscretizerTest.java | 104 ---
.../windowing/ParallelMergeITCase.java | 101 ---
.../operators/windowing/ParallelMergeTest.java | 122 ---
.../windowing/StreamDiscretizerTest.java | 117 ---
.../windowing/WindowFlattenerTest.java | 53 --
.../operators/windowing/WindowFolderTest.java | 61 --
.../operators/windowing/WindowMapperTest.java | 60 --
.../operators/windowing/WindowMergerTest.java | 75 --
.../windowing/WindowPartitionerTest.java | 75 --
.../operators/windowing/WindowReducerTest.java | 61 --
.../operators/windowing/WindowingITCase.java | 529 -----------
.../api/windowing/StreamWindowTest.java | 201 -----
.../api/windowing/StreamWindowTypeInfoTest.java | 51 --
.../windowing/extractor/ArrayFromTupleTest.java | 118 ---
.../extractor/ConcatenatedExtractTest.java | 77 --
.../windowing/extractor/FieldFromArrayTest.java | 55 --
.../windowing/extractor/FieldFromTupleTest.java | 84 --
.../extractor/FieldsFromArrayTest.java | 108 ---
.../extractor/FieldsFromTupleTest.java | 106 ---
.../policy/CountEvictionPolicyTest.java | 136 ---
.../policy/CountTriggerPolicyTest.java | 109 ---
.../api/windowing/policy/DeltaPolicyTest.java | 88 --
.../policy/MultiEvictionPolicyTest.java | 186 ----
.../policy/MultiTriggerPolicyTest.java | 245 ------
.../windowing/policy/PunctuationPolicyTest.java | 155 ----
.../policy/TimeEvictionPolicyTest.java | 164 ----
.../windowing/policy/TimeTriggerPolicyTest.java | 156 ----
.../policy/TumblingEvictionPolicyTest.java | 43 -
.../windowbuffer/BasicWindowBufferTest.java | 86 --
.../JumpingCountGroupedPreReducerTest.java | 157 ----
.../JumpingCountPreReducerTest.java | 107 ---
.../windowbuffer/JumpingTimePreReducerTest.java | 96 --
.../SlidingCountGroupedPreReducerTest.java | 235 -----
.../SlidingCountPreReducerTest.java | 216 -----
.../SlidingTimeGroupedPreReducerTest.java | 387 ---------
.../windowbuffer/SlidingTimePreReducerTest.java | 324 -------
.../TumblingGroupedPreReducerTest.java | 151 ----
.../windowbuffer/TumblingPreReducerTest.java | 104 ---
.../ml/IncrementalLearningSkeleton.java | 48 +-
.../util/IncrementalLearningSkeletonData.java | 16 +-
.../examples/windowing/SessionWindowing.java | 84 +-
.../examples/windowing/TopSpeedWindowing.java | 49 +-
.../examples/windowing/WindowWordCount.java | 13 +-
.../util/TopSpeedWindowingExampleData.java | 96 +-
.../examples/windowing/TopSpeedWindowing.scala | 28 +-
.../streaming/api/scala/AllWindowedStream.scala | 25 +
.../flink/streaming/api/scala/DataStream.scala | 36 +-
.../api/scala/WindowedDataStream.scala | 338 --------
.../streaming/api/scala/WindowedStream.scala | 25 +
.../flink/streaming/api/scala/package.scala | 7 +-
.../streaming/api/scala/windowing/Delta.scala | 46 -
.../streaming/api/scala/windowing/Time.scala | 53 --
.../streaming/api/scala/DataStreamTest.scala | 51 +-
.../StreamingScalaAPICompletenessTest.scala | 7 +-
154 files changed, 1440 insertions(+), 14289 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index ee8b3d2..80e0e47 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -62,13 +62,6 @@ import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.Delta;
-import org.apache.flink.streaming.api.windowing.helper.FullStream;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -630,88 +623,6 @@ public class DataStream<T> {
}
/**
- * Create a {@link WindowedDataStream} that can be used to apply
- * transformation like {@link WindowedDataStream#reduceWindow},
- * {@link WindowedDataStream#mapWindow} or aggregations on preset
- * chunks(windows) of the data stream. To define windows a
- * {@link WindowingHelper} such as {@link Time}, {@link Count},
- * {@link Delta} and {@link FullStream} can be used.
- *
- * <p>
- * When applied to a grouped data stream, the windows (evictions) and slide sizes
- * (triggers) will be computed on a per group basis.
- *
- * <p>
- * For more advanced control over the trigger and eviction policies please refer to
- * {@link #window(TriggerPolicy, EvictionPolicy)}
- *
- * <p>
- * For example, to create a sum every 5 seconds in a tumbling fashion:
- *
- * <pre>
- * {@code ds.window(Time.of(5, TimeUnit.SECONDS)).sum(field)}
- * </pre>
- *
- * <p>
- * To create sliding windows use the
- * {@link WindowedDataStream#every(WindowingHelper)}, for example with 3 second slides:</br>
- *
- * <pre>
- *
- * {@code
- * ds.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(3, TimeUnit.SECONDS)).sum(field)
- * }
- *
- * </pre>
- *
- * @param policyHelper
- * Any {@link WindowingHelper} such as {@link Time},
- * {@link Count}, {@link Delta} {@link FullStream} to define the
- * window size.
- *
- * @return A {@link WindowedDataStream} providing further operations.
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public WindowedDataStream<T> window(WindowingHelper policyHelper) {
- policyHelper.setExecutionConfig(getExecutionConfig());
- return new WindowedDataStream<T>(this, policyHelper);
- }
-
- /**
- * Create a {@link WindowedDataStream} using the given {@link TriggerPolicy}
- * and {@link EvictionPolicy}. Windowing can be used to apply transformation
- * like {@link WindowedDataStream#reduceWindow},
- * {@link WindowedDataStream#mapWindow} or aggregations on preset
- * chunks(windows) of the data stream.
- *
- * <p>
- * For most common use-cases please refer to {@link #window(WindowingHelper)}
- *
- * @param trigger
- * The {@link TriggerPolicy} that will determine how often the
- * user function is called on the window.
- * @param eviction
- * The {@link EvictionPolicy} that will determine the number of
- * elements in each time window.
- * @return A {@link WindowedDataStream} providing further operations.
- */
- public WindowedDataStream<T> window(TriggerPolicy<T> trigger, EvictionPolicy<T> eviction) {
- return new WindowedDataStream<T>(this, trigger, eviction);
- }
-
- /**
- * Create a {@link WindowedDataStream} on the full stream history, to
- * produce periodic aggregates.
- *
- * @return A {@link WindowedDataStream} providing further operations.
- */
- @SuppressWarnings("rawtypes")
- public WindowedDataStream<T> every(WindowingHelper policyHelper) {
- policyHelper.setExecutionConfig(getExecutionConfig());
- return window(FullStream.window()).every(policyHelper);
- }
-
- /**
* Windows this {@code DataStream} into tumbling time windows.
*
* <p>
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
deleted file mode 100644
index 18c2cee..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamFilter;
-import org.apache.flink.streaming.api.operators.StreamFlatMap;
-import org.apache.flink.streaming.api.operators.windowing.EmptyWindowFilter;
-import org.apache.flink.streaming.api.operators.windowing.ParallelGroupedMerge;
-import org.apache.flink.streaming.api.operators.windowing.ParallelMerge;
-import org.apache.flink.streaming.api.operators.windowing.ParallelMergeOperator;
-import org.apache.flink.streaming.api.operators.windowing.WindowFlattener;
-import org.apache.flink.streaming.api.operators.windowing.WindowFolder;
-import org.apache.flink.streaming.api.operators.windowing.WindowMapper;
-import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
-import org.apache.flink.streaming.api.operators.windowing.WindowPartExtractor;
-import org.apache.flink.streaming.api.operators.windowing.WindowPartitioner;
-import org.apache.flink.streaming.api.operators.windowing.WindowReducer;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
-import org.apache.flink.streaming.api.windowing.WindowUtils.WindowKey;
-import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
-
-/**
- * A {@link DiscretizedStream} represents a data stream that has been divided
- * into windows (predefined chunks). User defined function such as
- * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow(WindowMapFunction)},
- * or aggregations can be applied to the windows.
- *
- * @param <OUT>
- * The output type of the {@link DiscretizedStream}
- */
-public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
-
- private SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream;
- private WindowTransformation transformation;
- protected boolean isPartitioned = false;
-
- protected DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream,
- KeySelector<OUT, ?> keyByKey, WindowTransformation tranformation,
- boolean isPartitioned) {
- super();
- this.keyByKey = keyByKey;
- this.discretizedStream = discretizedStream;
- this.transformation = tranformation;
- this.isPartitioned = isPartitioned;
- }
-
- /**
- * Gets the name of the current data stream. This name is
- * used by the visualization and logging during runtime.
- *
- * @return Name of the stream.
- */
- public String getName(){
- return discretizedStream.getName();
- }
-
- /**
- * Sets the name of the current data stream. This name is
- * used by the visualization and logging during runtime.
- *
- * @return The named operator.
- */
- public DiscretizedStream<OUT> name(String name){
- discretizedStream.name(name);
- return this;
- }
-
- public DataStream<OUT> flatten() {
- return discretizedStream.transform("Window Flatten", getType(), new WindowFlattener<OUT>()).setParallelism(discretizedStream.getParallelism());
- }
-
- public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
- return discretizedStream;
- }
-
- @Override
- public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
-
- DiscretizedStream<OUT> out = partition(transformation).transform(
- WindowTransformation.REDUCEWINDOW, "Window Reduce", getType(),
- new WindowReducer<OUT>(reduceFunction)).merge();
-
- // If we merged a non-grouped reduce transformation we need to reduce
- // again
- if (!isGrouped() && ((OneInputTransformation<?, ?>)out.discretizedStream.getTransformation()).getOperator() instanceof WindowMerger) {
- return out.transform(WindowTransformation.REDUCEWINDOW, "Window Reduce", out.getType(),
- new WindowReducer<OUT>(discretizedStream.clean(reduceFunction)));
- } else {
- return out;
- }
- }
-
- /**
- * This method implements the parallel time reduce logic for time windows
- *
- * @param reduceFunction
- * The reduce function to be applied on the windows
- * @return The reduced DataStream
- */
- protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction) {
-
- // Since we also emit the empty windows for bookkeeping, we need to
- // filter them out
- DiscretizedStream<OUT> nonEmpty = filterEmpty(this);
-
- // We extract the number of parts from each window we will merge using
- // this afterwards
- DataStream<Tuple2<Integer, Integer>> numOfParts = extractPartsByID(this);
-
- // We merge the windows by the number of parts
- return wrap(parallelMerge(numOfParts, nonEmpty, reduceFunction), false);
-
- }
-
- private SingleOutputStreamOperator<StreamWindow<OUT>, ?> parallelMerge(
- DataStream<Tuple2<Integer, Integer>> numOfParts, DiscretizedStream<OUT> reduced,
- ReduceFunction<OUT> reduceFunction) {
-
- ParallelMerge<OUT> parallelMerger = isGrouped() ? new ParallelGroupedMerge<OUT>()
- : new ParallelMerge<OUT>(reduceFunction);
-
- return reduced.discretizedStream
- .keyBy(new WindowKey<OUT>())
- .connect(numOfParts.keyBy(0))
- .transform(
- "CoFlatMap",
- reduced.discretizedStream.getType(),
- new ParallelMergeOperator<OUT>(parallelMerger));
- }
-
- @Override
- public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
-
- TypeInformation<R> retType = getWindowMapReturnTypes(windowMapFunction, getType());
-
- return mapWindow(windowMapFunction, retType);
- }
-
- @Override
- public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction,
- TypeInformation<R> returnType) {
- DiscretizedStream<R> out = partition(transformation).transform(
- WindowTransformation.MAPWINDOW, "Window Map", returnType,
- new WindowMapper<OUT, R>(discretizedStream.clean(windowMapFunction))).setParallelism(discretizedStream.getParallelism()).merge();
-
- return out;
- }
-
- @Override
- public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction,
- TypeInformation<R> outType) {
-
- DiscretizedStream<R> out = partition(transformation).transform(
- WindowTransformation.FOLDWINDOW, "Fold Window", outType,
- new WindowFolder<OUT, R>(discretizedStream.clean(foldFunction), initialValue))
- .setParallelism(discretizedStream.getParallelism())
- .merge();
- return out;
- }
-
- private <R> DiscretizedStream<R> transform(WindowTransformation transformation,
- String operatorName, TypeInformation<R> retType,
- OneInputStreamOperator<StreamWindow<OUT>, StreamWindow<R>> operator) {
-
- return wrap(discretizedStream.transform(operatorName, new StreamWindowTypeInfo<R>(retType),
- operator).setParallelism(discretizedStream.getParallelism()), transformation);
- }
-
- private DiscretizedStream<OUT> filterEmpty(DiscretizedStream<OUT> input) {
- StreamFilter<StreamWindow<OUT>> emptyFilter = new StreamFilter<StreamWindow<OUT>>(new EmptyWindowFilter<OUT>());
- emptyFilter.disableInputCopy();
- return wrap(input.discretizedStream.transform("Filter", input.discretizedStream.getType(), emptyFilter), input.isPartitioned);
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private DataStream<Tuple2<Integer, Integer>> extractPartsByID(DiscretizedStream<OUT> input) {
- StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>> partExtractor = new StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
- new WindowPartExtractor<OUT>());
- partExtractor.disableInputCopy();
- return input.discretizedStream.transform("ExtractParts", new TupleTypeInfo(Tuple2.class,
- BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), partExtractor);
- }
-
- private DiscretizedStream<OUT> partition(WindowTransformation transformation) {
-
- int parallelism = discretizedStream.getParallelism();
-
- if (isGrouped()) {
- DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(),
- new WindowPartitioner<OUT>(keyByKey)).setParallelism(parallelism);
-
- out.keyByKey = null;
- out.isPartitioned = true;
-
- return out;
- } else if (transformation == WindowTransformation.REDUCEWINDOW
- && parallelism != discretizedStream.getExecutionEnvironment().getParallelism()) {
- DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(),
- new WindowPartitioner<OUT>(parallelism)).setParallelism(parallelism);
-
- out.isPartitioned = true;
-
- return out;
- } else {
- return this;
- }
- }
-
- private DiscretizedStream<OUT> setParallelism(int parallelism) {
- return wrap(discretizedStream.setParallelism(parallelism), isPartitioned);
- }
-
- private DiscretizedStream<OUT> merge() {
- TypeInformation<StreamWindow<OUT>> type = discretizedStream.getType();
-
- // Only merge partitioned streams
- if (isPartitioned) {
- return wrap(
- discretizedStream.keyBy(new WindowKey<OUT>()).transform("Window Merger",
- type, new WindowMerger<OUT>()).setParallelism(discretizedStream.getParallelism()), false);
- } else {
- return this;
- }
-
- }
-
- @SuppressWarnings("unchecked")
- private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> stream,
- boolean isPartitioned) {
- return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.keyByKey,
- transformation, isPartitioned);
- }
-
- @SuppressWarnings("unchecked")
- private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> stream,
- WindowTransformation transformation) {
- return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.keyByKey,
- transformation, isPartitioned);
- }
-
- @SuppressWarnings("rawtypes")
- protected Class<?> getClassAtPos(int pos) {
- Class<?> type;
- TypeInformation<OUT> outTypeInfo = getType();
- if (outTypeInfo.isTupleType()) {
- type = ((TupleTypeInfo) outTypeInfo).getTypeAt(pos).getTypeClass();
-
- } else if (outTypeInfo instanceof BasicArrayTypeInfo) {
-
- type = ((BasicArrayTypeInfo) outTypeInfo).getComponentTypeClass();
-
- } else if (outTypeInfo instanceof PrimitiveArrayTypeInfo) {
- Class<?> clazz = outTypeInfo.getTypeClass();
- if (clazz == boolean[].class) {
- type = Boolean.class;
- } else if (clazz == short[].class) {
- type = Short.class;
- } else if (clazz == int[].class) {
- type = Integer.class;
- } else if (clazz == long[].class) {
- type = Long.class;
- } else if (clazz == float[].class) {
- type = Float.class;
- } else if (clazz == double[].class) {
- type = Double.class;
- } else if (clazz == char[].class) {
- type = Character.class;
- } else {
- throw new IndexOutOfBoundsException("Type could not be determined for array");
- }
-
- } else if (pos == 0) {
- type = outTypeInfo.getTypeClass();
- } else {
- throw new IndexOutOfBoundsException("Position is out of range");
- }
- return type;
- }
-
- @Override
- public ExecutionConfig getExecutionConfig() {
- return discretizedStream.getExecutionConfig();
- }
-
- /**
- * Gets the output type.
- *
- * @return The output type.
- */
- public TypeInformation<OUT> getType() {
- return ((StreamWindowTypeInfo<OUT>) discretizedStream.getType()).getInnerType();
- }
-
- private static <IN, OUT> TypeInformation<OUT> getWindowMapReturnTypes(
- WindowMapFunction<IN, OUT> windowMapInterface, TypeInformation<IN> inType) {
- return TypeExtractor.getUnaryOperatorReturnType((Function) windowMapInterface,
- WindowMapFunction.class, true, true, inType, null, false);
- }
-
- protected DiscretizedStream<OUT> copy() {
- return new DiscretizedStream<OUT>(discretizedStream, keyByKey, transformation, isPartitioned);
- }
-
- @Override
- public WindowedDataStream<OUT> local() {
- throw new UnsupportedOperationException(
- "Local discretisation can only be applied after defining the discretisation logic");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
deleted file mode 100644
index c1c5f6d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ /dev/null
@@ -1,867 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichFoldFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.RichWindowMapFunction;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.windowing.GroupedActiveDiscretizer;
-import org.apache.flink.streaming.api.operators.windowing.GroupedStreamDiscretizer;
-import org.apache.flink.streaming.api.operators.windowing.GroupedWindowBuffer;
-import org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer;
-import org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.WindowUtils;
-import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
-import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.KeepAllEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountGroupedPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimeGroupedPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimePreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.PreAggregator;
-import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountGroupedPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-
-/**
- * A {@link WindowedDataStream} represents a data stream that has been
- * discretised into windows. User defined function such as
- * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow(WindowMapFunction)} or aggregations
- * can be applied to the windows. The results of these transformations are also
- * WindowedDataStreams of the same discretisation unit.
- *
- * @param <T> The output type of the {@link WindowedDataStream}
- */
-public class WindowedDataStream<T> {
-
- protected DataStream<T> dataStream;
-
- protected boolean isLocal = false;
-
- protected KeySelector<T, ?> discretizerKey;
- protected KeySelector<T, ?> keyByKey;
-
- protected WindowingHelper<T> triggerHelper;
- protected WindowingHelper<T> evictionHelper;
-
- protected TriggerPolicy<T> userTrigger;
- protected EvictionPolicy<T> userEvicter;
-
- protected WindowedDataStream(DataStream<T> dataStream, WindowingHelper<T> policyHelper) {
- this.dataStream = dataStream;
- this.triggerHelper = policyHelper;
-
- if (dataStream instanceof KeyedStream) {
- this.discretizerKey = ((KeyedStream<T, ?>) dataStream).keySelector;
- }
- }
-
- protected WindowedDataStream(DataStream<T> dataStream, TriggerPolicy<T> trigger,
- EvictionPolicy<T> evicter) {
- this.dataStream = dataStream;
-
- this.userTrigger = trigger;
- this.userEvicter = evicter;
-
- if (dataStream instanceof KeyedStream) {
- this.discretizerKey = ((KeyedStream<T, ?>) dataStream).keySelector;
- }
- }
-
- protected WindowedDataStream(WindowedDataStream<T> windowedDataStream) {
- this.dataStream = windowedDataStream.dataStream;
- this.discretizerKey = windowedDataStream.discretizerKey;
- this.keyByKey = windowedDataStream.keyByKey;
- this.triggerHelper = windowedDataStream.triggerHelper;
- this.evictionHelper = windowedDataStream.evictionHelper;
- this.userTrigger = windowedDataStream.userTrigger;
- this.userEvicter = windowedDataStream.userEvicter;
- this.isLocal = windowedDataStream.isLocal;
- }
-
- public WindowedDataStream() {
- }
-
- /**
- * Defines the slide size (trigger frequency) for the windowed data stream.
- * This controls how often the user defined function will be triggered on
- * the window. </br></br> For example to get a window of 5 elements with a
- * slide of 2 seconds use: </br></br>
- * {@code ds.window(Count.of(5)).every(Time.of(2,TimeUnit.SECONDS))}
- * </br></br> The user function in this case will be called on the 5 most
- * recent elements every 2 seconds
- *
- * @param policyHelper
- * The policy that define the triggering frequency
- *
- * @return The windowed data stream with triggering set
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public WindowedDataStream<T> every(WindowingHelper policyHelper) {
- policyHelper.setExecutionConfig(getExecutionConfig());
- WindowedDataStream<T> ret = this.copy();
- if (ret.evictionHelper == null) {
- ret.evictionHelper = ret.triggerHelper;
- ret.triggerHelper = policyHelper;
- }
-
- return ret;
- }
-
- /**
- * Groups the elements of the {@link WindowedDataStream} by the given key
- * positions. The window sizes (evictions) and slide sizes (triggers) will
- * be calculated on the whole stream (in a global fashion), but the user
- * defined functions will be applied on a per group basis. </br></br> To get
- * windows and triggers on a per group basis apply the
- * {@link DataStream#window} operator on an already grouped data stream.
- *
- * @param fields
- * The position of the fields to group by.
- * @return The grouped {@link WindowedDataStream}
- */
- public WindowedDataStream<T> keyBy(int... fields) {
- if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
- return keyBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
- } else {
- return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
- }
- }
-
- /**
- * Groups the elements of the {@link WindowedDataStream} by the given field
- * expressions. The window sizes (evictions) and slide sizes (triggers) will
- * be calculated on the whole stream (in a global fashion), but the user
- * defined functions will be applied on a per group basis. </br></br> To get
- * windows and triggers on a per group basis apply the
- * {@link DataStream#window} operator on an already grouped data stream.
- * </br></br> A field expression is either the name of a public field or a
- * getter method with parentheses of the stream's underlying type. A dot can
- * be used to drill down into objects, as in
- * {@code "field1.getInnerField2()" }.
- *
- * @param fields
- * The fields to group by
- * @return The grouped {@link WindowedDataStream}
- */
- public WindowedDataStream<T> keyBy(String... fields) {
- return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
- }
-
- /**
- * Groups the elements of the {@link WindowedDataStream} using the given
- * {@link KeySelector}. The window sizes (evictions) and slide sizes
- * (triggers) will be calculated on the whole stream (in a global fashion),
- * but the user defined functions will be applied on a per group basis.
- * </br></br> To get windows and triggers on a per group basis apply the
- * {@link DataStream#window} operator on an already grouped data stream.
- *
- * @param keySelector
- * The keySelector used to extract the key for grouping.
- * @return The grouped {@link WindowedDataStream}
- */
- public WindowedDataStream<T> keyBy(KeySelector<T, ?> keySelector) {
- WindowedDataStream<T> ret = this.copy();
- ret.keyByKey = keySelector;
- return ret;
- }
-
- private WindowedDataStream<T> keyBy(Keys<T> keys) {
- return keyBy(clean(KeySelectorUtil.getSelectorForKeys(keys, getType(),
- getExecutionConfig())));
- }
-
- /**
- * Sets the window discretisation local, meaning that windows will be
- * created in parallel at environment parallelism.
- *
- * @return The WindowedDataStream with local discretisation
- */
- public WindowedDataStream<T> local() {
- WindowedDataStream<T> out = copy();
- out.isLocal = true;
- return out;
- }
-
- /**
- * Returns the {@link DataStream} of {@link StreamWindow}s which represent
- * the discretised stream. There is no ordering guarantee for the received
- * windows.
- *
- * @return The discretised stream
- */
- public DataStream<StreamWindow<T>> getDiscretizedStream() {
- if (getEviction() instanceof KeepAllEvictionPolicy) {
- throw new RuntimeException("Cannot get discretized stream for full stream window");
- }
- return discretize(WindowTransformation.NONE, new BasicWindowBuffer<T>())
- .getDiscretizedStream();
- }
-
- /**
- * Flattens the results of the window computations and streams out the
- * window elements.
- *
- * @return The data stream consisting of the individual records.
- */
- public DataStream<T> flatten() {
- return dataStream;
- }
-
- /**
- * Applies a reduce transformation on the windowed data stream by reducing
- * the current window at every trigger.The user can also extend the
- * {@link RichReduceFunction} to gain access to other features provided by
- * the {@link org.apache.flink.api.common.functions.RichFunction} interface.
- *
- * @param reduceFunction
- * The reduce function that will be applied to the windows.
- * @return The transformed DataStream
- */
- public DiscretizedStream<T> reduceWindow(ReduceFunction<T> reduceFunction) {
-
- // We check whether we should apply parallel time discretization, which
- // is a more complex exploiting the monotonic properties of time
- // policies
- if (WindowUtils.isTimeOnly(getTrigger(), getEviction()) && discretizerKey == null
- && dataStream.getParallelism() > 1) {
- return timeReduce(reduceFunction);
- } else {
- WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
- .with(clean(reduceFunction));
-
- WindowBuffer<T> windowBuffer = getWindowBuffer(transformation);
-
- DiscretizedStream<T> discretized = discretize(transformation, windowBuffer);
-
- if (windowBuffer instanceof PreAggregator) {
- return discretized;
- } else {
- return discretized.reduceWindow(reduceFunction);
- }
- }
- }
-
- /**
- * Applies a fold transformation on the windowed data stream by folding the
- * current window at every trigger.The user can also extend the
- * {@link RichFoldFunction} to gain access to other features provided by the
- * {@link org.apache.flink.api.common.functions.RichFunction} interface.
- * This version of foldWindow uses user supplied typeinformation for
- * serializaton. Use this only when the system is unable to detect type
- * information.
- *
- * @param foldFunction
- * The fold function that will be applied to the windows.
- * @param initialValue
- * Initial value given to foldFunction
- * @param outType
- * The output type of the operator
- * @return The transformed DataStream
- */
- public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<T, R> foldFunction,
- TypeInformation<R> outType) {
-
- return discretize(WindowTransformation.FOLDWINDOW.with(clean(foldFunction)),
- new BasicWindowBuffer<T>()).foldWindow(initialValue, foldFunction, outType);
-
- }
-
- /**
- * Applies a fold transformation on the windowed data stream by folding the
- * current window at every trigger.The user can also extend the
- * {@link RichFoldFunction} to gain access to other features provided by the
- * {@link org.apache.flink.api.common.functions.RichFunction} interface.
- *
- * @param foldFunction
- * The fold function that will be applied to the windows.
- * @param initialValue
- * Initial value given to foldFunction
- * @return The transformed DataStream
- */
- public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<T, R> foldFunction) {
-
- TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(foldFunction),
- getType());
- return foldWindow(initialValue, foldFunction, outType);
- }
-
- /**
- * Applies a mapWindow transformation on the windowed data stream by calling
- * the mapWindow function on the window at every trigger. In contrast with
- * the standard binary reducer, with mapWindow allows the user to access all
- * elements of the window at the same time through the iterable interface.
- * The user can also extend the {@link RichWindowMapFunction} to gain access
- * to other features provided by the
- * {@link org.apache.flink.api.common.functions.RichFunction} interface.
- *
- * @param windowMapFunction
- * The function that will be applied to the windows.
- * @return The transformed DataStream
- */
- public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<T, R> windowMapFunction) {
- return discretize(WindowTransformation.MAPWINDOW.with(clean(windowMapFunction)),
- getWindowBuffer(WindowTransformation.MAPWINDOW)).mapWindow(windowMapFunction);
- }
-
- /**
- * Applies a mapWindow transformation on the windowed data stream by calling
- * the mapWindow function on the window at every trigger. In contrast with
- * the standard binary reducer, with mapWindow allows the user to access all
- * elements of the window at the same time through the iterable interface.
- * The user can also extend the {@link RichWindowMapFunction} to gain access
- * to other features provided by the
- * {@link org.apache.flink.api.common.functions.RichFunction} interface.
- * </br> </br> This version of mapWindow uses user supplied typeinformation
- * for serializaton. Use this only when the system is unable to detect type
- * information.
- *
- * @param windowMapFunction
- * The function that will be applied to the windows.
- * @param outType
- * The output type of the operator.
- * @return The transformed DataStream
- */
- public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<T, R> windowMapFunction,
- TypeInformation<R> outType) {
-
- return discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction),
- getWindowBuffer(WindowTransformation.MAPWINDOW)).mapWindow(windowMapFunction,
- outType);
- }
-
- private DiscretizedStream<T> discretize(WindowTransformation transformation,
- WindowBuffer<T> windowBuffer) {
-
- OneInputStreamOperator<T, WindowEvent<T>> discretizer = getDiscretizer();
-
- OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> bufferOperator = getBufferOperator(windowBuffer);
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- TypeInformation<WindowEvent<T>> bufferEventType = new TupleTypeInfo(WindowEvent.class,
- getType(), BasicTypeInfo.INT_TYPE_INFO);
-
- int parallelism = getDiscretizerParallelism(transformation);
-
- return new DiscretizedStream<T>(dataStream
- .transform(discretizer.getClass().getSimpleName(), bufferEventType, discretizer)
- .setParallelism(parallelism)
- .transform(windowBuffer.getClass().getSimpleName(),
- new StreamWindowTypeInfo<T>(getType()), bufferOperator)
- .setParallelism(parallelism), keyByKey, transformation, false);
-
- }
-
- /**
- * Returns the parallelism for the stream discretizer. The returned
- * parallelism is either 1 for for non-parallel global policies (or when the
- * input stream is non-parallel), environment parallelism for the policies
- * that can run in parallel (such as, any ditributed policy, reduce by count
- * or time).
- *
- * @param transformation
- * The applied transformation
- * @return The parallelism for the stream discretizer
- */
- private int getDiscretizerParallelism(WindowTransformation transformation) {
- return isLocal
- || (transformation == WindowTransformation.REDUCEWINDOW && WindowUtils
- .isParallelPolicy(getTrigger(), getEviction(), dataStream.getParallelism()))
- || (discretizerKey != null) ? dataStream.environment.getParallelism() : 1;
-
- }
-
- /**
- * Dedicated method for applying parallel time reduce transformations on
- * windows
- *
- * @param reduceFunction
- * Reduce function to apply
- * @return The transformed stream
- */
- protected DiscretizedStream<T> timeReduce(ReduceFunction<T> reduceFunction) {
-
- WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
- .with(clean(reduceFunction));
-
- // We get the windowbuffer and set it to emit empty windows with
- // sequential IDs. This logic is necessary to merge windows created in
- // parallel.
- WindowBuffer<T> windowBuffer = getWindowBuffer(transformation).emitEmpty().sequentialID();
-
- // If there is a groupby for the reduce operation we apply it before the
- // discretizers, because we will forward everything afterwards to
- // exploit task chaining
- if (keyByKey != null) {
- dataStream = dataStream.keyBy(keyByKey);
- }
-
- // We discretize the stream and call the timeReduce function of the
- // discretized stream, we also pass the type of the windowbuffer
- DiscretizedStream<T> discretized = discretize(transformation, windowBuffer);
-
- if (getEviction() instanceof KeepAllEvictionPolicy
- && !(windowBuffer instanceof PreAggregator)) {
- throw new RuntimeException(
- "Error in preaggregator logic, parallel time reduce should always be preaggregated");
- }
-
- return discretized.timeReduce(reduceFunction);
-
- }
-
- /**
- * Based on the defined policies, returns the stream discretizer to be used
- */
- private OneInputStreamOperator<T, WindowEvent<T>> getDiscretizer() {
- if (discretizerKey == null) {
- return new StreamDiscretizer<T>(getTrigger(), getEviction());
- } else if (getTrigger() instanceof CentralActiveTrigger) {
- return new GroupedActiveDiscretizer<T>(discretizerKey,
- (CentralActiveTrigger<T>) getTrigger(),
- (CloneableEvictionPolicy<T>) getEviction());
- } else {
- return new GroupedStreamDiscretizer<T>(discretizerKey,
- (CloneableTriggerPolicy<T>) getTrigger(),
- (CloneableEvictionPolicy<T>) getEviction());
- }
-
- }
-
- private OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> getBufferOperator(
- WindowBuffer<T> windowBuffer) {
- if (discretizerKey == null) {
- return new StreamWindowBuffer<T>(windowBuffer);
- } else {
- return new GroupedWindowBuffer<T>(windowBuffer, discretizerKey);
- }
- }
-
- /**
- * Based on the given policies returns the WindowBuffer used to store the
- * elements in the window. This is the module that also encapsulates the
- * pre-aggregator logic when it is applicable, reducing the space cost, and
- * trigger latency.
- *
- */
- @SuppressWarnings("unchecked")
- private WindowBuffer<T> getWindowBuffer(WindowTransformation transformation) {
- TriggerPolicy<T> trigger = getTrigger();
- EvictionPolicy<T> eviction = getEviction();
-
- if (transformation == WindowTransformation.REDUCEWINDOW) {
- if (WindowUtils.isTumblingPolicy(trigger, eviction)) {
- if (eviction instanceof KeepAllEvictionPolicy) {
- if (keyByKey == null) {
- return new TumblingPreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), getType()
- .createSerializer(getExecutionConfig())).noEvict();
- } else {
- return new TumblingGroupedPreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), keyByKey,
- getType().createSerializer(getExecutionConfig())).noEvict();
- }
- } else {
- if (keyByKey == null) {
- return new TumblingPreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), getType()
- .createSerializer(getExecutionConfig()));
- } else {
- return new TumblingGroupedPreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), keyByKey,
- getType().createSerializer(getExecutionConfig()));
- }
- }
- } else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
- if (keyByKey == null) {
- return new SlidingCountPreReducer<T>(
- clean((ReduceFunction<T>) transformation.getUDF()), dataStream
- .getType().createSerializer(getExecutionConfig()),
- WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
- ((CountTriggerPolicy<?>) trigger).getStart());
- } else {
- return new SlidingCountGroupedPreReducer<T>(
- clean((ReduceFunction<T>) transformation.getUDF()), dataStream
- .getType().createSerializer(getExecutionConfig()), keyByKey,
- WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
- ((CountTriggerPolicy<?>) trigger).getStart());
- }
-
- } else if (WindowUtils.isSlidingTimePolicy(trigger, eviction)) {
- if (keyByKey == null) {
- return new SlidingTimePreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), dataStream.getType()
- .createSerializer(getExecutionConfig()),
- WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
- WindowUtils.getTimeStampWrapper(trigger));
- } else {
- return new SlidingTimeGroupedPreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), dataStream.getType()
- .createSerializer(getExecutionConfig()), keyByKey,
- WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
- WindowUtils.getTimeStampWrapper(trigger));
- }
-
- } else if (WindowUtils.isJumpingCountPolicy(trigger, eviction)) {
- if (keyByKey == null) {
- return new JumpingCountPreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), getType()
- .createSerializer(getExecutionConfig()),
- WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
- } else {
- return new JumpingCountGroupedPreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), keyByKey, getType()
- .createSerializer(getExecutionConfig()),
- WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
- }
- } else if (WindowUtils.isJumpingTimePolicy(trigger, eviction)) {
- if (keyByKey == null) {
- return new JumpingTimePreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), getType()
- .createSerializer(getExecutionConfig()),
- WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
- WindowUtils.getTimeStampWrapper(trigger));
- } else {
- return new JumpingTimeGroupedPreReducer<T>(
- (ReduceFunction<T>) transformation.getUDF(), keyByKey, getType()
- .createSerializer(getExecutionConfig()),
- WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
- WindowUtils.getTimeStampWrapper(trigger));
- }
- }
- }
-
- if (eviction instanceof KeepAllEvictionPolicy) {
- throw new RuntimeException(
- "Full stream policy can only be used with operations that support preaggregations, such as reduce or aggregations");
- } else {
- return new BasicWindowBuffer<T>();
- }
- }
-
- /**
- * Applies an aggregation that sums every window of the data stream at the
- * given position.
- *
- * @param positionToSum
- * The position in the tuple/array to sum
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> sum(int positionToSum) {
- return aggregate(new SumAggregator<T>(positionToSum, getType(), getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that sums every window of the pojo data stream at
- * the given field for every window. </br></br> A field expression is either
- * the name of a public field or a getter method with parentheses of the
- * stream's underlying type. A dot can be used to drill down into objects,
- * as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field to sum
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> sum(String field) {
- return aggregate(new SumAggregator<T>(field, getType(), getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the minimum value of every window
- * of the data stream at the given position.
- *
- * @param positionToMin
- * The position to minimize
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> min(int positionToMin) {
- return aggregate(new ComparableAggregator<T>(positionToMin, getType(), AggregationType.MIN,
- getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the minimum value of the pojo data
- * stream at the given field expression for every window. </br></br>A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream}S underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> min(String field) {
- return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MIN,
- false, getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every window of
- * the data stream by the given position. If more elements have the same
- * minimum value the operator returns the first element by default.
- *
- * @param positionToMinBy
- * The position to minimize by
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> minBy(int positionToMinBy) {
- return this.minBy(positionToMinBy, true);
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every window of
- * the data stream by the given position. If more elements have the same
- * minimum value the operator returns the first element by default.
- *
- * @param positionToMinBy
- * The position to minimize by
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> minBy(String positionToMinBy) {
- return this.minBy(positionToMinBy, true);
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every window of
- * the data stream by the given position. If more elements have the same
- * minimum value the operator returns either the first or last one depending
- * on the parameter setting.
- *
- * @param positionToMinBy
- * The position to minimize
- * @param first
- * If true, then the operator return the first element with the
- * minimum value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> minBy(int positionToMinBy, boolean first) {
- return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationType.MINBY, first,
- getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the minimum element of the pojo
- * data stream by the given field expression for every window. A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream}S underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @param first
- * If True then in case of field equality the first object will
- * be returned
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> minBy(String field, boolean first) {
- return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MINBY,
- first, getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the maximum value of every window of
- * the data stream at the given position.
- *
- * @param positionToMax
- * The position to maximize
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> max(int positionToMax) {
- return aggregate(new ComparableAggregator<T>(positionToMax, getType(), AggregationType.MAX,
- getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the maximum value of the pojo data
- * stream at the given field expression for every window. A field expression
- * is either the name of a public field or a getter method with parentheses
- * of the {@link DataStream}S underlying type. A dot can be used to drill
- * down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> max(String field) {
- return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAX,
- false, getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every window of
- * the data stream by the given position. If more elements have the same
- * maximum value the operator returns the first by default.
- *
- * @param positionToMaxBy
- * The position to maximize by
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> maxBy(int positionToMaxBy) {
- return this.maxBy(positionToMaxBy, true);
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every window of
- * the data stream by the given position. If more elements have the same
- * maximum value the operator returns the first by default.
- *
- * @param positionToMaxBy
- * The position to maximize by
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> maxBy(String positionToMaxBy) {
- return this.maxBy(positionToMaxBy, true);
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every window of
- * the data stream by the given position. If more elements have the same
- * maximum value the operator returns either the first or last one depending
- * on the parameter setting.
- *
- * @param positionToMaxBy
- * The position to maximize by
- * @param first
- * If true, then the operator return the first element with the
- * maximum value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> maxBy(int positionToMaxBy, boolean first) {
- return aggregate(new ComparableAggregator<T>(positionToMaxBy, getType(), AggregationType.MAXBY, first,
- getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the maximum element of the pojo
- * data stream by the given field expression for every window. A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream}S underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @param first
- * If True then in case of field equality the first object will
- * be returned
- * @return The transformed DataStream.
- */
- public WindowedDataStream<T> maxBy(String field, boolean first) {
- return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAXBY, first,
- getExecutionConfig()));
- }
-
- private WindowedDataStream<T> aggregate(AggregationFunction<T> aggregator) {
- return reduceWindow(aggregator);
- }
-
- protected TriggerPolicy<T> getTrigger() {
-
- if (triggerHelper != null) {
- return triggerHelper.toTrigger();
- } else if (userTrigger != null) {
- return userTrigger;
- } else {
- throw new RuntimeException("Trigger must not be null");
- }
-
- }
-
- protected EvictionPolicy<T> getEviction() {
-
- if (evictionHelper != null) {
- return evictionHelper.toEvict();
- } else if (userEvicter == null || userEvicter instanceof TumblingEvictionPolicy) {
- if (triggerHelper instanceof Time) {
- return triggerHelper.toEvict();
- } else {
- return new TumblingEvictionPolicy<T>();
- }
- } else {
- return userEvicter;
- }
-
- }
-
- public <F> F clean(F f) {
- if (getExecutionConfig().isClosureCleanerEnabled()) {
- ClosureCleaner.clean(f, true);
- }
- ClosureCleaner.ensureSerializable(f);
- return f;
- }
-
- protected boolean isGrouped() {
- return keyByKey != null;
- }
-
- /**
- * Gets the output type.
- *
- * @return The output type.
- */
- public TypeInformation<T> getType() {
- return dataStream.getType();
- }
-
- public ExecutionConfig getExecutionConfig() {
- return dataStream.getExecutionConfig();
- }
-
- protected WindowedDataStream<T> copy() {
- return new WindowedDataStream<T>(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java
deleted file mode 100644
index ff045a5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * Abstract class for defining rich mapWindow transformation to be applied on
- * {@link WindowedDataStream}s. The mapWindow function will be called on each
- * {@link StreamWindow}.</p> In addition the user can access the functionality
- * provided by the {@link RichFunction} interface.
- */
-public abstract class RichWindowMapFunction<IN, OUT> extends AbstractRichFunction implements
- WindowMapFunction<IN, OUT> {
-
- private static final long serialVersionUID = 9052714915997374185L;
-
- @Override
- public abstract void mapWindow(Iterable<IN> values, Collector<OUT> out) throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java
deleted file mode 100644
index ececb29..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * Interface for defining mapWindow transformation to be applied on
- * {@link WindowedDataStream}s. The mapWindow function will be called on each
- * {@link StreamWindow}.
- */
-public interface WindowMapFunction<T, O> extends Function, Serializable {
-
- void mapWindow(Iterable<T> values, Collector<O> out) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
index 7859b2c..86a12e2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
@@ -17,7 +17,7 @@
package org.apache.flink.streaming.api.functions.windowing.delta;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.functions.windowing.delta.extractor.Extractor;
/**
* This delta function calculates the cosine distance between two given vectors.
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
index f9e8ec7..23efbf2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
@@ -17,7 +17,7 @@
package org.apache.flink.streaming.api.functions.windowing.delta;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.functions.windowing.delta.extractor.Extractor;
/**
* This delta function calculates the euclidean distance between two given
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
index bd5b0b9..7a4e01a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
@@ -17,7 +17,7 @@
package org.apache.flink.streaming.api.functions.windowing.delta;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.functions.windowing.delta.extractor.Extractor;
/**
* Extend this abstract class to implement a delta function which is aware of
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java
new file mode 100644
index 0000000..baceba4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Converts a Tuple to an Object-Array. The field which should be included in
+ * the array can selected and reordered as needed.
+ */
+public class ArrayFromTuple implements Extractor<Tuple, Object[]> {
+
+ /**
+ * Auto generated version id
+ */
+ private static final long serialVersionUID = -6076121226427616818L;
+ int[] order = null;
+
+ /**
+ * Using this constructor the extractor will convert the whole tuple (all
+ * fields in the original order) to an array.
+ */
+ public ArrayFromTuple() {
+ // noting to do
+ }
+
+ /**
+ * Using this constructor the extractor will combine the fields as specified
+ * in the indexes parameter in an object array.
+ *
+ * @param indexes
+ * the field ids (enumerated from 0)
+ */
+ public ArrayFromTuple(int... indexes) {
+ this.order = indexes;
+ }
+
+ @Override
+ public Object[] extract(Tuple in) {
+ Object[] output;
+
+ if (order == null) {
+ // copy the whole tuple
+ output = new Object[in.getArity()];
+ for (int i = 0; i < in.getArity(); i++) {
+ output[i] = in.getField(i);
+ }
+ } else {
+ // copy user specified order
+ output = new Object[order.length];
+ for (int i = 0; i < order.length; i++) {
+ output[i] = in.getField(order[i]);
+ }
+ }
+
+ return output;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java
new file mode 100644
index 0000000..89c3a32
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+/**
+ * Combines two extractors which will be executed one after each other.
+ *
+ * @param <FROM>
+ * The input type of the first extractor.
+ * @param <OVER>
+ * The output type of the first and the input type of the second
+ * extractor.
+ * @param <TO>
+ * The output type of the second extractor and the output type of the
+ * over all extraction.
+ */
+public class ConcatenatedExtract<FROM, OVER, TO> implements Extractor<FROM, TO> {
+
+ /**
+ * auto-generated id
+ */
+ private static final long serialVersionUID = -7807197760725651752L;
+
+ private Extractor<FROM, OVER> e1;
+ private Extractor<OVER, TO> e2;
+
+ /**
+ * Combines two extractors which will be executed one after each other.
+ *
+ * @param e1
+ * First extractor: This extractor gets applied to the input data
+ * first. Its output as then passed as input to the second
+ * extractor.
+ * @param e2
+ * Second extractor: This extractor gets the output of the first
+ * extractor as input. Its output is then the result of the over
+ * all extraction.
+ */
+ public ConcatenatedExtract(Extractor<FROM, OVER> e1, Extractor<OVER, TO> e2) {
+ this.e1 = e1;
+ this.e2 = e2;
+ }
+
+ @Override
+ public TO extract(FROM in) {
+ return e2.extract(e1.extract(in));
+ }
+
+ public <OUT> ConcatenatedExtract<FROM, TO, OUT> add(Extractor<TO, OUT> e3) {
+ return new ConcatenatedExtract<FROM, TO, OUT>(this, e3);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java
new file mode 100644
index 0000000..8cd0014
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import java.io.Serializable;
+
+/**
+ * Extractors allow to extract/convert one type to another. They are mostly used
+ * to extract some fields out of a more complex structure (Tuple/Array) to run
+ * further calculation on the extraction result.
+ *
+ * @param <FROM>
+ * The input data type.
+ * @param <TO>
+ * The output data type.
+ */
+public interface Extractor<FROM, TO> extends Serializable {
+
+ /**
+ * Extracts/Converts the given input to an object of the output type
+ *
+ * @param in
+ * the input data
+ * @return the extracted/converted data
+ */
+ public TO extract(FROM in);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java
new file mode 100644
index 0000000..f9d0a2b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import java.lang.reflect.Array;
+
+/**
+ * Extracts a single field out of an array.
+ *
+ * @param <OUT>
+ * The type of the extracted field.
+ */
+public class FieldFromArray<OUT> implements Extractor<Object, OUT> {
+
+ /**
+ * Auto-gernated version id
+ */
+ private static final long serialVersionUID = -5161386546695574359L;
+ private int fieldId = 0;
+
+ /**
+ * Extracts the first field (id 0) from the array
+ */
+ public FieldFromArray() {
+ // noting to do => will use default 0
+ }
+
+ /**
+ * Extracts the field with the given id from the array.
+ *
+ * @param fieldId
+ * The id of the field which will be extracted from the array.
+ */
+ public FieldFromArray(int fieldId) {
+ this.fieldId = fieldId;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public OUT extract(Object in) {
+ return (OUT) Array.get(in, fieldId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java
new file mode 100644
index 0000000..627afca
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Extracts a single field out of a tuple.
+ *
+ * @param <OUT>
+ * The type of the extracted field.
+ */
+public class FieldFromTuple<OUT> implements Extractor<Tuple, OUT> {
+
+ /**
+ * Auto-gernated version id
+ */
+ private static final long serialVersionUID = -5161386546695574359L;
+ private int fieldId = 0;
+
+ /**
+ * Extracts the first field (id 0) from the tuple
+ */
+ public FieldFromTuple() {
+ // noting to do => will use default 0
+ }
+
+ /**
+ * Extracts the field with the given id from the tuple.
+ *
+ * @param fieldId
+ * The id of the field which will be extracted from the tuple.
+ */
+ public FieldFromTuple(int fieldId) {
+ this.fieldId = fieldId;
+ }
+
+ @Override
+ public OUT extract(Tuple in) {
+ return in.getField(fieldId);
+ }
+
+}