You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:22 UTC
[07/15] flink git commit: [FLINK-5294] Make WindowOperator backwards
compatible with 1.1 snapshots
[FLINK-5294] Make WindowOperator backwards compatible with 1.1 snapshots
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0e2a2c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0e2a2c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0e2a2c1
Branch: refs/heads/master
Commit: b0e2a2c175d54346010ee4817d206bbeb4033ac9
Parents: f9b4f91
Author: kl0u <kk...@gmail.com>
Authored: Thu Dec 15 16:40:50 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100
----------------------------------------------------------------------
.../operators/windowing/WindowOperator.java | 100 +++++++++++++++++++
1 file changed, 100 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b0e2a2c1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index edcd833..1cfeba8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -34,6 +34,8 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -52,8 +54,10 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
+import java.util.PriorityQueue;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -140,6 +144,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
protected transient InternalTimerService<W> internalTimerService;
+ // ------------------------------------------------------------------------
+ // State restored in case of migration from an older version (backwards compatibility)
+ // ------------------------------------------------------------------------
+
+ /** The restored processing time timers. */
+ protected transient PriorityQueue<Timer<K, W>> restoredFromLegacyProcessingTimeTimers;
+
+ /** The restored event time timers. */
+ protected transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers;
+
/**
* Creates a new {@code WindowOperator} based on the given policies and user functions.
*/
@@ -196,6 +210,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
return internalTimerService.currentProcessingTime();
}
};
+
+ // if we restore from an older version,
+ // we have to re-register the timers.
+
+ if (restoredFromLegacyEventTimeTimers != null) {
+ for (Timer<K, W> timer : restoredFromLegacyEventTimeTimers) {
+ setCurrentKey(timer.key);
+ internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp);
+ }
+ }
+
+ if (restoredFromLegacyProcessingTimeTimers != null) {
+ for (Timer<K, W> timer : restoredFromLegacyProcessingTimeTimers) {
+ setCurrentKey(timer.key);
+ internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp);
+ }
+ }
+
+ // gc friendliness
+ this.restoredFromLegacyEventTimeTimers = null;
+ this.restoredFromLegacyProcessingTimeTimers = null;
}
@Override
@@ -707,6 +742,71 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
// ------------------------------------------------------------------------
+ // Restoring / Migrating from an older Flink version.
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void restoreState(FSDataInputStream in) throws Exception {
+ super.restoreState(in);
+
+ LOG.info("{} (taskIdx={}) restoring timers from an older Flink version.",
+ getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+
+ restoreTimers(new DataInputViewStreamWrapper(in));
+ }
+
+ private void restoreTimers(DataInputViewStreamWrapper in) throws IOException {
+ int numWatermarkTimers = in.readInt();
+ this.restoredFromLegacyEventTimeTimers = new PriorityQueue<>(Math.max(numWatermarkTimers, 1));
+
+ for (int i = 0; i < numWatermarkTimers; i++) {
+ K key = keySerializer.deserialize(in);
+ W window = windowSerializer.deserialize(in);
+ long timestamp = in.readLong();
+
+ Timer<K, W> timer = new Timer<>(timestamp, key, window);
+ restoredFromLegacyEventTimeTimers.add(timer);
+ }
+
+ int numProcessingTimeTimers = in.readInt();
+ this.restoredFromLegacyProcessingTimeTimers = new PriorityQueue<>(Math.max(numProcessingTimeTimers, 1));
+
+ for (int i = 0; i < numProcessingTimeTimers; i++) {
+ K key = keySerializer.deserialize(in);
+ W window = windowSerializer.deserialize(in);
+ long timestamp = in.readLong();
+
+ Timer<K, W> timer = new Timer<>(timestamp, key, window);
+ restoredFromLegacyProcessingTimeTimers.add(timer);
+ }
+
+ // just to read all the rest, although we do not really use this information.
+ int numProcessingTimeTimerTimestamp = in.readInt();
+ for (int i = 0; i< numProcessingTimeTimerTimestamp; i++) {
+ in.readLong();
+ in.readInt();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+
+ if (restoredFromLegacyEventTimeTimers != null && !restoredFromLegacyEventTimeTimers.isEmpty()) {
+ LOG.debug("{} (taskIdx={}) restored {} event time timers from an older Flink version: {}",
+ getClass().getSimpleName(), subtaskIdx,
+ restoredFromLegacyEventTimeTimers.size(),
+ restoredFromLegacyEventTimeTimers);
+ }
+
+ if (restoredFromLegacyProcessingTimeTimers != null && !restoredFromLegacyProcessingTimeTimers.isEmpty()) {
+ LOG.debug("{} (taskIdx={}) restored {} processing time timers from an older Flink version: {}",
+ getClass().getSimpleName(), subtaskIdx,
+ restoredFromLegacyProcessingTimeTimers.size(),
+ restoredFromLegacyProcessingTimeTimers);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
// Getters for testing
// ------------------------------------------------------------------------