You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2017/01/23 13:44:18 UTC
flink git commit: [FLINK-5450] Fix restore from legacy log message
Repository: flink
Updated Branches:
refs/heads/release-1.2 085288e7e -> f4cd7d0ec
[FLINK-5450] Fix restore from legacy log message
This closes #3134
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4cd7d0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4cd7d0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4cd7d0e
Branch: refs/heads/release-1.2
Commit: f4cd7d0ec3dd70e51e1db27c8336481838d50e63
Parents: 085288e
Author: kl0u <kk...@gmail.com>
Authored: Mon Jan 16 15:27:38 2017 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Mon Jan 23 14:32:30 2017 +0100
----------------------------------------------------------------------
.../operators/windowing/WindowOperator.java | 27 +++++++++++++-------
1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f4cd7d0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 628d663..5ed5a4e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -804,9 +804,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public void registerRestoredLegacyStateState() throws Exception {
- LOG.info("{} (taskIdx={}) re-registering state from an older Flink version.",
- getClass().getSimpleName(), legacyWindowOperatorType, getRuntimeContext().getIndexOfThisSubtask());
-
switch (legacyWindowOperatorType) {
case NONE:
reregisterStateFromLegacyWindowOperator();
@@ -983,14 +980,22 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// if we restore from an older version,
// we have to re-register the recovered state.
- if (restoredFromLegacyEventTimeTimers != null) {
+ if (restoredFromLegacyEventTimeTimers != null && !restoredFromLegacyEventTimeTimers.isEmpty()) {
+
+ LOG.info("{} (taskIdx={}) re-registering event-time timers from an older Flink version.",
+ getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+
for (Timer<K, W> timer : restoredFromLegacyEventTimeTimers) {
setCurrentKey(timer.key);
internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp);
}
}
- if (restoredFromLegacyProcessingTimeTimers != null) {
+ if (restoredFromLegacyProcessingTimeTimers != null && !restoredFromLegacyProcessingTimeTimers.isEmpty()) {
+
+ LOG.info("{} (taskIdx={}) re-registering processing-time timers from an older Flink version.",
+ getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+
for (Timer<K, W> timer : restoredFromLegacyProcessingTimeTimers) {
setCurrentKey(timer.key);
internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp);
@@ -1003,16 +1008,20 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
public void reregisterStateFromLegacyAlignedWindowOperator() throws Exception {
- if (restoredFromLegacyAlignedOpRecords != null) {
+ if (restoredFromLegacyAlignedOpRecords != null && !restoredFromLegacyAlignedOpRecords.isEmpty()) {
+
+ LOG.info("{} (taskIdx={}) re-registering timers from legacy {} from an older Flink version.",
+ getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), legacyWindowOperatorType);
+
while (!restoredFromLegacyAlignedOpRecords.isEmpty()) {
StreamRecord<IN> record = restoredFromLegacyAlignedOpRecords.poll();
setCurrentKey(keySelector.getKey(record.getValue()));
processElement(record);
}
-
- // gc friendliness
- restoredFromLegacyAlignedOpRecords = null;
}
+
+ // gc friendliness
+ restoredFromLegacyAlignedOpRecords = null;
}
// ------------------------------------------------------------------------