You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/17 08:06:56 UTC

[5/7] flink git commit: [hotfix] Small improvements in logging for local recovery

[hotfix] Small improvements in logging for local recovery


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89935997
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89935997
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89935997

Branch: refs/heads/master
Commit: 89935997de03b0f6db89d111a087b0f0f210695d
Parents: 2bc1eaa
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed May 16 17:09:28 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 10:03:04 2018 +0200

----------------------------------------------------------------------
 .../TaskExecutorLocalStateStoresManager.java    | 14 ++++++--------
 .../runtime/state/TaskLocalStateStoreImpl.java  | 20 ++++++++++++--------
 .../runtime/state/TaskStateManagerImpl.java     |  7 ++-----
 3 files changed, 20 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index cb3b680..6826fbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -158,15 +158,13 @@ public class TaskExecutorLocalStateStoresManager {
 
 				taskStateManagers.put(taskKey, taskLocalStateStore);
 
-				if (LOG.isTraceEnabled()) {
-					LOG.trace("Registered new local state store with configuration {} for {} - {} - {} under allocation id {}.",
-						localRecoveryConfig, jobId, jobVertexID, subtaskIndex, allocationID);
-				}
+
+				LOG.debug("Registered new local state store with configuration {} for {} - {} - {} under allocation " +
+						"id {}.", localRecoveryConfig, jobId, jobVertexID, subtaskIndex, allocationID);
+
 			} else {
-				if (LOG.isTraceEnabled()) {
-					LOG.trace("Found existing local state store for {} - {} - {} under allocation id {}.",
-						jobId, jobVertexID, subtaskIndex, allocationID);
-				}
+				LOG.debug("Found existing local state store for {} - {} - {} under allocation id {}: {}",
+					jobId, jobVertexID, subtaskIndex, allocationID, taskLocalStateStore);
 			}
 
 			return taskLocalStateStore;

http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
index 9d105e6..df9147c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -190,17 +190,20 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore {
 			snapshot = storedTaskStateByCheckpointID.get(checkpointID);
 		}
 
-		snapshot = (snapshot != NULL_DUMMY) ? snapshot : null;
-
-		if (LOG.isTraceEnabled()) {
-			LOG.trace("Found entry for local state for checkpoint {} in subtask ({} - {} - {}) : {}",
-				checkpointID, jobID, jobVertexID, subtaskIndex, snapshot);
-		} else if (LOG.isDebugEnabled()) {
-			LOG.debug("Found entry for local state for checkpoint {} in subtask ({} - {} - {})",
+		if (snapshot != null) {
+			if (LOG.isTraceEnabled()) {
+				LOG.trace("Found registered local state for checkpoint {} in subtask ({} - {} - {}) : {}",
+					checkpointID, jobID, jobVertexID, subtaskIndex, snapshot);
+			} else if (LOG.isDebugEnabled()) {
+				LOG.debug("Found registered local state for checkpoint {} in subtask ({} - {} - {})",
+					checkpointID, jobID, jobVertexID, subtaskIndex);
+			}
+		} else {
+			LOG.debug("Did not find registered local state for checkpoint {} in subtask ({} - {} - {})",
 				checkpointID, jobID, jobVertexID, subtaskIndex);
 		}
 
-		return snapshot;
+		return (snapshot != NULL_DUMMY) ? snapshot : null;
 	}
 
 	@Override
@@ -357,6 +360,7 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore {
 			", allocationID=" + allocationID +
 			", subtaskIndex=" + subtaskIndex +
 			", localRecoveryConfig=" + localRecoveryConfig +
+			", storedCheckpointIDs=" + storedTaskStateByCheckpointID.keySet() +
 			'}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/89935997/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index e542ba1..a0aeb3c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -136,11 +136,8 @@ public class TaskStateManagerImpl implements TaskStateManager {
 			}
 		}
 
-		if (LOG.isTraceEnabled()) {
-			LOG.trace("Operator {} has remote state {} from job manager and local state alternatives {} from local " +
-					"state store {}.",
-				operatorID, jobManagerSubtaskState, alternativesByPriority, localStateStore);
-		}
+		LOG.debug("Operator {} has remote state {} from job manager and local state alternatives {} from local " +
+				"state store {}.", operatorID, jobManagerSubtaskState, alternativesByPriority, localStateStore);
 
 		PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder(
 			jobManagerSubtaskState,