You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/01 10:15:58 UTC

[GitHub] [flink] fredia commented on a diff in pull request #21136: [FLINK-29095][state] Improve logging in SharedStateRegistry

fredia commented on code in PR #21136:
URL: https://github.com/apache/flink/pull/21136#discussion_r1010281489


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -95,60 +95,78 @@ public StreamStateHandle registerReference(
             entry = registeredStates.get(registrationKey);
 
             if (entry == null) {
-                // Additional check that should never fail, because only state handles that are not
-                // placeholders should
-                // ever be inserted to the registry.
                 checkState(
-                        !isPlaceholder(state),
+                        !isPlaceholder(newHandle),
                         "Attempt to reference unknown state: " + registrationKey);
 
-                entry = new SharedStateEntry(state, checkpointID);
+                LOG.trace(
+                        "Registered new shared state {} under key {}.", newHandle, registrationKey);
+                entry = new SharedStateEntry(newHandle, checkpointID);
                 registeredStates.put(registrationKey, entry);
-                LOG.trace("Registered new shared state {} under key {}.", entry, registrationKey);
 
-            } else {
-                // Delete if this is a real duplicate.
-                // Note that task (backend) is not required to re-upload state
-                // if the confirmation notification was missing.
-                // However, it's also not required to use exactly the same handle or placeholder
-                if (!Objects.equals(state, entry.stateHandle)) {
-                    if (entry.confirmed || isPlaceholder(state)) {
-                        scheduledStateDeletion = state;
-                    } else {
-                        // Old entry is not in a confirmed checkpoint yet, and the new one differs.
-                        // This might result from (omitted KG range here for simplicity):
-                        // 1. Flink recovers from a failure using a checkpoint 1
-                        // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
-                        // 3. JM triggers checkpoint 2
-                        // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
-                        // 5. TM crashes; everything is repeated from (2)
-                        // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
-                        // 7. JM triggers checkpoint 3
-                        // 8. TM sends NEW state "xyz-002.sst"
-                        // 9. JM discards it as duplicate
-                        // 10. checkpoint completes, but a wrong SST file is used
-                        // So we use a new entry and discard the old one:
-                        scheduledStateDeletion = entry.stateHandle;
-                        entry.stateHandle = state;
-                    }
-                    LOG.trace(
-                            "Identified duplicate state registration under key {}. New state {} was determined to "
-                                    + "be an unnecessary copy of existing state {} and will be dropped.",
-                            registrationKey,
-                            state,
-                            entry.stateHandle);
-                }
+                // no further handling
+                return entry.stateHandle;
+
+            } else if (entry.stateHandle == newHandle) {
+                // might be a bug but state backend is not required to use a place-holder

Review Comment:
   > could you please share in which case we may enter this branch?
   
   I think this is just a reminder to remind us not to register the same handle multiple times. When we **accidentally** register multiple times, the log is printed here for investigation.
   
   > If we think this is a bug, do we need to continue the registry here
   
   For the current implementation, we don't need to continue the registry here. 
   And for [IncrementalRemoteKeyedStateHandle](https://github.com/apache/flink/blob/8e16cc8e424e352c5b45b46f1520ecf0edec70be/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java#L313), it already enforces a limit that cannot be registered more than once.
   I think the limit should be made by xxxStateHandle instead of here. 
   
   > Does this need to be info level so that we can easily know this?
   
   Nice suggestion, changed to info level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org