You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:22 UTC

[07/15] flink git commit: [FLINK-5294] Make WindowOperator backwards compatible with 1.1 snapshots

[FLINK-5294] Make WindowOperator backwards compatible with 1.1 snapshots


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0e2a2c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0e2a2c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0e2a2c1

Branch: refs/heads/master
Commit: b0e2a2c175d54346010ee4817d206bbeb4033ac9
Parents: f9b4f91
Author: kl0u <kk...@gmail.com>
Authored: Thu Dec 15 16:40:50 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../operators/windowing/WindowOperator.java     | 100 +++++++++++++++++++
 1 file changed, 100 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0e2a2c1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index edcd833..1cfeba8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -34,6 +34,8 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -52,8 +54,10 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.PriorityQueue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -140,6 +144,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	protected transient InternalTimerService<W> internalTimerService;
 
+	// ------------------------------------------------------------------------
+	// State restored in case of migration from an older version (backwards compatibility)
+	// ------------------------------------------------------------------------
+
+	/** The restored processing time timers. */
+	protected transient PriorityQueue<Timer<K, W>> restoredFromLegacyProcessingTimeTimers;
+
+	/** The restored event time timers. */
+	protected transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers;
+
 	/**
 	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
 	 */
@@ -196,6 +210,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				return internalTimerService.currentProcessingTime();
 			}
 		};
+
+		// if we restore from an older version,
+		// we have to re-register the timers.
+
+		if (restoredFromLegacyEventTimeTimers != null) {
+			for (Timer<K, W> timer : restoredFromLegacyEventTimeTimers) {
+				setCurrentKey(timer.key);
+				internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp);
+			}
+		}
+
+		if (restoredFromLegacyProcessingTimeTimers != null) {
+			for (Timer<K, W> timer : restoredFromLegacyProcessingTimeTimers) {
+				setCurrentKey(timer.key);
+				internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp);
+			}
+		}
+
+		// gc friendliness
+		this.restoredFromLegacyEventTimeTimers = null;
+		this.restoredFromLegacyProcessingTimeTimers = null;
 	}
 
 	@Override
@@ -707,6 +742,71 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	// ------------------------------------------------------------------------
+	//  Restoring / Migrating from an older Flink version.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void restoreState(FSDataInputStream in) throws Exception {
+		super.restoreState(in);
+
+		LOG.info("{} (taskIdx={}) restoring timers from an older Flink version.",
+			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+
+		restoreTimers(new DataInputViewStreamWrapper(in));
+	}
+
+	private void restoreTimers(DataInputViewStreamWrapper in) throws IOException {
+		int numWatermarkTimers = in.readInt();
+		this.restoredFromLegacyEventTimeTimers = new PriorityQueue<>(Math.max(numWatermarkTimers, 1));
+
+		for (int i = 0; i < numWatermarkTimers; i++) {
+			K key = keySerializer.deserialize(in);
+			W window = windowSerializer.deserialize(in);
+			long timestamp = in.readLong();
+
+			Timer<K, W> timer = new Timer<>(timestamp, key, window);
+			restoredFromLegacyEventTimeTimers.add(timer);
+		}
+
+		int numProcessingTimeTimers = in.readInt();
+		this.restoredFromLegacyProcessingTimeTimers = new PriorityQueue<>(Math.max(numProcessingTimeTimers, 1));
+
+		for (int i = 0; i < numProcessingTimeTimers; i++) {
+			K key = keySerializer.deserialize(in);
+			W window = windowSerializer.deserialize(in);
+			long timestamp = in.readLong();
+
+			Timer<K, W> timer = new Timer<>(timestamp, key, window);
+			restoredFromLegacyProcessingTimeTimers.add(timer);
+		}
+
+		// just to read all the rest, although we do not really use this information.
+		int numProcessingTimeTimerTimestamp = in.readInt();
+		for (int i = 0; i< numProcessingTimeTimerTimestamp; i++) {
+			in.readLong();
+			in.readInt();
+		}
+
+		if (LOG.isDebugEnabled()) {
+			int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+
+			if (restoredFromLegacyEventTimeTimers != null && !restoredFromLegacyEventTimeTimers.isEmpty()) {
+				LOG.debug("{} (taskIdx={}) restored {} event time timers from an older Flink version: {}",
+					getClass().getSimpleName(), subtaskIdx,
+					restoredFromLegacyEventTimeTimers.size(),
+					restoredFromLegacyEventTimeTimers);
+			}
+
+			if (restoredFromLegacyProcessingTimeTimers != null && !restoredFromLegacyProcessingTimeTimers.isEmpty()) {
+				LOG.debug("{} (taskIdx={}) restored {} processing time timers from an older Flink version: {}",
+					getClass().getSimpleName(), subtaskIdx,
+					restoredFromLegacyProcessingTimeTimers.size(),
+					restoredFromLegacyProcessingTimeTimers);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
 	// Getters for testing
 	// ------------------------------------------------------------------------