You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/12 14:43:28 UTC
[flink] 05/08: [FLINK-10011] Introduce
SubmittedJobGraphStore#releaseJobGraph
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cb69af18c5545b7218ea16d25d0b910f767ccf49
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Aug 20 09:42:28 2018 +0200
[FLINK-10011] Introduce SubmittedJobGraphStore#releaseJobGraph
SubmitedJobGraphStore#releaseJobGraph removes a potentially existing lock
from the specified JobGraph. This allows other SubmittedJobGraphStores to
remove the JobGraph given that it is no longer locked.
---
.../SingleJobSubmittedJobGraphStore.java | 9 ++++++--
.../StandaloneSubmittedJobGraphStore.java | 13 ++++++++----
.../runtime/jobmanager/SubmittedJobGraphStore.java | 12 +++++++++++
.../ZooKeeperSubmittedJobGraphStore.java | 24 +++++++++++++++++++---
.../flink/runtime/dispatcher/DispatcherHATest.java | 5 +++++
.../testutils/InMemorySubmittedJobGraphStore.java | 5 +++++
6 files changed, 59 insertions(+), 9 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
index 26d3abc..fe7f5f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
@@ -66,12 +66,17 @@ public class SingleJobSubmittedJobGraphStore implements SubmittedJobGraphStore {
}
@Override
- public void removeJobGraph(JobID jobId) throws Exception {
+ public void removeJobGraph(JobID jobId) {
// ignore
}
@Override
- public Collection<JobID> getJobIds() throws Exception {
+ public void releaseJobGraph(JobID jobId) {
+ // ignore
+ }
+
+ @Override
+ public Collection<JobID> getJobIds() {
return Collections.singleton(jobGraph.getJobID());
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index d1ca1a3..f28621f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -43,22 +43,27 @@ public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore
}
@Override
- public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+ public void putJobGraph(SubmittedJobGraph jobGraph) {
// Nothing to do
}
@Override
- public void removeJobGraph(JobID jobId) throws Exception {
+ public void removeJobGraph(JobID jobId) {
// Nothing to do
}
@Override
- public Collection<JobID> getJobIds() throws Exception {
+ public void releaseJobGraph(JobID jobId) {
+ // nothing to do
+ }
+
+ @Override
+ public Collection<JobID> getJobIds() {
return Collections.emptyList();
}
@Override
- public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+ public SubmittedJobGraph recoverJobGraph(JobID jobId) {
return null;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index 7e624ec..b40a4a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
import javax.annotation.Nullable;
@@ -59,6 +60,17 @@ public interface SubmittedJobGraphStore {
void removeJobGraph(JobID jobId) throws Exception;
/**
+ * Releases the locks on the specified {@link JobGraph}.
+ *
+ * Releasing the locks allows that another instance can delete the job from
+ * the {@link SubmittedJobGraphStore}.
+ *
+ * @param jobId specifying the job to release the locks for
+ * @throws Exception if the locks cannot be released
+ */
+ void releaseJobGraph(JobID jobId) throws Exception;
+
+ /**
* Get all job ids of submitted job graphs to the submitted job graph store.
*
* @return Collection of submitted job ids
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 0510815..2b935af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -68,13 +68,13 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
/** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
private final Object cacheLock = new Object();
- /** Client (not a namespace facade) */
+ /** Client (not a namespace facade). */
private final CuratorFramework client;
/** The set of IDs of all added job graphs. */
private final Set<JobID> addedJobGraphs = new HashSet<>();
- /** Completed checkpoints in ZooKeeper */
+ /** Completed checkpoints in ZooKeeper. */
private final ZooKeeperStateHandleStore<SubmittedJobGraph> jobGraphsInZooKeeper;
/**
@@ -93,7 +93,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
private boolean isRunning;
/**
- * Submitted job graph store backed by ZooKeeper
+ * Submitted job graph store backed by ZooKeeper.
*
* @param client ZooKeeper client
* @param currentJobsPath ZooKeeper path for current job graphs
@@ -274,6 +274,24 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
}
@Override
+ public void releaseJobGraph(JobID jobId) throws Exception {
+ checkNotNull(jobId, "Job ID");
+ final String path = getPathForJob(jobId);
+
+ LOG.debug("Releasing locks of job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
+ synchronized (cacheLock) {
+ if (addedJobGraphs.contains(jobId)) {
+ jobGraphsInZooKeeper.release(path);
+
+ addedJobGraphs.remove(jobId);
+ }
+ }
+
+ LOG.info("Released locks of job graph {} from ZooKeeper.", jobId);
+ }
+
+ @Override
public Collection<JobID> getJobIds() throws Exception {
Collection<String> paths;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 2c030d2..5876c5f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -244,6 +244,11 @@ public class DispatcherHATest extends TestLogger {
}
@Override
+ public void releaseJobGraph(JobID jobId) throws Exception {
+ throw new UnsupportedOperationException("Should not be called.");
+ }
+
+ @Override
public Collection<JobID> getJobIds() throws Exception {
enterGetJobIdsLatch.trigger();
proceedGetJobIdsLatch.await();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index ba0dc80..3b9c578 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -97,6 +97,11 @@ public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore {
}
@Override
+ public void releaseJobGraph(JobID jobId) {
+ verifyIsStarted();
+ }
+
+ @Override
public synchronized Collection<JobID> getJobIds() throws Exception {
verifyIsStarted();