You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/09 10:10:47 UTC

[flink] branch master updated: [FLINK-10326] Simplify ZooKeeperSubmittedJobGraphStore#constructor

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0315f20  [FLINK-10326] Simplify ZooKeeperSubmittedJobGraphStore#constructor
0315f20 is described below

commit 0315f2066c0b56a7ebfe1e8cd8c8b2eca9acca9c
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 12 16:13:55 2018 +0200

    [FLINK-10326] Simplify ZooKeeperSubmittedJobGraphStore#constructor
    
    Move initialization logic out of the ZooKeeperSubmittedJobGraphStore constructor.
---
 .../ZooKeeperSubmittedJobGraphStore.java           | 36 +++++------------
 .../apache/flink/runtime/util/ZooKeeperUtils.java  | 21 ++++++++--
 .../ZooKeeperSubmittedJobGraphStoreTest.java       |  8 ++--
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java    | 47 +++++++++++++---------
 4 files changed, 61 insertions(+), 51 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index a030184..8158429 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -69,9 +68,6 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	/** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
 	private final Object cacheLock = new Object();
 
-	/** Client (not a namespace facade). */
-	private final CuratorFramework client;
-
 	/** The set of IDs of all added job graphs. */
 	private final Set<JobID> addedJobGraphs = new HashSet<>();
 
@@ -96,34 +92,20 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 	/**
 	 * Submitted job graph store backed by ZooKeeper.
 	 *
-	 * @param client ZooKeeper client
-	 * @param currentJobsPath ZooKeeper path for current job graphs
-	 * @param stateStorage State storage used to persist the submitted jobs
-	 * @throws Exception
+	 * @param zooKeeperFullBasePath ZooKeeper path for current job graphs
+	 * @param zooKeeperStateHandleStore State storage used to persist the submitted jobs
 	 */
 	public ZooKeeperSubmittedJobGraphStore(
-			CuratorFramework client,
-			String currentJobsPath,
-			RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
-
-		checkNotNull(currentJobsPath, "Current jobs path");
-		checkNotNull(stateStorage, "State storage");
-
-		// Keep a reference to the original client and not the namespace facade. The namespace
-		// facade cannot be closed.
-		this.client = checkNotNull(client, "Curator client");
-
-		// Ensure that the job graphs path exists
-		client.newNamespaceAwareEnsurePath(currentJobsPath)
-				.ensure(client.getZookeeperClient());
+			String zooKeeperFullBasePath,
+			ZooKeeperStateHandleStore<SubmittedJobGraph> zooKeeperStateHandleStore,
+			PathChildrenCache pathCache) {
 
-		// All operations will have the path as root
-		CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
+		checkNotNull(zooKeeperFullBasePath, "Current jobs path");
 
-		this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
-		this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);
+		this.zooKeeperFullBasePath = zooKeeperFullBasePath;
+		this.jobGraphsInZooKeeper = checkNotNull(zooKeeperStateHandleStore);
 
-		this.pathCache = new PathChildrenCache(facade, "/", false);
+		this.pathCache = checkNotNull(pathCache);
 		pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index cc1ec70..039dcf4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
 import org.apache.flink.util.Preconditions;
 
@@ -41,6 +42,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
@@ -244,10 +246,23 @@ public class ZooKeeperUtils {
 		// ZooKeeper submitted jobs root dir
 		String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
+		// Ensure that the job graphs path exists
+		client.newNamespaceAwareEnsurePath(zooKeeperSubmittedJobsPath)
+			.ensure(client.getZookeeperClient());
+
+		// All operations will have the path as root
+		CuratorFramework facade = client.usingNamespace(client.getNamespace() + zooKeeperSubmittedJobsPath);
+
+		final String zooKeeperFullSubmittedJobsPath = client.getNamespace() + zooKeeperSubmittedJobsPath;
+
+		final ZooKeeperStateHandleStore<SubmittedJobGraph> zooKeeperStateHandleStore = new ZooKeeperStateHandleStore<>(facade, stateStorage);
+
+		final PathChildrenCache pathCache = new PathChildrenCache(facade, "/", false);
+
 		return new ZooKeeperSubmittedJobGraphStore(
-			client,
-			zooKeeperSubmittedJobsPath,
-			stateStorage);
+			zooKeeperFullSubmittedJobsPath,
+			zooKeeperStateHandleStore,
+			pathCache);
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
index fae8459..04b7792 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
@@ -24,9 +24,11 @@ import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -98,9 +100,9 @@ public class ZooKeeperSubmittedJobGraphStoreTest extends TestLogger {
 	@Nonnull
 	public ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphStore(CuratorFramework client, TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
 		return new ZooKeeperSubmittedJobGraphStore(
-			client,
-			"/foobar",
-			stateStorage);
+			client.getNamespace(),
+			new ZooKeeperStateHandleStore<>(client, stateStorage),
+			new PathChildrenCache(client, "/", false));
 	}
 
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index e9be145..23c2725 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -27,17 +27,22 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorRef;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
@@ -59,9 +64,9 @@ import static org.mockito.Mockito.verify;
  */
 public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 
-	private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+	private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
 
-	private final static RetrievableStateStorageHelper<SubmittedJobGraph> localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() {
+	private static final RetrievableStateStorageHelper<SubmittedJobGraph> localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() {
 		@Override
 		public RetrievableStateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws IOException {
 			ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
@@ -71,7 +76,6 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 		}
 	};
 
-
 	@AfterClass
 	public static void tearDown() throws Exception {
 		if (ZooKeeper != null) {
@@ -86,10 +90,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 
 	@Test
 	public void testPutAndRemoveJobGraph() throws Exception {
-		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-			ZooKeeper.createClient(),
-			"/testPutAndRemoveJobGraph",
-			localStateStorage);
+		ZooKeeperSubmittedJobGraphStore jobGraphs = createZooKeeperSubmittedJobGraphStore("/testPutAndRemoveJobGraph");
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -142,10 +143,25 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 		}
 	}
 
+	@Nonnull
+	private ZooKeeperSubmittedJobGraphStore createZooKeeperSubmittedJobGraphStore(String fullPath) throws Exception {
+		final CuratorFramework client = ZooKeeper.getClient();
+		// Ensure that the job graphs path exists
+		client.newNamespaceAwareEnsurePath(fullPath).ensure(client.getZookeeperClient());
+
+		// All operations will have the path as root
+		CuratorFramework facade = client.usingNamespace(client.getNamespace() + fullPath);
+		return new ZooKeeperSubmittedJobGraphStore(
+			fullPath,
+			new ZooKeeperStateHandleStore<>(
+				facade,
+				localStateStorage),
+			new PathChildrenCache(facade, "/", false));
+	}
+
 	@Test
 	public void testRecoverJobGraphs() throws Exception {
-		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
+		ZooKeeperSubmittedJobGraphStore jobGraphs = createZooKeeperSubmittedJobGraphStore("/testRecoverJobGraphs");
 
 		try {
 			SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -195,12 +211,9 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 		ZooKeeperSubmittedJobGraphStore otherJobGraphs = null;
 
 		try {
-			jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
-
-			otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-					ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
+			jobGraphs = createZooKeeperSubmittedJobGraphStore("/testConcurrentAddJobGraph");
 
+			otherJobGraphs = createZooKeeperSubmittedJobGraphStore("/testConcurrentAddJobGraph");
 
 			SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
 			SubmittedJobGraph otherJobGraph = createSubmittedJobGraph(new JobID(), 0);
@@ -254,11 +267,9 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 
 	@Test(expected = IllegalStateException.class)
 	public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
-		ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+		ZooKeeperSubmittedJobGraphStore jobGraphs = createZooKeeperSubmittedJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
 
-		ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-				ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+		ZooKeeperSubmittedJobGraphStore otherJobGraphs = createZooKeeperSubmittedJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
 
 		jobGraphs.start(null);
 		otherJobGraphs.start(null);