You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/01 09:44:52 UTC

[7/7] flink git commit: [FLINK-5198] [logging] Improve TaskState toString

[FLINK-5198] [logging] Improve TaskState toString


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc7d8ec2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc7d8ec2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc7d8ec2

Branch: refs/heads/master
Commit: dc7d8ec2c4d03c42e3d582947a3fe39a274d7f4b
Parents: 67bd827
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:15:30 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointCoordinator.java    |  8 ++++++--
 .../apache/flink/runtime/checkpoint/TaskState.java   | 15 +++++++++++++--
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d8ec2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 638e0a7..2242c14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -653,9 +653,13 @@ public class CheckpointCoordinator {
 
 							if (LOG.isDebugEnabled()) {
 								StringBuilder builder = new StringBuilder();
-								for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) {
-									builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
+								builder.append("Checkpoint state: ");
+								for (TaskState state : completed.getTaskStates().values()) {
+									builder.append(state);
+									builder.append(", ");
 								}
+								// Remove last two chars ", "
+								builder.delete(builder.length() - 2, builder.length());
 
 								LOG.debug(builder.toString());
 							}

http://git-wip-us.apache.org/repos/asf/flink/blob/dc7d8ec2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index 3cdc5e9..76f1c51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -130,7 +129,7 @@ public class TaskState implements StateObject {
 
 
 	@Override
-	public long getStateSize() throws IOException {
+	public long getStateSize() {
 		long result = 0L;
 
 		for (int i = 0; i < parallelism; i++) {
@@ -164,4 +163,16 @@ public class TaskState implements StateObject {
 	public Map<Integer, SubtaskState> getSubtaskStates() {
 		return Collections.unmodifiableMap(subtaskStates);
 	}
+
+	@Override
+	public String toString() {
+		// KvStates are always null in 1.1. Don't print this as it might
+		// confuse users that don't care about how we store it internally.
+		return "TaskState(" +
+			"jobVertexID: " + jobVertexID +
+			", parallelism: " + parallelism +
+			", sub task states: " + subtaskStates.size() +
+			", total size (bytes): " + getStateSize() +
+			')';
+	}
 }