You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/01 09:44:47 UTC

[2/7] flink git commit: [FLINK-5199] [logging] Improve logging in ZooKeeperSubmittedJobGraphStore

[FLINK-5199] [logging] Improve logging in ZooKeeperSubmittedJobGraphStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f91dd9fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f91dd9fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f91dd9fb

Branch: refs/heads/master
Commit: f91dd9fbaf392bc2968e974dddba4cda2a4f3be2
Parents: dc7d8ec
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:35:14 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100

----------------------------------------------------------------------
 .../runtime/jobmanager/SubmittedJobGraph.java   |  2 +-
 .../ZooKeeperSubmittedJobGraphStore.java        | 86 +++++++++++++-------
 2 files changed, 56 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f91dd9fb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
index faacc93..e868da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
@@ -72,6 +72,6 @@ public class SubmittedJobGraph implements Serializable {
 
 	@Override
 	public String toString() {
-		return String.format("SubmittedJobGraph(%s, %s)", jobGraph, jobInfo);
+		return String.format("SubmittedJobGraph(%s, %s)", jobGraph.getJobID(), jobInfo);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f91dd9fb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index b241712..c1dc656 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -82,6 +82,9 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	 */
 	private final PathChildrenCache pathCache;
 
+	/** The full configured base path including the namespace. */
+	private final String zooKeeperFullBasePath;
+
 	/** The external listener to be notified on races. */
 	private SubmittedJobGraphListener jobGraphListener;
 
@@ -117,6 +120,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		// All operations will have the path as root
 		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
 
+		this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
 		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
 
 		this.pathCache = new PathChildrenCache(facade, "/", false);
@@ -156,6 +160,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
+			LOG.debug("Recovering all job graphs from ZooKeeper at {}.", zooKeeperFullBasePath);
 			List<Tuple2<RetrievableStateHandle<SubmittedJobGraph>, String>> submitted;
 
 			while (true) {
@@ -168,6 +173,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 				}
 			}
 
+			LOG.info("Found {} job graphs.", submitted.size());
+
 			if (submitted.size() != 0) {
 				List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
 
@@ -193,6 +200,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		checkNotNull(jobId, "Job ID");
 		String path = getPathForJob(jobId);
 
+		LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
 		synchronized (cacheLock) {
 			verifyIsRunning();
 
@@ -215,6 +224,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		checkNotNull(jobGraph, "Job graph");
 		String path = getPathForJob(jobGraph.getJobId());
 
+		LOG.debug("Adding job graph {} to {}{}.", jobGraph.getJobId(), zooKeeperFullBasePath, path);
+
 		boolean success = false;
 
 		while (!success) {
@@ -229,8 +240,6 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 
 						addedJobGraphs.add(jobGraph.getJobId());
 
-						LOG.info("Added {} to ZooKeeper.", jobGraph);
-
 						success = true;
 					}
 					catch (KeeperException.NodeExistsException ignored) {
@@ -252,6 +261,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 				}
 			}
 		}
+
+		LOG.info("Added {} to ZooKeeper.", jobGraph);
 	}
 
 	@Override
@@ -259,14 +270,17 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		checkNotNull(jobId, "Job ID");
 		String path = getPathForJob(jobId);
 
+		LOG.debug("Removing job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
 		synchronized (cacheLock) {
 			if (addedJobGraphs.contains(jobId)) {
 				jobGraphsInZooKeeper.removeAndDiscardState(path);
 
 				addedJobGraphs.remove(jobId);
-				LOG.info("Removed job graph {} from ZooKeeper.", jobId);
 			}
 		}
+
+		LOG.info("Removed job graph {} from ZooKeeper.", jobId);
 	}
 
 	/**
@@ -291,70 +305,80 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 			}
 
 			switch (event.getType()) {
-				case CHILD_ADDED:
+				case CHILD_ADDED: {
+					JobID jobId = fromEvent(event);
+
+					LOG.debug("Received CHILD_ADDED event notification for job {}", jobId);
+
 					synchronized (cacheLock) {
 						try {
-							JobID jobId = fromEvent(event);
 							if (jobGraphListener != null && !addedJobGraphs.contains(jobId)) {
 								try {
 									// Whoa! This has been added by someone else. Or we were fast
 									// to remove it (false positive).
 									jobGraphListener.onAddedJobGraph(jobId);
-								}
-								catch (Throwable t) {
+								} catch (Throwable t) {
 									LOG.error("Error in callback", t);
 								}
 							}
-						}
-						catch (Exception e) {
+						} catch (Exception e) {
 							LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
 						}
 					}
+				}
+				break;
 
-					break;
-
-				case CHILD_UPDATED:
+				case CHILD_UPDATED: {
 					// Nothing to do
-					break;
+				}
+				break;
+
+				case CHILD_REMOVED: {
+					JobID jobId = fromEvent(event);
+
+					LOG.debug("Received CHILD_REMOVED event notification for job {}", jobId);
 
-				case CHILD_REMOVED:
 					synchronized (cacheLock) {
 						try {
-							JobID jobId = fromEvent(event);
 							if (jobGraphListener != null && addedJobGraphs.contains(jobId)) {
 								try {
 									// Oh oh. Someone else removed one of our job graphs. Mean!
 									jobGraphListener.onRemovedJobGraph(jobId);
-								}
-								catch (Throwable t) {
+								} catch (Throwable t) {
 									LOG.error("Error in callback", t);
 								}
 							}
 
 							break;
-						}
-						catch (Exception e) {
+						} catch (Exception e) {
 							LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
 						}
 					}
-					break;
+				}
+				break;
 
-				case CONNECTION_SUSPENDED:
+				case CONNECTION_SUSPENDED: {
 					LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " +
-							"graphs are not monitored (temporarily).");
-					break;
-				case CONNECTION_LOST:
+						"graphs are not monitored (temporarily).");
+				}
+				break;
+
+				case CONNECTION_LOST: {
 					LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " +
-							"graphs are not monitored (permanently).");
-					break;
+						"graphs are not monitored (permanently).");
+				}
+				break;
 
-				case CONNECTION_RECONNECTED:
+				case CONNECTION_RECONNECTED: {
 					LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " +
-							"graphs are monitored again.");
-					break;
-				case INITIALIZED:
+						"graphs are monitored again.");
+				}
+				break;
+
+				case INITIALIZED: {
 					LOG.info("SubmittedJobGraphsPathCacheListener initialized");
-					break;
+				}
+				break;
 			}
 		}