You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:23 UTC
[08/15] flink git commit: [FLINK-5295] Migrate the
AlignedWindowOperators to the WindowOperator.
[FLINK-5295] Migrate the AlignedWindowOperators to the WindowOperator.
This adds code that lets WindowOperator restore from the Flink 1.1
fast aligned processing-time windows operator.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0819dc7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0819dc7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0819dc7
Branch: refs/heads/master
Commit: e0819dc7b72487670dd3ba06628980e27fdbedb0
Parents: b0e2a2c
Author: kl0u <kk...@gmail.com>
Authored: Thu Dec 15 17:59:13 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100
----------------------------------------------------------------------
.../datastream/LegacyWindowOperatorType.java | 63 +++++
.../api/datastream/WindowedStream.java | 155 +++++------
...ractAlignedProcessingTimeWindowOperator.java | 1 +
...ccumulatingProcessingTimeWindowOperator.java | 109 +-------
...AggregatingProcessingTimeWindowOperator.java | 1 +
.../operators/windowing/WindowOperator.java | 259 +++++++++++++++++--
.../windowing/TimeWindowTranslationTest.java | 1 +
.../windowing/WindowingTestHarnessTest.java | 3 +-
.../streaming/util/WindowingTestHarness.java | 14 +-
.../api/scala/WindowTranslationTest.scala | 10 +-
10 files changed, 380 insertions(+), 236 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
new file mode 100644
index 0000000..bb6e4bc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+/**
+ * For specifying what type of window operator was used to create the state
+ * that a {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}
+ * is restoring from. This is used to signal that state written using an aligned processing-time
+ * window operator should be restored.
+ */
+public enum LegacyWindowOperatorType {
+
+ FAST_ACCUMULATING(true, false),
+
+ FAST_AGGREGATING(false, true),
+
+ NONE(false, false);
+
+ // ------------------------------------------------------------------------
+
+ private final boolean fastAccumulating;
+ private final boolean fastAggregating;
+
+ LegacyWindowOperatorType(boolean fastAccumulating, boolean fastAggregating) {
+ this.fastAccumulating = fastAccumulating;
+ this.fastAggregating = fastAggregating;
+ }
+
+ public boolean isFastAccumulating() {
+ return fastAccumulating;
+ }
+
+ public boolean isFastAggregating() {
+ return fastAggregating;
+ }
+
+ @Override
+ public String toString() {
+ if (fastAccumulating) {
+ return "AccumulatingProcessingTimeWindowOperator";
+ } else if (fastAggregating) {
+ return "AggregatingProcessingTimeWindowOperator";
+ } else {
+ return "WindowOperator";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index ad7f371..98bf89a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -49,10 +49,7 @@ import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
@@ -190,16 +187,44 @@ public class WindowedStream<T, K, W extends Window> {
//clean the closure
function = input.getExecutionEnvironment().clean(function);
+ LegacyWindowOperatorType legacyOpType = getLegacyWindowType(function);
+ return reduce(function, new PassThroughWindowFunction<K, W, T>(), legacyOpType);
+ }
- String callLocation = Utils.getCallLocationName();
- String udfName = "WindowedStream." + callLocation;
-
- SingleOutputStreamOperator<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
- if (result != null) {
- return result;
- }
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Arriving data is incrementally aggregated using the given reducer.
+ *
+ * @param reduceFunction The reduce function that is used for incremental aggregation.
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
+ return reduce(reduceFunction, function, LegacyWindowOperatorType.NONE);
+ }
- return reduce(function, new PassThroughWindowFunction<K, W, T>());
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Arriving data is incrementally aggregated using the given reducer.
+ *
+ * @param reduceFunction The reduce function that is used for incremental aggregation.
+ * @param function The window function.
+ * @param resultType Type information for the result type of the window function
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ public <R> SingleOutputStreamOperator<R> reduce(
+ ReduceFunction<T> reduceFunction,
+ WindowFunction<T, R, K, W> function,
+ TypeInformation<R> resultType) {
+ return reduce(reduceFunction, function, resultType, LegacyWindowOperatorType.NONE);
}
/**
@@ -212,14 +237,20 @@ public class WindowedStream<T, K, W extends Window> {
*
* @param reduceFunction The reduce function that is used for incremental aggregation.
* @param function The window function.
+ * @param legacyWindowOpType When migrating from an older Flink version, this flag indicates
+ * the type of the previous operator whose state we inherit.
* @return The data stream that is the result of applying the window function to the window.
*/
- public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
+ private <R> SingleOutputStreamOperator<R> reduce(
+ ReduceFunction<T> reduceFunction,
+ WindowFunction<T, R, K, W> function,
+ LegacyWindowOperatorType legacyWindowOpType) {
+
TypeInformation<T> inType = input.getType();
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, WindowFunction.class, true, true, inType, null, false);
- return reduce(reduceFunction, function, resultType);
+ return reduce(reduceFunction, function, resultType, legacyWindowOpType);
}
/**
@@ -232,10 +263,17 @@ public class WindowedStream<T, K, W extends Window> {
*
* @param reduceFunction The reduce function that is used for incremental aggregation.
* @param function The window function.
- * @param resultType Type information for the result type of the window function
+ * @param resultType Type information for the result type of the window function.
+ * @param legacyWindowOpType When migrating from an older Flink version, this flag indicates
+ * the type of the previous operator whose state we inherit.
* @return The data stream that is the result of applying the window function to the window.
*/
- public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
+ private <R> SingleOutputStreamOperator<R> reduce(
+ ReduceFunction<T> reduceFunction,
+ WindowFunction<T, R, K, W> function,
+ TypeInformation<R> resultType,
+ LegacyWindowOperatorType legacyWindowOpType) {
+
if (reduceFunction instanceof RichFunction) {
throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
}
@@ -288,7 +326,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueWindowFunction<>(function),
trigger,
- allowedLateness);
+ allowedLateness,
+ legacyWindowOpType);
}
return input.transform(opName, resultType, operator);
@@ -458,7 +497,7 @@ public class WindowedStream<T, K, W extends Window> {
* interpreted as a regular non-windowed stream.
*
* <p>
- * Not that this function requires that all data in the windows is buffered until the window
+ * Note that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
@@ -473,12 +512,7 @@ public class WindowedStream<T, K, W extends Window> {
String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;
- SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
- if (result != null) {
- return result;
- }
-
-
+ LegacyWindowOperatorType legacyWindowOpType = getLegacyWindowType(function);
String opName;
KeySelector<T, K> keySel = input.getKeySelector();
@@ -519,7 +553,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalIterableWindowFunction<>(function),
trigger,
- allowedLateness);
+ allowedLateness,
+ legacyWindowOpType);
}
return input.transform(opName, resultType, operator);
@@ -925,77 +960,21 @@ public class WindowedStream<T, K, W extends Window> {
// Utilities
// ------------------------------------------------------------------------
- private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
- Function function,
- TypeInformation<R> resultType,
- String functionName) {
-
+ private LegacyWindowOperatorType getLegacyWindowType(Function function) {
if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
- SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
- final long windowLength = timeWindows.getSize();
- final long windowSlide = timeWindows.getSlide();
-
- String opName = "Fast " + timeWindows + " of " + functionName;
-
if (function instanceof ReduceFunction) {
- @SuppressWarnings("unchecked")
- ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
- @SuppressWarnings("unchecked")
- OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
- new AggregatingProcessingTimeWindowOperator<>(
- reducer, input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
- else if (function instanceof WindowFunction) {
- @SuppressWarnings("unchecked")
- WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
- OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
- wf, input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
+ return LegacyWindowOperatorType.FAST_AGGREGATING;
+ } else if (function instanceof WindowFunction) {
+ return LegacyWindowOperatorType.FAST_ACCUMULATING;
}
} else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
- TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
- final long windowLength = timeWindows.getSize();
- final long windowSlide = timeWindows.getSize();
-
- String opName = "Fast " + timeWindows + " of " + functionName;
-
if (function instanceof ReduceFunction) {
- @SuppressWarnings("unchecked")
- ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
- @SuppressWarnings("unchecked")
- OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
- new AggregatingProcessingTimeWindowOperator<>(
- reducer,
- input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
- else if (function instanceof WindowFunction) {
- @SuppressWarnings("unchecked")
- WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
- OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
- wf, input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
+ return LegacyWindowOperatorType.FAST_AGGREGATING;
+ } else if (function instanceof WindowFunction) {
+ return LegacyWindowOperatorType.FAST_ACCUMULATING;
}
}
-
- return null;
+ return LegacyWindowOperatorType.NONE;
}
public StreamExecutionEnvironment getExecutionEnvironment() {
http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 24fd0de..14500ee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import static java.util.Objects.requireNonNull;
@Internal
+@Deprecated
public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE, F extends Function>
extends AbstractUdfStreamOperator<OUT, F>
implements OneInputStreamOperator<IN, OUT>, ProcessingTimeCallback {
http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 9ea2949..90e4b52 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -22,16 +22,15 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
-import java.io.IOException;
import java.util.ArrayList;
@Internal
+@Deprecated
public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
@@ -57,108 +56,4 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
}
-
- // ------------------------------------------------------------------------
- // Utility Serializer for Lists of Elements
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("ForLoopReplaceableByForEach")
- private static final class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
-
- private static final long serialVersionUID = 1119562170939152304L;
-
- private final TypeSerializer<T> elementSerializer;
-
- ArrayListSerializer(TypeSerializer<T> elementSerializer) {
- this.elementSerializer = elementSerializer;
- }
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public TypeSerializer<ArrayList<T>> duplicate() {
- TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
- return duplicateElement == elementSerializer ? this : new ArrayListSerializer<T>(duplicateElement);
- }
-
- @Override
- public ArrayList<T> createInstance() {
- return new ArrayList<>();
- }
-
- @Override
- public ArrayList<T> copy(ArrayList<T> from) {
- ArrayList<T> newList = new ArrayList<>(from.size());
- for (int i = 0; i < from.size(); i++) {
- newList.add(elementSerializer.copy(from.get(i)));
- }
- return newList;
- }
-
- @Override
- public ArrayList<T> copy(ArrayList<T> from, ArrayList<T> reuse) {
- return copy(from);
- }
-
- @Override
- public int getLength() {
- return -1; // var length
- }
-
- @Override
- public void serialize(ArrayList<T> list, DataOutputView target) throws IOException {
- final int size = list.size();
- target.writeInt(size);
- for (int i = 0; i < size; i++) {
- elementSerializer.serialize(list.get(i), target);
- }
- }
-
- @Override
- public ArrayList<T> deserialize(DataInputView source) throws IOException {
- final int size = source.readInt();
- final ArrayList<T> list = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- list.add(elementSerializer.deserialize(source));
- }
- return list;
- }
-
- @Override
- public ArrayList<T> deserialize(ArrayList<T> reuse, DataInputView source) throws IOException {
- return deserialize(source);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- // copy number of elements
- final int num = source.readInt();
- target.writeInt(num);
- for (int i = 0; i < num; i++) {
- elementSerializer.copy(source, target);
- }
- }
-
- // --------------------------------------------------------------------
-
- @Override
- public boolean equals(Object obj) {
- return obj == this ||
- (obj != null && obj.getClass() == getClass() &&
- elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return true;
- }
-
- @Override
- public int hashCode() {
- return elementSerializer.hashCode();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
index 9d85cf0..2175fb5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
@Internal
+@Deprecated
public class AggregatingProcessingTimeWindowOperator<KEY, IN>
extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, IN, ReduceFunction<IN>> {
http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 1cfeba8..990162e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.operators.windowing;
+import org.apache.commons.math3.util.ArithmeticUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.AppendingState;
@@ -35,10 +36,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -47,16 +51,21 @@ import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
import java.util.PriorityQueue;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -148,11 +157,29 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// State restored in case of migration from an older version (backwards compatibility)
// ------------------------------------------------------------------------
- /** The restored processing time timers. */
- protected transient PriorityQueue<Timer<K, W>> restoredFromLegacyProcessingTimeTimers;
+ /**
+ * A flag indicating if we are migrating from a regular {@link WindowOperator}
+ * or one of the deprecated {@link AccumulatingProcessingTimeWindowOperator} and
+ * {@link AggregatingProcessingTimeWindowOperator}.
+ */
+ private final LegacyWindowOperatorType legacyWindowOperatorType;
+
+ /**
+ * The elements restored when migrating from an older, deprecated
+ * {@link AccumulatingProcessingTimeWindowOperator} or
+ * {@link AggregatingProcessingTimeWindowOperator}. */
+ private transient PriorityQueue<StreamRecord<IN>> restoredFromLegacyAlignedOpRecords;
+
+ /**
+ * The restored processing time timers when migrating from an
+ * older version of the {@link WindowOperator}.
+ */
+ private transient PriorityQueue<Timer<K, W>> restoredFromLegacyProcessingTimeTimers;
- /** The restored event time timers. */
- protected transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers;
+ /** The restored event time timer when migrating from an
+ * older version of the {@link WindowOperator}.
+ */
+ private transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers;
/**
* Creates a new {@code WindowOperator} based on the given policies and user functions.
@@ -167,6 +194,24 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
Trigger<? super IN, ? super W> trigger,
long allowedLateness) {
+ this(windowAssigner, windowSerializer, keySelector, keySerializer,
+ windowStateDescriptor, windowFunction, trigger, allowedLateness, LegacyWindowOperatorType.NONE);
+ }
+
+ /**
+ * Creates a new {@code WindowOperator} based on the given policies and user functions.
+ */
+ public WindowOperator(
+ WindowAssigner<? super IN, W> windowAssigner,
+ TypeSerializer<W> windowSerializer,
+ KeySelector<IN, K> keySelector,
+ TypeSerializer<K> keySerializer,
+ StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
+ InternalWindowFunction<ACC, OUT, K, W> windowFunction,
+ Trigger<? super IN, ? super W> trigger,
+ long allowedLateness,
+ LegacyWindowOperatorType legacyWindowOperatorType) {
+
super(windowFunction);
checkArgument(allowedLateness >= 0);
@@ -181,6 +226,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
this.windowStateDescriptor = windowStateDescriptor;
this.trigger = checkNotNull(trigger);
this.allowedLateness = allowedLateness;
+ this.legacyWindowOperatorType = legacyWindowOperatorType;
if (windowAssigner instanceof MergingWindowAssigner) {
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -211,26 +257,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
};
- // if we restore from an older version,
- // we have to re-register the timers.
-
- if (restoredFromLegacyEventTimeTimers != null) {
- for (Timer<K, W> timer : restoredFromLegacyEventTimeTimers) {
- setCurrentKey(timer.key);
- internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp);
- }
- }
-
- if (restoredFromLegacyProcessingTimeTimers != null) {
- for (Timer<K, W> timer : restoredFromLegacyProcessingTimeTimers) {
- setCurrentKey(timer.key);
- internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp);
- }
- }
-
- // gc friendliness
- this.restoredFromLegacyEventTimeTimers = null;
- this.restoredFromLegacyProcessingTimeTimers = null;
+ registerRestoredLegacyStateState();
}
@Override
@@ -745,17 +772,157 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// Restoring / Migrating from an older Flink version.
// ------------------------------------------------------------------------
+ private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42;
+
+ private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5;
+
@Override
public void restoreState(FSDataInputStream in) throws Exception {
super.restoreState(in);
- LOG.info("{} (taskIdx={}) restoring timers from an older Flink version.",
- getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+ LOG.info("{} (taskIdx={}) restoring {} state from an older Flink version.",
+ getClass().getSimpleName(), legacyWindowOperatorType, getRuntimeContext().getIndexOfThisSubtask());
+
+ DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in);
- restoreTimers(new DataInputViewStreamWrapper(in));
+ switch (legacyWindowOperatorType) {
+ case NONE:
+ restoreFromLegacyWindowOperator(streamWrapper);
+ break;
+ case FAST_ACCUMULATING:
+ case FAST_AGGREGATING:
+ restoreFromLegacyAlignedWindowOperator(streamWrapper);
+ break;
+ }
}
- private void restoreTimers(DataInputViewStreamWrapper in) throws IOException {
+ public void registerRestoredLegacyStateState() throws Exception {
+
+ LOG.info("{} (taskIdx={}) re-registering state from an older Flink version.",
+ getClass().getSimpleName(), legacyWindowOperatorType, getRuntimeContext().getIndexOfThisSubtask());
+
+ switch (legacyWindowOperatorType) {
+ case NONE:
+ reregisterStateFromLegacyWindowOperator();
+ break;
+ case FAST_ACCUMULATING:
+ case FAST_AGGREGATING:
+ reregisterStateFromLegacyAlignedWindowOperator();
+ break;
+ }
+ }
+
+ private void restoreFromLegacyAlignedWindowOperator(DataInputViewStreamWrapper in) throws IOException {
+ Preconditions.checkArgument(legacyWindowOperatorType != LegacyWindowOperatorType.NONE);
+
+ final long nextEvaluationTime = in.readLong();
+ final long nextSlideTime = in.readLong();
+
+ validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, in.readInt());
+
+ restoredFromLegacyAlignedOpRecords = new PriorityQueue<>(42,
+ new Comparator<StreamRecord<IN>>() {
+ @Override
+ public int compare(StreamRecord<IN> o1, StreamRecord<IN> o2) {
+ return Long.compare(o1.getTimestamp(), o2.getTimestamp());
+ }
+ }
+ );
+
+ switch (legacyWindowOperatorType) {
+ case FAST_ACCUMULATING:
+ restoreElementsFromLegacyAccumulatingAlignedWindowOperator(in, nextSlideTime);
+ break;
+ case FAST_AGGREGATING:
+ restoreElementsFromLegacyAggregatingAlignedWindowOperator(in, nextSlideTime);
+ break;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} (taskIdx={}) restored {} events from legacy {}.",
+ getClass().getSimpleName(),
+ getRuntimeContext().getIndexOfThisSubtask(),
+ restoredFromLegacyAlignedOpRecords.size(),
+ legacyWindowOperatorType);
+ }
+ }
+
+ private void restoreElementsFromLegacyAccumulatingAlignedWindowOperator(DataInputView in, long nextSlideTime) throws IOException {
+ int numPanes = in.readInt();
+ final long paneSize = getPaneSize();
+ long nextElementTimestamp = nextSlideTime - (numPanes * paneSize);
+
+ @SuppressWarnings("unchecked")
+ ArrayListSerializer<IN> ser = new ArrayListSerializer<>((TypeSerializer<IN>) getStateDescriptor().getSerializer());
+
+ while (numPanes > 0) {
+ validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, in.readInt());
+
+ nextElementTimestamp += paneSize - 1; // the -1 is so that the elements fall into the correct time-frame
+
+ final int numElementsInPane = in.readInt();
+ for (int i = numElementsInPane - 1; i >= 0; i--) {
+ K key = keySerializer.deserialize(in);
+
+ @SuppressWarnings("unchecked")
+ List<IN> valueList = ser.deserialize(in);
+ for (IN record: valueList) {
+ restoredFromLegacyAlignedOpRecords.add(new StreamRecord<>(record, nextElementTimestamp));
+ }
+ }
+ numPanes--;
+ }
+ }
+
+ private void restoreElementsFromLegacyAggregatingAlignedWindowOperator(DataInputView in, long nextSlideTime) throws IOException {
+ int numPanes = in.readInt();
+ final long paneSize = getPaneSize();
+ long nextElementTimestamp = nextSlideTime - (numPanes * paneSize);
+
+ while (numPanes > 0) {
+ validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, in.readInt());
+
+ nextElementTimestamp += paneSize - 1; // the -1 is so that the elements fall into the correct time-frame
+
+ final int numElementsInPane = in.readInt();
+ for (int i = numElementsInPane - 1; i >= 0; i--) {
+ K key = keySerializer.deserialize(in);
+
+ @SuppressWarnings("unchecked")
+ IN value = (IN) getStateDescriptor().getSerializer().deserialize(in);
+ restoredFromLegacyAlignedOpRecords.add(new StreamRecord<>(value, nextElementTimestamp));
+ }
+ numPanes--;
+ }
+ }
+
+ private long getPaneSize() {
+ Preconditions.checkArgument(
+ legacyWindowOperatorType == LegacyWindowOperatorType.FAST_ACCUMULATING ||
+ legacyWindowOperatorType == LegacyWindowOperatorType.FAST_AGGREGATING);
+
+ final long paneSlide;
+ if (windowAssigner instanceof SlidingProcessingTimeWindows) {
+ SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
+ paneSlide = ArithmeticUtils.gcd(timeWindows.getSize(), timeWindows.getSlide());
+ } else {
+ TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
+ paneSlide = timeWindows.getSize(); // this is valid as windowLength == windowSlide == timeWindows.getSize
+ }
+ return paneSlide;
+ }
+
+ private static void validateMagicNumber(int expected, int found) throws IOException {
+ if (expected != found) {
+ throw new IOException("Corrupt state stream - wrong magic number. " +
+ "Expected '" + Integer.toHexString(expected) +
+ "', found '" + Integer.toHexString(found) + '\'');
+ }
+ }
+
+ private void restoreFromLegacyWindowOperator(DataInputViewStreamWrapper in) throws IOException {
+ Preconditions.checkArgument(legacyWindowOperatorType == LegacyWindowOperatorType.NONE);
+
int numWatermarkTimers = in.readInt();
this.restoredFromLegacyEventTimeTimers = new PriorityQueue<>(Math.max(numWatermarkTimers, 1));
@@ -806,6 +973,42 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
}
+ public void reregisterStateFromLegacyWindowOperator() {
+ // if we restore from an older version,
+ // we have to re-register the recovered state.
+
+ if (restoredFromLegacyEventTimeTimers != null) {
+ for (Timer<K, W> timer : restoredFromLegacyEventTimeTimers) {
+ setCurrentKey(timer.key);
+ internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp);
+ }
+ }
+
+ if (restoredFromLegacyProcessingTimeTimers != null) {
+ for (Timer<K, W> timer : restoredFromLegacyProcessingTimeTimers) {
+ setCurrentKey(timer.key);
+ internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp);
+ }
+ }
+
+ // gc friendliness
+ restoredFromLegacyEventTimeTimers = null;
+ restoredFromLegacyProcessingTimeTimers = null;
+ }
+
+ public void reregisterStateFromLegacyAlignedWindowOperator() throws Exception {
+ if (restoredFromLegacyAlignedOpRecords != null) {
+ while (!restoredFromLegacyAlignedOpRecords.isEmpty()) {
+ StreamRecord<IN> record = restoredFromLegacyAlignedOpRecords.poll();
+ setCurrentKey(keySelector.getKey(record.getValue()));
+ processElement(record);
+ }
+
+ // gc friendliness
+ restoredFromLegacyAlignedOpRecords = null;
+ }
+ }
+
// ------------------------------------------------------------------------
// Getters for testing
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index c1ad0fc..5aa8151 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -55,6 +55,7 @@ public class TimeWindowTranslationTest {
* conditions are right.
*/
@Test
+ @Ignore
public void testFastTimeWindows() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
index 8e33c92..82c3d71 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.WindowingTestHarness;
import org.junit.Test;
@@ -171,7 +172,7 @@ public class WindowingTestHarnessTest {
testHarness.compareActualToExpectedOutput("Output was not correct.");
// do a snapshot, close and restore again
- StreamStateHandle snapshot = testHarness.snapshot(0L, 0L);
+ OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
testHarness.close();
testHarness.restore(snapshot);
http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index 25deb54..efb0d7e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -31,6 +30,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
@@ -159,20 +159,20 @@ public class WindowingTestHarness<K, IN, W extends Window> {
/**
* Takes a snapshot of the current state of the operator. This can be used to test fault-tolerance.
*/
- public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
- return testHarness.snapshotLegacy(checkpointId, timestamp);
+ public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
+ return testHarness.snapshot(checkpointId, timestamp);
}
/**
- * Resumes execution from a provided {@link StreamStateHandle}. This is used to test recovery after a failure.
+ * Resumes execution from the provided {@link OperatorStateHandles}. This is used to test recovery after a failure.
*/
- public void restore(StreamStateHandle stateHandle) throws Exception {
+ public void restore(OperatorStateHandles stateHandles) throws Exception {
Preconditions.checkArgument(!isOpen,
"You are trying to restore() while the operator is still open. " +
"Please call close() first.");
testHarness.setup();
- testHarness.restore(stateHandle);
+ testHarness.initializeState(stateHandles);
openOperator();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0819dc7/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index c67c215..299932f 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -24,17 +24,16 @@ import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDesc
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingEventTimeWindows, SlidingEventTimeWindows}
+import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, SlidingProcessingTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
+import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, ProcessingTimeTrigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.runtime.operators.windowing.{EvictingWindowOperator, WindowOperator, AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator}
+import org.apache.flink.streaming.runtime.operators.windowing.{AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator, EvictingWindowOperator, WindowOperator}
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.util.Collector
-
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Ignore, Test}
class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
@@ -43,6 +42,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
* conditions are right.
*/
@Test
+ @Ignore
def testFastTimeWindows(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment