You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/21 11:09:13 UTC

[3/6] flink git commit: [FLINK-5074] [runtime] Add a ZooKeeper-based RunningJobRegistry

[FLINK-5074] [runtime] Add a ZooKeeper-based RunningJobRegistry

This closes #2903


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/544f5346
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/544f5346
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/544f5346

Branch: refs/heads/master
Commit: 544f53467b901e6e891a23fc4f2ef3a6be229718
Parents: 8780cb6
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Mon Feb 13 18:33:14 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 19:43:47 2017 +0100

----------------------------------------------------------------------
 .../HighAvailabilityServicesUtils.java          | 10 ++-
 .../highavailability/ZookeeperHaServices.java   |  6 +-
 .../highavailability/ZookeeperRegistry.java     | 94 ++++++++++++++++++++
 .../highavailability/ZooKeeperRegistryTest.java | 78 ++++++++++++++++
 4 files changed, 183 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 9113309..fe180de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -19,8 +19,10 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 
 public class HighAvailabilityServicesUtils {
 
@@ -32,8 +34,8 @@ public class HighAvailabilityServicesUtils {
 				return new EmbeddedNonHaServices();
 
 			case ZOOKEEPER:
-				throw new UnsupportedOperationException("ZooKeeper high availability services " +
-						"have not been implemented yet.");
+				return new ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(config), 
+						Executors.directExecutor(), config);
 
 			default:
 				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
@@ -49,8 +51,8 @@ public class HighAvailabilityServicesUtils {
 				final String resourceManagerAddress = null;
 				return new NonHaServices(resourceManagerAddress);
 			case ZOOKEEPER:
-				throw new UnsupportedOperationException("ZooKeeper high availability services " +
-					"have not been implemented yet.");
+				return new ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration), 
+						Executors.directExecutor(), configuration);
 			default:
 				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index ed0ad17..741f9e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -97,10 +97,14 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	/** The runtime configuration */
 	private final Configuration configuration;
 
+	/** The zookeeper based running jobs registry */
+	private final RunningJobsRegistry runningJobsRegistry;
+
 	public ZookeeperHaServices(CuratorFramework client, Executor executor, Configuration configuration) {
 		this.client = checkNotNull(client);
 		this.executor = checkNotNull(executor);
 		this.configuration = checkNotNull(configuration);
+		this.runningJobsRegistry = new ZookeeperRegistry(client, configuration);
 	}
 
 	// ------------------------------------------------------------------------
@@ -149,7 +153,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 
 	@Override
 	public RunningJobsRegistry getRunningJobsRegistry() {
-		throw new UnsupportedOperationException("not yet implemented");
+		return runningJobsRegistry;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
new file mode 100644
index 0000000..c0621af
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A zookeeper based registry for running jobs, highly available.
+ */
+public class ZookeeperRegistry implements RunningJobsRegistry {
+	
+	private static final String DEFAULT_HA_JOB_REGISTRY_PATH = "/running_job_registry/";
+
+	/** The ZooKeeper client to use */
+	private final CuratorFramework client;
+
+	private final String runningJobPath;
+
+	private static final String HA_JOB_REGISTRY_PATH = "high-availability.zookeeper.job.registry";
+
+	public ZookeeperRegistry(final CuratorFramework client, final Configuration configuration) {
+		this.client = client;
+		runningJobPath = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT) + 
+			configuration.getString(HA_JOB_REGISTRY_PATH, DEFAULT_HA_JOB_REGISTRY_PATH);
+	}
+
+	@Override
+	public void setJobRunning(JobID jobID) throws IOException {
+		checkNotNull(jobID);
+
+		try {
+			String zkPath = runningJobPath + jobID.toString();
+			this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+			this.client.setData().forPath(zkPath);
+		}
+		catch (Exception e) {
+			throw new IOException("Set running state to zk fail for job " + jobID.toString(), e);
+		}
+	}
+
+	@Override
+	public void setJobFinished(JobID jobID) throws IOException {
+		checkNotNull(jobID);
+
+		try {
+			String zkPath = runningJobPath + jobID.toString();
+			this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+			this.client.delete().forPath(zkPath);
+		}
+		catch (Exception e) {
+			throw new IOException("Set finished state to zk fail for job " + jobID.toString(), e);
+		}
+	}
+
+	@Override
+	public boolean isJobRunning(JobID jobID) throws IOException {
+		checkNotNull(jobID);
+
+		try {
+			Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString());
+			if (stat != null) {
+				return true;
+			}
+			return false;
+		}
+		catch (Exception e) {
+			throw new IOException("Get running state from zk fail for job " + jobID.toString(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
new file mode 100644
index 0000000..72982c8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+public class ZooKeeperRegistryTest extends TestLogger {
+	private TestingServer testingServer;
+
+	private static Logger LOG = LoggerFactory.getLogger(ZooKeeperRegistryTest.class);
+
+	@Before
+	public void before() throws Exception {
+		testingServer = new TestingServer();
+	}
+
+	@After
+	public void after() throws Exception {
+		testingServer.stop();
+		testingServer = null;
+	}
+
+	/**
+	 * Tests that the function of ZookeeperRegistry, setJobRunning(), setJobFinished(), isJobRunning()
+	 */
+	@Test
+	public void testZooKeeperRegistry() throws Exception {
+		Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+		HighAvailabilityServices zkHaService = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
+		RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();
+
+		try {
+			JobID jobID = JobID.generate();
+			assertTrue(!zkRegistry.isJobRunning(jobID));
+
+			zkRegistry.setJobRunning(jobID);
+			assertTrue(zkRegistry.isJobRunning(jobID));
+
+			zkRegistry.setJobFinished(jobID);
+			assertTrue(!zkRegistry.isJobRunning(jobID));
+
+		} finally {
+			if (zkHaService != null) {
+				zkHaService.close();
+			}
+		}
+	}
+}