You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/12 14:43:28 UTC

[flink] 05/08: [FLINK-10011] Introduce SubmittedJobGraphStore#releaseJobGraph

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cb69af18c5545b7218ea16d25d0b910f767ccf49
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Aug 20 09:42:28 2018 +0200

    [FLINK-10011] Introduce SubmittedJobGraphStore#releaseJobGraph
    
    SubmitedJobGraphStore#releaseJobGraph removes a potentially existing lock
    from the specified JobGraph. This allows other SubmittedJobGraphStores to
    remove the JobGraph given that it is no longer locked.
---
 .../SingleJobSubmittedJobGraphStore.java           |  9 ++++++--
 .../StandaloneSubmittedJobGraphStore.java          | 13 ++++++++----
 .../runtime/jobmanager/SubmittedJobGraphStore.java | 12 +++++++++++
 .../ZooKeeperSubmittedJobGraphStore.java           | 24 +++++++++++++++++++---
 .../flink/runtime/dispatcher/DispatcherHATest.java |  5 +++++
 .../testutils/InMemorySubmittedJobGraphStore.java  |  5 +++++
 6 files changed, 59 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
index 26d3abc..fe7f5f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
@@ -66,12 +66,17 @@ public class SingleJobSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	}
 
 	@Override
-	public void removeJobGraph(JobID jobId) throws Exception {
+	public void removeJobGraph(JobID jobId) {
 		// ignore
 	}
 
 	@Override
-	public Collection<JobID> getJobIds() throws Exception {
+	public void releaseJobGraph(JobID jobId) {
+		// ignore
+	}
+
+	@Override
+	public Collection<JobID> getJobIds() {
 		return Collections.singleton(jobGraph.getJobID());
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index d1ca1a3..f28621f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -43,22 +43,27 @@ public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore
 	}
 
 	@Override
-	public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+	public void putJobGraph(SubmittedJobGraph jobGraph) {
 		// Nothing to do
 	}
 
 	@Override
-	public void removeJobGraph(JobID jobId) throws Exception {
+	public void removeJobGraph(JobID jobId) {
 		// Nothing to do
 	}
 
 	@Override
-	public Collection<JobID> getJobIds() throws Exception {
+	public void releaseJobGraph(JobID jobId) {
+		// nothing to do
+	}
+
+	@Override
+	public Collection<JobID> getJobIds() {
 		return Collections.emptyList();
 	}
 
 	@Override
-	public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+	public SubmittedJobGraph recoverJobGraph(JobID jobId) {
 		return null;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index 7e624ec..b40a4a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import javax.annotation.Nullable;
 
@@ -59,6 +60,17 @@ public interface SubmittedJobGraphStore {
 	void removeJobGraph(JobID jobId) throws Exception;
 
 	/**
+	 * Releases the locks on the specified {@link JobGraph}.
+	 *
+	 * Releasing the locks allows that another instance can delete the job from
+	 * the {@link SubmittedJobGraphStore}.
+	 *
+	 * @param jobId specifying the job to release the locks for
+	 * @throws Exception if the locks cannot be released
+	 */
+	void releaseJobGraph(JobID jobId) throws Exception;
+
+	/**
 	 * Get all job ids of submitted job graphs to the submitted job graph store.
 	 *
 	 * @return Collection of submitted job ids
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 0510815..2b935af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -68,13 +68,13 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	/** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
 	private final Object cacheLock = new Object();
 
-	/** Client (not a namespace facade) */
+	/** Client (not a namespace facade). */
 	private final CuratorFramework client;
 
 	/** The set of IDs of all added job graphs. */
 	private final Set<JobID> addedJobGraphs = new HashSet<>();
 
-	/** Completed checkpoints in ZooKeeper */
+	/** Completed checkpoints in ZooKeeper. */
 	private final ZooKeeperStateHandleStore<SubmittedJobGraph> jobGraphsInZooKeeper;
 
 	/**
@@ -93,7 +93,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	private boolean isRunning;
 
 	/**
-	 * Submitted job graph store backed by ZooKeeper
+	 * Submitted job graph store backed by ZooKeeper.
 	 *
 	 * @param client ZooKeeper client
 	 * @param currentJobsPath ZooKeeper path for current job graphs
@@ -274,6 +274,24 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	}
 
 	@Override
+	public void releaseJobGraph(JobID jobId) throws Exception {
+		checkNotNull(jobId, "Job ID");
+		final String path = getPathForJob(jobId);
+
+		LOG.debug("Releasing locks of job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path);
+
+		synchronized (cacheLock) {
+			if (addedJobGraphs.contains(jobId)) {
+				jobGraphsInZooKeeper.release(path);
+
+				addedJobGraphs.remove(jobId);
+			}
+		}
+
+		LOG.info("Released locks of job graph {} from ZooKeeper.", jobId);
+	}
+
+	@Override
 	public Collection<JobID> getJobIds() throws Exception {
 		Collection<String> paths;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 2c030d2..5876c5f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -244,6 +244,11 @@ public class DispatcherHATest extends TestLogger {
 		}
 
 		@Override
+		public void releaseJobGraph(JobID jobId) throws Exception {
+			throw new UnsupportedOperationException("Should not be called.");
+		}
+
+		@Override
 		public Collection<JobID> getJobIds() throws Exception {
 			enterGetJobIdsLatch.trigger();
 			proceedGetJobIdsLatch.await();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index ba0dc80..3b9c578 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -97,6 +97,11 @@ public class InMemorySubmittedJobGraphStore implements SubmittedJobGraphStore {
 	}
 
 	@Override
+	public void releaseJobGraph(JobID jobId) {
+		verifyIsStarted();
+	}
+
+	@Override
 	public synchronized Collection<JobID> getJobIds() throws Exception {
 		verifyIsStarted();