You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kl...@apache.org on 2022/11/08 06:53:42 UTC
[flink] branch master updated: [FLINK-29095][state] Improve logging in SharedStateRegistry
This is an automated email from the ASF dual-hosted git repository.
klion26 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9a4250d248e [FLINK-29095][state] Improve logging in SharedStateRegistry
9a4250d248e is described below
commit 9a4250d248e93f3e87b211df98ce3d3c66aabca0
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Aug 30 18:04:32 2022 +0200
[FLINK-29095][state] Improve logging in SharedStateRegistry
---
.../runtime/state/SharedStateRegistryImpl.java | 122 ++++++++++++---------
1 file changed, 70 insertions(+), 52 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
index 2e16c528997..3a710428509 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
@@ -79,12 +79,12 @@ public class SharedStateRegistryImpl implements SharedStateRegistry {
@Override
public StreamStateHandle registerReference(
- SharedStateRegistryKey registrationKey,
- StreamStateHandle state,
- long checkpointID,
- boolean preventDiscardingCreatedCheckpoint) {
+ final SharedStateRegistryKey registrationKey,
+ final StreamStateHandle newHandle,
+ final long checkpointID,
+ final boolean preventDiscardingCreatedCheckpoint) {
- checkNotNull(state);
+ checkNotNull(newHandle, "State handle should not be null.");
StreamStateHandle scheduledStateDeletion = null;
SharedStateEntry entry;
@@ -95,60 +95,78 @@ public class SharedStateRegistryImpl implements SharedStateRegistry {
entry = registeredStates.get(registrationKey);
if (entry == null) {
- // Additional check that should never fail, because only state handles that are not
- // placeholders should
- // ever be inserted to the registry.
checkState(
- !isPlaceholder(state),
+ !isPlaceholder(newHandle),
"Attempt to reference unknown state: " + registrationKey);
- entry = new SharedStateEntry(state, checkpointID);
+ LOG.trace(
+ "Registered new shared state {} under key {}.", newHandle, registrationKey);
+ entry = new SharedStateEntry(newHandle, checkpointID);
registeredStates.put(registrationKey, entry);
- LOG.trace("Registered new shared state {} under key {}.", entry, registrationKey);
- } else {
- // Delete if this is a real duplicate.
- // Note that task (backend) is not required to re-upload state
- // if the confirmation notification was missing.
- // However, it's also not required to use exactly the same handle or placeholder
- if (!Objects.equals(state, entry.stateHandle)) {
- if (entry.confirmed || isPlaceholder(state)) {
- scheduledStateDeletion = state;
- } else {
- // Old entry is not in a confirmed checkpoint yet, and the new one differs.
- // This might result from (omitted KG range here for simplicity):
- // 1. Flink recovers from a failure using a checkpoint 1
- // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
- // 3. JM triggers checkpoint 2
- // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
- // 5. TM crashes; everything is repeated from (2)
- // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
- // 7. JM triggers checkpoint 3
- // 8. TM sends NEW state "xyz-002.sst"
- // 9. JM discards it as duplicate
- // 10. checkpoint completes, but a wrong SST file is used
- // So we use a new entry and discard the old one:
- scheduledStateDeletion = entry.stateHandle;
- entry.stateHandle = state;
- }
- LOG.trace(
- "Identified duplicate state registration under key {}. New state {} was determined to "
- + "be an unnecessary copy of existing state {} and will be dropped.",
- registrationKey,
- state,
- entry.stateHandle);
- }
+ // no further handling
+ return entry.stateHandle;
+
+ } else if (entry.stateHandle == newHandle) {
+ // might be a bug but state backend is not required to use a place-holder
+ LOG.info(
+ "Duplicated registration under key {} with the same object: {}",
+ registrationKey,
+ newHandle);
+ } else if (Objects.equals(entry.stateHandle, newHandle)) {
+ // might be a bug but state backend is not required to use a place-holder
+ LOG.info(
+ "Duplicated registration under key {} with the new object: {}.",
+ registrationKey,
+ newHandle);
+ } else if (isPlaceholder(newHandle)) {
LOG.trace(
- "Updating last checkpoint for {} from {} to {}",
+ "Duplicated registration under key {} with a placeholder (normal case)",
+ registrationKey);
+ scheduledStateDeletion = newHandle;
+ } else if (entry.confirmed) {
+ LOG.info(
+ "Duplicated registration under key {} of a new state: {}. "
+ + "This might happen if checkpoint confirmation was delayed and state backend re-uploaded the state. "
+ + "Discarding the new state and keeping the old one which is included into a completed checkpoint",
registrationKey,
- entry.lastUsedCheckpointID,
- checkpointID);
- entry.advanceLastUsingCheckpointID(checkpointID);
- if (preventDiscardingCreatedCheckpoint) {
- entry.preventDiscardingCreatedCheckpoint();
- }
+ newHandle);
+ scheduledStateDeletion = newHandle;
+ } else {
+ // Old entry is not in a confirmed checkpoint yet, and the new one differs.
+ // This might result from (omitted KG range here for simplicity):
+ // 1. Flink recovers from a failure using a checkpoint 1
+ // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
+ // 3. JM triggers checkpoint 2
+ // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
+ // 5. TM crashes; everything is repeated from (2)
+ // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
+ // 7. JM triggers checkpoint 3
+ // 8. TM sends NEW state "xyz-002.sst"
+ // 9. JM discards it as duplicate
+ // 10. checkpoint completes, but a wrong SST file is used
+ // So we use a new entry and discard the old one:
+ LOG.info(
+ "Duplicated registration under key {} of a new state: {}. "
+ + "This might happen during the task failover if state backend creates different states with the same key before and after the failure. "
+ + "Discarding the OLD state and keeping the NEW one which is included into a completed checkpoint",
+ registrationKey,
+ newHandle);
+ scheduledStateDeletion = entry.stateHandle;
+ entry.stateHandle = newHandle;
}
- }
+
+ LOG.trace(
+ "Updating last checkpoint for {} from {} to {}",
+ registrationKey,
+ entry.lastUsedCheckpointID,
+ checkpointID);
+ entry.advanceLastUsingCheckpointID(checkpointID);
+
+ if (preventDiscardingCreatedCheckpoint) {
+ entry.preventDiscardingCreatedCheckpoint();
+ }
+ } // end of synchronized (registeredStates)
scheduleAsyncDelete(scheduledStateDeletion);
return entry.stateHandle;
@@ -246,7 +264,7 @@ public class SharedStateRegistryImpl implements SharedStateRegistry {
private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
// We do the small optimization to not issue discards for placeholders, which are NOPs.
if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
- LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
+ LOG.debug("Scheduled delete of state handle {}.", streamStateHandle);
AsyncDisposalRunnable asyncDisposalRunnable =
new AsyncDisposalRunnable(streamStateHandle);
try {