You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/17 08:06:56 UTC
[5/7] flink git commit: [hotfix] Small improvements in logging for
local recovery
[hotfix] Small improvements in logging for local recovery
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89935997
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89935997
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89935997
Branch: refs/heads/master
Commit: 89935997de03b0f6db89d111a087b0f0f210695d
Parents: 2bc1eaa
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed May 16 17:09:28 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 10:03:04 2018 +0200
----------------------------------------------------------------------
.../TaskExecutorLocalStateStoresManager.java | 14 ++++++--------
.../runtime/state/TaskLocalStateStoreImpl.java | 20 ++++++++++++--------
.../runtime/state/TaskStateManagerImpl.java | 7 ++-----
3 files changed, 20 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index cb3b680..6826fbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -158,15 +158,13 @@ public class TaskExecutorLocalStateStoresManager {
taskStateManagers.put(taskKey, taskLocalStateStore);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Registered new local state store with configuration {} for {} - {} - {} under allocation id {}.",
- localRecoveryConfig, jobId, jobVertexID, subtaskIndex, allocationID);
- }
+
+ LOG.debug("Registered new local state store with configuration {} for {} - {} - {} under allocation " +
+ "id {}.", localRecoveryConfig, jobId, jobVertexID, subtaskIndex, allocationID);
+
} else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found existing local state store for {} - {} - {} under allocation id {}.",
- jobId, jobVertexID, subtaskIndex, allocationID);
- }
+ LOG.debug("Found existing local state store for {} - {} - {} under allocation id {}: {}",
+ jobId, jobVertexID, subtaskIndex, allocationID, taskLocalStateStore);
}
return taskLocalStateStore;
http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
index 9d105e6..df9147c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -190,17 +190,20 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore {
snapshot = storedTaskStateByCheckpointID.get(checkpointID);
}
- snapshot = (snapshot != NULL_DUMMY) ? snapshot : null;
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Found entry for local state for checkpoint {} in subtask ({} - {} - {}) : {}",
- checkpointID, jobID, jobVertexID, subtaskIndex, snapshot);
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("Found entry for local state for checkpoint {} in subtask ({} - {} - {})",
+ if (snapshot != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Found registered local state for checkpoint {} in subtask ({} - {} - {}) : {}",
+ checkpointID, jobID, jobVertexID, subtaskIndex, snapshot);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Found registered local state for checkpoint {} in subtask ({} - {} - {})",
+ checkpointID, jobID, jobVertexID, subtaskIndex);
+ }
+ } else {
+ LOG.debug("Did not find registered local state for checkpoint {} in subtask ({} - {} - {})",
checkpointID, jobID, jobVertexID, subtaskIndex);
}
- return snapshot;
+ return (snapshot != NULL_DUMMY) ? snapshot : null;
}
@Override
@@ -357,6 +360,7 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore {
", allocationID=" + allocationID +
", subtaskIndex=" + subtaskIndex +
", localRecoveryConfig=" + localRecoveryConfig +
+ ", storedCheckpointIDs=" + storedTaskStateByCheckpointID.keySet() +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index e542ba1..a0aeb3c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -136,11 +136,8 @@ public class TaskStateManagerImpl implements TaskStateManager {
}
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Operator {} has remote state {} from job manager and local state alternatives {} from local " +
- "state store {}.",
- operatorID, jobManagerSubtaskState, alternativesByPriority, localStateStore);
- }
+ LOG.debug("Operator {} has remote state {} from job manager and local state alternatives {} from local " +
+ "state store {}.", operatorID, jobManagerSubtaskState, alternativesByPriority, localStateStore);
PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder(
jobManagerSubtaskState,