You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/09 10:10:47 UTC
[flink] branch master updated: [FLINK-10326] Simplify
ZooKeeperSubmittedJobGraphStore#constructor
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0315f20 [FLINK-10326] Simplify ZooKeeperSubmittedJobGraphStore#constructor
0315f20 is described below
commit 0315f2066c0b56a7ebfe1e8cd8c8b2eca9acca9c
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Sep 12 16:13:55 2018 +0200
[FLINK-10326] Simplify ZooKeeperSubmittedJobGraphStore#constructor
Move initialization logic out of the ZooKeeperSubmittedJobGraphStore constructor.
---
.../ZooKeeperSubmittedJobGraphStore.java | 36 +++++------------
.../apache/flink/runtime/util/ZooKeeperUtils.java | 21 ++++++++--
.../ZooKeeperSubmittedJobGraphStoreTest.java | 8 ++--
.../ZooKeeperSubmittedJobGraphsStoreITCase.java | 47 +++++++++++++---------
4 files changed, 61 insertions(+), 51 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index a030184..8158429 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
@@ -69,9 +68,6 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
/** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
private final Object cacheLock = new Object();
- /** Client (not a namespace facade). */
- private final CuratorFramework client;
-
/** The set of IDs of all added job graphs. */
private final Set<JobID> addedJobGraphs = new HashSet<>();
@@ -96,34 +92,20 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
/**
* Submitted job graph store backed by ZooKeeper.
*
- * @param client ZooKeeper client
- * @param currentJobsPath ZooKeeper path for current job graphs
- * @param stateStorage State storage used to persist the submitted jobs
- * @throws Exception
+ * @param zooKeeperFullBasePath ZooKeeper path for current job graphs
+ * @param zooKeeperStateHandleStore State storage used to persist the submitted jobs
*/
public ZooKeeperSubmittedJobGraphStore(
- CuratorFramework client,
- String currentJobsPath,
- RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
-
- checkNotNull(currentJobsPath, "Current jobs path");
- checkNotNull(stateStorage, "State storage");
-
- // Keep a reference to the original client and not the namespace facade. The namespace
- // facade cannot be closed.
- this.client = checkNotNull(client, "Curator client");
-
- // Ensure that the job graphs path exists
- client.newNamespaceAwareEnsurePath(currentJobsPath)
- .ensure(client.getZookeeperClient());
+ String zooKeeperFullBasePath,
+ ZooKeeperStateHandleStore<SubmittedJobGraph> zooKeeperStateHandleStore,
+ PathChildrenCache pathCache) {
- // All operations will have the path as root
- CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath);
+ checkNotNull(zooKeeperFullBasePath, "Current jobs path");
- this.zooKeeperFullBasePath = client.getNamespace() + currentJobsPath;
- this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage);
+ this.zooKeeperFullBasePath = zooKeeperFullBasePath;
+ this.jobGraphsInZooKeeper = checkNotNull(zooKeeperStateHandleStore);
- this.pathCache = new PathChildrenCache(facade, "/", false);
+ this.pathCache = checkNotNull(pathCache);
pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index cc1ec70..039dcf4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
import org.apache.flink.util.Preconditions;
@@ -41,6 +42,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
@@ -244,10 +246,23 @@ public class ZooKeeperUtils {
// ZooKeeper submitted jobs root dir
String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
+ // Ensure that the job graphs path exists
+ client.newNamespaceAwareEnsurePath(zooKeeperSubmittedJobsPath)
+ .ensure(client.getZookeeperClient());
+
+ // All operations will have the path as root
+ CuratorFramework facade = client.usingNamespace(client.getNamespace() + zooKeeperSubmittedJobsPath);
+
+ final String zooKeeperFullSubmittedJobsPath = client.getNamespace() + zooKeeperSubmittedJobsPath;
+
+ final ZooKeeperStateHandleStore<SubmittedJobGraph> zooKeeperStateHandleStore = new ZooKeeperStateHandleStore<>(facade, stateStorage);
+
+ final PathChildrenCache pathCache = new PathChildrenCache(facade, "/", false);
+
return new ZooKeeperSubmittedJobGraphStore(
- client,
- zooKeeperSubmittedJobsPath,
- stateStorage);
+ zooKeeperFullSubmittedJobsPath,
+ zooKeeperStateHandleStore,
+ pathCache);
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
index fae8459..04b7792 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
@@ -24,9 +24,11 @@ import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.TestLogger;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -98,9 +100,9 @@ public class ZooKeeperSubmittedJobGraphStoreTest extends TestLogger {
@Nonnull
public ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphStore(CuratorFramework client, TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
return new ZooKeeperSubmittedJobGraphStore(
- client,
- "/foobar",
- stateStorage);
+ client.getNamespace(),
+ new ZooKeeperStateHandleStore<>(client, stateStorage),
+ new PathChildrenCache(client, "/", false));
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index e9be145..23c2725 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -27,17 +27,22 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import akka.actor.ActorRef;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
@@ -59,9 +64,9 @@ import static org.mockito.Mockito.verify;
*/
public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
- private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
+ private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
- private final static RetrievableStateStorageHelper<SubmittedJobGraph> localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() {
+ private static final RetrievableStateStorageHelper<SubmittedJobGraph> localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() {
@Override
public RetrievableStateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws IOException {
ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
@@ -71,7 +76,6 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
}
};
-
@AfterClass
public static void tearDown() throws Exception {
if (ZooKeeper != null) {
@@ -86,10 +90,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
@Test
public void testPutAndRemoveJobGraph() throws Exception {
- ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(),
- "/testPutAndRemoveJobGraph",
- localStateStorage);
+ ZooKeeperSubmittedJobGraphStore jobGraphs = createZooKeeperSubmittedJobGraphStore("/testPutAndRemoveJobGraph");
try {
SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -142,10 +143,25 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
}
}
+ @Nonnull
+ private ZooKeeperSubmittedJobGraphStore createZooKeeperSubmittedJobGraphStore(String fullPath) throws Exception {
+ final CuratorFramework client = ZooKeeper.getClient();
+ // Ensure that the job graphs path exists
+ client.newNamespaceAwareEnsurePath(fullPath).ensure(client.getZookeeperClient());
+
+ // All operations will have the path as root
+ CuratorFramework facade = client.usingNamespace(client.getNamespace() + fullPath);
+ return new ZooKeeperSubmittedJobGraphStore(
+ fullPath,
+ new ZooKeeperStateHandleStore<>(
+ facade,
+ localStateStorage),
+ new PathChildrenCache(facade, "/", false));
+ }
+
@Test
public void testRecoverJobGraphs() throws Exception {
- ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
+ ZooKeeperSubmittedJobGraphStore jobGraphs = createZooKeeperSubmittedJobGraphStore("/testRecoverJobGraphs");
try {
SubmittedJobGraphListener listener = mock(SubmittedJobGraphListener.class);
@@ -195,12 +211,9 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
ZooKeeperSubmittedJobGraphStore otherJobGraphs = null;
try {
- jobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
-
- otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
+ jobGraphs = createZooKeeperSubmittedJobGraphStore("/testConcurrentAddJobGraph");
+ otherJobGraphs = createZooKeeperSubmittedJobGraphStore("/testConcurrentAddJobGraph");
SubmittedJobGraph jobGraph = createSubmittedJobGraph(new JobID(), 0);
SubmittedJobGraph otherJobGraph = createSubmittedJobGraph(new JobID(), 0);
@@ -254,11 +267,9 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
@Test(expected = IllegalStateException.class)
public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
- ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+ ZooKeeperSubmittedJobGraphStore jobGraphs = createZooKeeperSubmittedJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
- ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+ ZooKeeperSubmittedJobGraphStore otherJobGraphs = createZooKeeperSubmittedJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
jobGraphs.start(null);
otherJobGraphs.start(null);