You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/09/28 18:15:08 UTC
[12/12] flink git commit: [FLINK-2677] Add a general-purpose
keyed-window operator
[FLINK-2677] Add a general-purpose keyed-window operator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd51c977
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd51c977
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd51c977
Branch: refs/heads/master
Commit: dd51c97741b336d3a11e319183537eef864a84fd
Parents: 3be2dc1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Sep 25 12:27:35 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 28 17:09:25 2015 +0200
----------------------------------------------------------------------
.../api/datastream/KeyedDataStream.java | 19 +
.../KeyedTriggerWindowDataStream.java | 255 +++++++++++
.../windowing/ReduceWindowFunction.java | 70 +++
.../ReduceWindowFunctionWithWindow.java | 71 +++
.../windowing/RichKeyedWindowFunction.java | 25 +
.../flink/streaming/api/graph/StreamGraph.java | 6 +
.../api/windowing/assigners/GlobalWindows.java | 55 +++
.../assigners/SlidingProcessingTimeWindows.java | 92 ++++
.../windowing/assigners/SlidingTimeWindows.java | 81 ++++
.../TumblingProcessingTimeWindows.java | 67 +++
.../assigners/TumblingTimeWindows.java | 67 +++
.../api/windowing/assigners/WindowAssigner.java | 32 ++
.../api/windowing/evictors/CountEvictor.java | 44 ++
.../api/windowing/evictors/DeltaEvictor.java | 58 +++
.../api/windowing/evictors/Evictor.java | 28 ++
.../api/windowing/evictors/TimeEvictor.java | 54 +++
.../ContinuousProcessingTimeTrigger.java | 79 ++++
.../triggers/ContinuousWatermarkTrigger.java | 65 +++
.../api/windowing/triggers/CountTrigger.java | 61 +++
.../api/windowing/triggers/DeltaTrigger.java | 66 +++
.../triggers/ProcessingTimeTrigger.java | 57 +++
.../api/windowing/triggers/PurgingTrigger.java | 76 +++
.../api/windowing/triggers/Trigger.java | 40 ++
.../windowing/triggers/WatermarkTrigger.java | 57 +++
.../api/windowing/windows/GlobalWindow.java | 65 +++
.../api/windowing/windows/TimeWindow.java | 75 +++
.../streaming/api/windowing/windows/Window.java | 27 ++
.../operators/BucketStreamSortOperator.java | 93 ++++
.../windowing/EvictingWindowOperator.java | 115 +++++
.../operators/windowing/PolicyToOperator.java | 166 ++++++-
.../operators/windowing/WindowOperator.java | 320 +++++++++++++
.../windowing/buffers/EvictingWindowBuffer.java | 22 +
.../windowing/buffers/HeapWindowBuffer.java | 88 ++++
.../buffers/PreAggregatingHeapWindowBuffer.java | 91 ++++
.../windowing/buffers/WindowBuffer.java | 34 ++
.../windowing/buffers/WindowBufferFactory.java | 30 ++
.../windowing/EvictingWindowOperatorTest.java | 179 ++++++++
.../windowing/PolicyWindowTranslationTest.java | 216 +++++++++
.../windowing/TriggerWindowTranslationTest.java | 201 ++++++++
.../operators/windowing/WindowOperatorTest.java | 459 +++++++++++++++++++
40 files changed, 3701 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
index 611953e..ce105e5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
@@ -23,7 +23,9 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -115,4 +117,21 @@ public class KeyedDataStream<T, KEY> extends DataStream<T> {
public KeyedWindowDataStream<T, KEY> window(WindowPolicy window, WindowPolicy slide) {
return new KeyedWindowDataStream<T, KEY>(this, window, slide);
}
+
+ /**
+ * Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
+ * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
+ * grouping of elements is done both by key and by window.
+ *
+ * <p>
+ * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
+ * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
+ * that is used if a {@code Trigger} is not specified.
+ *
+ * @param assigner The {@code WindowAssigner} that assigns elements to windows.
+ * @return The trigger windows data stream.
+ */
+ public <W extends Window> KeyedTriggerWindowDataStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
+ return new KeyedTriggerWindowDataStream<T, KEY, W>(this, assigner);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
new file mode 100644
index 0000000..5b39775
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@code KeyedTriggerWindowDataStream} represents a data stream where elements are grouped by
+ * key, and for each key, the stream of elements is split into windows based on a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
+ * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
+ * different points for each key.
+ *
+ * <p>
+ * If an {@link Evictor} is specified it will be used to evict elements from the window after
+ * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
+ * When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ *
+ * <p>
+ * Note that the {@code KeyedTriggerWindowDataStream} is purely and API construct, during runtime
+ * the {@code KeyedTriggerWindowDataStream} will be collapsed together with the
+ * {@code KeyedDataStream} and the operation over the window into one single operation.
+ *
+ * @param <T> The type of elements in the stream.
+ * @param <K> The type of the key by which elements are grouped.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
+ */
+public class KeyedTriggerWindowDataStream<T, K, W extends Window> {
+
+ /** The keyed data stream that is windowed by this stream */
+ private final KeyedDataStream<T, K> input;
+
+ /** The window assigner */
+ private final WindowAssigner<? super T, W> windowAssigner;
+
+ /** The trigger that is used for window evaluation/emission. */
+ private Trigger<? super T, ? super W> trigger;
+
+ /** The evictor that is used for evicting elements before window evaluation. */
+ private Evictor<? super T, ? super W> evictor;
+
+
+ public KeyedTriggerWindowDataStream(KeyedDataStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {
+ this.input = input;
+ this.windowAssigner = windowAssigner;
+ this.trigger = windowAssigner.getDefaultTrigger();
+ }
+
+ /**
+ * Sets the {@code Trigger} that should be used to trigger window emission.
+ */
+ public KeyedTriggerWindowDataStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
+ this.trigger = trigger;
+ return this;
+ }
+
+ /**
+ * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+ *
+ * <p>
+ * Note: When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ */
+ public KeyedTriggerWindowDataStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
+ this.evictor = evictor;
+ return this;
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Operations on the keyed windows
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies a reduce function to the window. The window function is called for each evaluation
+ * of the window for each key individually. The output of the reduce function is interpreted
+ * as a regular non-windowed stream.
+ * <p>
+ * This window will try and pre-aggregate data as much as the window policies permit. For example,
+ * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+ * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
+ * so a few elements are stored per key (one per slide interval).
+ * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+ * aggregation tree.
+ *
+ * @param function The reduce function.
+ * @return The data stream that is the result of applying the reduce function to the window.
+ */
+ public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "Reduce at " + callLocation;
+
+ DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
+ if (result != null) {
+ return result;
+ }
+
+ String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+ KeySelector<T, K> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, T> operator;
+
+ if (evictor != null) {
+ operator = new EvictingWindowOperator<>(windowAssigner,
+ keySel,
+ new HeapWindowBuffer.Factory<T>(),
+ new ReduceWindowFunction<K, W, T>(function),
+ trigger,
+ evictor);
+
+ } else {
+ // we need to copy because we need our own instance of the pre aggregator
+ @SuppressWarnings("unchecked")
+ ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
+
+ operator = new WindowOperator<>(windowAssigner,
+ keySel,
+ new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+ new ReduceWindowFunction<K, W, T>(function),
+ trigger);
+ }
+
+ return input.transform(opName, input.getType(), operator);
+ }
+
+ /**
+ * Applies a window function to the window. The window function is called for each evaluation
+ * of the window for each key individually. The output of the window function is interpreted
+ * as a regular non-windowed stream.
+ * <p>
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of pre-aggregation.
+ *
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ public <R> DataStream<R> mapWindow(KeyedWindowFunction<T, R, K, W> function) {
+ TypeInformation<T> inType = input.getType();
+ TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ function, KeyedWindowFunction.class, true, true, inType, null, false);
+
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "MapWindow at " + callLocation;
+
+ DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
+ if (result != null) {
+ return result;
+ }
+
+
+ String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+ KeySelector<T, K> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, R> operator;
+
+ if (evictor != null) {
+ operator = new EvictingWindowOperator<>(windowAssigner,
+ keySel,
+ new HeapWindowBuffer.Factory<T>(),
+ function,
+ trigger,
+ evictor);
+
+ } else {
+ operator = new WindowOperator<>(windowAssigner,
+ keySel,
+ new HeapWindowBuffer.Factory<T>(),
+ function,
+ trigger);
+ }
+
+
+
+ return input.transform(opName, resultType, operator);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private <R> DataStream<R> createFastTimeOperatorIfValid(
+ Function function,
+ TypeInformation<R> resultType,
+ String functionName) {
+
+ WindowPolicy windowPolicy = null;
+ WindowPolicy slidePolicy = null;
+
+ if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+ SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
+ windowPolicy = ProcessingTime.of(timeWindows.getSize(), TimeUnit.MILLISECONDS);
+ slidePolicy = ProcessingTime.of(timeWindows.getSlide(), TimeUnit.MILLISECONDS);
+ } else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+ TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
+ windowPolicy = ProcessingTime.of(timeWindows.getSize(), TimeUnit.MILLISECONDS);
+ }
+
+ if (windowPolicy == null) {
+ return null;
+ }
+
+ String opName = windowPolicy.toString(slidePolicy) + " of " + functionName;
+ KeySelector<T, K> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, R> operator =
+ PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel);
+
+ return input.transform(opName, resultType, operator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
new file mode 100644
index 0000000..1c9578a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceWindowFunction<K, W extends Window, T> extends RichKeyedWindowFunction<T, T, K, W> {
+ private static final long serialVersionUID = 1L;
+
+ private final ReduceFunction<T> reduceFunction;
+
+ public ReduceWindowFunction(ReduceFunction<T> reduceFunction) {
+ this.reduceFunction = reduceFunction;
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext ctx) {
+ super.setRuntimeContext(ctx);
+ FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ FunctionUtils.openFunction(reduceFunction, parameters);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ FunctionUtils.closeFunction(reduceFunction);
+ }
+
+ @Override
+ public void evaluate(K k, W window, Iterable<T> values, Collector<T> out) throws Exception {
+ T result = null;
+
+ for (T v: values) {
+ if (result == null) {
+ result = v;
+ } else {
+ result = reduceFunction.reduce(result, v);
+ }
+ }
+
+ if (result != null) {
+ out.collect(result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
new file mode 100644
index 0000000..bceff82
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends RichKeyedWindowFunction<T, Tuple2<W, T>, K, W> {
+ private static final long serialVersionUID = 1L;
+
+ private final ReduceFunction<T> reduceFunction;
+
+ public ReduceWindowFunctionWithWindow(ReduceFunction<T> reduceFunction) {
+ this.reduceFunction = reduceFunction;
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext ctx) {
+ super.setRuntimeContext(ctx);
+ FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ FunctionUtils.openFunction(reduceFunction, parameters);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ FunctionUtils.closeFunction(reduceFunction);
+ }
+
+ @Override
+ public void evaluate(K k, W window, Iterable<T> values, Collector<Tuple2<W, T>> out) throws Exception {
+ T result = null;
+
+ for (T v: values) {
+ if (result == null) {
+ result = v;
+ } else {
+ result = reduceFunction.reduce(result, v);
+ }
+ }
+
+ if (result != null) {
+ out.collect(Tuple2.of(window, result));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
new file mode 100644
index 0000000..90ccb40
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public abstract class RichKeyedWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction implements KeyedWindowFunction<IN, OUT, KEY, W> {
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index cda5686..cfa6d93 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -217,6 +218,11 @@ public class StreamGraph extends StreamingPlan {
outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
}
+ if (operatorObject instanceof InputTypeConfigurable) {
+ InputTypeConfigurable inputTypeConfigurable = (InputTypeConfigurable) operatorObject;
+ inputTypeConfigurable.setInputType(inTypeInfo, executionConfig);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: {}", vertexID);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
new file mode 100644
index 0000000..391a6a4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
+ private static final long serialVersionUID = 1L;
+
+ private GlobalWindows() {}
+
+ @Override
+ public Collection<GlobalWindow> assignWindows(Object element, long timestamp) {
+ return Collections.singletonList(GlobalWindow.get());
+ }
+
+ @Override
+ public Trigger<Object, GlobalWindow> getDefaultTrigger() {
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "GlobalWindows()";
+ }
+
+ /**
+ * Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns
+ * all elements to the same {@link GlobalWindow}.
+ *
+ * @return The global window policy.
+ */
+ public static GlobalWindows create() {
+ return new GlobalWindows();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
new file mode 100644
index 0000000..a2d95c2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Collection;
+import java.util.List;
+
+public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
+ private static final long serialVersionUID = 1L;
+
+ private final long size;
+
+ private final long slide;
+
+ private transient List<TimeWindow> result;
+
+ private SlidingProcessingTimeWindows(long size, long slide) {
+ this.size = size;
+ this.slide = slide;
+ this.result = Lists.newArrayListWithCapacity((int) (size / slide));
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ this.result = Lists.newArrayListWithCapacity((int) (size / slide));
+ }
+
+ @Override
+ public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+ result.clear();
+ long time = System.currentTimeMillis();
+ long lastStart = time - time % slide;
+ for (long start = lastStart;
+ start > time - size;
+ start -= slide) {
+ result.add(new TimeWindow(start, size));
+ }
+ return result;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public long getSlide() {
+ return slide;
+ }
+
+ @Override
+ public Trigger<Object, TimeWindow> getDefaultTrigger() {
+ return ProcessingTimeTrigger.create();
+ }
+
+ @Override
+ public String toString() {
+ return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")";
+ }
+
+ /**
+ * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns
+ * elements to sliding time windows based on the current processing time.
+ *
+ * @param size The size of the generated windows.
+ * @param slide The slide interval of the generated windows.
+ * @return The time policy.
+ */
+ public static SlidingProcessingTimeWindows of(long size, long slide) {
+ return new SlidingProcessingTimeWindows(size, slide);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
new file mode 100644
index 0000000..cb5a7a1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
+ private static final long serialVersionUID = 1L;
+
+ private final long size;
+
+ private final long slide;
+
+ private SlidingTimeWindows(long size, long slide) {
+ this.size = size;
+ this.slide = slide;
+ }
+
+ @Override
+ public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+ List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
+ long lastStart = timestamp - timestamp % slide;
+ for (long start = lastStart;
+ start > timestamp - size;
+ start -= slide) {
+ windows.add(new TimeWindow(start, size));
+ }
+ return windows;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public long getSlide() {
+ return slide;
+ }
+
+ @Override
+ public Trigger<Object, TimeWindow> getDefaultTrigger() {
+ return WatermarkTrigger.create();
+ }
+
+ @Override
+ public String toString() {
+ return "SlidingTimeWindows(" + size + ", " + slide + ")";
+ }
+
+ /**
+ * Creates a new {@code SlidingTimeWindows} {@link WindowAssigner} that assigns
+ * elements to sliding time windows based on the element timestamp.
+ *
+ * @param size The size of the generated windows.
+ * @param slide The slide interval of the generated windows.
+ * @return The time policy.
+ */
+ public static SlidingTimeWindows of(long size, long slide) {
+ return new SlidingTimeWindows(size, slide);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
new file mode 100644
index 0000000..b1ef857
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
+ private static final long serialVersionUID = 1L;
+
+ private long size;
+
+ private TumblingProcessingTimeWindows(long size) {
+ this.size = size;
+ }
+
+ @Override
+ public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+ long time = System.currentTimeMillis();
+ long start = time - (time % size);
+ return Collections.singletonList(new TimeWindow(start, size));
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public Trigger<Object, TimeWindow> getDefaultTrigger() {
+ return ProcessingTimeTrigger.create();
+ }
+
+ @Override
+ public String toString() {
+ return "TumblingProcessingTimeWindows(" + size + ")";
+ }
+
+ /**
+ * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
+ * elements to time windows based on the current processing time.
+ *
+ * @param size The size of the generated windows.
+ * @return The time policy.
+ */
+ public static TumblingProcessingTimeWindows of(long size) {
+ return new TumblingProcessingTimeWindows(size);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
new file mode 100644
index 0000000..d19c97d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
+ private static final long serialVersionUID = 1L;
+
+ private long size;
+
+ private TumblingTimeWindows(long size) {
+ this.size = size;
+ }
+
+ @Override
+ public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+ long start = timestamp - (timestamp % size);
+ return Collections.singletonList(new TimeWindow(start, size));
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public Trigger<Object, TimeWindow> getDefaultTrigger() {
+ return WatermarkTrigger.create();
+ }
+
+ @Override
+ public String toString() {
+ return "TumblingTimeWindows(" + size + ")";
+ }
+
+ /**
+ * Creates a new {@code TumblingTimeWindows} {@link WindowAssigner} that assigns
+ * elements to time windows based on the element timestamp.
+ *
+ * @param size The size of the generated windows.
+ * @return The time policy.
+ */
+ public static TumblingTimeWindows of(long size) {
+ return new TumblingTimeWindows(size);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
new file mode 100644
index 0000000..5996426
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import scala.Serializable;
+
+import java.util.Collection;
+
+public abstract class WindowAssigner<T, W extends Window> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public abstract Collection<W> assignWindows(T element, long timestamp);
+
+ public abstract Trigger<T, W> getDefaultTrigger();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
new file mode 100644
index 0000000..04636ee
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.evictors;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+public class CountEvictor<W extends Window> implements Evictor<Object, W> {
+ private static final long serialVersionUID = 1L;
+
+ private final long maxCount;
+
+ private CountEvictor(long count) {
+ this.maxCount = count;
+ }
+
+ @Override
+ public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
+ if (size > maxCount) {
+ return (int) (size - maxCount);
+ } else {
+ return 0;
+ }
+ }
+
+ public static <W extends Window> CountEvictor<W> of(long maxCount) {
+ return new CountEvictor<>(maxCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
new file mode 100644
index 0000000..c7872ce
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.evictors;
+
+import com.google.common.collect.Iterables;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
+ private static final long serialVersionUID = 1L;
+
+ DeltaFunction<T> deltaFunction;
+ private double threshold;
+
+ private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) {
+ this.deltaFunction = deltaFunction;
+ this.threshold = threshold;
+ }
+
+ @Override
+ public int evict(Iterable<StreamRecord<T>> elements, int size, W window) {
+ StreamRecord<T> lastElement = Iterables.getLast(elements);
+ int toEvict = 0;
+ for (StreamRecord<T> element : elements) {
+ if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) < this.threshold) {
+ break;
+ }
+ toEvict++;
+ }
+
+ return toEvict;
+ }
+
+ @Override
+ public String toString() {
+ return "DeltaEvictor(" + deltaFunction + ", " + threshold + ")";
+ }
+
+ public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
+ return new DeltaEvictor<>(threshold, deltaFunction);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
new file mode 100644
index 0000000..db04ac4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.evictors;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import scala.Serializable;
+
+public interface Evictor<T, W extends Window> extends Serializable {
+
+ public abstract int evict(Iterable<StreamRecord<T>> elements, int size, W window);
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
new file mode 100644
index 0000000..450b132
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.evictors;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
+ private static final long serialVersionUID = 1L;
+
+ private final long windowSize;
+
+ public TimeEvictor(long windowSize) {
+ this.windowSize = windowSize;
+ }
+
+ @Override
+ public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
+ int toEvict = 0;
+ long currentTime = System.currentTimeMillis();
+ long evictCutoff = currentTime - windowSize;
+ for (StreamRecord<Object> record: elements) {
+ if (record.getTimestamp() > evictCutoff) {
+ break;
+ }
+ toEvict++;
+ }
+ return toEvict;
+ }
+
+ @Override
+ public String toString() {
+ return "TimeEvictor(" + windowSize + ")";
+ }
+
+ public static <W extends Window> TimeEvictor<W> of(long windowSize) {
+ return new TimeEvictor<>(windowSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
new file mode 100644
index 0000000..64850a2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
+ private static final long serialVersionUID = 1L;
+
+ private long granularity;
+
+ private long nextFireTimestamp = 0;
+
+ private ContinuousProcessingTimeTrigger(long granularity) {
+ this.granularity = granularity;
+ }
+
+ @Override
+ public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
+ long currentTime = System.currentTimeMillis();
+ if (nextFireTimestamp == 0) {
+ long start = currentTime - (currentTime % granularity);
+ nextFireTimestamp = start + granularity;
+
+ ctx.registerProcessingTimeTimer(nextFireTimestamp);
+ return TriggerResult.CONTINUE;
+ }
+ if (currentTime > nextFireTimestamp) {
+ long start = currentTime - (currentTime % granularity);
+ nextFireTimestamp = start + granularity;
+
+ ctx.registerProcessingTimeTimer(nextFireTimestamp);
+
+ return TriggerResult.FIRE;
+ }
+ return TriggerResult.CONTINUE;
+ }
+
+ @Override
+ public TriggerResult onTime(long time, TriggerContext ctx) {
+ // only fire if an element didn't already fire
+ long currentTime = System.currentTimeMillis();
+ if (currentTime > nextFireTimestamp) {
+ long start = currentTime - (currentTime % granularity);
+ nextFireTimestamp = start + granularity;
+ return TriggerResult.FIRE;
+ }
+ return TriggerResult.CONTINUE;
+ }
+
+ @Override
+ public Trigger<Object, W> duplicate() {
+ return new ContinuousProcessingTimeTrigger<>(granularity);
+ }
+
+ @Override
+ public String toString() {
+ return "ContinuousProcessingTimeTrigger(" + granularity + ")";
+ }
+
+ public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(long granularity) {
+ return new ContinuousProcessingTimeTrigger<>(granularity);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
new file mode 100644
index 0000000..b7f085a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> {
+ private static final long serialVersionUID = 1L;
+
+ private long granularity;
+
+ private boolean first = true;
+
+ private ContinuousWatermarkTrigger(long granularity) {
+ this.granularity = granularity;
+ }
+
+ @Override
+ public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
+ if (first) {
+ long start = timestamp - (timestamp % granularity);
+ long nextFireTimestamp = start + granularity;
+
+ ctx.registerWatermarkTimer(nextFireTimestamp);
+ first = false;
+ return TriggerResult.CONTINUE;
+ }
+ return TriggerResult.CONTINUE;
+ }
+
+ @Override
+ public TriggerResult onTime(long time, TriggerContext ctx) {
+ ctx.registerWatermarkTimer(time + granularity);
+ return TriggerResult.FIRE;
+ }
+
+ @Override
+ public Trigger<Object, W> duplicate() {
+ return new ContinuousWatermarkTrigger<>(granularity);
+ }
+
+ @Override
+ public String toString() {
+ return "ContinuousProcessingTimeTrigger(" + granularity + ")";
+ }
+
+ public static <W extends Window> ContinuousWatermarkTrigger<W> of(long granularity) {
+ return new ContinuousWatermarkTrigger<>(granularity);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
new file mode 100644
index 0000000..a51fae6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class CountTrigger<W extends Window> implements Trigger<Object, W> {
+ private static final long serialVersionUID = 1L;
+
+ private long maxCount;
+ private long count;
+
+ private CountTrigger(long maxCount) {
+ this.maxCount = maxCount;
+ count = 0;
+ }
+
+ @Override
+ public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
+ count++;
+ if (count >= maxCount) {
+ count = 0;
+ return TriggerResult.FIRE;
+ }
+ return TriggerResult.CONTINUE;
+ }
+
+ @Override
+ public TriggerResult onTime(long time, TriggerContext ctx) {
+ return null;
+ }
+
+ @Override
+ public Trigger<Object, W> duplicate() {
+ return new CountTrigger<>(maxCount);
+ }
+
+ @Override
+ public String toString() {
+ return "CountTrigger(" + maxCount + ")";
+ }
+
+ public static <W extends Window> CountTrigger<W> of(long maxCount) {
+ return new CountTrigger<>(maxCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
new file mode 100644
index 0000000..ecd7ed0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
+ private static final long serialVersionUID = 1L;
+
+ DeltaFunction<T> deltaFunction;
+ private double threshold;
+ private transient T lastElement;
+
+ private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction) {
+ this.deltaFunction = deltaFunction;
+ this.threshold = threshold;
+ }
+
+ @Override
+ public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) {
+ if (lastElement == null) {
+ lastElement = element;
+ return TriggerResult.CONTINUE;
+ }
+ if (deltaFunction.getDelta(lastElement, element) > this.threshold) {
+ lastElement = element;
+ return TriggerResult.FIRE;
+ }
+ return TriggerResult.CONTINUE;
+ }
+
+ @Override
+ public TriggerResult onTime(long time, TriggerContext ctx) {
+ return null;
+ }
+
+ @Override
+ public Trigger<T, W> duplicate() {
+ return new DeltaTrigger<>(threshold, deltaFunction);
+ }
+
+ @Override
+ public String toString() {
+ return "DeltaTrigger(" + deltaFunction + ", " + threshold + ")";
+ }
+
+ public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
+ return new DeltaTrigger<>(threshold, deltaFunction);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
new file mode 100644
index 0000000..f693a67
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
+ private static final long serialVersionUID = 1L;
+
+ boolean isFirst = true;
+
+ private ProcessingTimeTrigger() {}
+
+ @Override
+ public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
+ if (isFirst) {
+ ctx.registerProcessingTimeTimer(window.getEnd());
+ isFirst = false;
+ }
+ return TriggerResult.CONTINUE;
+ }
+
+ @Override
+ public TriggerResult onTime(long time, TriggerContext ctx) {
+ return TriggerResult.FIRE_AND_PURGE;
+ }
+
+ @Override
+ public Trigger<Object, TimeWindow> duplicate() {
+ return new ProcessingTimeTrigger();
+ }
+
+ @Override
+ public String toString() {
+ return "ProcessingTimeTrigger()";
+ }
+
+ public static ProcessingTimeTrigger create() {
+ return new ProcessingTimeTrigger();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
new file mode 100644
index 0000000..88e22cd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
+ private static final long serialVersionUID = 1L;
+
+ private Trigger<T, W> nestedTrigger;
+
+ private PurgingTrigger(Trigger<T, W> nestedTrigger) {
+ this.nestedTrigger = nestedTrigger;
+ }
+
+ @Override
+ public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) {
+ TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
+ switch (triggerResult) {
+ case FIRE:
+ return TriggerResult.FIRE_AND_PURGE;
+ case FIRE_AND_PURGE:
+ return TriggerResult.FIRE_AND_PURGE;
+ default:
+ return TriggerResult.CONTINUE;
+ }
+ }
+
+ @Override
+ public TriggerResult onTime(long time, TriggerContext ctx) {
+ TriggerResult triggerResult = nestedTrigger.onTime(time, ctx);
+ switch (triggerResult) {
+ case FIRE:
+ return TriggerResult.FIRE_AND_PURGE;
+ case FIRE_AND_PURGE:
+ return TriggerResult.FIRE_AND_PURGE;
+ default:
+ return TriggerResult.CONTINUE;
+ }
+ }
+
+ @Override
+ public Trigger<T, W> duplicate() {
+ return new PurgingTrigger<>(nestedTrigger.duplicate());
+ }
+
+ @Override
+ public String toString() {
+ return "PurgingTrigger(" + nestedTrigger.toString() + ")";
+ }
+
+ public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
+ return new PurgingTrigger<>(nestedTrigger);
+ }
+
+ @VisibleForTesting
+ public Trigger<T, W> getNestedTrigger() {
+ return nestedTrigger;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
new file mode 100644
index 0000000..b04aacf
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import scala.Serializable;
+
+public interface Trigger<T, W extends Window> extends Serializable {
+
+ public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
+
+ public TriggerResult onTime(long time, TriggerContext ctx);
+
+ public Trigger<T, W> duplicate();
+
+ public static enum TriggerResult {
+ CONTINUE, FIRE_AND_PURGE, FIRE
+ }
+
+ public interface TriggerContext {
+ void registerProcessingTimeTimer(long time);
+
+ void registerWatermarkTimer(long time);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
new file mode 100644
index 0000000..6ba8890
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+public class WatermarkTrigger implements Trigger<Object, TimeWindow> {
+ private static final long serialVersionUID = 1L;
+
+ boolean isFirst = true;
+
+ private WatermarkTrigger() {}
+
+ @Override
+ public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
+ if (isFirst) {
+ ctx.registerWatermarkTimer(window.maxTimestamp());
+ isFirst = false;
+ }
+ return TriggerResult.CONTINUE;
+ }
+
+ @Override
+ public TriggerResult onTime(long time, TriggerContext ctx) {
+ return TriggerResult.FIRE_AND_PURGE;
+ }
+
+ @Override
+ public Trigger<Object, TimeWindow> duplicate() {
+ return new WatermarkTrigger();
+ }
+
+ @Override
+ public String toString() {
+ return "WatermarkTrigger()";
+ }
+
+ public static WatermarkTrigger create() {
+ return new WatermarkTrigger();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
new file mode 100644
index 0000000..e0df19d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.windows;
+
+public class GlobalWindow extends Window {
+
+ private static GlobalWindow INSTANCE = new GlobalWindow();
+
+ private GlobalWindow() { }
+
+ public static GlobalWindow get() {
+ return INSTANCE;
+ }
+
+ @Override
+ public long getStart() {
+ return Long.MIN_VALUE;
+ }
+
+ @Override
+ public long getEnd() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public long maxTimestamp() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public String toString() {
+ return "GlobalWindow";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
new file mode 100644
index 0000000..20080c0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.windows;
+
+public class TimeWindow extends Window {
+ long start;
+ long end;
+
+ public TimeWindow() {
+ }
+
+ public TimeWindow(long start, long size) {
+ this.start = start;
+ this.end = start + size - 1;
+ }
+
+ @Override
+ public long getStart() {
+ return start;
+ }
+
+ @Override
+ public long getEnd() {
+ return end;
+ }
+
+ @Override
+ public long maxTimestamp() {
+ return end;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TimeWindow window = (TimeWindow) o;
+
+ return end == window.end && start == window.start;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (start ^ (start >>> 32));
+ result = 31 * result + (int) (end ^ (end >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TimeWindow{" +
+ "start=" + start +
+ ", end=" + end +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
new file mode 100644
index 0000000..4e22c32
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.windows;
+
+public abstract class Window {
+
+ public abstract long getStart();
+
+ public abstract long getEnd();
+
+ public abstract long maxTimestamp();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
new file mode 100644
index 0000000..145ad25
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
+ private static final long serialVersionUID = 1L;
+
+ private long granularity;
+
+ private transient Map<Long, List<StreamRecord<T>>> buckets;
+
+ public BucketStreamSortOperator(long granularity) {
+ this.granularity = granularity;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ buckets = Maps.newHashMap();
+
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processElement(StreamRecord<T> record) throws Exception {
+ long bucketId = record.getTimestamp() - (record.getTimestamp() % granularity);
+ List<StreamRecord<T>> bucket = buckets.get(bucketId);
+ if (bucket == null) {
+ bucket = Lists.newArrayList();
+ buckets.put(bucketId, bucket);
+ }
+ bucket.add(record);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ long maxBucketId = mark.getTimestamp() - (mark.getTimestamp() % granularity);
+ Set<Long> toRemove = Sets.newHashSet();
+ for (Map.Entry<Long, List<StreamRecord<T>>> bucket: buckets.entrySet()) {
+ if (bucket.getKey() < maxBucketId) {
+ Collections.sort(bucket.getValue(), new Comparator<StreamRecord<T>>() {
+ @Override
+ public int compare(StreamRecord<T> o1, StreamRecord<T> o2) {
+ return (int) (o1.getTimestamp() - o2.getTimestamp());
+ }
+ });
+ for (StreamRecord<T> r: bucket.getValue()) {
+ output.collect(r);
+ }
+ toRemove.add(bucket.getKey());
+ }
+ }
+
+ for (Long l: toRemove) {
+ buckets.remove(l);
+ }
+
+ output.emitWatermark(mark);
+ }
+
+}