You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kl...@apache.org on 2022/11/08 06:53:42 UTC

[flink] branch master updated: [FLINK-29095][state] Improve logging in SharedStateRegistry

This is an automated email from the ASF dual-hosted git repository.

klion26 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a4250d248e [FLINK-29095][state] Improve logging in SharedStateRegistry
9a4250d248e is described below

commit 9a4250d248e93f3e87b211df98ce3d3c66aabca0
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Aug 30 18:04:32 2022 +0200

    [FLINK-29095][state] Improve logging in SharedStateRegistry
---
 .../runtime/state/SharedStateRegistryImpl.java     | 122 ++++++++++++---------
 1 file changed, 70 insertions(+), 52 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
index 2e16c528997..3a710428509 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
@@ -79,12 +79,12 @@ public class SharedStateRegistryImpl implements SharedStateRegistry {
 
     @Override
     public StreamStateHandle registerReference(
-            SharedStateRegistryKey registrationKey,
-            StreamStateHandle state,
-            long checkpointID,
-            boolean preventDiscardingCreatedCheckpoint) {
+            final SharedStateRegistryKey registrationKey,
+            final StreamStateHandle newHandle,
+            final long checkpointID,
+            final boolean preventDiscardingCreatedCheckpoint) {
 
-        checkNotNull(state);
+        checkNotNull(newHandle, "State handle should not be null.");
 
         StreamStateHandle scheduledStateDeletion = null;
         SharedStateEntry entry;
@@ -95,60 +95,78 @@ public class SharedStateRegistryImpl implements SharedStateRegistry {
             entry = registeredStates.get(registrationKey);
 
             if (entry == null) {
-                // Additional check that should never fail, because only state handles that are not
-                // placeholders should
-                // ever be inserted to the registry.
                 checkState(
-                        !isPlaceholder(state),
+                        !isPlaceholder(newHandle),
                         "Attempt to reference unknown state: " + registrationKey);
 
-                entry = new SharedStateEntry(state, checkpointID);
+                LOG.trace(
+                        "Registered new shared state {} under key {}.", newHandle, registrationKey);
+                entry = new SharedStateEntry(newHandle, checkpointID);
                 registeredStates.put(registrationKey, entry);
-                LOG.trace("Registered new shared state {} under key {}.", entry, registrationKey);
 
-            } else {
-                // Delete if this is a real duplicate.
-                // Note that task (backend) is not required to re-upload state
-                // if the confirmation notification was missing.
-                // However, it's also not required to use exactly the same handle or placeholder
-                if (!Objects.equals(state, entry.stateHandle)) {
-                    if (entry.confirmed || isPlaceholder(state)) {
-                        scheduledStateDeletion = state;
-                    } else {
-                        // Old entry is not in a confirmed checkpoint yet, and the new one differs.
-                        // This might result from (omitted KG range here for simplicity):
-                        // 1. Flink recovers from a failure using a checkpoint 1
-                        // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
-                        // 3. JM triggers checkpoint 2
-                        // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
-                        // 5. TM crashes; everything is repeated from (2)
-                        // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
-                        // 7. JM triggers checkpoint 3
-                        // 8. TM sends NEW state "xyz-002.sst"
-                        // 9. JM discards it as duplicate
-                        // 10. checkpoint completes, but a wrong SST file is used
-                        // So we use a new entry and discard the old one:
-                        scheduledStateDeletion = entry.stateHandle;
-                        entry.stateHandle = state;
-                    }
-                    LOG.trace(
-                            "Identified duplicate state registration under key {}. New state {} was determined to "
-                                    + "be an unnecessary copy of existing state {} and will be dropped.",
-                            registrationKey,
-                            state,
-                            entry.stateHandle);
-                }
+                // no further handling
+                return entry.stateHandle;
+
+            } else if (entry.stateHandle == newHandle) {
+                // might be a bug but state backend is not required to use a place-holder
+                LOG.info(
+                        "Duplicated registration under key {} with the same object: {}",
+                        registrationKey,
+                        newHandle);
+            } else if (Objects.equals(entry.stateHandle, newHandle)) {
+                // might be a bug but state backend is not required to use a place-holder
+                LOG.info(
+                        "Duplicated registration under key {} with the new object: {}.",
+                        registrationKey,
+                        newHandle);
+            } else if (isPlaceholder(newHandle)) {
                 LOG.trace(
-                        "Updating last checkpoint for {} from {} to {}",
+                        "Duplicated registration under key {} with a placeholder (normal case)",
+                        registrationKey);
+                scheduledStateDeletion = newHandle;
+            } else if (entry.confirmed) {
+                LOG.info(
+                        "Duplicated registration under key {} of a new state: {}. "
+                                + "This might happen if checkpoint confirmation was delayed and state backend re-uploaded the state. "
+                                + "Discarding the new state and keeping the old one which is included into a completed checkpoint",
                         registrationKey,
-                        entry.lastUsedCheckpointID,
-                        checkpointID);
-                entry.advanceLastUsingCheckpointID(checkpointID);
-                if (preventDiscardingCreatedCheckpoint) {
-                    entry.preventDiscardingCreatedCheckpoint();
-                }
+                        newHandle);
+                scheduledStateDeletion = newHandle;
+            } else {
+                // Old entry is not in a confirmed checkpoint yet, and the new one differs.
+                // This might result from (omitted KG range here for simplicity):
+                // 1. Flink recovers from a failure using a checkpoint 1
+                // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
+                // 3. JM triggers checkpoint 2
+                // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
+                // 5. TM crashes; everything is repeated from (2)
+                // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
+                // 7. JM triggers checkpoint 3
+                // 8. TM sends NEW state "xyz-002.sst"
+                // 9. JM discards it as duplicate
+                // 10. checkpoint completes, but a wrong SST file is used
+                // So we use a new entry and discard the old one:
+                LOG.info(
+                        "Duplicated registration under key {} of a new state: {}. "
+                                + "This might happen during the task failover if state backend creates different states with the same key before and after the failure. "
+                                + "Discarding the OLD state and keeping the NEW one which is included into a completed checkpoint",
+                        registrationKey,
+                        newHandle);
+                scheduledStateDeletion = entry.stateHandle;
+                entry.stateHandle = newHandle;
             }
-        }
+
+            LOG.trace(
+                    "Updating last checkpoint for {} from {} to {}",
+                    registrationKey,
+                    entry.lastUsedCheckpointID,
+                    checkpointID);
+            entry.advanceLastUsingCheckpointID(checkpointID);
+
+            if (preventDiscardingCreatedCheckpoint) {
+                entry.preventDiscardingCreatedCheckpoint();
+            }
+        } // end of synchronized (registeredStates)
 
         scheduleAsyncDelete(scheduledStateDeletion);
         return entry.stateHandle;
@@ -246,7 +264,7 @@ public class SharedStateRegistryImpl implements SharedStateRegistry {
     private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
         // We do the small optimization to not issue discards for placeholders, which are NOPs.
         if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
-            LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
+            LOG.debug("Scheduled delete of state handle {}.", streamStateHandle);
             AsyncDisposalRunnable asyncDisposalRunnable =
                     new AsyncDisposalRunnable(streamStateHandle);
             try {