You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:02:12 UTC

[flink] 09/09: [FLINK-10328] Release all locks when stopping the ZooKeeperSubmittedJobGraphStore

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4fdba85036b8d3fb743e36cb696842e808e3342f
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 12 17:47:59 2018 +0200

    [FLINK-10328] Release all locks when stopping the ZooKeeperSubmittedJobGraphStore
    
    When stopping the ZooKeeperSubmittedJobGraphStore, it will release all currently held
    locks such that other instances can remove entries from the store. This is necessary
    if we don't immediately close the used CuratorFramework/ZooKeeper client.
    
    This closes #6686.
---
 .../jobmanager/ZooKeeperSubmittedJobGraphStore.java | 21 ++++++++++++++++++---
 .../ZooKeeperSubmittedJobGraphStoreTest.java        | 12 +++++++++++-
 2 files changed, 29 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 343b22a..2fd19fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
 import org.apache.curator.framework.CuratorFramework;
@@ -146,9 +147,23 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 				jobGraphListener = null;
 
 				try {
-					pathCache.close();
-				} catch (Exception e) {
-					throw new Exception("Could not properly stop the ZooKeeperSubmittedJobGraphStore.", e);
+					Exception exception = null;
+
+					try {
+						jobGraphsInZooKeeper.releaseAll();
+					} catch (Exception e) {
+						exception = e;
+					}
+
+					try {
+						pathCache.close();
+					} catch (Exception e) {
+						exception = ExceptionUtils.firstOrSuppressed(e, exception);
+					}
+
+					if (exception != null) {
+						throw new FlinkException("Could not properly stop the ZooKeeperSubmittedJobGraphStore.", exception);
+					}
 				} finally {
 					isRunning = false;
 				}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
index dde3b7a..fae8459 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
@@ -35,6 +35,7 @@ import javax.annotation.Nonnull;
 
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -57,9 +58,11 @@ public class ZooKeeperSubmittedJobGraphStoreTest extends TestLogger {
 	/**
 	 * Tests that we fail with an exception if the job cannot be removed from the
 	 * ZooKeeperSubmittedJobGraphStore.
+	 *
+	 * <p>Tests that a close ZooKeeperSubmittedJobGraphStore no longer holds any locks.
 	 */
 	@Test
-	public void testJobGraphRemovalFailure() throws Exception {
+	public void testJobGraphRemovalFailureAndLockRelease() throws Exception {
 		try (final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration)) {
 			final TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage = new TestingRetrievableStateStorageHelper<>();
 			final ZooKeeperSubmittedJobGraphStore submittedJobGraphStore = createSubmittedJobGraphStore(client, stateStorage);
@@ -81,6 +84,13 @@ public class ZooKeeperSubmittedJobGraphStoreTest extends TestLogger {
 				// expected
 			}
 
+			submittedJobGraphStore.stop();
+
+			// now we should be able to delete the job graph
+			otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobId());
+
+			assertThat(otherSubmittedJobGraphStore.recoverJobGraph(recoveredJobGraph.getJobId()), is(nullValue()));
+
 			otherSubmittedJobGraphStore.stop();
 		}
 	}