You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2017/01/23 13:44:18 UTC

flink git commit: [FLINK-5450] Fix restore from legacy log message

Repository: flink
Updated Branches:
  refs/heads/release-1.2 085288e7e -> f4cd7d0ec


[FLINK-5450] Fix restore from legacy log message

This closes #3134


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4cd7d0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4cd7d0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4cd7d0e

Branch: refs/heads/release-1.2
Commit: f4cd7d0ec3dd70e51e1db27c8336481838d50e63
Parents: 085288e
Author: kl0u <kk...@gmail.com>
Authored: Mon Jan 16 15:27:38 2017 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Mon Jan 23 14:32:30 2017 +0100

----------------------------------------------------------------------
 .../operators/windowing/WindowOperator.java     | 27 +++++++++++++-------
 1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f4cd7d0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 628d663..5ed5a4e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -804,9 +804,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	public void registerRestoredLegacyStateState() throws Exception {
 
-		LOG.info("{} (taskIdx={}) re-registering state from an older Flink version.",
-			getClass().getSimpleName(), legacyWindowOperatorType, getRuntimeContext().getIndexOfThisSubtask());
-
 		switch (legacyWindowOperatorType) {
 			case NONE:
 				reregisterStateFromLegacyWindowOperator();
@@ -983,14 +980,22 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		// if we restore from an older version,
 		// we have to re-register the recovered state.
 
-		if (restoredFromLegacyEventTimeTimers != null) {
+		if (restoredFromLegacyEventTimeTimers != null && !restoredFromLegacyEventTimeTimers.isEmpty()) {
+
+			LOG.info("{} (taskIdx={}) re-registering event-time timers from an older Flink version.",
+				getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+
 			for (Timer<K, W> timer : restoredFromLegacyEventTimeTimers) {
 				setCurrentKey(timer.key);
 				internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp);
 			}
 		}
 
-		if (restoredFromLegacyProcessingTimeTimers != null) {
+		if (restoredFromLegacyProcessingTimeTimers != null && !restoredFromLegacyProcessingTimeTimers.isEmpty()) {
+
+			LOG.info("{} (taskIdx={}) re-registering processing-time timers from an older Flink version.",
+				getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+
 			for (Timer<K, W> timer : restoredFromLegacyProcessingTimeTimers) {
 				setCurrentKey(timer.key);
 				internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp);
@@ -1003,16 +1008,20 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	public void reregisterStateFromLegacyAlignedWindowOperator() throws Exception {
-		if (restoredFromLegacyAlignedOpRecords != null) {
+		if (restoredFromLegacyAlignedOpRecords != null && !restoredFromLegacyAlignedOpRecords.isEmpty()) {
+
+			LOG.info("{} (taskIdx={}) re-registering timers from legacy {} from an older Flink version.",
+				getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), legacyWindowOperatorType);
+
 			while (!restoredFromLegacyAlignedOpRecords.isEmpty()) {
 				StreamRecord<IN> record = restoredFromLegacyAlignedOpRecords.poll();
 				setCurrentKey(keySelector.getKey(record.getValue()));
 				processElement(record);
 			}
-
-			// gc friendliness
-			restoredFromLegacyAlignedOpRecords = null;
 		}
+
+		// gc friendliness
+		restoredFromLegacyAlignedOpRecords = null;
 	}
 
 	// ------------------------------------------------------------------------