You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:39 UTC
[05/13] flink git commit: [FLINK-2550] Simplify Stream Java API Class
Names
[FLINK-2550] Simplify Stream Java API Class Names
KeyedDataStream -> KeyedStream
KeyedWindowDataStream -> WindowedStream
NonParallelWindowDataStream -> AllWindowedStream
KeyedWindowFunction -> WindowFunction
WindowFunction -> AllWindowFunction
(along with rich functions and reduce function wrappers)
WindowedStream.mapWindow -> WindowedStream.apply
AllWindowedStream.mapWindow -> AllWindowedStream.apply
Also renamed the tests to match the new names.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e6e0aec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e6e0aec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e6e0aec
Branch: refs/heads/master
Commit: 9e6e0aeca01c50640827adbdd60089761cd5e8d2
Parents: 68c1afc
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Oct 1 15:58:52 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200
----------------------------------------------------------------------
.../api/datastream/AllWindowedStream.java | 231 ++++++++++++++
.../streaming/api/datastream/DataStream.java | 34 +--
.../api/datastream/GroupedDataStream.java | 2 +-
.../api/datastream/KeyedDataStream.java | 159 ----------
.../streaming/api/datastream/KeyedStream.java | 160 ++++++++++
.../api/datastream/KeyedWindowDataStream.java | 287 ------------------
.../datastream/NonParallelWindowDataStream.java | 218 -------------
.../api/datastream/WindowedStream.java | 302 +++++++++++++++++++
.../functions/windowing/AllWindowFunction.java | 45 +++
.../windowing/KeyedWindowFunction.java | 45 ---
.../windowing/ReduceAllWindowFunction.java | 70 +++++
.../windowing/ReduceKeyedWindowFunction.java | 70 -----
.../windowing/ReduceWindowFunction.java | 4 +-
.../ReduceWindowFunctionWithWindow.java | 4 +-
.../windowing/RichAllWindowFunction.java | 25 ++
.../windowing/RichKeyedWindowFunction.java | 25 --
.../functions/windowing/RichWindowFunction.java | 2 +-
.../api/functions/windowing/WindowFunction.java | 16 +-
.../windowing/AccumulatingKeyedTimePanes.java | 14 +-
...ccumulatingProcessingTimeWindowOperator.java | 8 +-
.../EvictingNonKeyedWindowOperator.java | 6 +-
.../windowing/EvictingWindowOperator.java | 6 +-
.../windowing/NonKeyedWindowOperator.java | 8 +-
.../operators/windowing/WindowOperator.java | 8 +-
.../runtime/tasks/StreamingRuntimeContext.java | 2 +-
.../api/state/StatefulOperatorTest.java | 4 +-
...AlignedProcessingTimeWindowOperatorTest.java | 22 +-
.../windowing/AllWindowTranslationTest.java | 198 ++++++++++++
.../EvictingNonKeyedWindowOperatorTest.java | 5 +-
.../windowing/EvictingWindowOperatorTest.java | 4 +-
.../windowing/NonKeyedWindowOperatorTest.java | 11 +-
...ParallelWindowDataStreamTranslationTest.java | 198 ------------
.../windowing/TimeWindowTranslationTest.java | 13 +-
.../operators/windowing/WindowOperatorTest.java | 10 +-
.../windowing/WindowTranslationTest.java | 17 +-
.../GroupedProcessingTimeWindowExample.java | 10 +-
.../flink/streaming/api/scala/DataStream.scala | 5 +-
37 files changed, 1144 insertions(+), 1104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
new file mode 100644
index 0000000..e5c7c18
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+
+/**
+ * A {@code AllWindowedStream} represents a data stream where the stream of
+ * elements is split into windows based on a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
+ * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be
+ * used to evict elements from the window after
+ * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
+ * When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ *
+ * <p>
+ * Note that the {@code AllWindowedStream} is purely and API construct, during runtime
+ * the {@code AllWindowedStream} will be collapsed together with the
+ * operation over the window into one single operation.
+ *
+ * @param <T> The type of elements in the stream.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
+ */
+public class AllWindowedStream<T, W extends Window> {
+
+ /** The data stream that is windowed by this stream */
+ private final DataStream<T> input;
+
+ /** The window assigner */
+ private final WindowAssigner<? super T, W> windowAssigner;
+
+ /** The trigger that is used for window evaluation/emission. */
+ private Trigger<? super T, ? super W> trigger;
+
+ /** The evictor that is used for evicting elements before window evaluation. */
+ private Evictor<? super T, ? super W> evictor;
+
+
+ public AllWindowedStream(DataStream<T> input,
+ WindowAssigner<? super T, W> windowAssigner) {
+ this.input = input;
+ this.windowAssigner = windowAssigner;
+ this.trigger = windowAssigner.getDefaultTrigger();
+ }
+
+ /**
+ * Sets the {@code Trigger} that should be used to trigger window emission.
+ */
+ public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
+ this.trigger = trigger;
+ return this;
+ }
+
+ /**
+ * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+ *
+ * <p>
+ * Note: When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ */
+ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
+ this.evictor = evictor;
+ return this;
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Operations on the keyed windows
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies a reduce function to the window. The window function is called for each evaluation
+ * of the window for each key individually. The output of the reduce function is interpreted
+ * as a regular non-windowed stream.
+ * <p>
+ * This window will try and pre-aggregate data as much as the window policies permit. For example,
+ * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+ * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
+ * so a few elements are stored per key (one per slide interval).
+ * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+ * aggregation tree.
+ *
+ * @param function The reduce function.
+ * @return The data stream that is the result of applying the reduce function to the window.
+ */
+ public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "Reduce at " + callLocation;
+
+ DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
+ if (result != null) {
+ return result;
+ }
+
+ String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+ OneInputStreamOperator<T, T> operator;
+
+ if (evictor != null) {
+ operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+ new HeapWindowBuffer.Factory<T>(),
+ new ReduceAllWindowFunction<W, T>(function),
+ trigger,
+ evictor);
+
+ } else {
+ // we need to copy because we need our own instance of the pre aggregator
+ @SuppressWarnings("unchecked")
+ ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
+
+ operator = new NonKeyedWindowOperator<>(windowAssigner,
+ new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+ new ReduceAllWindowFunction<W, T>(function),
+ trigger);
+ }
+
+ return input.transform(opName, input.getType(), operator).setParallelism(1);
+ }
+
+ /**
+ * Applies a window function to the window. The window function is called for each evaluation
+ * of the window for each key individually. The output of the window function is interpreted
+ * as a regular non-windowed stream.
+ * <p>
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of pre-aggregation.
+ *
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ public <R> DataStream<R> apply(AllWindowFunction<T, R, W> function) {
+ TypeInformation<T> inType = input.getType();
+ TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ function, AllWindowFunction.class, true, true, inType, null, false);
+
+ return apply(function, resultType);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each evaluation
+ * of the window for each key individually. The output of the window function is interpreted
+ * as a regular non-windowed stream.
+ * <p>
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of pre-aggregation.
+ *
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ public <R> DataStream<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "MapWindow at " + callLocation;
+
+ DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
+ if (result != null) {
+ return result;
+ }
+
+
+ String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+ OneInputStreamOperator<T, R> operator;
+
+ if (evictor != null) {
+ operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+ new HeapWindowBuffer.Factory<T>(),
+ function,
+ trigger,
+ evictor);
+
+ } else {
+ operator = new NonKeyedWindowOperator<>(windowAssigner,
+ new HeapWindowBuffer.Factory<T>(),
+ function,
+ trigger);
+ }
+
+ return input.transform(opName, resultType, operator).setParallelism(1);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+
+ private <R> DataStream<R> createFastTimeOperatorIfValid(
+ Function function,
+ TypeInformation<R> resultType,
+ String functionName) {
+
+ // TODO: add once non-parallel fast aligned time windows operator is ready
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c2be055..ad159f9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -233,15 +233,15 @@ public class DataStream<T> {
/**
*
- * It creates a new {@link KeyedDataStream} that uses the provided key for partitioning
+ * It creates a new {@link KeyedStream} that uses the provided key for partitioning
* its operator states.
*
* @param key
* The KeySelector to be used for extracting the key for partitioning
- * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
+ * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
*/
- public <K> KeyedDataStream<T, K> keyBy(KeySelector<T, K> key){
- return new KeyedDataStream<T, K>(this, clean(key));
+ public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key){
+ return new KeyedStream<T, K>(this, clean(key));
}
/**
@@ -250,9 +250,9 @@ public class DataStream<T> {
* @param fields
* The position of the fields on which the {@link DataStream}
* will be grouped.
- * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
+ * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
*/
- public KeyedDataStream<T, Tuple> keyBy(int... fields) {
+ public KeyedStream<T, Tuple> keyBy(int... fields) {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
return keyBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
} else {
@@ -269,14 +269,14 @@ public class DataStream<T> {
* @param fields
* One or more field expressions on which the state of the {@link DataStream} operators will be
* partitioned.
- * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
+ * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
**/
- public KeyedDataStream<T, Tuple> keyBy(String... fields) {
+ public KeyedStream<T, Tuple> keyBy(String... fields) {
return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
}
- private KeyedDataStream<T, Tuple> keyBy(Keys<T> keys) {
- return new KeyedDataStream<T, Tuple>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
+ private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
+ return new KeyedStream<T, Tuple>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
getType(), getExecutionConfig())));
}
@@ -288,7 +288,7 @@ public class DataStream<T> {
* @param fields
* The position of the fields on which the states of the {@link DataStream}
* will be partitioned.
- * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
+ * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
*/
public GroupedDataStream<T, Tuple> groupBy(int... fields) {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
@@ -816,7 +816,7 @@ public class DataStream<T> {
}
/**
- * Windows this {@code KeyedDataStream} into tumbling time windows.
+ * Windows this {@code DataStream} into tumbling time windows.
*
* <p>
* This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
@@ -826,7 +826,7 @@ public class DataStream<T> {
*
* @param size The size of the window.
*/
- public NonParallelWindowDataStream<T, TimeWindow> timeWindowAll(AbstractTime size) {
+ public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size) {
AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
if (actualSize instanceof EventTime) {
@@ -837,7 +837,7 @@ public class DataStream<T> {
}
/**
- * Windows this {@code KeyedDataStream} into sliding time windows.
+ * Windows this {@code DataStream} into sliding time windows.
*
* <p>
* This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
@@ -847,7 +847,7 @@ public class DataStream<T> {
*
* @param size The size of the window.
*/
- public NonParallelWindowDataStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime slide) {
+ public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime slide) {
AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
@@ -879,8 +879,8 @@ public class DataStream<T> {
* @param assigner The {@code WindowAssigner} that assigns elements to windows.
* @return The trigger windows data stream.
*/
- public <W extends Window> NonParallelWindowDataStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
- return new NonParallelWindowDataStream<>(this, assigner);
+ public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
+ return new AllWindowedStream<>(this, assigner);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index fde5a6d..ebaeb56 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
* @param <T> The type of the elements in the Grouped Stream.
* @param <KEY> The type of the key in the Keyed Stream.
*/
-public class GroupedDataStream<T, KEY> extends KeyedDataStream<T, KEY> {
+public class GroupedDataStream<T, KEY> extends KeyedStream<T, KEY> {
/**
* Creates a new {@link GroupedDataStream}, group inclusion is determined using
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
deleted file mode 100644
index 2ae07b2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.time.EventTime;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-
-/**
- * A KeyedDataStream represents a {@link DataStream} on which operator state is
- * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a {@link DataStream}
- * are also possible on a KeyedDataStream, with the exception of partitioning methods such as shuffle, forward and groupBy.
- *
- *
- * @param <T> The type of the elements in the Keyed Stream.
- * @param <KEY> The type of the key in the Keyed Stream.
- */
-public class KeyedDataStream<T, KEY> extends DataStream<T> {
-
- protected final KeySelector<T, KEY> keySelector;
-
- /**
- * Creates a new {@link KeyedDataStream} using the given {@link KeySelector}
- * to partition operator state by key.
- *
- * @param dataStream
- * Base stream of data
- * @param keySelector
- * Function for determining state partitions
- */
- public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
- super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector)));
- this.keySelector = keySelector;
- }
-
-
- public KeySelector<T, KEY> getKeySelector() {
- return this.keySelector;
- }
-
-
- @Override
- protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
- throw new UnsupportedOperationException("Cannot override partitioning for KeyedDataStream.");
- }
-
-
- @Override
- public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
- TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
-
- SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
-
- ((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(keySelector);
- return returnStream;
- }
-
-
-
- @Override
- public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
- DataStreamSink<T> result = super.addSink(sinkFunction);
- result.getTransformation().setStateKeySelector(keySelector);
- return result;
- }
-
- // ------------------------------------------------------------------------
- // Windowing
- // ------------------------------------------------------------------------
-
- /**
- * Windows this {@code KeyedDataStream} into tumbling time windows.
- *
- * <p>
- * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
- * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
- * set using
- * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
- *
- * @param size The size of the window.
- */
- public KeyedWindowDataStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
- AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
- if (actualSize instanceof EventTime) {
- return window(TumblingTimeWindows.of(actualSize.toMilliseconds()));
- } else {
- return window(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
- }
- }
-
- /**
- * Windows this {@code KeyedDataStream} into sliding time windows.
- *
- * <p>
- * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
- * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
- * set using
- * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
- *
- * @param size The size of the window.
- */
- public KeyedWindowDataStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
- AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
- AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
- if (actualSize instanceof EventTime) {
- return window(SlidingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
- } else {
- return window(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
- }
- }
-
- /**
- * Windows this data stream to a {@code KeyedWindowDataStream}, which evaluates windows
- * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
- * grouping of elements is done both by key and by window.
- *
- * <p>
- * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
- * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
- * that is used if a {@code Trigger} is not specified.
- *
- * @param assigner The {@code WindowAssigner} that assigns elements to windows.
- * @return The trigger windows data stream.
- */
- public <W extends Window> KeyedWindowDataStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
- return new KeyedWindowDataStream<>(this, assigner);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
new file mode 100644
index 0000000..b3cfb55
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.EventTime;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+
+/**
+ * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
+ * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
+ * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
+ * partitioning methods such as shuffle, forward and groupBy.
+ *
+ *
+ * @param <T> The type of the elements in the Keyed Stream.
+ * @param <KEY> The type of the key in the Keyed Stream.
+ */
+public class KeyedStream<T, KEY> extends DataStream<T> {
+
+ protected final KeySelector<T, KEY> keySelector;
+
+ /**
+ * Creates a new {@link KeyedStream} using the given {@link KeySelector}
+ * to partition operator state by key.
+ *
+ * @param dataStream
+ * Base stream of data
+ * @param keySelector
+ * Function for determining state partitions
+ */
+ public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
+ super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector)));
+ this.keySelector = keySelector;
+ }
+
+
+ public KeySelector<T, KEY> getKeySelector() {
+ return this.keySelector;
+ }
+
+
+ @Override
+ protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
+ throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
+ }
+
+
+ @Override
+ public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
+ TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
+
+ SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
+
+ ((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(keySelector);
+ return returnStream;
+ }
+
+
+
+ @Override
+ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
+ DataStreamSink<T> result = super.addSink(sinkFunction);
+ result.getTransformation().setStateKeySelector(keySelector);
+ return result;
+ }
+
+ // ------------------------------------------------------------------------
+ // Windowing
+ // ------------------------------------------------------------------------
+
+ /**
+ * Windows this {@code KeyedStream} into tumbling time windows.
+ *
+ * <p>
+ * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
+ * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
+ * set using
+ * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+ *
+ * @param size The size of the window.
+ */
+ public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
+ AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+
+ if (actualSize instanceof EventTime) {
+ return window(TumblingTimeWindows.of(actualSize.toMilliseconds()));
+ } else {
+ return window(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
+ }
+ }
+
+ /**
+ * Windows this {@code KeyedStream} into sliding time windows.
+ *
+ * <p>
+ * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
+ * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
+ * set using
+ * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+ *
+ * @param size The size of the window.
+ */
+ public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
+ AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+ AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+
+ if (actualSize instanceof EventTime) {
+ return window(SlidingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
+ } else {
+ return window(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
+ }
+ }
+
+ /**
+ * Windows this data stream to a {@code WindowedStream}, which evaluates windows
+ * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
+ * grouping of elements is done both by key and by window.
+ *
+ * <p>
+ * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
+ * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
+ * that is used if a {@code Trigger} is not specified.
+ *
+ * @param assigner The {@code WindowAssigner} that assigns elements to windows.
+ * @return The trigger windows data stream.
+ */
+ public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
+ return new WindowedStream<>(this, assigner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
deleted file mode 100644
index 9d05b8c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-
-/**
- * A {@code KeyedWindowDataStream} represents a data stream where elements are grouped by
- * key, and for each key, the stream of elements is split into windows based on a
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
- * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
- *
- * <p>
- * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
- * different points for each key.
- *
- * <p>
- * If an {@link Evictor} is specified it will be used to evict elements from the window after
- * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
- * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- *
- * <p>
- * Note that the {@code KeyedWindowDataStream} is purely and API construct, during runtime
- * the {@code KeyedWindowDataStream} will be collapsed together with the
- * {@code KeyedDataStream} and the operation over the window into one single operation.
- *
- * @param <T> The type of elements in the stream.
- * @param <K> The type of the key by which elements are grouped.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
- */
-public class KeyedWindowDataStream<T, K, W extends Window> {
-
- /** The keyed data stream that is windowed by this stream */
- private final KeyedDataStream<T, K> input;
-
- /** The window assigner */
- private final WindowAssigner<? super T, W> windowAssigner;
-
- /** The trigger that is used for window evaluation/emission. */
- private Trigger<? super T, ? super W> trigger;
-
- /** The evictor that is used for evicting elements before window evaluation. */
- private Evictor<? super T, ? super W> evictor;
-
-
- public KeyedWindowDataStream(KeyedDataStream<T, K> input,
- WindowAssigner<? super T, W> windowAssigner) {
- this.input = input;
- this.windowAssigner = windowAssigner;
- this.trigger = windowAssigner.getDefaultTrigger();
- }
-
- /**
- * Sets the {@code Trigger} that should be used to trigger window emission.
- */
- public KeyedWindowDataStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
- this.trigger = trigger;
- return this;
- }
-
- /**
- * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
- *
- * <p>
- * Note: When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- */
- public KeyedWindowDataStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
- this.evictor = evictor;
- return this;
- }
-
-
- // ------------------------------------------------------------------------
- // Operations on the keyed windows
- // ------------------------------------------------------------------------
-
- /**
- * Applies a reduce function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the reduce function is interpreted
- * as a regular non-windowed stream.
- * <p>
- * This window will try and pre-aggregate data as much as the window policies permit. For example,
- * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
- * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
- * so a few elements are stored per key (one per slide interval).
- * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
- * aggregation tree.
- *
- * @param function The reduce function.
- * @return The data stream that is the result of applying the reduce function to the window.
- */
- public DataStream<T> reduceWindow(ReduceFunction<T> function) {
- String callLocation = Utils.getCallLocationName();
- String udfName = "Reduce at " + callLocation;
-
- DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
- if (result != null) {
- return result;
- }
-
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
- KeySelector<T, K> keySel = input.getKeySelector();
-
- OneInputStreamOperator<T, T> operator;
-
- if (evictor != null) {
- operator = new EvictingWindowOperator<>(windowAssigner,
- keySel,
- new HeapWindowBuffer.Factory<T>(),
- new ReduceKeyedWindowFunction<K, W, T>(function),
- trigger,
- evictor);
-
- } else {
- // we need to copy because we need our own instance of the pre aggregator
- @SuppressWarnings("unchecked")
- ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
- operator = new WindowOperator<>(windowAssigner,
- keySel,
- new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
- new ReduceKeyedWindowFunction<K, W, T>(function),
- trigger);
- }
-
- return input.transform(opName, input.getType(), operator);
- }
-
- /**
- * Applies a window function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the window function is interpreted
- * as a regular non-windowed stream.
- * <p>
- * Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> DataStream<R> mapWindow(KeyedWindowFunction<T, R, K, W> function) {
- // clean the closure
- function = input.getExecutionEnvironment().clean(function);
-
- TypeInformation<T> inType = input.getType();
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, KeyedWindowFunction.class, true, true, inType, null, false);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "MapWindow at " + callLocation;
-
- DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
- if (result != null) {
- return result;
- }
-
-
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
- KeySelector<T, K> keySel = input.getKeySelector();
-
- OneInputStreamOperator<T, R> operator;
-
- if (evictor != null) {
- operator = new EvictingWindowOperator<>(windowAssigner,
- keySel,
- new HeapWindowBuffer.Factory<T>(),
- function,
- trigger,
- evictor);
-
- } else {
- operator = new WindowOperator<>(windowAssigner,
- keySel,
- new HeapWindowBuffer.Factory<T>(),
- function,
- trigger);
- }
-
-
-
- return input.transform(opName, resultType, operator);
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private <R> DataStream<R> createFastTimeOperatorIfValid(
- Function function,
- TypeInformation<R> resultType,
- String functionName) {
-
- if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
- SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
- final long windowLength = timeWindows.getSize();
- final long windowSlide = timeWindows.getSlide();
-
- String opName = "Fast " + timeWindows + " of " + functionName;
-
- if (function instanceof ReduceFunction) {
- @SuppressWarnings("unchecked")
- ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
- @SuppressWarnings("unchecked")
- OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
- new AggregatingProcessingTimeWindowOperator<>(
- reducer, input.getKeySelector(), windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
- else if (function instanceof KeyedWindowFunction) {
- @SuppressWarnings("unchecked")
- KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;
-
- OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
- wf, input.getKeySelector(), windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
- } else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
- TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
- final long windowLength = timeWindows.getSize();
- final long windowSlide = timeWindows.getSize();
-
- String opName = "Fast " + timeWindows + " of " + functionName;
-
- if (function instanceof ReduceFunction) {
- @SuppressWarnings("unchecked")
- ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
- @SuppressWarnings("unchecked")
- OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
- new AggregatingProcessingTimeWindowOperator<>(
- reducer, input.getKeySelector(), windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
- else if (function instanceof KeyedWindowFunction) {
- @SuppressWarnings("unchecked")
- KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;
-
- OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
- wf, input.getKeySelector(), windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
- }
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/NonParallelWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/NonParallelWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/NonParallelWindowDataStream.java
deleted file mode 100644
index 5cb3b6b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/NonParallelWindowDataStream.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-
-/**
- * A {@code NonParallelWindowDataStream} represents a data stream where the stream of
- * elements is split into windows based on a
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
- * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
- *
- * <p>
- * If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be
- * used to evict elements from the window after
- * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
- * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- *
- * <p>
- * Note that the {@code NonParallelWindowDataStream} is purely and API construct, during runtime
- * the {@code NonParallelWindowDataStream} will be collapsed together with the
- * operation over the window into one single operation.
- *
- * @param <T> The type of elements in the stream.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
- */
-public class NonParallelWindowDataStream<T, W extends Window> {
-
- /** The data stream that is windowed by this stream */
- private final DataStream<T> input;
-
- /** The window assigner */
- private final WindowAssigner<? super T, W> windowAssigner;
-
- /** The trigger that is used for window evaluation/emission. */
- private Trigger<? super T, ? super W> trigger;
-
- /** The evictor that is used for evicting elements before window evaluation. */
- private Evictor<? super T, ? super W> evictor;
-
-
- public NonParallelWindowDataStream(DataStream<T> input,
- WindowAssigner<? super T, W> windowAssigner) {
- this.input = input;
- this.windowAssigner = windowAssigner;
- this.trigger = windowAssigner.getDefaultTrigger();
- }
-
- /**
- * Sets the {@code Trigger} that should be used to trigger window emission.
- */
- public NonParallelWindowDataStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
- this.trigger = trigger;
- return this;
- }
-
- /**
- * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
- *
- * <p>
- * Note: When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- */
- public NonParallelWindowDataStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
- this.evictor = evictor;
- return this;
- }
-
-
- // ------------------------------------------------------------------------
- // Operations on the keyed windows
- // ------------------------------------------------------------------------
-
- /**
- * Applies a reduce function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the reduce function is interpreted
- * as a regular non-windowed stream.
- * <p>
- * This window will try and pre-aggregate data as much as the window policies permit. For example,
- * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
- * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
- * so a few elements are stored per key (one per slide interval).
- * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
- * aggregation tree.
- *
- * @param function The reduce function.
- * @return The data stream that is the result of applying the reduce function to the window.
- */
- public DataStream<T> reduceWindow(ReduceFunction<T> function) {
- String callLocation = Utils.getCallLocationName();
- String udfName = "Reduce at " + callLocation;
-
- DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
- if (result != null) {
- return result;
- }
-
- String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
- OneInputStreamOperator<T, T> operator;
-
- if (evictor != null) {
- operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
- new HeapWindowBuffer.Factory<T>(),
- new ReduceWindowFunction<W, T>(function),
- trigger,
- evictor);
-
- } else {
- // we need to copy because we need our own instance of the pre aggregator
- @SuppressWarnings("unchecked")
- ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
- operator = new NonKeyedWindowOperator<>(windowAssigner,
- new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
- new ReduceWindowFunction<W, T>(function),
- trigger);
- }
-
- return input.transform(opName, input.getType(), operator).setParallelism(1);
- }
-
- /**
- * Applies a window function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the window function is interpreted
- * as a regular non-windowed stream.
- * <p>
- * Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> DataStream<R> mapWindow(WindowFunction<T, R, W> function) {
- TypeInformation<T> inType = input.getType();
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, WindowFunction.class, true, true, inType, null, false);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "MapWindow at " + callLocation;
-
- DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
- if (result != null) {
- return result;
- }
-
-
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
- OneInputStreamOperator<T, R> operator;
-
- if (evictor != null) {
- operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
- new HeapWindowBuffer.Factory<T>(),
- function,
- trigger,
- evictor);
-
- } else {
- operator = new NonKeyedWindowOperator<>(windowAssigner,
- new HeapWindowBuffer.Factory<T>(),
- function,
- trigger);
- }
-
-
-
- return input.transform(opName, resultType, operator).setParallelism(1);
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
-
- private <R> DataStream<R> createFastTimeOperatorIfValid(
- Function function,
- TypeInformation<R> resultType,
- String functionName) {
-
- // TODO: add once non-parallel fast aligned time windows operator is ready
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
new file mode 100644
index 0000000..16898dd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+
+/**
+ * A {@code WindowedStream} represents a data stream where elements are grouped by
+ * key, and for each key, the stream of elements is split into windows based on a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
+ * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
+ * different points for each key.
+ *
+ * <p>
+ * If an {@link Evictor} is specified it will be used to evict elements from the window after
+ * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
+ * When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ *
+ * <p>
+ * Note that the {@code WindowedStream} is purely and API construct, during runtime
+ * the {@code WindowedStream} will be collapsed together with the
+ * {@code KeyedStream} and the operation over the window into one single operation.
+ *
+ * @param <T> The type of elements in the stream.
+ * @param <K> The type of the key by which elements are grouped.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
+ */
+public class WindowedStream<T, K, W extends Window> {
+
+ /** The keyed data stream that is windowed by this stream */
+ private final KeyedStream<T, K> input;
+
+ /** The window assigner */
+ private final WindowAssigner<? super T, W> windowAssigner;
+
+ /** The trigger that is used for window evaluation/emission. */
+ private Trigger<? super T, ? super W> trigger;
+
+ /** The evictor that is used for evicting elements before window evaluation. */
+ private Evictor<? super T, ? super W> evictor;
+
+
+ public WindowedStream(KeyedStream<T, K> input,
+ WindowAssigner<? super T, W> windowAssigner) {
+ this.input = input;
+ this.windowAssigner = windowAssigner;
+ this.trigger = windowAssigner.getDefaultTrigger();
+ }
+
+ /**
+ * Sets the {@code Trigger} that should be used to trigger window emission.
+ */
+ public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
+ this.trigger = trigger;
+ return this;
+ }
+
+ /**
+ * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+ *
+ * <p>
+ * Note: When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ */
+ public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
+ this.evictor = evictor;
+ return this;
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Operations on the keyed windows
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies a reduce function to the window. The window function is called for each evaluation
+ * of the window for each key individually. The output of the reduce function is interpreted
+ * as a regular non-windowed stream.
+ * <p>
+ * This window will try and pre-aggregate data as much as the window policies permit. For example,
+ * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+ * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
+ * so a few elements are stored per key (one per slide interval).
+ * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+ * aggregation tree.
+ *
+ * @param function The reduce function.
+ * @return The data stream that is the result of applying the reduce function to the window.
+ */
+ public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "Reduce at " + callLocation;
+
+ DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
+ if (result != null) {
+ return result;
+ }
+
+ String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+ KeySelector<T, K> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, T> operator;
+
+ if (evictor != null) {
+ operator = new EvictingWindowOperator<>(windowAssigner,
+ keySel,
+ new HeapWindowBuffer.Factory<T>(),
+ new ReduceWindowFunction<K, W, T>(function),
+ trigger,
+ evictor);
+
+ } else {
+ // we need to copy because we need our own instance of the pre aggregator
+ @SuppressWarnings("unchecked")
+ ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
+
+ operator = new WindowOperator<>(windowAssigner,
+ keySel,
+ new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+ new ReduceWindowFunction<K, W, T>(function),
+ trigger);
+ }
+
+ return input.transform(opName, input.getType(), operator);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of pre-aggregation.
+ *
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ public <R> DataStream<R> apply(WindowFunction<T, R, K, W> function) {
+ TypeInformation<T> inType = input.getType();
+ TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ function, WindowFunction.class, true, true, inType, null, false);
+
+ return apply(function, resultType);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of pre-aggregation.
+ *
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ public <R> DataStream<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
+ //clean the closure
+ function = input.getExecutionEnvironment().clean(function);
+
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "MapWindow at " + callLocation;
+
+ DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
+ if (result != null) {
+ return result;
+ }
+
+
+ String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+ KeySelector<T, K> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, R> operator;
+
+ if (evictor != null) {
+ operator = new EvictingWindowOperator<>(windowAssigner,
+ keySel,
+ new HeapWindowBuffer.Factory<T>(),
+ function,
+ trigger,
+ evictor);
+
+ } else {
+ operator = new WindowOperator<>(windowAssigner,
+ keySel,
+ new HeapWindowBuffer.Factory<T>(),
+ function,
+ trigger);
+ }
+
+ return input.transform(opName, resultType, operator);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private <R> DataStream<R> createFastTimeOperatorIfValid(
+ Function function,
+ TypeInformation<R> resultType,
+ String functionName) {
+
+ if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+ SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
+ final long windowLength = timeWindows.getSize();
+ final long windowSlide = timeWindows.getSlide();
+
+ String opName = "Fast " + timeWindows + " of " + functionName;
+
+ if (function instanceof ReduceFunction) {
+ @SuppressWarnings("unchecked")
+ ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+ @SuppressWarnings("unchecked")
+ OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+ new AggregatingProcessingTimeWindowOperator<>(
+ reducer, input.getKeySelector(), windowLength, windowSlide);
+ return input.transform(opName, resultType, op);
+ }
+ else if (function instanceof WindowFunction) {
+ @SuppressWarnings("unchecked")
+ WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
+
+ OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+ wf, input.getKeySelector(), windowLength, windowSlide);
+ return input.transform(opName, resultType, op);
+ }
+ } else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+ TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
+ final long windowLength = timeWindows.getSize();
+ final long windowSlide = timeWindows.getSize();
+
+ String opName = "Fast " + timeWindows + " of " + functionName;
+
+ if (function instanceof ReduceFunction) {
+ @SuppressWarnings("unchecked")
+ ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+ @SuppressWarnings("unchecked")
+ OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+ new AggregatingProcessingTimeWindowOperator<>(
+ reducer, input.getKeySelector(), windowLength, windowSlide);
+ return input.transform(opName, resultType, op);
+ }
+ else if (function instanceof WindowFunction) {
+ @SuppressWarnings("unchecked")
+ WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
+
+ OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+ wf, input.getKeySelector(), windowLength, windowSlide);
+ return input.transform(opName, resultType, op);
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
new file mode 100644
index 0000000..1d54436
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for functions that are evaluated over non-keyed windows.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ */
+public interface AllWindowFunction<IN, OUT, W extends Window> extends Function, Serializable {
+
+ /**
+ * Evaluates the window and outputs none or several elements.
+ *
+ * @param window The window that is being evaluated.
+ * @param values The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ *
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ void apply(W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java
deleted file mode 100644
index 77ce53e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * Base interface for functions that are evaluated over keyed (grouped) windows.
- *
- * @param <IN> The type of the input value.
- * @param <OUT> The type of the output value.
- * @param <KEY> The type of the key.
- */
-public interface KeyedWindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
-
- /**
- *
- * @param key
- * @param values
- * @param out
- *
- * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
- */
- void evaluate(KEY key, W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
new file mode 100644
index 0000000..24855a5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceAllWindowFunction<W extends Window, T> extends RichAllWindowFunction<T, T, W> {
+ private static final long serialVersionUID = 1L;
+
+ private final ReduceFunction<T> reduceFunction;
+
+ public ReduceAllWindowFunction(ReduceFunction<T> reduceFunction) {
+ this.reduceFunction = reduceFunction;
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext ctx) {
+ super.setRuntimeContext(ctx);
+ FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ FunctionUtils.openFunction(reduceFunction, parameters);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ FunctionUtils.closeFunction(reduceFunction);
+ }
+
+ @Override
+ public void apply(W window, Iterable<T> values, Collector<T> out) throws Exception {
+ T result = null;
+
+ for (T v: values) {
+ if (result == null) {
+ result = v;
+ } else {
+ result = reduceFunction.reduce(result, v);
+ }
+ }
+
+ if (result != null) {
+ out.collect(result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java
deleted file mode 100644
index 70627f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-public class ReduceKeyedWindowFunction<K, W extends Window, T> extends RichKeyedWindowFunction<T, T, K, W> {
- private static final long serialVersionUID = 1L;
-
- private final ReduceFunction<T> reduceFunction;
-
- public ReduceKeyedWindowFunction(ReduceFunction<T> reduceFunction) {
- this.reduceFunction = reduceFunction;
- }
-
- @Override
- public void setRuntimeContext(RuntimeContext ctx) {
- super.setRuntimeContext(ctx);
- FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- FunctionUtils.openFunction(reduceFunction, parameters);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- FunctionUtils.closeFunction(reduceFunction);
- }
-
- @Override
- public void evaluate(K k, W window, Iterable<T> values, Collector<T> out) throws Exception {
- T result = null;
-
- for (T v: values) {
- if (result == null) {
- result = v;
- } else {
- result = reduceFunction.reduce(result, v);
- }
- }
-
- if (result != null) {
- out.collect(result);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
index ba26218..042fe18 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
-public class ReduceWindowFunction<W extends Window, T> extends RichWindowFunction<T, T, W> {
+public class ReduceWindowFunction<K, W extends Window, T> extends RichWindowFunction<T, T, K, W> {
private static final long serialVersionUID = 1L;
private final ReduceFunction<T> reduceFunction;
@@ -52,7 +52,7 @@ public class ReduceWindowFunction<W extends Window, T> extends RichWindowFunctio
}
@Override
- public void evaluate(W window, Iterable<T> values, Collector<T> out) throws Exception {
+ public void apply(K k, W window, Iterable<T> values, Collector<T> out) throws Exception {
T result = null;
for (T v: values) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
index bceff82..6a472b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
-public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends RichKeyedWindowFunction<T, Tuple2<W, T>, K, W> {
+public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends RichWindowFunction<T, Tuple2<W, T>, K, W> {
private static final long serialVersionUID = 1L;
private final ReduceFunction<T> reduceFunction;
@@ -53,7 +53,7 @@ public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends Rich
}
@Override
- public void evaluate(K k, W window, Iterable<T> values, Collector<Tuple2<W, T>> out) throws Exception {
+ public void apply(K k, W window, Iterable<T> values, Collector<Tuple2<W, T>> out) throws Exception {
T result = null;
for (T v: values) {