You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/01 09:44:47 UTC
[2/7] flink git commit: [FLINK-5199] [logging] Improve logging in
ZooKeeperSubmittedJobGraphStore
[FLINK-5199] [logging] Improve logging in ZooKeeperSubmittedJobGraphStore
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f91dd9fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f91dd9fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f91dd9fb
Branch: refs/heads/master
Commit: f91dd9fbaf392bc2968e974dddba4cda2a4f3be2
Parents: dc7d8ec
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:35:14 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100
----------------------------------------------------------------------
.../runtime/jobmanager/SubmittedJobGraph.java | 2 +-
.../ZooKeeperSubmittedJobGraphStore.java | 86 +++++++++++++-------
2 files changed, 56 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f91dd9fb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
index faacc93..e868da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java
@@ -72,6 +72,6 @@ public class SubmittedJobGraph implements Serializable {
@Override
public String toString() {
- return String.format("SubmittedJobGraph(%s, %s)", jobGraph, jobInfo);
+ return String.format("SubmittedJobGraph(%s, %s)", jobGraph.getJobID(), jobInfo);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f91dd9fb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index b241712..c1dc656 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -82,6 +82,9 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
*/
private final PathChildrenCache pathCache;
+ /** The full configured base path including the namespace. */
+ private final String zooKeeperFullBasePath;
+
/** The external listener to be notified on races. */
private SubmittedJobGraphListener jobGraphListener;
@@ -117,6 +120,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
// All operations will have the path as root
CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
+ this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
this.pathCache = new PathChildrenCache(facade, "/", false);
@@ -156,6 +160,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
synchronized (cacheLock) {
verifyIsRunning();
+ LOG.debug("Recovering all job graphs from ZooKeeper at {}.", zooKeeperFullBasePath);
List<Tuple2<RetrievableStateHandle<SubmittedJobGraph>, String>> submitted;
while (true) {
@@ -168,6 +173,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
}
}
+ LOG.info("Found {} job graphs.", submitted.size());
+
if (submitted.size() != 0) {
List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
@@ -193,6 +200,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
checkNotNull(jobId, "Job ID");
String path = getPathForJob(jobId);
+ LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
synchronized (cacheLock) {
verifyIsRunning();
@@ -215,6 +224,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
checkNotNull(jobGraph, "Job graph");
String path = getPathForJob(jobGraph.getJobId());
+ LOG.debug("Adding job graph {} to {}{}.", jobGraph.getJobId(), zooKeeperFullBasePath, path);
+
boolean success = false;
while (!success) {
@@ -229,8 +240,6 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
addedJobGraphs.add(jobGraph.getJobId());
- LOG.info("Added {} to ZooKeeper.", jobGraph);
-
success = true;
}
catch (KeeperException.NodeExistsException ignored) {
@@ -252,6 +261,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
}
}
}
+
+ LOG.info("Added {} to ZooKeeper.", jobGraph);
}
@Override
@@ -259,14 +270,17 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
checkNotNull(jobId, "Job ID");
String path = getPathForJob(jobId);
+ LOG.debug("Removing job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
synchronized (cacheLock) {
if (addedJobGraphs.contains(jobId)) {
jobGraphsInZooKeeper.removeAndDiscardState(path);
addedJobGraphs.remove(jobId);
- LOG.info("Removed job graph {} from ZooKeeper.", jobId);
}
}
+
+ LOG.info("Removed job graph {} from ZooKeeper.", jobId);
}
/**
@@ -291,70 +305,80 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
}
switch (event.getType()) {
- case CHILD_ADDED:
+ case CHILD_ADDED: {
+ JobID jobId = fromEvent(event);
+
+ LOG.debug("Received CHILD_ADDED event notification for job {}", jobId);
+
synchronized (cacheLock) {
try {
- JobID jobId = fromEvent(event);
if (jobGraphListener != null && !addedJobGraphs.contains(jobId)) {
try {
// Whoa! This has been added by someone else. Or we were fast
// to remove it (false positive).
jobGraphListener.onAddedJobGraph(jobId);
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
LOG.error("Error in callback", t);
}
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
}
}
+ }
+ break;
- break;
-
- case CHILD_UPDATED:
+ case CHILD_UPDATED: {
// Nothing to do
- break;
+ }
+ break;
+
+ case CHILD_REMOVED: {
+ JobID jobId = fromEvent(event);
+
+ LOG.debug("Received CHILD_REMOVED event notification for job {}", jobId);
- case CHILD_REMOVED:
synchronized (cacheLock) {
try {
- JobID jobId = fromEvent(event);
if (jobGraphListener != null && addedJobGraphs.contains(jobId)) {
try {
// Oh oh. Someone else removed one of our job graphs. Mean!
jobGraphListener.onRemovedJobGraph(jobId);
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
LOG.error("Error in callback", t);
}
}
break;
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
}
}
- break;
+ }
+ break;
- case CONNECTION_SUSPENDED:
+ case CONNECTION_SUSPENDED: {
LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job " +
- "graphs are not monitored (temporarily).");
- break;
- case CONNECTION_LOST:
+ "graphs are not monitored (temporarily).");
+ }
+ break;
+
+ case CONNECTION_LOST: {
LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " +
- "graphs are not monitored (permanently).");
- break;
+ "graphs are not monitored (permanently).");
+ }
+ break;
- case CONNECTION_RECONNECTED:
+ case CONNECTION_RECONNECTED: {
LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " +
- "graphs are monitored again.");
- break;
- case INITIALIZED:
+ "graphs are monitored again.");
+ }
+ break;
+
+ case INITIALIZED: {
LOG.info("SubmittedJobGraphsPathCacheListener initialized");
- break;
+ }
+ break;
}
}