You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/01 11:54:17 UTC
[6/9] flink git commit: [hotfix] Simplify new windowing API
[hotfix] Simplify new windowing API
Before, there would be three different window() methods on
KeyedDataStream: one that takes two policies, one that takes one policy
and one that takes a window assigner.
Now, there is only one window() method that takes a window assigner and
creates a KeyedWindowDataStream.
For conveniece, there are two methods timeWindows() that take either one
argument (tumbling windows) or two arguments (sliding windows). These
create a KeyedWindowDataStream with either a SlidingWindows or
TumblingWindows assigner.
When the window operator is created we pick the optimized aligned time
windows operator if the combination of window assigner/trigger/evictor
allows it.
All of this behaviour is verified in tests.
This closes #1195
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5623c15b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5623c15b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5623c15b
Branch: refs/heads/master
Commit: 5623c15b0933487aea11fa8962feef29433133b7
Parents: 937793e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Sep 29 20:22:11 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 11:04:00 2015 +0200
----------------------------------------------------------------------
.../api/datastream/KeyedDataStream.java | 64 +++--
.../KeyedTriggerWindowDataStream.java | 255 -------------------
.../api/datastream/KeyedWindowDataStream.java | 237 +++++++++++++----
.../windowing/ReduceKeyedWindowFunction.java | 70 +++++
.../api/windowing/evictors/TimeEvictor.java | 6 +
.../api/windowing/time/AbstractTime.java | 95 +++++++
.../streaming/api/windowing/time/EventTime.java | 62 +++++
.../api/windowing/time/ProcessingTime.java | 63 +++++
.../streaming/api/windowing/time/Time.java | 66 +++++
.../ContinuousProcessingTimeTrigger.java | 6 +
.../triggers/ContinuousWatermarkTrigger.java | 6 +
.../windowpolicy/AbstractTimePolicy.java | 109 --------
.../api/windowing/windowpolicy/Count.java | 58 -----
.../api/windowing/windowpolicy/Delta.java | 68 -----
.../api/windowing/windowpolicy/EventTime.java | 64 -----
.../windowing/windowpolicy/ProcessingTime.java | 65 -----
.../api/windowing/windowpolicy/Time.java | 68 -----
.../windowing/windowpolicy/WindowPolicy.java | 57 -----
.../operators/windowing/PolicyToOperator.java | 239 -----------------
.../windowing/EvictingWindowOperatorTest.java | 4 +-
.../windowing/PolicyWindowTranslationTest.java | 216 ----------------
.../windowing/TimeWindowTranslationTest.java | 97 +++++++
.../windowing/TriggerWindowTranslationTest.java | 201 ---------------
.../operators/windowing/WindowOperatorTest.java | 10 +-
.../windowing/WindowTranslationTest.java | 201 +++++++++++++++
.../GroupedProcessingTimeWindowExample.java | 4 +-
26 files changed, 916 insertions(+), 1475 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
index ce105e5..2ae07b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
@@ -23,8 +23,14 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.EventTime;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -92,34 +98,50 @@ public class KeyedDataStream<T, KEY> extends DataStream<T> {
// ------------------------------------------------------------------------
/**
- * Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key
- * grouped stream. The window is defined by a single policy.
+ * Windows this {@code KeyedDataStream} into tumbling time windows.
+ *
* <p>
- * For time windows, these single-policy windows result in tumbling time windows.
- *
- * @param policy The policy that defines the window.
- * @return The windows data stream.
+ * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
+ * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
+ * set using
+ * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+ *
+ * @param size The size of the window.
*/
- public KeyedWindowDataStream<T, KEY> window(WindowPolicy policy) {
- return new KeyedWindowDataStream<T, KEY>(this, policy);
+ public KeyedWindowDataStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
+ AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+
+ if (actualSize instanceof EventTime) {
+ return window(TumblingTimeWindows.of(actualSize.toMilliseconds()));
+ } else {
+ return window(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
+ }
}
/**
- * Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key
- * grouped stream. The window is defined by a window policy, plus a slide policy.
+ * Windows this {@code KeyedDataStream} into sliding time windows.
+ *
* <p>
- * For time windows, these slide policy windows result in sliding time windows.
- *
- * @param window The policy that defines the window.
- * @param slide The additional policy defining the slide of the window.
- * @return The windows data stream.
+ * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
+ * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
+ * set using
+ * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+ *
+ * @param size The size of the window.
*/
- public KeyedWindowDataStream<T, KEY> window(WindowPolicy window, WindowPolicy slide) {
- return new KeyedWindowDataStream<T, KEY>(this, window, slide);
+ public KeyedWindowDataStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
+ AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+ AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+
+ if (actualSize instanceof EventTime) {
+ return window(SlidingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
+ } else {
+ return window(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
+ }
}
/**
- * Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
+ * Windows this data stream to a {@code KeyedWindowDataStream}, which evaluates windows
* over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
* grouping of elements is done both by key and by window.
*
@@ -131,7 +153,7 @@ public class KeyedDataStream<T, KEY> extends DataStream<T> {
* @param assigner The {@code WindowAssigner} that assigns elements to windows.
* @return The trigger windows data stream.
*/
- public <W extends Window> KeyedTriggerWindowDataStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
- return new KeyedTriggerWindowDataStream<T, KEY, W>(this, assigner);
+ public <W extends Window> KeyedWindowDataStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
+ return new KeyedWindowDataStream<>(this, assigner);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
deleted file mode 100644
index 5b39775..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
-import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * A {@code KeyedTriggerWindowDataStream} represents a data stream where elements are grouped by
- * key, and for each key, the stream of elements is split into windows based on a
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
- * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
- *
- * <p>
- * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
- * different points for each key.
- *
- * <p>
- * If an {@link Evictor} is specified it will be used to evict elements from the window after
- * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
- * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- *
- * <p>
- * Note that the {@code KeyedTriggerWindowDataStream} is purely and API construct, during runtime
- * the {@code KeyedTriggerWindowDataStream} will be collapsed together with the
- * {@code KeyedDataStream} and the operation over the window into one single operation.
- *
- * @param <T> The type of elements in the stream.
- * @param <K> The type of the key by which elements are grouped.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
- */
-public class KeyedTriggerWindowDataStream<T, K, W extends Window> {
-
- /** The keyed data stream that is windowed by this stream */
- private final KeyedDataStream<T, K> input;
-
- /** The window assigner */
- private final WindowAssigner<? super T, W> windowAssigner;
-
- /** The trigger that is used for window evaluation/emission. */
- private Trigger<? super T, ? super W> trigger;
-
- /** The evictor that is used for evicting elements before window evaluation. */
- private Evictor<? super T, ? super W> evictor;
-
-
- public KeyedTriggerWindowDataStream(KeyedDataStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {
- this.input = input;
- this.windowAssigner = windowAssigner;
- this.trigger = windowAssigner.getDefaultTrigger();
- }
-
- /**
- * Sets the {@code Trigger} that should be used to trigger window emission.
- */
- public KeyedTriggerWindowDataStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
- this.trigger = trigger;
- return this;
- }
-
- /**
- * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
- *
- * <p>
- * Note: When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- */
- public KeyedTriggerWindowDataStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
- this.evictor = evictor;
- return this;
- }
-
-
- // ------------------------------------------------------------------------
- // Operations on the keyed windows
- // ------------------------------------------------------------------------
-
- /**
- * Applies a reduce function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the reduce function is interpreted
- * as a regular non-windowed stream.
- * <p>
- * This window will try and pre-aggregate data as much as the window policies permit. For example,
- * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
- * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
- * so a few elements are stored per key (one per slide interval).
- * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
- * aggregation tree.
- *
- * @param function The reduce function.
- * @return The data stream that is the result of applying the reduce function to the window.
- */
- public DataStream<T> reduceWindow(ReduceFunction<T> function) {
- String callLocation = Utils.getCallLocationName();
- String udfName = "Reduce at " + callLocation;
-
- DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
- if (result != null) {
- return result;
- }
-
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
- KeySelector<T, K> keySel = input.getKeySelector();
-
- OneInputStreamOperator<T, T> operator;
-
- if (evictor != null) {
- operator = new EvictingWindowOperator<>(windowAssigner,
- keySel,
- new HeapWindowBuffer.Factory<T>(),
- new ReduceWindowFunction<K, W, T>(function),
- trigger,
- evictor);
-
- } else {
- // we need to copy because we need our own instance of the pre aggregator
- @SuppressWarnings("unchecked")
- ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
- operator = new WindowOperator<>(windowAssigner,
- keySel,
- new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
- new ReduceWindowFunction<K, W, T>(function),
- trigger);
- }
-
- return input.transform(opName, input.getType(), operator);
- }
-
- /**
- * Applies a window function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the window function is interpreted
- * as a regular non-windowed stream.
- * <p>
- * Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> DataStream<R> mapWindow(KeyedWindowFunction<T, R, K, W> function) {
- TypeInformation<T> inType = input.getType();
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, KeyedWindowFunction.class, true, true, inType, null, false);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "MapWindow at " + callLocation;
-
- DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
- if (result != null) {
- return result;
- }
-
-
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
- KeySelector<T, K> keySel = input.getKeySelector();
-
- OneInputStreamOperator<T, R> operator;
-
- if (evictor != null) {
- operator = new EvictingWindowOperator<>(windowAssigner,
- keySel,
- new HeapWindowBuffer.Factory<T>(),
- function,
- trigger,
- evictor);
-
- } else {
- operator = new WindowOperator<>(windowAssigner,
- keySel,
- new HeapWindowBuffer.Factory<T>(),
- function,
- trigger);
- }
-
-
-
- return input.transform(opName, resultType, operator);
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private <R> DataStream<R> createFastTimeOperatorIfValid(
- Function function,
- TypeInformation<R> resultType,
- String functionName) {
-
- WindowPolicy windowPolicy = null;
- WindowPolicy slidePolicy = null;
-
- if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
- SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
- windowPolicy = ProcessingTime.of(timeWindows.getSize(), TimeUnit.MILLISECONDS);
- slidePolicy = ProcessingTime.of(timeWindows.getSlide(), TimeUnit.MILLISECONDS);
- } else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
- TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
- windowPolicy = ProcessingTime.of(timeWindows.getSize(), TimeUnit.MILLISECONDS);
- }
-
- if (windowPolicy == null) {
- return null;
- }
-
- String opName = windowPolicy.toString(slidePolicy) + " of " + functionName;
- KeySelector<T, K> keySel = input.getKeySelector();
-
- OneInputStreamOperator<T, R> operator =
- PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel);
-
- return input.transform(opName, resultType, operator);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index e658bdd..ad7ca37 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -18,61 +18,98 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
/**
- * A KeyedWindowDataStream represents a data stream where elements are grouped by key, and
- * for each key, the stream of elements is split into windows. The windows are conceptually
- * evaluated for each key individually, meaning windows can trigger at different points
- * for each key.
+ * A {@code KeyedWindowDataStream} represents a data stream where elements are grouped by
+ * key, and for each key, the stream of elements is split into windows based on a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
+ * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
+ * different points for each key.
+ *
* <p>
- * In many cases, however, the windows are "aligned", meaning they trigger at the
- * same time for all keys. The most common example for that are the regular time windows.
+ * If an {@link Evictor} is specified it will be used to evict elements from the window after
+ * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
+ * When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ *
* <p>
- * Note that the KeyedWindowDataStream is purely and API construct, during runtime the
- * KeyedWindowDataStream will be collapsed together with the KeyedDataStream and the operation
- * over the window into one single operation.
+ * Note that the {@code KeyedWindowDataStream} is purely and API construct, during runtime
+ * the {@code KeyedWindowDataStream} will be collapsed together with the
+ * {@code KeyedDataStream} and the operation over the window into one single operation.
*
* @param <T> The type of elements in the stream.
* @param <K> The type of the key by which elements are grouped.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
*/
-public class KeyedWindowDataStream<T, K> {
+public class KeyedWindowDataStream<T, K, W extends Window> {
/** The keyed data stream that is windowed by this stream */
private final KeyedDataStream<T, K> input;
- /** The core window policy */
- private final WindowPolicy windowPolicy;
+ /** The window assigner */
+ private final WindowAssigner<? super T, W> windowAssigner;
- /** The optional additional slide policy */
- private final WindowPolicy slidePolicy;
-
-
- public KeyedWindowDataStream(KeyedDataStream<T, K> input, WindowPolicy windowPolicy) {
- this(input, windowPolicy, null);
- }
+ /** The trigger that is used for window evaluation/emission. */
+ private Trigger<? super T, ? super W> trigger;
+
+ /** The evictor that is used for evicting elements before window evaluation. */
+ private Evictor<? super T, ? super W> evictor;
- public KeyedWindowDataStream(KeyedDataStream<T, K> input,
- WindowPolicy windowPolicy, WindowPolicy slidePolicy)
- {
- TimeCharacteristic time = input.getExecutionEnvironment().getStreamTimeCharacteristic();
+ public KeyedWindowDataStream(KeyedDataStream<T, K> input,
+ WindowAssigner<? super T, W> windowAssigner) {
this.input = input;
- this.windowPolicy = windowPolicy.makeSpecificBasedOnTimeCharacteristic(time);
- this.slidePolicy = slidePolicy == null ? null : slidePolicy.makeSpecificBasedOnTimeCharacteristic(time);
+ this.windowAssigner = windowAssigner;
+ this.trigger = windowAssigner.getDefaultTrigger();
}
-
+
+ /**
+ * Sets the {@code Trigger} that should be used to trigger window emission.
+ */
+ public KeyedWindowDataStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
+ this.trigger = trigger;
+ return this;
+ }
+
+ /**
+ * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+ *
+ * <p>
+ * Note: When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ */
+ public KeyedWindowDataStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
+ this.evictor = evictor;
+ return this;
+ }
+
+
// ------------------------------------------------------------------------
// Operations on the keyed windows
// ------------------------------------------------------------------------
@@ -94,7 +131,39 @@ public class KeyedWindowDataStream<T, K> {
*/
public DataStream<T> reduceWindow(ReduceFunction<T> function) {
String callLocation = Utils.getCallLocationName();
- return createWindowOperator(function, input.getType(), "Reduce at " + callLocation);
+ String udfName = "Reduce at " + callLocation;
+
+ DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
+ if (result != null) {
+ return result;
+ }
+
+ String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+ KeySelector<T, K> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, T> operator;
+
+ if (evictor != null) {
+ operator = new EvictingWindowOperator<>(windowAssigner,
+ keySel,
+ new HeapWindowBuffer.Factory<T>(),
+ new ReduceKeyedWindowFunction<K, W, T>(function),
+ trigger,
+ evictor);
+
+ } else {
+ // we need to copy because we need our own instance of the pre aggregator
+ @SuppressWarnings("unchecked")
+ ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
+
+ operator = new WindowOperator<>(windowAssigner,
+ keySel,
+ new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+ new ReduceKeyedWindowFunction<K, W, T>(function),
+ trigger);
+ }
+
+ return input.transform(opName, input.getType(), operator);
}
/**
@@ -108,29 +177,107 @@ public class KeyedWindowDataStream<T, K> {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
- public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<T, Result, K, Window> function) {
- String callLocation = Utils.getCallLocationName();
-
+ public <R> DataStream<R> mapWindow(KeyedWindowFunction<T, R, K, W> function) {
TypeInformation<T> inType = input.getType();
- TypeInformation<Result> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, KeyedWindowFunction.class, true, true, inType, null, false);
- return createWindowOperator(function, resultType, "KeyedWindowFunction at " + callLocation);
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "MapWindow at " + callLocation;
+
+ DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
+ if (result != null) {
+ return result;
+ }
+
+
+ String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+ KeySelector<T, K> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, R> operator;
+
+ if (evictor != null) {
+ operator = new EvictingWindowOperator<>(windowAssigner,
+ keySel,
+ new HeapWindowBuffer.Factory<T>(),
+ function,
+ trigger,
+ evictor);
+
+ } else {
+ operator = new WindowOperator<>(windowAssigner,
+ keySel,
+ new HeapWindowBuffer.Factory<T>(),
+ function,
+ trigger);
+ }
+
+
+
+ return input.transform(opName, resultType, operator);
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
-
- private <Result> DataStream<Result> createWindowOperator(
- Function function, TypeInformation<Result> resultType, String functionName) {
- String opName = windowPolicy.toString(slidePolicy) + " of " + functionName;
- KeySelector<T, K> keySel = input.getKeySelector();
-
- OneInputStreamOperator<T, Result> operator =
- PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel);
-
- return input.transform(opName, resultType, operator);
+ private <R> DataStream<R> createFastTimeOperatorIfValid(
+ Function function,
+ TypeInformation<R> resultType,
+ String functionName) {
+
+ if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+ SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
+ final long windowLength = timeWindows.getSize();
+ final long windowSlide = timeWindows.getSlide();
+
+ String opName = "Fast " + timeWindows + " of " + functionName;
+
+ if (function instanceof ReduceFunction) {
+ @SuppressWarnings("unchecked")
+ ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+ @SuppressWarnings("unchecked")
+ OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+ new AggregatingProcessingTimeWindowOperator<>(
+ reducer, input.getKeySelector(), windowLength, windowSlide);
+ return input.transform(opName, resultType, op);
+ }
+ else if (function instanceof KeyedWindowFunction) {
+ @SuppressWarnings("unchecked")
+ KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function;
+
+ OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+ wf, input.getKeySelector(), windowLength, windowSlide);
+ return input.transform(opName, resultType, op);
+ }
+ } else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+ TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
+ final long windowLength = timeWindows.getSize();
+ final long windowSlide = timeWindows.getSize();
+
+ String opName = "Fast " + timeWindows + " of " + functionName;
+
+ if (function instanceof ReduceFunction) {
+ @SuppressWarnings("unchecked")
+ ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+ @SuppressWarnings("unchecked")
+ OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+ new AggregatingProcessingTimeWindowOperator<>(
+ reducer, input.getKeySelector(), windowLength, windowSlide);
+ return input.transform(opName, resultType, op);
+ }
+ else if (function instanceof KeyedWindowFunction) {
+ @SuppressWarnings("unchecked")
+ KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function;
+
+ OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+ wf, input.getKeySelector(), windowLength, windowSlide);
+ return input.transform(opName, resultType, op);
+ }
+ }
+
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java
new file mode 100644
index 0000000..70627f0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceKeyedWindowFunction<K, W extends Window, T> extends RichKeyedWindowFunction<T, T, K, W> {
+ private static final long serialVersionUID = 1L;
+
+ private final ReduceFunction<T> reduceFunction;
+
+ public ReduceKeyedWindowFunction(ReduceFunction<T> reduceFunction) {
+ this.reduceFunction = reduceFunction;
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext ctx) {
+ super.setRuntimeContext(ctx);
+ FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ FunctionUtils.openFunction(reduceFunction, parameters);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ FunctionUtils.closeFunction(reduceFunction);
+ }
+
+ @Override
+ public void evaluate(K k, W window, Iterable<T> values, Collector<T> out) throws Exception {
+ T result = null;
+
+ for (T v: values) {
+ if (result == null) {
+ result = v;
+ } else {
+ result = reduceFunction.reduce(result, v);
+ }
+ }
+
+ if (result != null) {
+ out.collect(result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index 450b132..c38100c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.api.windowing.evictors;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -48,6 +49,11 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
return "TimeEvictor(" + windowSize + ")";
}
+ @VisibleForTesting
+ public long getWindowSize() {
+ return windowSize;
+ }
+
public static <W extends Window> TimeEvictor<W> of(long windowSize) {
return new TimeEvictor<>(windowSize);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
new file mode 100644
index 0000000..1264c2a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.time;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public abstract class AbstractTime {
+
+ /** The time unit for this policy's time interval */
+ private final TimeUnit unit;
+
+ /** The size of the windows generated by this policy */
+ private final long size;
+
+
+ protected AbstractTime(long size, TimeUnit unit) {
+ this.unit = checkNotNull(unit, "time unit may not be null");
+ this.size = size;
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the time unit for this policy's time interval.
+ * @return The time unit for this policy's time interval.
+ */
+ public TimeUnit getUnit() {
+ return unit;
+ }
+
+ /**
+ * Gets the length of this policy's time interval.
+ * @return The length of this policy's time interval.
+ */
+ public long getSize() {
+ return size;
+ }
+
+ /**
+ * Converts the time interval to milliseconds.
+ * @return The time interval in milliseconds.
+ */
+ public long toMilliseconds() {
+ return unit.toMillis(size);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ public abstract AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic);
+
+ @Override
+ public int hashCode() {
+ return 31 * (int) (size ^ (size >>> 32)) + unit.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj.getClass() == getClass()) {
+ AbstractTime that = (AbstractTime) obj;
+ return this.size == that.size && this.unit.equals(that.unit);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + " (" + size + ' ' + unit.name() + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
new file mode 100644
index 0000000..6a4349c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.time;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of an event time interval for windowing. See
+ * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a definition
+ * of event time.
+ */
+public final class EventTime extends AbstractTime {
+
+ /** Instantiation only via factory method */
+ private EventTime(long size, TimeUnit unit) {
+ super(size, unit);
+ }
+
+ @Override
+ public EventTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
+ if (characteristic == TimeCharacteristic.EventTime || characteristic == TimeCharacteristic.IngestionTime) {
+ return this;
+ }
+ else {
+ throw new InvalidProgramException(
+ "Cannot use EventTime policy in a dataflow that runs on " + characteristic);
+ }
+ }
+ // ------------------------------------------------------------------------
+ // Factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates an event time policy describing an event time interval.
+ *
+ * @param size The size of the generated windows.
+ * @param unit The init (seconds, milliseconds) of the time interval.
+ * @return The event time policy.
+ */
+ public static EventTime of(long size, TimeUnit unit) {
+ return new EventTime(size, unit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
new file mode 100644
index 0000000..4be6ed0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.time;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of a processing time interval for windowing. See
+ * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} for a definition
+ * of processing time.
+ */
+public final class ProcessingTime extends AbstractTime {
+
+ /** Instantiation only via factory method */
+ private ProcessingTime(long size, TimeUnit unit) {
+ super(size, unit);
+ }
+
+ @Override
+ public ProcessingTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
+ if (characteristic == TimeCharacteristic.ProcessingTime) {
+ return this;
+ }
+ else {
+ throw new InvalidProgramException(
+ "Cannot use ProcessingTime policy in a dataflow that runs on " + characteristic);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a processing time policy describing a processing time interval.
+ *
+ * @param size The size of the generated windows.
+ * @param unit The init (seconds, milliseconds) of the time interval.
+ * @return The processing time policy.
+ */
+ public static ProcessingTime of(long size, TimeUnit unit) {
+ return new ProcessingTime(size, unit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
new file mode 100644
index 0000000..d1b3fe3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.time;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of a time interval for windowing. The time characteristic referred
+ * to is the default time characteristic set on the execution environment.
+ */
+public final class Time extends AbstractTime {
+
+ /** Instantiation only via factory method */
+ private Time(long size, TimeUnit unit) {
+ super(size, unit);
+ }
+
+ @Override
+ public AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+ switch (timeCharacteristic) {
+ case ProcessingTime:
+ return ProcessingTime.of(getSize(), getUnit());
+ case IngestionTime:
+ case EventTime:
+ return EventTime.of(getSize(), getUnit());
+ default:
+ throw new IllegalArgumentException("Unknown time characteristic");
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a time policy describing a processing time interval. The policy refers to the
+ * time characteristic that is set on the dataflow via
+ * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#
+ * setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}.
+ *
+ * @param size The size of the generated windows.
+ * @param unit The init (seconds, milliseconds) of the time interval.
+ * @return The time policy.
+ */
+ public static Time of(long size, TimeUnit unit) {
+ return new Time(size, unit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 64850a2..da198be 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.api.windowing.triggers;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.windows.Window;
public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
@@ -68,6 +69,11 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
return new ContinuousProcessingTimeTrigger<>(granularity);
}
+ @VisibleForTesting
+ public long getGranularity() {
+ return granularity;
+ }
+
@Override
public String toString() {
return "ContinuousProcessingTimeTrigger(" + granularity + ")";
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
index b7f085a..3b6dc6d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.api.windowing.triggers;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.windows.Window;
public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> {
@@ -59,6 +60,11 @@ public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Obj
return "ContinuousProcessingTimeTrigger(" + granularity + ")";
}
+ @VisibleForTesting
+ public long getGranularity() {
+ return granularity;
+ }
+
public static <W extends Window> ContinuousWatermarkTrigger<W> of(long granularity) {
return new ContinuousWatermarkTrigger<>(granularity);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
deleted file mode 100644
index 6e382bd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class AbstractTimePolicy extends WindowPolicy {
-
- private static final long serialVersionUID = 6593098375698927728L;
-
- /** The time unit for this policy's time interval */
- private final TimeUnit unit;
-
- /** The size of the windows generated by this policy */
- private final long size;
-
-
- protected AbstractTimePolicy(long size, TimeUnit unit) {
- this.unit = checkNotNull(unit, "time unit may not be null");
- this.size = size;
- }
-
- // ------------------------------------------------------------------------
- // Properties
- // ------------------------------------------------------------------------
-
- /**
- * Gets the time unit for this policy's time interval.
- * @return The time unit for this policy's time interval.
- */
- public TimeUnit getUnit() {
- return unit;
- }
-
- /**
- * Gets the length of this policy's time interval.
- * @return The length of this policy's time interval.
- */
- public long getSize() {
- return size;
- }
-
- /**
- * Converts the time interval to milliseconds.
- * @return The time interval in milliseconds.
- */
- public long toMilliseconds() {
- return unit.toMillis(size);
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- @Override
- public String toString(WindowPolicy slidePolicy) {
- if (slidePolicy == null) {
- return "Tumbling Window (" + getClass().getSimpleName() + ") (" + size + ' ' + unit.name() + ')';
- }
- else if (slidePolicy.getClass() == getClass()) {
- AbstractTimePolicy timeSlide = (AbstractTimePolicy) slidePolicy;
-
- return "Sliding Window (" + getClass().getSimpleName() + ") (length="
- + size + ' ' + unit.name() + ", slide=" + timeSlide.size + ' ' + timeSlide.unit.name() + ')';
- }
- else {
- return super.toString(slidePolicy);
- }
- }
-
- @Override
- public int hashCode() {
- return 31 * (int) (size ^ (size >>> 32)) + unit.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj != null && obj.getClass() == getClass()) {
- AbstractTimePolicy that = (AbstractTimePolicy) obj;
- return this.size == that.size && this.unit.equals(that.unit);
- }
- else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + " (" + size + ' ' + unit.name() + ')';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Count.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Count.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Count.java
deleted file mode 100644
index 5fb7d58..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Count.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-
-/**
- * A windowing policy that generates windows based on element counts.
- */
-public final class Count extends WindowPolicy {
-
- private static final long serialVersionUID = 3197290738634320211L;
-
- private long size;
-
- /** Instantiation only via factory method */
- private Count(long size) {
- this.size = size;
- }
-
- public long getSize() {
- return size;
- }
-
- @Override
- public String toString() {
- return "Count Window (" + size + ')';
- }
-
- // ------------------------------------------------------------------------
- // Factory
- // ------------------------------------------------------------------------
-
- /**
- * Creates a count based windowing policy
- *
- * @param size The size of the generated windows.
- * @return The time policy.
- */
- public static Count of(long size) {
- return new Count(size);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Delta.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Delta.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Delta.java
deleted file mode 100644
index 4a3082c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Delta.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-
-/**
- * A windowing policy that generates windows based on a delta between elements.
- */
-public final class Delta<T> extends WindowPolicy {
-
- private static final long serialVersionUID = 3197290738634320211L;
-
- private DeltaFunction<T> deltaFunction;
-
- private double threshold;
-
- /** Instantiation only via factory method */
- private Delta(DeltaFunction<T> deltaFunction, double threshold) {
- this.deltaFunction = deltaFunction;
- this.threshold = threshold;
- }
-
- public DeltaFunction<T> getDeltaFunction() {
- return deltaFunction;
- }
-
- public double getThreshold() {
- return threshold;
- }
-
- @Override
- public String toString() {
- return "Delta Window (" + deltaFunction + ", " + threshold + ')';
- }
-
- // ------------------------------------------------------------------------
- // Factory
- // ------------------------------------------------------------------------
-
- /**
- * Creates a delta based windowing policy
- *
- * @param threshold The threshold for deltas at which to trigger windows
- * @param deltaFunction The delta function
- * @return The time policy.
- */
- public static <T> Delta<T> of(double threshold, DeltaFunction<T> deltaFunction) {
- return new Delta<T>(deltaFunction, threshold);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
deleted file mode 100644
index c32a0b0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of an event time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a definition
- * of event time.
- */
-public final class EventTime extends AbstractTimePolicy {
-
- private static final long serialVersionUID = 8333566691833596747L;
-
- /** Instantiation only via factory method */
- private EventTime(long size, TimeUnit unit) {
- super(size, unit);
- }
-
- @Override
- public EventTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
- if (characteristic == TimeCharacteristic.EventTime || characteristic == TimeCharacteristic.IngestionTime) {
- return this;
- }
- else {
- throw new InvalidProgramException(
- "Cannot use EventTime policy in a dataflow that runs on " + characteristic);
- }
- }
- // ------------------------------------------------------------------------
- // Factory
- // ------------------------------------------------------------------------
-
- /**
- * Creates an event time policy describing an event time interval.
- *
- * @param size The size of the generated windows.
- * @param unit The init (seconds, milliseconds) of the time interval.
- * @return The event time policy.
- */
- public static EventTime of(long size, TimeUnit unit) {
- return new EventTime(size, unit);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
deleted file mode 100644
index a71ba1d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of a processing time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} for a definition
- * of processing time.
- */
-public final class ProcessingTime extends AbstractTimePolicy {
-
- private static final long serialVersionUID = 7546166721132583007L;
-
- /** Instantiation only via factory method */
- private ProcessingTime(long size, TimeUnit unit) {
- super(size, unit);
- }
-
- @Override
- public ProcessingTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
- if (characteristic == TimeCharacteristic.ProcessingTime) {
- return this;
- }
- else {
- throw new InvalidProgramException(
- "Cannot use ProcessingTime policy in a dataflow that runs on " + characteristic);
- }
- }
-
- // ------------------------------------------------------------------------
- // Factory
- // ------------------------------------------------------------------------
-
- /**
- * Creates a processing time policy describing a processing time interval.
- *
- * @param size The size of the generated windows.
- * @param unit The init (seconds, milliseconds) of the time interval.
- * @return The processing time policy.
- */
- public static ProcessingTime of(long size, TimeUnit unit) {
- return new ProcessingTime(size, unit);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
deleted file mode 100644
index efc9bf2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of a time interval for windowing. The time characteristic referred
- * to is the default time characteristic set on the execution environment.
- */
-public final class Time extends AbstractTimePolicy {
-
- private static final long serialVersionUID = 3197290738634320211L;
-
- /** Instantiation only via factory method */
- private Time(long size, TimeUnit unit) {
- super(size, unit);
- }
-
- @Override
- public AbstractTimePolicy makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
- switch (timeCharacteristic) {
- case ProcessingTime:
- return ProcessingTime.of(getSize(), getUnit());
- case IngestionTime:
- case EventTime:
- return EventTime.of(getSize(), getUnit());
- default:
- throw new IllegalArgumentException("Unknown time characteristic");
- }
- }
-
- // ------------------------------------------------------------------------
- // Factory
- // ------------------------------------------------------------------------
-
- /**
- * Creates a time policy describing a processing time interval. The policy refers to the
- * time characteristic that is set on the dataflow via
- * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#
- * setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}.
- *
- * @param size The size of the generated windows.
- * @param unit The init (seconds, milliseconds) of the time interval.
- * @return The time policy.
- */
- public static Time of(long size, TimeUnit unit) {
- return new Time(size, unit);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
deleted file mode 100644
index 2e1a387..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-/**
- * The base class of all window policies. Window policies define how windows
- * are formed over the data stream.
- */
-public abstract class WindowPolicy implements java.io.Serializable {
-
- private static final long serialVersionUID = -8696529489282723113L;
-
- /**
- * If the concrete instantiation of a window policy depends on the time characteristic of the
- * dataflow (processing time, event time), then this method must be overridden to convert this
- * policy to the respective specific instantiation.
- * <p>
- * The {@link Time} policy for example, will convert itself to an {@link ProcessingTime} policy,
- * if the time characteristic is set to {@link TimeCharacteristic#ProcessingTime}.
- * <p>
- * By default, this method does nothing and simply returns this object itself.
- *
- * @param characteristic The time characteristic of the dataflow.
- * @return The specific instantiation of this policy, or the policy itself.
- */
- public WindowPolicy makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
- return this;
- }
-
-
- public String toString(WindowPolicy slidePolicy) {
- if (slidePolicy != null) {
- return "Window [" + toString() + ", slide=" + slidePolicy + ']';
- }
- else {
- return "Window [" + toString() + ']';
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
deleted file mode 100644
index 880c85c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.evictors.DeltaEvictor;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
-import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
-import org.apache.flink.streaming.api.windowing.windowpolicy.Count;
-import org.apache.flink.streaming.api.windowing.windowpolicy.Delta;
-import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime;
-import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
-import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-
-/**
- * This class implements the conversion from window policies to concrete operator
- * implementations.
- */
-public class PolicyToOperator {
-
- /**
- * Entry point to create an operator for the given window policies and the window function.
- */
- public static <IN, OUT, KEY> OneInputStreamOperator<IN, OUT> createOperatorForPolicies(
- WindowPolicy window, WindowPolicy slide, Function function, KeySelector<IN, KEY> keySelector)
- {
- if (window == null || function == null) {
- throw new NullPointerException();
- }
-
- // -- case 1: both policies are processing time policies
- if (window instanceof ProcessingTime && (slide == null || slide instanceof ProcessingTime)) {
- final long windowLength = ((ProcessingTime) window).toMilliseconds();
- final long windowSlide = slide == null ? windowLength : ((ProcessingTime) slide).toMilliseconds();
-
- if (function instanceof ReduceFunction) {
- @SuppressWarnings("unchecked")
- ReduceFunction<IN> reducer = (ReduceFunction<IN>) function;
-
- @SuppressWarnings("unchecked")
- OneInputStreamOperator<IN, OUT> op = (OneInputStreamOperator<IN, OUT>)
- new AggregatingProcessingTimeWindowOperator<>(
- reducer, keySelector, windowLength, windowSlide);
- return op;
- }
- else if (function instanceof KeyedWindowFunction) {
- @SuppressWarnings("unchecked")
- KeyedWindowFunction<IN, OUT, KEY, Window> wf = (KeyedWindowFunction<IN, OUT, KEY, Window>) function;
-
- return new AccumulatingProcessingTimeWindowOperator<>(
- wf, keySelector, windowLength, windowSlide);
- }
- }
-
- // -- case 2: both policies are event time policies
- if (window instanceof EventTime && (slide == null || slide instanceof EventTime)) {
- final long windowLength = ((EventTime) window).toMilliseconds();
- final long windowSlide = slide == null ? windowLength : ((EventTime) slide).toMilliseconds();
-
- WindowAssigner<? super IN, TimeWindow> assigner;
- if (windowSlide == windowLength) {
- assigner = TumblingTimeWindows.of(windowLength);
- } else {
- assigner = SlidingTimeWindows.of(windowLength, windowSlide);
- }
- WindowBufferFactory<IN, ? extends WindowBuffer<IN>> windowBuffer;
- if (function instanceof ReduceFunction) {
- @SuppressWarnings("unchecked")
- ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function);
- function = new ReduceWindowFunction<>(reducer);
- windowBuffer = new PreAggregatingHeapWindowBuffer.Factory<>(reducer);
- } else {
- windowBuffer = new HeapWindowBuffer.Factory<>();
- }
- @SuppressWarnings("unchecked")
- KeyedWindowFunction<IN, OUT, KEY, TimeWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, TimeWindow>) function;
-
- return new WindowOperator<>(
- assigner,
- keySelector,
- windowBuffer,
- windowFunction,
- WatermarkTrigger.create());
- }
-
- // -- case 3: arbitrary trigger, no eviction
- if (slide == null) {
- Trigger<? super IN, GlobalWindow> trigger = policyToTrigger(window);
- // we need to make them purging triggers because the trigger/eviction policy model
- // expects that the window is purged when no slide is used
- Trigger<? super IN, GlobalWindow> purgingTrigger = PurgingTrigger.of(trigger);
-
- WindowBufferFactory<IN, ? extends WindowBuffer<IN>> windowBuffer;
- if (function instanceof ReduceFunction) {
- @SuppressWarnings("unchecked")
- ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function);
- function = new ReduceWindowFunction<>(reducer);
- windowBuffer = new PreAggregatingHeapWindowBuffer.Factory<>(reducer);
- } else {
- windowBuffer = new HeapWindowBuffer.Factory<>();
- }
-
- if (!(function instanceof KeyedWindowFunction)) {
- throw new IllegalStateException("Windowing function is not of type EvaluateKeyedWindowFunction.");
- }
- @SuppressWarnings("unchecked")
- KeyedWindowFunction<IN, OUT, KEY, GlobalWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, GlobalWindow>) function;
-
- return new WindowOperator<>(
- GlobalWindows.<IN>create(),
- keySelector,
- windowBuffer,
- windowFunction,
- purgingTrigger);
- }
-
- // -- case 4: arbitrary trigger, arbitrary eviction
- Trigger<? super IN, GlobalWindow> trigger = policyToTrigger(slide);
- Evictor<? super IN, GlobalWindow> evictor = policyToEvictor(window);
-
- WindowBufferFactory<IN, ? extends EvictingWindowBuffer<IN>> windowBuffer = new HeapWindowBuffer.Factory<>();
- if (function instanceof ReduceFunction) {
- @SuppressWarnings("unchecked")
- ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function);
- function = new ReduceWindowFunction<>(reducer);
- }
-
- if (!(function instanceof KeyedWindowFunction)) {
- throw new IllegalStateException("Windowing function is not of type EvaluateKeyedWindowFunction.");
- }
-
- @SuppressWarnings("unchecked")
- KeyedWindowFunction<IN, OUT, KEY, GlobalWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, GlobalWindow>) function;
-
- EvictingWindowOperator<KEY, IN, OUT, GlobalWindow> op = new EvictingWindowOperator<>(
- GlobalWindows.<IN>create(),
- keySelector,
- windowBuffer,
- windowFunction,
- trigger,
- evictor);
-
- if (window instanceof ProcessingTime) {
- // special case, we need to instruct the window operator to store the processing time in
- // the elements so that the evictor can work on that
- op.enableSetProcessingTime(true);
- }
-
- return op;
- }
-
- private static <IN> Trigger<? super IN, GlobalWindow> policyToTrigger(WindowPolicy policy) {
- if (policy instanceof EventTime) {
- EventTime eventTime = (EventTime) policy;
- return ContinuousWatermarkTrigger.of(eventTime.getSize());
- } else if (policy instanceof ProcessingTime) {
- ProcessingTime processingTime = (ProcessingTime) policy;
- return ContinuousProcessingTimeTrigger.of(processingTime.getSize());
- } else if (policy instanceof Count) {
- Count count = (Count) policy;
- return CountTrigger.of(count.getSize());
- } else if (policy instanceof Delta) {
- @SuppressWarnings("unchecked,rawtypes")
- Delta<IN> delta = (Delta) policy;
- return DeltaTrigger.of(delta.getThreshold(), delta.getDeltaFunction());
-
- }
-
- throw new UnsupportedOperationException("Unsupported policy " + policy);
- }
-
- private static <IN> Evictor<? super IN, GlobalWindow> policyToEvictor(WindowPolicy policy) {
- if (policy instanceof EventTime) {
- EventTime eventTime = (EventTime) policy;
- return TimeEvictor.of(eventTime.getSize());
- } else if (policy instanceof ProcessingTime) {
- ProcessingTime processingTime = (ProcessingTime) policy;
- return TimeEvictor.of(processingTime.getSize());
- } else if (policy instanceof Count) {
- Count count = (Count) policy;
- return CountEvictor.of(count.getSize());
- } else if (policy instanceof Delta) {
- @SuppressWarnings("unchecked,rawtypes")
- Delta<IN> delta = (Delta) policy;
- return DeltaEvictor.of(delta.getThreshold(), delta.getDeltaFunction());
-
- }
-
-
- throw new UnsupportedOperationException("Unsupported policy " + policy);
- }
-
- // ------------------------------------------------------------------------
-
- /** Don't instantiate */
- private PolicyToOperator() {}
-}