You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/04/05 17:19:16 UTC
[2/4] flink git commit: [FLINK-3614] Remove Non-Keyed Window Operator
[FLINK-3614] Remove Non-Keyed Window Operator
Instead we use a dummy KeySelector and the regular WindowOperator now.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/505512db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/505512db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/505512db
Branch: refs/heads/master
Commit: 505512dbe461b9840dde6197c71dbb90b49c0495
Parents: b1e5086
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 16 15:59:05 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Apr 5 16:26:54 2016 +0200
----------------------------------------------------------------------
.../api/datastream/AllWindowedStream.java | 264 ++++----
.../EvictingNonKeyedWindowOperator.java | 88 ---
.../windowing/NonKeyedWindowOperator.java | 624 -------------------
.../operators/windowing/WindowOperator.java | 5 -
.../windowing/buffers/EvictingWindowBuffer.java | 38 --
.../windowing/buffers/FoldingWindowBuffer.java | 163 -----
.../windowing/buffers/ListWindowBuffer.java | 127 ----
.../windowing/buffers/ReducingWindowBuffer.java | 121 ----
.../windowing/buffers/WindowBuffer.java | 73 ---
.../windowing/buffers/WindowBufferFactory.java | 47 --
.../InternalIterableAllWindowFunction.java | 75 +++
.../InternalIterableWindowFunction.java | 5 +-
.../InternalSingleValueAllWindowFunction.java | 77 +++
.../InternalSingleValueWindowFunction.java | 5 +-
.../windowing/AllWindowTranslationTest.java | 54 +-
.../EvictingNonKeyedWindowOperatorTest.java | 147 -----
.../windowing/NonKeyedWindowOperatorTest.java | 406 ------------
.../api/scala/AllWindowTranslationTest.scala | 27 +-
.../StreamingScalaAPICompletenessTest.scala | 2 -
19 files changed, 345 insertions(+), 2003 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 54c0b86..1a59bf1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -21,11 +21,14 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
@@ -35,17 +38,17 @@ import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.FoldingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
/**
* A {@code AllWindowedStream} represents a data stream where the stream of
@@ -71,8 +74,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWi
@Public
public class AllWindowedStream<T, W extends Window> {
- /** The data stream that is windowed by this stream */
- private final DataStream<T> input;
+ /** The keyed data stream that is windowed by this stream */
+ private final KeyedStream<T, Byte> input;
/** The window assigner */
private final WindowAssigner<? super T, W> windowAssigner;
@@ -87,7 +90,7 @@ public class AllWindowedStream<T, W extends Window> {
@PublicEvolving
public AllWindowedStream(DataStream<T> input,
WindowAssigner<? super T, W> windowAssigner) {
- this.input = input;
+ this.input = input.keyBy(new NullByteKeySelector<T>());
this.windowAssigner = windowAssigner;
this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
}
@@ -106,7 +109,7 @@ public class AllWindowedStream<T, W extends Window> {
*
* <p>
* Note: When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
+ * incremental aggregation of window results cannot be used.
*/
@PublicEvolving
public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
@@ -123,55 +126,32 @@ public class AllWindowedStream<T, W extends Window> {
* Applies a reduce function to the window. The window function is called for each evaluation
* of the window for each key individually. The output of the reduce function is interpreted
* as a regular non-windowed stream.
+ *
* <p>
- * This window will try and pre-aggregate data as much as the window policies permit. For example,
- * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
- * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
+ * This window will try and incrementally aggregate data as much as the window policies permit.
+ * For example, tumbling time windows can aggregate the data, meaning that only one element per
+ * key is stored. Sliding time windows will aggregate on the granularity of the slide interval,
* so a few elements are stored per key (one per slide interval).
- * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
- * aggregation tree.
- *
+ * Custom windows may not be able to incrementally aggregate, or may need to store extra values
+ * in an aggregation tree.
+ *
* @param function The reduce function.
- * @return The data stream that is the result of applying the reduce function to the window.
+ * @return The data stream that is the result of applying the reduce function to the window.
*/
+ @SuppressWarnings("unchecked")
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " +
- "Please use apply(ReduceFunction, WindowFunction) instead.");
+ "Please use apply(ReduceFunction, WindowFunction) instead.");
}
//clean the closure
function = input.getExecutionEnvironment().clean(function);
String callLocation = Utils.getCallLocationName();
- String udfName = "AllWindowedStream." + callLocation;
-
- SingleOutputStreamOperator<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
- if (result != null) {
- return result;
- }
-
- String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
- OneInputStreamOperator<T, T> operator;
-
- if (evictor != null) {
- operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- new ReduceIterableAllWindowFunction<W, T>(function),
- trigger,
- evictor);
-
- } else {
- operator = new NonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ReducingWindowBuffer.Factory<>(function, getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- new ReduceIterableAllWindowFunction<W, T>(function),
- trigger);
- }
+ String udfName = "WindowedStream." + callLocation;
- return input.transform(opName, input.getType(), operator).setParallelism(1);
+ return apply(function, new PassThroughAllWindowFunction<W, T>());
}
/**
@@ -185,11 +165,11 @@ public class AllWindowedStream<T, W extends Window> {
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
- "Please use apply(FoldFunction, WindowFunction) instead.");
+ "Please use apply(FoldFunction, WindowFunction) instead.");
}
TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
- Utils.getCallLocationName(), true);
+ Utils.getCallLocationName(), true);
return fold(initialValue, function, resultType);
}
@@ -205,25 +185,25 @@ public class AllWindowedStream<T, W extends Window> {
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
- "Please use apply(FoldFunction, WindowFunction) instead.");
+ "Please use apply(FoldFunction, WindowFunction) instead.");
}
return apply(initialValue, function, new PassThroughAllWindowFunction<W, R>(), resultType);
}
/**
- * Applies a window function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the window function is interpreted
- * as a regular non-windowed stream.
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
* <p>
* Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
+ * is evaluated, as the function provides no means of incremental aggregation.
+ *
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
- @SuppressWarnings("unchecked, rawtypes")
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, AllWindowFunction.class, true, true, getInputType(), null, false);
@@ -231,46 +211,58 @@ public class AllWindowedStream<T, W extends Window> {
}
/**
- * Applies the given window function to each window. The window function is called for each evaluation
- * of the window for each key individually. The output of the window function is interpreted
- * as a regular non-windowed stream.
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
* <p>
* Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
+ * is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
+ * @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+
//clean the closure
function = input.getExecutionEnvironment().clean(function);
String callLocation = Utils.getCallLocationName();
- String udfName = "AllWindowedStream." + callLocation;
+ String udfName = "WindowedStream." + callLocation;
- SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
- if (result != null) {
- return result;
- }
+ String opName;
+ KeySelector<T, Byte> keySel = input.getKeySelector();
+ WindowOperator<Byte, T, Iterable<T>, R, W> operator;
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+ if (evictor != null) {
+ ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
+ new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
- NonKeyedWindowOperator<T, T, R, W> operator;
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
- if (evictor != null) {
- operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+ operator = new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- function,
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalIterableAllWindowFunction<>(function),
trigger,
evictor);
} else {
- operator = new NonKeyedWindowOperator<>(windowAssigner,
+ ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
+ input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+ operator = new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- function,
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalIterableAllWindowFunction<>(function),
trigger);
}
@@ -283,19 +275,19 @@ public class AllWindowedStream<T, W extends Window> {
* interpreted as a regular non-windowed stream.
*
* <p>
- * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+ * Arriving data is incrementally aggregated using the given reducer.
*
- * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param reduceFunction The reduce function that is used for incremental aggregation.
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
- public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function) {
+ public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function) {
TypeInformation<T> inType = input.getType();
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, AllWindowFunction.class, true, true, inType, null, false);
- return apply(preAggregator, function, resultType);
+ return apply(reduceFunction, function, resultType);
}
/**
@@ -304,43 +296,59 @@ public class AllWindowedStream<T, W extends Window> {
* interpreted as a regular non-windowed stream.
*
* <p>
- * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+ * Arriving data is incrementally aggregated using the given reducer.
*
- * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param reduceFunction The reduce function that is used for incremental aggregation.
* @param function The window function.
* @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
*/
- public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
- if (preAggregator instanceof RichFunction) {
- throw new UnsupportedOperationException("Pre-aggregator of apply can not be a RichFunction.");
+ public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+ if (reduceFunction instanceof RichFunction) {
+ throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
}
//clean the closures
function = input.getExecutionEnvironment().clean(function);
- preAggregator = input.getExecutionEnvironment().clean(preAggregator);
+ reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
String callLocation = Utils.getCallLocationName();
- String udfName = "AllWindowedStream." + callLocation;
+ String udfName = "WindowedStream." + callLocation;
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+ String opName;
+ KeySelector<T, Byte> keySel = input.getKeySelector();
OneInputStreamOperator<T, R> operator;
if (evictor != null) {
- operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+ ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
+ new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+ operator = new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- new ReduceApplyAllWindowFunction<>(preAggregator, function),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalIterableAllWindowFunction<>(new ReduceApplyAllWindowFunction<>(reduceFunction, function)),
trigger,
evictor);
} else {
- operator = new NonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ReducingWindowBuffer.Factory<>(preAggregator, getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- function,
- trigger);
+ ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
+ reduceFunction,
+ input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+ operator = new WindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalSingleValueAllWindowFunction<>(function),
+ trigger);
}
return input.transform(opName, resultType, operator).setParallelism(1);
@@ -360,8 +368,9 @@ public class AllWindowedStream<T, W extends Window> {
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) {
+
TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
- Utils.getCallLocationName(), true);
+ Utils.getCallLocationName(), true);
return apply(initialValue, foldFunction, function, resultType);
}
@@ -390,37 +399,51 @@ public class AllWindowedStream<T, W extends Window> {
foldFunction = input.getExecutionEnvironment().clean(foldFunction);
String callLocation = Utils.getCallLocationName();
- String udfName = "AllWindowedStream." + callLocation;
+ String udfName = "WindowedStream." + callLocation;
String opName;
+ KeySelector<T, Byte> keySel = input.getKeySelector();
OneInputStreamOperator<T, R> operator;
if (evictor != null) {
- opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + evictor + ", " + udfName + ")";
- operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
- new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function),
- trigger,
- evictor);
+ ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents",
+ new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+ operator = new EvictingWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function)),
+ trigger,
+ evictor);
} else {
- opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+ FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
+ initialValue,
+ foldFunction,
+ resultType);
- operator = new NonKeyedWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new FoldingWindowBuffer.Factory<>(foldFunction, initialValue, resultType.createSerializer(getExecutionEnvironment().getConfig())),
- function,
- trigger);
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+ operator = new WindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalSingleValueAllWindowFunction<>(function),
+ trigger);
}
return input.transform(opName, resultType, operator).setParallelism(1);
}
// ------------------------------------------------------------------------
- // Aggregations on the windows
+ // Aggregations on the keyed windows
// ------------------------------------------------------------------------
/**
@@ -621,16 +644,6 @@ public class AllWindowedStream<T, W extends Window> {
// Utilities
// ------------------------------------------------------------------------
-
- private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
- Function function,
- TypeInformation<R> resultType,
- String functionName) {
-
- // TODO: add once non-parallel fast aligned time windows operator is ready
- return null;
- }
-
public StreamExecutionEnvironment getExecutionEnvironment() {
return input.getExecutionEnvironment();
}
@@ -638,4 +651,17 @@ public class AllWindowedStream<T, W extends Window> {
public TypeInformation<T> getInputType() {
return input.getType();
}
+
+ /**
+ * Used as dummy KeySelector to allow using WindowOperator for Non-Keyed Windows.
+ * @param <T>
+ */
+ private static class NullByteKeySelector<T> implements KeySelector<T, Byte> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Byte getKey(T value) throws Exception {
+ return 0;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
deleted file mode 100644
index 22d207d..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Evicting window operator for non-keyed windows.
- *
- * @see org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator
- *
- * @param <IN> The type of the incoming elements.
- * @param <ACC> The type of elements stored in the window buffers.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
- */
-@Internal
-public class EvictingNonKeyedWindowOperator<IN, ACC, OUT, W extends Window> extends NonKeyedWindowOperator<IN, ACC, OUT, W> {
-
- private static final long serialVersionUID = 1L;
-
- private final Evictor<? super IN, ? super W> evictor;
-
- public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
- TypeSerializer<W> windowSerializer,
- WindowBufferFactory<? super IN, ACC, ? extends EvictingWindowBuffer<IN, ACC>> windowBufferFactory,
- AllWindowFunction<ACC, OUT, W> windowFunction,
- Trigger<? super IN, ? super W> trigger,
- Evictor<? super IN, ? super W> evictor) {
- super(windowAssigner, windowSerializer, windowBufferFactory, windowFunction, trigger);
- this.evictor = requireNonNull(evictor);
- }
-
- @Override
- @SuppressWarnings("unchecked, rawtypes")
- protected void emitWindow(Context context) throws Exception {
- timestampedCollector.setAbsoluteTimestamp(context.window.maxTimestamp());
- EvictingWindowBuffer<IN, ACC> windowBuffer = (EvictingWindowBuffer<IN, ACC>) context.windowBuffer;
-
- int toEvict = 0;
- if (windowBuffer.size() > 0) {
- // need some type trickery here...
- toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), context.window);
- }
-
- windowBuffer.removeElements(toEvict);
-
- userFunction.apply(
- context.window,
- context.windowBuffer.getUnpackedElements(),
- timestampedCollector);
- }
-
- // ------------------------------------------------------------------------
- // Getters for testing
- // ------------------------------------------------------------------------
-
- @VisibleForTesting
- public Evictor<? super IN, ? super W> getEvictor() {
- return evictor;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
deleted file mode 100644
index 6bd5c7d..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ /dev/null
@@ -1,624 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext;
-import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.util.InstantiationUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Window operator for non-keyed windows.
- *
- * @see org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
- *
- * @param <IN> The type of the incoming elements.
- * @param <ACC> The type of elements stored in the window buffers.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
- */
-@Internal
-public class NonKeyedWindowOperator<IN, ACC, OUT, W extends Window>
- extends AbstractUdfStreamOperator<OUT, AllWindowFunction<ACC, OUT, W>>
- implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
-
- // ------------------------------------------------------------------------
- // Configuration values and stuff from the user
- // ------------------------------------------------------------------------
-
- private final WindowAssigner<? super IN, W> windowAssigner;
-
- private final Trigger<? super IN, ? super W> trigger;
-
- private final WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory;
-
- /**
- * This is used to copy the incoming element because it can be put into several window
- * buffers.
- */
- private TypeSerializer<IN> inputSerializer;
-
- /**
- * For serializing the window in checkpoints.
- */
- private final TypeSerializer<W> windowSerializer;
-
- // ------------------------------------------------------------------------
- // State that is not checkpointed
- // ------------------------------------------------------------------------
-
- /**
- * Processing time timers that are currently in-flight.
- */
- private transient Map<Long, Set<Context>> processingTimeTimers;
-
- /**
- * Current waiting watermark callbacks.
- */
- private transient Map<Long, Set<Context>> watermarkTimers;
-
- /**
- * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
- */
- protected transient TimestampedCollector<OUT> timestampedCollector;
-
- /**
- * To keep track of the current watermark so that we can immediately fire if a trigger
- * registers an event time callback for a timestamp that lies in the past.
- */
- protected transient long currentWatermark = -1L;
-
- // ------------------------------------------------------------------------
- // State that needs to be checkpointed
- // ------------------------------------------------------------------------
-
- /**
- * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
- * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
- */
- protected transient Map<W, Context> windows;
-
- /**
- * Creates a new {@code WindowOperator} based on the given policies and user functions.
- */
- public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
- TypeSerializer<W> windowSerializer,
- WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> windowBufferFactory,
- AllWindowFunction<ACC, OUT, W> windowFunction,
- Trigger<? super IN, ? super W> trigger) {
-
- super(windowFunction);
-
- this.windowAssigner = requireNonNull(windowAssigner);
- this.windowSerializer = windowSerializer;
-
- this.windowBufferFactory = requireNonNull(windowBufferFactory);
- this.trigger = requireNonNull(trigger);
-
- setChainingStrategy(ChainingStrategy.ALWAYS);
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- currentWatermark = -1;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
- inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
- }
-
- @Override
- public final void open() throws Exception {
- super.open();
- timestampedCollector = new TimestampedCollector<>(output);
-
- if (inputSerializer == null) {
- throw new IllegalStateException("Input serializer was not set.");
- }
-
- // these could already be initialized from restoreState()
- if (watermarkTimers == null) {
- watermarkTimers = new HashMap<>();
- }
- if (processingTimeTimers == null) {
- processingTimeTimers = new HashMap<>();
- }
- if (windows == null) {
- windows = new HashMap<>();
- }
-
- // re-register timers that this window context had set
- for (Context context: windows.values()) {
- if (context.processingTimeTimer > 0) {
- Set<Context> triggers = processingTimeTimers.get(context.processingTimeTimer);
- if (triggers == null) {
- getRuntimeContext().registerTimer(context.processingTimeTimer, NonKeyedWindowOperator.this);
- triggers = new HashSet<>();
- processingTimeTimers.put(context.processingTimeTimer, triggers);
- }
- triggers.add(context);
- }
- if (context.watermarkTimer > 0) {
- Set<Context> triggers = watermarkTimers.get(context.watermarkTimer);
- if (triggers == null) {
- triggers = new HashSet<>();
- watermarkTimers.put(context.watermarkTimer, triggers);
- }
- triggers.add(context);
- }
-
- }
- }
-
- @Override
- public final void dispose() {
- super.dispose();
- windows.clear();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public final void processElement(StreamRecord<IN> element) throws Exception {
- Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
-
- for (W window: elementWindows) {
- Context context = windows.get(window);
- if (context == null) {
- WindowBuffer<IN, ACC> windowBuffer = windowBufferFactory.create();
- context = new Context(window, windowBuffer);
- windows.put(window, context);
- }
- context.windowBuffer.storeElement(element);
- TriggerResult triggerResult = context.onElement(element);
- processTriggerResult(triggerResult, window);
- }
- }
-
- protected void emitWindow(Context context) throws Exception {
- timestampedCollector.setAbsoluteTimestamp(context.window.maxTimestamp());
-
- if (context.windowBuffer.size() > 0) {
- userFunction.apply(
- context.window,
- context.windowBuffer.getUnpackedElements(),
- timestampedCollector);
- }
- }
-
- private void processTriggerResult(TriggerResult triggerResult, W window) throws Exception {
- if (!triggerResult.isFire() && !triggerResult.isPurge()) {
- // do nothing
- return;
- }
- Context context;
-
- if (triggerResult.isPurge()) {
- context = windows.remove(window);
- } else {
- context = windows.get(window);
- }
- if (context == null) {
- LOG.debug("Window {} already gone.", window);
- return;
- }
-
- if (triggerResult.isFire()) {
- emitWindow(context);
- }
-
- if (triggerResult.isPurge()) {
- context.clear();
- }
- }
-
- @Override
- public final void processWatermark(Watermark mark) throws Exception {
- List<Set<Context>> toTrigger = new ArrayList<>();
-
- Iterator<Map.Entry<Long, Set<Context>>> it = watermarkTimers.entrySet().iterator();
-
- while (it.hasNext()) {
- Map.Entry<Long, Set<Context>> triggers = it.next();
- if (triggers.getKey() <= mark.getTimestamp()) {
- toTrigger.add(triggers.getValue());
- it.remove();
- }
- }
-
- for (Set<Context> ctxs: toTrigger) {
- for (Context ctx: ctxs) {
- // double check the time. it can happen that the trigger registers a new timer,
- // in that case the entry is left in the watermarkTimers set for performance reasons.
- // We have to check here whether the entry in the set still reflects the
- // currently set timer in the Context.
- if (ctx.watermarkTimer <= mark.getTimestamp()) {
- TriggerResult triggerResult = ctx.onEventTime(ctx.watermarkTimer);
- processTriggerResult(triggerResult, ctx.window);
- }
- }
- }
-
- output.emitWatermark(mark);
-
- this.currentWatermark = mark.getTimestamp();
- }
-
- @Override
- public final void trigger(long time) throws Exception {
- List<Set<Context>> toTrigger = new ArrayList<>();
-
- Iterator<Map.Entry<Long, Set<Context>>> it = processingTimeTimers.entrySet().iterator();
-
- while (it.hasNext()) {
- Map.Entry<Long, Set<Context>> triggers = it.next();
- if (triggers.getKey() <= time) {
- toTrigger.add(triggers.getValue());
- it.remove();
- }
- }
-
- for (Set<Context> ctxs: toTrigger) {
- for (Context ctx: ctxs) {
- // double check the time. it can happen that the trigger registers a new timer,
- // in that case the entry is left in the processingTimeTimers set for
- // performance reasons. We have to check here whether the entry in the set still
- // reflects the currently set timer in the Context.
- if (ctx.processingTimeTimer <= time) {
- TriggerResult triggerResult = ctx.onProcessingTime(ctx.processingTimeTimer);
- processTriggerResult(triggerResult, ctx.window);
- }
- }
- }
- }
-
- /**
- * The {@code Context} is responsible for keeping track of the state of one pane.
- *
- * <p>
- * A pane is the bucket of elements that have the same key (assigned by the
- * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
- * be in multiple panes of it was assigned to multiple windows by the
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
- * have their own instance of the {@code Trigger}.
- */
- protected class Context implements TriggerContext {
- protected W window;
-
- protected WindowBuffer<IN, ACC> windowBuffer;
-
- protected HashMap<String, Serializable> state;
-
- // use these to only allow one timer in flight at a time of each type
- // if the trigger registers another timer this value here will be overwritten,
- // the timer is not removed from the set of in-flight timers to improve performance.
- // When a trigger fires it is just checked against the last timer that was set.
- protected long watermarkTimer;
- protected long processingTimeTimer;
-
- public Context(
- W window,
- WindowBuffer<IN, ACC> windowBuffer) {
- this.window = window;
- this.windowBuffer = windowBuffer;
- state = new HashMap<>();
-
- this.watermarkTimer = -1;
- this.processingTimeTimer = -1;
- }
-
- @Override
- public long getCurrentWatermark() {
- return currentWatermark;
- }
-
- @SuppressWarnings("unchecked")
- protected Context(DataInputView in, ClassLoader userClassloader) throws Exception {
- this.window = windowSerializer.deserialize(in);
- this.watermarkTimer = in.readLong();
- this.processingTimeTimer = in.readLong();
-
- int stateSize = in.readInt();
- byte[] stateData = new byte[stateSize];
- in.read(stateData);
- state = InstantiationUtil.deserializeObject(stateData, userClassloader);
-
- this.windowBuffer = windowBufferFactory.restoreFromSnapshot(in);
- }
-
- protected void writeToState(AbstractStateBackend.CheckpointStateOutputView out) throws IOException {
- windowSerializer.serialize(window, out);
- out.writeLong(watermarkTimer);
- out.writeLong(processingTimeTimer);
-
- byte[] serializedState = InstantiationUtil.serializeObject(state);
- out.writeInt(serializedState.length);
- out.write(serializedState, 0, serializedState.length);
-
- windowBuffer.snapshot(out);
- }
-
- @Override
- public <S extends Serializable> ValueState<S> getKeyValueState(String name,
- Class<S> stateType,
- S defaultState) {
- requireNonNull(stateType, "The state type class must not be null");
-
- TypeInformation<S> typeInfo;
- try {
- typeInfo = TypeExtractor.getForClass(stateType);
- }
- catch (Exception e) {
- throw new RuntimeException("Cannot analyze type '" + stateType.getName() +
- "' from the class alone, due to generic type parameters. " +
- "Please specify the TypeInformation directly.", e);
- }
-
- return getKeyValueState(name, typeInfo, defaultState);
- }
-
- @Override
- public <S extends Serializable> ValueState<S> getKeyValueState(String name,
- TypeInformation<S> stateType,
- S defaultState) {
-
- requireNonNull(name, "The name of the state must not be null");
- requireNonNull(stateType, "The state type information must not be null");
-
- ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(
- name, stateType.createSerializer(getExecutionConfig()), defaultState);
- return getPartitionedState(stateDesc);
- }
-
- @Override
- @SuppressWarnings("rawtypes, unchecked")
- public <S extends State> S getPartitionedState(final StateDescriptor<S, ?> stateDescriptor) {
- if (!(stateDescriptor instanceof ValueStateDescriptor)) {
- throw new UnsupportedOperationException("NonKeyedWindowOperator Triggers only " +
- "support ValueState.");
- }
- @SuppressWarnings("unchecked")
- final ValueStateDescriptor<?> valueStateDescriptor = (ValueStateDescriptor<?>) stateDescriptor;
- ValueState valueState = new ValueState() {
- @Override
- public Object value() throws IOException {
- Object value = state.get(stateDescriptor.getName());
- if (value == null) {
- value = valueStateDescriptor.getDefaultValue();
- state.put(stateDescriptor.getName(), (Serializable) value);
- }
- return value;
- }
-
- @Override
- public void update(Object value) throws IOException {
- if (!(value instanceof Serializable)) {
- throw new UnsupportedOperationException(
- "Value state of NonKeyedWindowOperator must be serializable.");
- }
- state.put(stateDescriptor.getName(), (Serializable) value);
- }
-
- @Override
- public void clear() {
- state.remove(stateDescriptor.getName());
- }
- };
- return (S) valueState;
- }
-
- @Override
- public void registerProcessingTimeTimer(long time) {
- if (this.processingTimeTimer == time) {
- // we already have set a trigger for that time
- return;
- }
- Set<Context> triggers = processingTimeTimers.get(time);
- if (triggers == null) {
- getRuntimeContext().registerTimer(time, NonKeyedWindowOperator.this);
- triggers = new HashSet<>();
- processingTimeTimers.put(time, triggers);
- }
- this.processingTimeTimer = time;
- triggers.add(this);
- }
-
- @Override
- public void registerEventTimeTimer(long time) {
- if (watermarkTimer == time) {
- // we already have set a trigger for that time
- return;
- }
- Set<Context> triggers = watermarkTimers.get(time);
- if (triggers == null) {
- triggers = new HashSet<>();
- watermarkTimers.put(time, triggers);
- }
- this.watermarkTimer = time;
- triggers.add(this);
- }
-
- @Override
- public void deleteProcessingTimeTimer(long time) {
- Set<Context> triggers = processingTimeTimers.get(time);
- if (triggers != null) {
- triggers.remove(this);
- }
- }
-
- @Override
- public void deleteEventTimeTimer(long time) {
- Set<Context> triggers = watermarkTimers.get(time);
- if (triggers != null) {
- triggers.remove(this);
- }
-
- }
-
- public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
- TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
- if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
- // fire now and don't wait for the next watermark update
- TriggerResult onEventTimeResult = onEventTime(watermarkTimer);
- return TriggerResult.merge(onElementResult, onEventTimeResult);
- } else {
- return onElementResult;
- }
- }
-
- public TriggerResult onProcessingTime(long time) throws Exception {
- if (time == processingTimeTimer) {
- processingTimeTimer = -1;
- return trigger.onProcessingTime(time, window, this);
- } else {
- return TriggerResult.CONTINUE;
- }
- }
-
- public TriggerResult onEventTime(long time) throws Exception {
- if (time == watermarkTimer) {
- watermarkTimer = -1;
- TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this);
-
- if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
- // fire now and don't wait for the next watermark update
- TriggerResult secondTriggerResult = onEventTime(watermarkTimer);
- return TriggerResult.merge(firstTriggerResult, secondTriggerResult);
- } else {
- return firstTriggerResult;
- }
-
- } else {
- return TriggerResult.CONTINUE;
- }
- }
-
- public void clear() throws Exception {
- trigger.clear(window, this);
- }
- }
-
- // ------------------------------------------------------------------------
- // Checkpointing
- // ------------------------------------------------------------------------
-
- @Override
- public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
- StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-
- // we write the panes with the key/value maps into the stream
- AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-
- int numWindows = windows.size();
- out.writeInt(numWindows);
- for (Context context: windows.values()) {
- context.writeToState(out);
- }
-
- taskState.setOperatorState(out.closeAndGetHandle());
- return taskState;
- }
-
- @Override
- public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
- super.restoreState(taskState, recoveryTimestamp);
-
- final ClassLoader userClassloader = getUserCodeClassloader();
- @SuppressWarnings("unchecked")
- StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
- DataInputView in = inputState.getState(userClassloader);
-
- int numWindows = in.readInt();
- this.windows = new HashMap<>(numWindows);
- this.processingTimeTimers = new HashMap<>();
- this.watermarkTimers = new HashMap<>();
-
- for (int j = 0; j < numWindows; j++) {
- Context context = new Context(in, userClassloader);
- windows.put(context.window, context);
- }
- }
-
-
- // ------------------------------------------------------------------------
- // Getters for testing
- // ------------------------------------------------------------------------
-
- @VisibleForTesting
- public Trigger<? super IN, ? super W> getTrigger() {
- return trigger;
- }
-
- @VisibleForTesting
- public WindowAssigner<? super IN, W> getWindowAssigner() {
- return windowAssigner;
- }
-
- @VisibleForTesting
- public WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, ACC>> getWindowBufferFactory() {
- return windowBufferFactory;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 9b7b347..ecad9b2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
@@ -77,10 +76,6 @@ import static java.util.Objects.requireNonNull;
* the given {@link InternalWindowFunction} is invoked to produce the results that are emitted for
* the pane to which the {@code Trigger} belongs.
*
- * <p>
- * This operator also needs a {@link WindowBufferFactory} to create a buffer for storing the
- * elements of each pane.
- *
* @param <K> The type of key returned by the {@code KeySelector}.
* @param <IN> The type of the incoming elements.
* @param <OUT> The type of elements emitted by the {@code InternalWindowFunction}.
http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
deleted file mode 100644
index 75f646d..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * A {@code WindowBuffer} that can also evict elements from the buffer. The order in which
- * the elements are added is preserved. Elements can only be evicted started from the beginning of
- * the buffer.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- * @param <O> The type of elements that this window buffer will return when asked for its contents.
- */
-@Internal
-public interface EvictingWindowBuffer<T, O> extends WindowBuffer<T, O> {
-
- /**
- * Removes the given number of elements, starting from the beginning.
- * @param count The number of elements to remove.
- */
- void removeElements(int count);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
deleted file mode 100644
index f6c2319..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Collections;
-
-/**
- * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a
- * {@link FoldFunction} to incrementally aggregate elements that are added to the buffer.
- *
- * @param <T> The type of elements that can be added to this {@code WindowBuffer}.
- * @param <ACC> The type of the accumulator that this {@code WindowBuffer} can store.
- */
-@Internal
-public class FoldingWindowBuffer<T, ACC> implements WindowBuffer<T, ACC> {
-
- private final FoldFunction<T, ACC> foldFunction;
- private final TypeSerializer<ACC> accSerializer;
- private StreamRecord<ACC> data;
-
- protected FoldingWindowBuffer(FoldFunction<T, ACC> foldFunction, ACC initialAccumulator, TypeSerializer<ACC> accSerializer) {
- this.foldFunction = foldFunction;
- this.accSerializer = accSerializer;
- this.data = new StreamRecord<>(initialAccumulator);
- }
-
- protected FoldingWindowBuffer(FoldFunction<T, ACC> foldFunction, StreamRecord<ACC> initialAccumulator, TypeSerializer<ACC> accSerializer) {
- this.foldFunction = foldFunction;
- this.accSerializer = accSerializer;
- this.data = initialAccumulator;
- }
-
- @Override
- public void storeElement(StreamRecord<T> element) throws Exception {
- data.replace(foldFunction.fold(data.getValue(), element.getValue()));
- }
-
- @Override
- public Iterable<StreamRecord<ACC>> getElements() {
- return Collections.singleton(data);
- }
-
- @Override
- public Iterable<ACC> getUnpackedElements() {
- return Collections.singleton(data.getValue());
- }
-
- @Override
- public int size() {
- return 1;
- }
-
- @Override
- public void snapshot(DataOutputView out) throws IOException {
- MultiplexingStreamRecordSerializer<ACC> recordSerializer = new MultiplexingStreamRecordSerializer<>(accSerializer);
- recordSerializer.serialize(data, out);
- }
-
- public static class Factory<T, ACC> implements WindowBufferFactory<T, ACC, FoldingWindowBuffer<T, ACC>> {
- private static final long serialVersionUID = 1L;
-
- private final FoldFunction<T, ACC> foldFunction;
-
- private final TypeSerializer<ACC> accSerializer;
-
- private transient ACC initialAccumulator;
-
- public Factory(FoldFunction<T, ACC> foldFunction, ACC initialValue, TypeSerializer<ACC> accSerializer) {
- this.foldFunction = foldFunction;
- this.accSerializer = accSerializer;
- this.initialAccumulator = initialValue;
- }
-
- @Override
- public FoldingWindowBuffer<T, ACC> create() {
- return new FoldingWindowBuffer<>(foldFunction, accSerializer.copy(initialAccumulator), accSerializer);
- }
-
- @Override
- public FoldingWindowBuffer<T, ACC> restoreFromSnapshot(DataInputView in) throws IOException {
- MultiplexingStreamRecordSerializer<ACC> recordSerializer = new MultiplexingStreamRecordSerializer<>(accSerializer);
- StreamElement element = recordSerializer.deserialize(in);
- return new FoldingWindowBuffer<>(foldFunction, element.<ACC>asRecord(), accSerializer);
- }
-
- private void writeObject(final ObjectOutputStream out) throws IOException {
- // write all the non-transient fields
- out.defaultWriteObject();
-
-
- byte[] serializedDefaultValue;
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos))
- {
- accSerializer.serialize(initialAccumulator, outView);
-
- outView.flush();
- serializedDefaultValue = baos.toByteArray();
- }
- catch (Exception e) {
- throw new IOException("Unable to serialize initial accumulator of type " +
- initialAccumulator.getClass().getSimpleName() + ".", e);
- }
-
- out.writeInt(serializedDefaultValue.length);
- out.write(serializedDefaultValue);
- }
-
- private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
- // read the non-transient fields
- in.defaultReadObject();
-
- // read the default value field
- int size = in.readInt();
- byte[] buffer = new byte[size];
- int bytesRead = in.read(buffer);
-
- if (bytesRead != size) {
- throw new RuntimeException("Read size does not match expected size.");
- }
-
- try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
- DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais))
- {
- initialAccumulator = accSerializer.deserialize(inView);
- }
- catch (Exception e) {
- throw new IOException("Unable to deserialize initial accumulator.", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java
deleted file mode 100644
index 5b9dd3c..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-
-/**
- * An {@link EvictingWindowBuffer} that stores elements on the Java Heap.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-@Internal
-public class ListWindowBuffer<T> implements EvictingWindowBuffer<T, T> {
-
- private final TypeSerializer<T> serializer;
-
- private ArrayDeque<StreamRecord<T>> elements;
-
- protected ListWindowBuffer(TypeSerializer<T> serializer) {
- this.serializer = serializer;
- this.elements = new ArrayDeque<>();
- }
-
- protected ListWindowBuffer(ArrayDeque<StreamRecord<T>> elements, TypeSerializer<T> serializer) {
- this.serializer = serializer;
- this.elements = elements;
- }
-
- @Override
- public void storeElement(StreamRecord<T> element) {
- elements.add(element);
- }
-
- @Override
- public void removeElements(int count) {
- // TODO determine if this can be done in a better way
- for (int i = 0; i < count; i++) {
- elements.removeFirst();
- }
- }
-
- @Override
- public Iterable<StreamRecord<T>> getElements() {
- return elements;
- }
-
- @Override
- public Iterable<T> getUnpackedElements() {
- return FluentIterable.from(elements).transform(new Function<StreamRecord<T>, T>() {
- @Override
- public T apply(StreamRecord<T> record) {
- return record.getValue();
- }
- });
- }
-
- @Override
- public int size() {
- return elements.size();
- }
-
- @Override
- public void snapshot(DataOutputView out) throws IOException {
- out.writeInt(elements.size());
-
- MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer);
-
- for (StreamRecord<T> e: elements) {
- recordSerializer.serialize(e, out);
- }
- }
-
- public static class Factory<T> implements WindowBufferFactory<T, T, ListWindowBuffer<T>> {
- private static final long serialVersionUID = 1L;
-
- private final TypeSerializer<T> serializer;
-
- public Factory(TypeSerializer<T> serializer) {
- this.serializer = serializer;
- }
-
- @Override
- public ListWindowBuffer<T> create() {
- return new ListWindowBuffer<>(serializer);
- }
-
- @Override
- public ListWindowBuffer<T> restoreFromSnapshot(DataInputView in) throws IOException {
- int size = in.readInt();
-
- MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer);
-
- ArrayDeque<StreamRecord<T>> elements = new ArrayDeque<>();
-
- for (int i = 0; i < size; i++) {
- elements.add(recordSerializer.deserialize(in).<T>asRecord());
- }
-
- return new ListWindowBuffer<>(elements, serializer);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
deleted file mode 100644
index d3bf4b4..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-import java.util.Collections;
-
-/**
- * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a
- * {@link ReduceFunction} to incrementally aggregate elements that are added to the buffer.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-@Internal
-public class ReducingWindowBuffer<T> implements WindowBuffer<T, T> {
-
- private final ReduceFunction<T> reduceFunction;
- private final TypeSerializer<T> serializer;
- private StreamRecord<T> data;
-
- protected ReducingWindowBuffer(ReduceFunction<T> reduceFunction, TypeSerializer<T> serializer) {
- this.reduceFunction = reduceFunction;
- this.serializer = serializer;
- this.data = null;
- }
-
- protected ReducingWindowBuffer(ReduceFunction<T> reduceFunction, StreamRecord<T> data, TypeSerializer<T> serializer) {
- this.reduceFunction = reduceFunction;
- this.serializer = serializer;
- this.data = data;
- }
-
- @Override
- public void storeElement(StreamRecord<T> element) throws Exception {
- if (data == null) {
- data = element.copy(element.getValue());
- } else {
- data.replace(reduceFunction.reduce(data.getValue(), element.getValue()));
- }
- }
-
- @Override
- public Iterable<StreamRecord<T>> getElements() {
- return Collections.singleton(data);
- }
-
- @Override
- public Iterable<T> getUnpackedElements() {
- return Collections.singleton(data.getValue());
- }
-
- @Override
- public int size() {
- return 1;
- }
-
- @Override
- public void snapshot(DataOutputView out) throws IOException {
- if (data != null) {
- out.writeBoolean(true);
- MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer);
- recordSerializer.serialize(data, out);
- } else {
- out.writeBoolean(false);
- }
- }
-
- public static class Factory<T> implements WindowBufferFactory<T, T, ReducingWindowBuffer<T>> {
- private static final long serialVersionUID = 1L;
-
- private final ReduceFunction<T> reduceFunction;
-
- private final TypeSerializer<T> serializer;
-
- public Factory(ReduceFunction<T> reduceFunction, TypeSerializer<T> serializer) {
- this.reduceFunction = reduceFunction;
- this.serializer = serializer;
- }
-
- @Override
- public ReducingWindowBuffer<T> create() {
- return new ReducingWindowBuffer<>(reduceFunction, serializer);
- }
-
- @Override
- public ReducingWindowBuffer<T> restoreFromSnapshot(DataInputView in) throws IOException {
- boolean hasValue = in.readBoolean();
- if (hasValue) {
- MultiplexingStreamRecordSerializer<T> recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer);
- StreamElement element = recordSerializer.deserialize(in);
- return new ReducingWindowBuffer<>(reduceFunction, element.<T>asRecord(), serializer);
- } else {
- return new ReducingWindowBuffer<>(reduceFunction, serializer);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
deleted file mode 100644
index 16be0f3..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-
-/**
- * A {@code WindowBuffer} is used by
- * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} to store
- * the elements of one pane.
- *
- * <p>
- * A pane is the bucket of elements that have the same key (assigned by the
- * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
- * be in multiple panes of it was assigned to multiple windows by the
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
- * have their own instance of the {@code Evictor}.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- * @param <O> The type of elements that this window buffer will return when asked for its contents.
- */
-@Internal
-public interface WindowBuffer<T, O> {
-
- /**
- * Adds the element to the buffer.
- *
- * @param element The element to add.
- */
- void storeElement(StreamRecord<T> element) throws Exception;
-
- /**
- * Returns all elements that are currently in the buffer.
- */
- Iterable<StreamRecord<O>> getElements();
-
- /**
- * Returns all elements that are currently in the buffer. This will unwrap the contained
- * elements from their {@link StreamRecord}.
- */
- Iterable<O> getUnpackedElements();
-
- /**
- * Returns the number of elements that are currently in the buffer.
- */
- int size();
-
- /**
- * Writes the contents of the window buffer to a {@link DataOutputView} for checkpointing.
- */
- void snapshot(DataOutputView out) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
deleted file mode 100644
index 1ca6350..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * A factory for {@link WindowBuffer WindowBuffers}.
- *
- * @param <T> The type of elements that the created {@code WindowBuffer} can store.
- * @param <O> The type of elements that the created buffer will return when asked for its contents.
- * @param <B> The type of the created {@code WindowBuffer}
- */
-@Internal
-public interface WindowBufferFactory<T, O, B extends WindowBuffer<T, O>> extends Serializable {
-
- /**
- * Creates a new {@code WindowBuffer}.
- */
- B create();
-
- /**
- * Restores a {@code WindowBuffer} from a previous snapshot written using
- * {@link WindowBuffer#snapshot(DataOutputView)}.
- */
- B restoreFromSnapshot(DataInputView in) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
new file mode 100644
index 0000000..3a4be91
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Internal window function for wrapping an {@link AllWindowFunction} that takes an {@code Iterable}
+ * when the window state also is an {@code Iterable}.
+ */
+public final class InternalIterableAllWindowFunction<IN, OUT, W extends Window>
+ extends InternalWindowFunction<Iterable<IN>, OUT, Byte, W>
+ implements RichFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ protected final AllWindowFunction<IN, OUT, W> wrappedFunction;
+
+ public InternalIterableAllWindowFunction(AllWindowFunction<IN, OUT, W> wrappedFunction) {
+ this.wrappedFunction = wrappedFunction;
+ }
+
+ @Override
+ public void apply(Byte key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ wrappedFunction.apply(window, input, out);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ FunctionUtils.openFunction(this.wrappedFunction, parameters);
+ }
+
+ @Override
+ public void close() throws Exception {
+ FunctionUtils.closeFunction(this.wrappedFunction);
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext t) {
+ FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+
+ @Override
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
index 7b441fb..822a57c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
@@ -30,7 +30,10 @@ import org.apache.flink.util.Collector;
* Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable}
* when the window state also is an {@code Iterable}.
*/
-public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window> extends InternalWindowFunction<Iterable<IN>, OUT, KEY, W> implements RichFunction {
+public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window>
+ extends InternalWindowFunction<Iterable<IN>, OUT, KEY, W>
+ implements RichFunction {
+
private static final long serialVersionUID = 1L;
protected final WindowFunction<IN, OUT, KEY, W> wrappedFunction;
http://git-wip-us.apache.org/repos/asf/flink/blob/505512db/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
new file mode 100644
index 0000000..aa6e196
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping an {@link AllWindowFunction} that takes an {@code Iterable}
+ * when the window state is a single value.
+ */
+public final class InternalSingleValueAllWindowFunction<IN, OUT, W extends Window>
+ extends InternalWindowFunction<IN, OUT, Byte, W>
+ implements RichFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ protected AllWindowFunction<IN, OUT, W> wrappedFunction;
+
+ public InternalSingleValueAllWindowFunction(AllWindowFunction<IN, OUT, W> wrappedFunction) {
+ this.wrappedFunction = wrappedFunction;
+ }
+
+ @Override
+ public void apply(Byte key, W window, IN input, Collector<OUT> out) throws Exception {
+ wrappedFunction.apply(window, Collections.singletonList(input), out);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ FunctionUtils.openFunction(this.wrappedFunction, parameters);
+ }
+
+ @Override
+ public void close() throws Exception {
+ FunctionUtils.closeFunction(this.wrappedFunction);
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext t) {
+ FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+
+ @Override
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+
+ }
+}