You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/08/14 18:04:00 UTC
[2/2] hadoop git commit: HADOOP-14741. Refactor curator based ZooKeeper communication into common library. (Íñigo Goiri via Subru).
HADOOP-14741. Refactor curator based ZooKeeper communication into common library. (Íñigo Goiri via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a70efb61
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a70efb61
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a70efb61
Branch: refs/heads/branch-2
Commit: a70efb61381eeb33ea7c1d3df6405cce42f69587
Parents: 3945737
Author: Subru Krishnan <su...@apache.org>
Authored: Mon Aug 14 11:03:50 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Mon Aug 14 11:03:50 2017 -0700
----------------------------------------------------------------------
.../hadoop/fs/CommonConfigurationKeys.java | 21 ++
.../hadoop/util/curator/ZKCuratorManager.java | 294 +++++++++++++++++++
.../hadoop/util/curator/package-info.java | 27 ++
.../src/main/resources/core-default.xml | 46 +++
.../util/curator/TestZKCuratorManager.java | 95 ++++++
.../hadoop/yarn/conf/YarnConfiguration.java | 13 +-
.../yarn/conf/TestYarnConfigurationFields.java | 9 +
.../src/main/resources/yarn-default.xml | 53 ----
...ActiveStandbyElectorBasedElectorService.java | 5 +-
.../yarn/server/resourcemanager/RMZKUtils.java | 81 -----
.../server/resourcemanager/ResourceManager.java | 83 +++---
.../recovery/ZKRMStateStore.java | 38 ++-
.../server/resourcemanager/RMHATestBase.java | 5 +-
13 files changed, 567 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a70efb61/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 49062ea..c4eb795 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -364,4 +364,25 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
// HDFS client HTrace configuration.
public static final String FS_CLIENT_HTRACE_PREFIX = "fs.client.htrace.";
+
+ // Global ZooKeeper configuration keys
+ public static final String ZK_PREFIX = "hadoop.zk.";
+ /** ACL for the ZooKeeper ensemble. */
+ public static final String ZK_ACL = ZK_PREFIX + "acl";
+ public static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
+ /** Authentication for the ZooKeeper ensemble. */
+ public static final String ZK_AUTH = ZK_PREFIX + "auth";
+
+ /** Address of the ZooKeeper ensemble. */
+ public static final String ZK_ADDRESS = ZK_PREFIX + "address";
+ /** Maximum number of retries for a ZooKeeper operation. */
+ public static final String ZK_NUM_RETRIES = ZK_PREFIX + "num-retries";
+ public static final int ZK_NUM_RETRIES_DEFAULT = 1000;
+ /** Timeout for a ZooKeeper operation in ZooKeeper in milliseconds. */
+ public static final String ZK_TIMEOUT_MS = ZK_PREFIX + "timeout-ms";
+ public static final int ZK_TIMEOUT_MS_DEFAULT = 10000;
+ /** How often to retry a ZooKeeper operation in milliseconds. */
+ public static final String ZK_RETRY_INTERVAL_MS =
+ ZK_PREFIX + "retry-interval-ms";
+ public static final int ZK_RETRY_INTERVAL_MS_DEFAULT = 1000;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a70efb61/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
new file mode 100644
index 0000000..b980c41
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.util.curator;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.util.ZKUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class that provides utility methods specific to ZK operations.
+ */
+@InterfaceAudience.Private
+public final class ZKCuratorManager {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ZKCuratorManager.class);
+
+ /** Configuration for the ZooKeeper connection. */
+ private final Configuration conf;
+
+ /** Curator for ZooKeeper. */
+ private CuratorFramework curator;
+
+
+ public ZKCuratorManager(Configuration config) throws IOException {
+ this.conf = config;
+ }
+
+ /**
+ * Get the curator framework managing the ZooKeeper connection.
+ * @return Curator framework.
+ */
+ public CuratorFramework getCurator() {
+ return curator;
+ }
+
+ /**
+ * Close the connection with ZooKeeper.
+ */
+ public void close() {
+ if (curator != null) {
+ curator.close();
+ }
+ }
+
+ /**
+ * Utility method to fetch the ZK ACLs from the configuration.
+ * @throws java.io.IOException if the Zookeeper ACLs configuration file
+ * cannot be read
+ */
+ public static List<ACL> getZKAcls(Configuration conf) throws IOException {
+ // Parse authentication from configuration.
+ String zkAclConf = conf.get(CommonConfigurationKeys.ZK_ACL,
+ CommonConfigurationKeys.ZK_ACL_DEFAULT);
+ try {
+ zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
+ return ZKUtil.parseACLs(zkAclConf);
+ } catch (IOException | ZKUtil.BadAclFormatException e) {
+ LOG.error("Couldn't read ACLs based on {}",
+ CommonConfigurationKeys.ZK_ACL);
+ throw e;
+ }
+ }
+
+ /**
+ * Utility method to fetch ZK auth info from the configuration.
+ * @throws java.io.IOException if the Zookeeper ACLs configuration file
+ * cannot be read
+ */
+ public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
+ throws IOException {
+ String zkAuthConf = conf.get(CommonConfigurationKeys.ZK_AUTH);
+ try {
+ zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
+ if (zkAuthConf != null) {
+ return ZKUtil.parseAuth(zkAuthConf);
+ } else {
+ return Collections.emptyList();
+ }
+ } catch (IOException | ZKUtil.BadAuthFormatException e) {
+ LOG.error("Couldn't read Auth based on {}",
+ CommonConfigurationKeys.ZK_AUTH);
+ throw e;
+ }
+ }
+
+ /**
+ * Start the connection to the ZooKeeper ensemble.
+ * @param conf Configuration for the connection.
+ * @throws IOException If the connection cannot be started.
+ */
+ public void start() throws IOException {
+ this.start(new ArrayList<AuthInfo>());
+ }
+
+ /**
+ * Start the connection to the ZooKeeper ensemble.
+ * @param conf Configuration for the connection.
+ * @param authInfos List of authentication keys.
+ * @throws IOException If the connection cannot be started.
+ */
+ public void start(List<AuthInfo> authInfos) throws IOException {
+
+ // Connect to the ZooKeeper ensemble
+ String zkHostPort = conf.get(CommonConfigurationKeys.ZK_ADDRESS);
+ if (zkHostPort == null) {
+ throw new IOException(
+ CommonConfigurationKeys.ZK_ADDRESS + " is not configured.");
+ }
+ int numRetries = conf.getInt(CommonConfigurationKeys.ZK_NUM_RETRIES,
+ CommonConfigurationKeys.ZK_NUM_RETRIES_DEFAULT);
+ int zkSessionTimeout = conf.getInt(CommonConfigurationKeys.ZK_TIMEOUT_MS,
+ CommonConfigurationKeys.ZK_TIMEOUT_MS_DEFAULT);
+ int zkRetryInterval = conf.getInt(
+ CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS,
+ CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS_DEFAULT);
+ RetryNTimes retryPolicy = new RetryNTimes(numRetries, zkRetryInterval);
+
+ // Set up ZK auths
+ List<ZKUtil.ZKAuthInfo> zkAuths = getZKAuths(conf);
+ if (authInfos == null) {
+ authInfos = new ArrayList<>();
+ }
+ for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
+ authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
+ }
+
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(zkHostPort)
+ .sessionTimeoutMs(zkSessionTimeout)
+ .retryPolicy(retryPolicy)
+ .authorization(authInfos)
+ .build();
+ client.start();
+
+ this.curator = client;
+ }
+
+ /**
+ * Get ACLs for a ZNode.
+ * @param path Path of the ZNode.
+ * @return The list of ACLs.
+ * @throws Exception
+ */
+ public List<ACL> getACL(final String path) throws Exception {
+ return curator.getACL().forPath(path);
+ }
+
+ /**
+ * Get the data in a ZNode.
+ * @param path Path of the ZNode.
+ * @param stat Output statistics of the ZNode.
+ * @return The data in the ZNode.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public byte[] getData(final String path) throws Exception {
+ return curator.getData().forPath(path);
+ }
+
+ /**
+ * Get the data in a ZNode.
+ * @param path Path of the ZNode.
+ * @param stat Output statistics of the ZNode.
+ * @return The data in the ZNode.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public String getSringData(final String path) throws Exception {
+ byte[] bytes = getData(path);
+ return new String(bytes, Charset.forName("UTF-8"));
+ }
+
+ /**
+ * Set data into a ZNode.
+ * @param path Path of the ZNode.
+ * @param data Data to set.
+ * @param version Version of the data to store.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public void setData(String path, byte[] data, int version) throws Exception {
+ curator.setData().withVersion(version).forPath(path, data);
+ }
+
+ /**
+ * Set data into a ZNode.
+ * @param path Path of the ZNode.
+ * @param data Data to set as String.
+ * @param version Version of the data to store.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public void setData(String path, String data, int version) throws Exception {
+ byte[] bytes = data.getBytes(Charset.forName("UTF-8"));
+ setData(path, bytes, version);
+ }
+
+ /**
+ * Get children of a ZNode.
+ * @param path Path of the ZNode.
+ * @return The list of children.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public List<String> getChildren(final String path) throws Exception {
+ return curator.getChildren().forPath(path);
+ }
+
+ /**
+ * Check if a ZNode exists.
+ * @param path Path of the ZNode.
+ * @return If the ZNode exists.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public boolean exists(final String path) throws Exception {
+ return curator.checkExists().forPath(path) != null;
+ }
+
+ /**
+ * Create a ZNode.
+ * @param path Path of the ZNode.
+ * @return If the ZNode was created.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public boolean create(final String path) throws Exception {
+ return create(path, null);
+ }
+
+ /**
+ * Create a ZNode.
+ * @param path Path of the ZNode.
+ * @param zkAcl ACL for the node.
+ * @return If the ZNode was created.
+ * @throws Exception If it cannot contact Zookeeper.
+ */
+ public boolean create(final String path, List<ACL> zkAcl) throws Exception {
+ boolean created = false;
+ if (!exists(path)) {
+ curator.create()
+ .withMode(CreateMode.PERSISTENT)
+ .withACL(zkAcl)
+ .forPath(path, null);
+ created = true;
+ }
+ return created;
+ }
+
+ /**
+ * Delete a ZNode.
+ * @param path Path of the ZNode.
+ * @throws Exception If it cannot contact ZooKeeper.
+ */
+ public void delete(final String path) throws Exception {
+ if (exists(path)) {
+ curator.delete().deletingChildrenIfNeeded().forPath(path);
+ }
+ }
+
+ /**
+ * Get the path for a ZNode.
+ * @param root Root of the ZNode.
+ * @param nodeName Name of the ZNode.
+ * @return Path for the ZNode.
+ */
+ public static String getNodePath(String root, String nodeName) {
+ return root + "/" + nodeName;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a70efb61/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/package-info.java
new file mode 100644
index 0000000..7b93f5a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/package-info.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides utilities to interact with Curator ZooKeeper.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+package org.apache.hadoop.util.curator;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a70efb61/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 4ec5867..c2c6db5 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -2764,4 +2764,50 @@
This determines the number of open file handles.
</description>
</property>
+
+ <property>
+ <description>Host:Port of the ZooKeeper server to be used.
+ </description>
+ <name>hadoop.zk.address</name>
+ <!--value>127.0.0.1:2181</value-->
+ </property>
+
+ <property>
+ <description>Number of tries to connect to ZooKeeper.</description>
+ <name>hadoop.zk.num-retries</name>
+ <value>1000</value>
+ </property>
+
+ <property>
+ <description>Retry interval in milliseconds when connecting to ZooKeeper.
+ </description>
+ <name>hadoop.zk.retry-interval-ms</name>
+ <value>1000</value>
+ </property>
+
+ <property>
+ <description>ZooKeeper session timeout in milliseconds. Session expiration
+ is managed by the ZooKeeper cluster itself, not by the client. This value is
+ used by the cluster to determine when the client's session expires.
+ Expirations happens when the cluster does not hear from the client within
+ the specified session timeout period (i.e. no heartbeat).</description>
+ <name>hadoop.zk.timeout-ms</name>
+ <value>10000</value>
+ </property>
+
+ <property>
+ <description>ACL's to be used for ZooKeeper znodes.</description>
+ <name>hadoop.zk.acl</name>
+ <value>world:anyone:rwcda</value>
+ </property>
+
+ <property>
+ <description>
+ Specify the auths to be used for the ACL's specified in hadoop.zk.acl.
+ This takes a comma-separated list of authentication mechanisms, each of the
+ form 'scheme:auth' (the same syntax used for the 'addAuth' command in
+ the ZK CLI).
+ </description>
+ <name>hadoop.zk.auth</name>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a70efb61/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java
new file mode 100644
index 0000000..2bcf508
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the manager for ZooKeeper Curator.
+ */
+public class TestZKCuratorManager {
+
+ private TestingServer server;
+ private ZKCuratorManager curator;
+
+ @Before
+ public void setup() throws Exception {
+ this.server = new TestingServer();
+
+ Configuration conf = new Configuration();
+ conf.set(
+ CommonConfigurationKeys.ZK_ADDRESS, this.server.getConnectString());
+
+ this.curator = new ZKCuratorManager(conf);
+ this.curator.start();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ this.curator.close();
+ if (this.server != null) {
+ this.server.close();
+ this.server = null;
+ }
+ }
+
+ @Test
+ public void testReadWriteData() throws Exception {
+ String testZNode = "/test";
+ String expectedString = "testString";
+ assertFalse(curator.exists(testZNode));
+ curator.create(testZNode);
+ assertTrue(curator.exists(testZNode));
+ curator.setData(testZNode, expectedString, -1);
+ String testString = curator.getSringData("/test");
+ assertEquals(expectedString, testString);
+ }
+
+ @Test
+ public void testChildren() throws Exception {
+ List<String> children = curator.getChildren("/");
+ assertEquals(1, children.size());
+
+ assertFalse(curator.exists("/node1"));
+ curator.create("/node1");
+ assertTrue(curator.exists("/node1"));
+
+ assertFalse(curator.exists("/node2"));
+ curator.create("/node2");
+ assertTrue(curator.exists("/node2"));
+
+ children = curator.getChildren("/");
+ assertEquals(3, children.size());
+
+ curator.delete("/node2");
+ assertFalse(curator.exists("/node2"));
+ children = curator.getChildren("/");
+ assertEquals(2, children.size());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a70efb61/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 3e389a4..e67caea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.ActiveStandbyElector;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
@@ -83,7 +84,17 @@ public class YarnConfiguration extends Configuration {
private static void addDeprecatedKeys() {
Configuration.addDeprecations(new DeprecationDelta[] {
new DeprecationDelta("yarn.client.max-nodemanagers-proxies",
- NM_CLIENT_MAX_NM_PROXIES)
+ NM_CLIENT_MAX_NM_PROXIES),
+ new DeprecationDelta(RM_ZK_ACL, CommonConfigurationKeys.ZK_ACL),
+ new DeprecationDelta(RM_ZK_AUTH, CommonConfigurationKeys.ZK_AUTH),
+ new DeprecationDelta(RM_ZK_ADDRESS,
+ CommonConfigurationKeys.ZK_ADDRESS),
+ new DeprecationDelta(RM_ZK_NUM_RETRIES,
+ CommonConfigurationKeys.ZK_NUM_RETRIES),
+ new DeprecationDelta(RM_ZK_TIMEOUT_MS,
+ CommonConfigurationKeys.ZK_TIMEOUT_MS),
+ new DeprecationDelta(RM_ZK_RETRY_INTERVAL_MS,
+ CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS),
});
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a70efb61/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 6a31102..3145ee0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -78,6 +78,15 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
// Ignore all YARN Application Timeline Service (version 1) properties
configurationPrefixToSkipCompare.add("yarn.timeline-service.");
+ // skip deprecated ZooKeeper settings
+ configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_ADDRESS);
+ configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_NUM_RETRIES);
+ configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_TIMEOUT_MS);
+ configurationPropsToSkipCompare.add(
+ YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS);
+ configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_AUTH);
+ configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_ACL);
+
// Used as Java command line properties, not XML
configurationPrefixToSkipCompare.add("yarn.app.container");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a70efb61/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e984a33..88f12ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -438,31 +438,6 @@
</property>
<property>
- <description>Host:Port of the ZooKeeper server to be used by the RM. This
- must be supplied when using the ZooKeeper based implementation of the
- RM state store and/or embedded automatic failover in an HA setting.
- </description>
- <name>yarn.resourcemanager.zk-address</name>
- <!--value>127.0.0.1:2181</value-->
- </property>
-
- <property>
- <description>Number of times RM tries to connect to ZooKeeper.</description>
- <name>yarn.resourcemanager.zk-num-retries</name>
- <value>1000</value>
- </property>
-
- <property>
- <description>Retry interval in milliseconds when connecting to ZooKeeper.
- When HA is enabled, the value here is NOT used. It is generated
- automatically from yarn.resourcemanager.zk-timeout-ms and
- yarn.resourcemanager.zk-num-retries.
- </description>
- <name>yarn.resourcemanager.zk-retry-interval-ms</name>
- <value>1000</value>
- </property>
-
- <property>
<description>Full path of the ZooKeeper znode where RM state will be
stored. This must be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
@@ -472,22 +447,6 @@
</property>
<property>
- <description>ZooKeeper session timeout in milliseconds. Session expiration
- is managed by the ZooKeeper cluster itself, not by the client. This value is
- used by the cluster to determine when the client's session expires.
- Expirations happens when the cluster does not hear from the client within
- the specified session timeout period (i.e. no heartbeat).</description>
- <name>yarn.resourcemanager.zk-timeout-ms</name>
- <value>10000</value>
- </property>
-
- <property>
- <description>ACL's to be used for ZooKeeper znodes.</description>
- <name>yarn.resourcemanager.zk-acl</name>
- <value>world:anyone:rwcda</value>
- </property>
-
- <property>
<description>
ACLs to be used for the root znode when using ZKRMStateStore in an HA
scenario for fencing.
@@ -513,18 +472,6 @@
</property>
<property>
- <description>
- Specify the auths to be used for the ACL's specified in both the
- yarn.resourcemanager.zk-acl and
- yarn.resourcemanager.zk-state-store.root-node.acl properties. This
- takes a comma-separated list of authentication mechanisms, each of the
- form 'scheme:auth' (the same syntax used for the 'addAuth' command in
- the ZK CLI).
- </description>
- <name>yarn.resourcemanager.zk-auth</name>
- </property>
-
- <property>
<description>URI pointing to the location of the FileSystem path where
RM state will be stored. This must be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a70efb61/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
index a8dcda4..c5c9211 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -96,8 +97,8 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
- List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
- List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
+ List<ACL> zkAcls = ZKCuratorManager.getZKAcls(conf);
+ List<ZKUtil.ZKAuthInfo> zkAuths = ZKCuratorManager.getZKAuths(conf);
int maxRetryNum =
conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a70efb61/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
deleted file mode 100644
index 4b8561d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ZKUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.zookeeper.data.ACL;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Helper class that provides utility methods specific to ZK operations
- */
-@InterfaceAudience.Private
-public class RMZKUtils {
- private static final Log LOG = LogFactory.getLog(RMZKUtils.class);
-
- /**
- * Utility method to fetch the ZK ACLs from the configuration.
- *
- * @throws java.io.IOException if the Zookeeper ACLs configuration file
- * cannot be read
- */
- public static List<ACL> getZKAcls(Configuration conf) throws IOException {
- // Parse authentication from configuration.
- String zkAclConf =
- conf.get(YarnConfiguration.RM_ZK_ACL,
- YarnConfiguration.DEFAULT_RM_ZK_ACL);
- try {
- zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
- return ZKUtil.parseACLs(zkAclConf);
- } catch (IOException | ZKUtil.BadAclFormatException e) {
- LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
- throw e;
- }
- }
-
- /**
- * Utility method to fetch ZK auth info from the configuration.
- *
- * @throws java.io.IOException if the Zookeeper ACLs configuration file
- * cannot be read
- */
- public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
- throws IOException {
- String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
- try {
- zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
- if (zkAuthConf != null) {
- return ZKUtil.parseAuth(zkAuthConf);
- } else {
- return Collections.emptyList();
- }
- } catch (IOException | ZKUtil.BadAuthFormatException e) {
- LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
- throw e;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a70efb61/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 3ffdbbc..858b917 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -23,8 +23,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
@@ -44,7 +42,7 @@ import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -173,7 +171,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected ResourceTrackerService resourceTracker;
private JvmMetrics jvmMetrics;
private boolean curatorEnabled = false;
- private CuratorFramework curator;
+ private ZKCuratorManager zkManager;
private final String zkRootNodePassword =
Long.toString(new SecureRandom().nextLong());
private boolean recoveryEnabled;
@@ -316,7 +314,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
- this.curator = createAndStartCurator(conf);
+ this.zkManager = createAndStartZKManager(conf);
elector = new CuratorBasedElectorService(this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(this);
@@ -324,50 +322,49 @@ public class ResourceManager extends CompositeService implements Recoverable {
return elector;
}
- public CuratorFramework createAndStartCurator(Configuration conf)
+ /**
+ * Create and ZooKeeper Curator manager.
+ * @param config Configuration for the ZooKeeper curator.
+ * @return New ZooKeeper Curator manager.
+ * @throws IOException If it cannot create the manager.
+ */
+ public ZKCuratorManager createAndStartZKManager(Configuration config)
throws IOException {
- String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
- if (zkHostPort == null) {
- throw new YarnRuntimeException(
- YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
- }
- int numRetries = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
- YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
- int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
- YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
- int zkRetryInterval = conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
- YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
-
- // set up zk auths
- List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
+ ZKCuratorManager manager = new ZKCuratorManager(config);
+
+ // Get authentication
List<AuthInfo> authInfos = new ArrayList<>();
- for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
- authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
+ if (HAUtil.isHAEnabled(config) && HAUtil.getConfValueForRMInstance(
+ YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, config) == null) {
+ String zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
+ YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS, config);
+ String defaultFencingAuth =
+ zkRootNodeUsername + ":" + zkRootNodePassword;
+ byte[] defaultFencingAuthData =
+ defaultFencingAuth.getBytes(Charset.forName("UTF-8"));
+ String scheme = new DigestAuthenticationProvider().getScheme();
+ AuthInfo authInfo = new AuthInfo(scheme, defaultFencingAuthData);
+ authInfos.add(authInfo);
}
- if (HAUtil.isHAEnabled(conf) && HAUtil.getConfValueForRMInstance(
- YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf) == null) {
- String zkRootNodeUsername = HAUtil
- .getConfValueForRMInstance(YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
- byte[] defaultFencingAuth =
- (zkRootNodeUsername + ":" + zkRootNodePassword)
- .getBytes(Charset.forName("UTF-8"));
- authInfos.add(new AuthInfo(new DigestAuthenticationProvider().getScheme(),
- defaultFencingAuth));
- }
+ manager.start(authInfos);
+ return manager;
+ }
- CuratorFramework client = CuratorFrameworkFactory.builder()
- .connectString(zkHostPort)
- .sessionTimeoutMs(zkSessionTimeout)
- .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval))
- .authorization(authInfos).build();
- client.start();
- return client;
+ /**
+ * Get the ZooKeeper Curator manager.
+ * @return ZooKeeper Curator manager.
+ */
+ public ZKCuratorManager getZKManager() {
+ return this.zkManager;
}
public CuratorFramework getCurator() {
- return this.curator;
+ if (this.zkManager == null) {
+ return null;
+ }
+ return this.zkManager.getCurator();
}
public String getZkRootNodePassword() {
@@ -1119,8 +1116,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
configurationProvider.close();
}
super.serviceStop();
- if (curator != null) {
- curator.close();
+ if (zkManager != null) {
+ zkManager.close();
}
transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a70efb61/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 31a5363..12fb3a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -46,7 +47,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -201,8 +201,8 @@ public class ZKRMStateStore extends RMStateStore {
private final String zkRootNodeAuthScheme =
new DigestAuthenticationProvider().getScheme();
- @VisibleForTesting
- protected CuratorFramework curatorFramework;
+ /** Manager for the ZooKeeper connection. */
+ private ZKCuratorManager zkManager;
/*
* Indicates different app attempt state store operations.
@@ -298,12 +298,11 @@ public class ZKRMStateStore extends RMStateStore {
appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
}
- zkAcl = RMZKUtils.getZKAcls(conf);
+ zkAcl = ZKCuratorManager.getZKAcls(conf);
if (HAUtil.isHAEnabled(conf)) {
String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
(YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
-
if (zkRootNodeAclConf != null) {
zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf);
@@ -330,10 +329,9 @@ public class ZKRMStateStore extends RMStateStore {
amrmTokenSecretManagerRoot =
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
- curatorFramework = resourceManager.getCurator();
-
- if (curatorFramework == null) {
- curatorFramework = resourceManager.createAndStartCurator(conf);
+ zkManager = resourceManager.getZKManager();
+ if (zkManager == null) {
+ zkManager = resourceManager.createAndStartZKManager(conf);
}
}
@@ -382,6 +380,7 @@ public class ZKRMStateStore extends RMStateStore {
logRootNodeAcls("Before setting ACLs'\n");
}
+ CuratorFramework curatorFramework = zkManager.getCurator();
if (HAUtil.isHAEnabled(getConfig())) {
curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath);
} else {
@@ -401,6 +400,7 @@ public class ZKRMStateStore extends RMStateStore {
}
if (!HAUtil.isHAEnabled(getConfig())) {
+ CuratorFramework curatorFramework = zkManager.getCurator();
IOUtils.closeStream(curatorFramework);
}
}
@@ -937,6 +937,7 @@ public class ZKRMStateStore extends RMStateStore {
}
safeDelete(appIdRemovePath);
} else {
+ CuratorFramework curatorFramework = zkManager.getCurator();
curatorFramework.delete().deletingChildrenIfNeeded().
forPath(appIdRemovePath);
}
@@ -1237,38 +1238,32 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting
byte[] getData(final String path) throws Exception {
- return curatorFramework.getData().forPath(path);
+ return zkManager.getData(path);
}
@VisibleForTesting
List<ACL> getACL(final String path) throws Exception {
- return curatorFramework.getACL().forPath(path);
+ return zkManager.getACL(path);
}
@VisibleForTesting
List<String> getChildren(final String path) throws Exception {
- return curatorFramework.getChildren().forPath(path);
+ return zkManager.getChildren(path);
}
@VisibleForTesting
boolean exists(final String path) throws Exception {
- return curatorFramework.checkExists().forPath(path) != null;
+ return zkManager.exists(path);
}
@VisibleForTesting
void create(final String path) throws Exception {
- if (!exists(path)) {
- curatorFramework.create()
- .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
- .forPath(path, null);
- }
+ zkManager.create(path, zkAcl);
}
@VisibleForTesting
void delete(final String path) throws Exception {
- if (exists(path)) {
- curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
- }
+ zkManager.delete(path);
}
private void safeCreate(String path, byte[] data, List<ACL> acl,
@@ -1311,6 +1306,7 @@ public class ZKRMStateStore extends RMStateStore {
private CuratorTransactionFinal transactionFinal;
SafeTransaction() throws Exception {
+ CuratorFramework curatorFramework = zkManager.getCurator();
CuratorTransaction transaction = curatorFramework.inTransaction();
transactionFinal = transaction.create()
.withMode(CreateMode.PERSISTENT).withACL(zkAcl)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a70efb61/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
index c95bcdf..4d8b20d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -61,8 +62,8 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
configuration.set(YarnConfiguration.RM_STORE,
ZKRMStateStore.class.getName());
- configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
- configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
+ configuration.set(CommonConfigurationKeys.ZK_ADDRESS, hostPort);
+ configuration.setInt(CommonConfigurationKeys.ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
int base = 100;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org