You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/06/08 23:51:11 UTC
hadoop git commit: YARN-2716. Refactor ZKRMStateStore retry code with
Apache Curator. Contributed by Karthik Kambatla
Repository: hadoop
Updated Branches:
refs/heads/trunk 0e80d5198 -> 960b8f19c
YARN-2716. Refactor ZKRMStateStore retry code with Apache Curator. Contributed by Karthik Kambatla
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/960b8f19
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/960b8f19
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/960b8f19
Branch: refs/heads/trunk
Commit: 960b8f19ca98dbcfdd30f2f1f275b8718d2e872f
Parents: 0e80d51
Author: Jian He <ji...@apache.org>
Authored: Mon Jun 8 14:50:58 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Mon Jun 8 14:50:58 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 2 +-
.../hadoop-yarn-server-resourcemanager/pom.xml | 8 +
.../recovery/ZKRMStateStore.java | 770 ++++++-------------
.../recovery/RMStateStoreTestBase.java | 3 +-
.../recovery/TestZKRMStateStore.java | 83 +-
.../recovery/TestZKRMStateStorePerf.java | 12 +-
.../TestZKRMStateStoreZKClientConnections.java | 181 +----
8 files changed, 336 insertions(+), 726 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f393cad..86494cc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -294,6 +294,9 @@ Release 2.8.0 - UNRELEASED
YARN-1462. AHS API and other AHS changes to handle tags for completed MR jobs. (xgong)
+ YARN-2716. Refactor ZKRMStateStore retry code with Apache Curator.
+ (Karthik Kambatla via jianhe)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 72855cc..3ea1558 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -413,7 +413,7 @@ public class YarnConfiguration extends Configuration {
public static final String RM_ZK_RETRY_INTERVAL_MS =
RM_ZK_PREFIX + "retry-interval-ms";
- public static final long DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000;
+ public static final int DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000;
public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms";
public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 76d280a..4960f95 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -175,6 +175,14 @@
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 15ac971..bca5348 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -22,22 +22,25 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.IOException;
import java.nio.charset.Charset;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -64,14 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.Ap
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
@@ -80,7 +76,37 @@ import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import com.google.common.annotations.VisibleForTesting;
/**
- * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
+ * {@link RMStateStore} implementation backed by ZooKeeper.
+ *
+ * The znode structure is as follows:
+ * ROOT_DIR_PATH
+ * |--- VERSION_INFO
+ * |--- EPOCH_NODE
+ * |--- RM_ZK_FENCING_LOCK
+ * |--- RM_APP_ROOT
+ * | |----- (#ApplicationId1)
+ * | | |----- (#ApplicationAttemptIds)
+ * | |
+ * | |----- (#ApplicationId2)
+ * | | |----- (#ApplicationAttemptIds)
+ * | ....
+ * |
+ * |--- RM_DT_SECRET_MANAGER_ROOT
+ * |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
+ * |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
+ * | |----- Token_1
+ * | |----- Token_2
+ * | ....
+ * |
+ * |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
+ * | |----- Key_1
+ * | |----- Key_2
+ * ....
+ * |--- AMRMTOKEN_SECRET_MANAGER_ROOT
+ * |----- currentMasterKey
+ * |----- nextMasterKey
+ *
+ * Note: Changes from 1.1 to 1.2 - AMRMTokenSecretManager state has been saved
* separately. The currentMasterkey and nextMasterkey have been stored.
* Also, AMRMToken has been removed from ApplicationAttemptState.
*/
@@ -100,46 +126,14 @@ public class ZKRMStateStore extends RMStateStore {
"RMDTSequentialNumber";
private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
"RMDTMasterKeysRoot";
- private int numRetries;
private String zkHostPort = null;
+ private int numRetries;
private int zkSessionTimeout;
-
@VisibleForTesting
- long zkRetryInterval;
- private List<ACL> zkAcl;
- private List<ZKUtil.ZKAuthInfo> zkAuths;
+ int zkRetryInterval;
- /**
- *
- * ROOT_DIR_PATH
- * |--- VERSION_INFO
- * |--- EPOCH_NODE
- * |--- RM_ZK_FENCING_LOCK
- * |--- RM_APP_ROOT
- * | |----- (#ApplicationId1)
- * | | |----- (#ApplicationAttemptIds)
- * | |
- * | |----- (#ApplicationId2)
- * | | |----- (#ApplicationAttemptIds)
- * | ....
- * |
- * |--- RM_DT_SECRET_MANAGER_ROOT
- * |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
- * |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
- * | |----- Token_1
- * | |----- Token_2
- * | ....
- * |
- * |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
- * | |----- Key_1
- * | |----- Key_2
- * ....
- * |--- AMRMTOKEN_SECRET_MANAGER_ROOT
- * |----- currentMasterKey
- * |----- nextMasterKey
- *
- */
+ /** Znode paths */
private String zkRootNodePath;
private String rmAppRoot;
private String rmDTSecretManagerRoot;
@@ -147,37 +141,29 @@ public class ZKRMStateStore extends RMStateStore {
private String delegationTokensRootPath;
private String dtSequenceNumberPath;
private String amrmTokenSecretManagerRoot;
-
@VisibleForTesting
protected String znodeWorkingPath;
- @VisibleForTesting
- protected ZooKeeper zkClient;
-
- /* activeZkClient is not used to do actual operations,
- * it is only used to verify client session for watched events and
- * it gets activated into zkClient on connection event.
- */
- @VisibleForTesting
- ZooKeeper activeZkClient;
-
/** Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
+ private boolean useDefaultFencingScheme = false;
private String fencingNodePath;
- private Op createFencingNodePathOp;
- private Op deleteFencingNodePathOp;
private Thread verifyActiveStatusThread;
- private String zkRootNodeUsername;
- private final String zkRootNodePassword = Long.toString(random.nextLong());
+ /** ACL and auth info */
+ private List<ACL> zkAcl;
+ private List<ZKUtil.ZKAuthInfo> zkAuths;
@VisibleForTesting
List<ACL> zkRootNodeAcl;
- private boolean useDefaultFencingScheme = false;
+ private String zkRootNodeUsername;
+ private final String zkRootNodePassword = Long.toString(random.nextLong());
public static final int CREATE_DELETE_PERMS =
ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
private final String zkRootNodeAuthScheme =
new DigestAuthenticationProvider().getScheme();
+ @VisibleForTesting
+ protected CuratorFramework curatorFramework;
/**
* Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
* ZooKeeper access, construct the {@link ACL}s for the store's root node.
@@ -192,7 +178,7 @@ public class ZKRMStateStore extends RMStateStore {
@Unstable
protected List<ACL> constructZkRootNodeACL(
Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
- List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
+ List<ACL> zkRootNodeAcl = new ArrayList<>();
for (ACL acl : sourceACLs) {
zkRootNodeAcl.add(new ACL(
ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
@@ -231,7 +217,7 @@ public class ZKRMStateStore extends RMStateStore {
zkRetryInterval = zkSessionTimeout / numRetries;
} else {
zkRetryInterval =
- conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
+ conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
}
@@ -243,9 +229,6 @@ public class ZKRMStateStore extends RMStateStore {
/* Initialize fencing related paths, acls, and ops */
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
- createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl,
- CreateMode.PERSISTENT);
- deleteFencingNodePathOp = Op.delete(fencingNodePath, -1);
if (HAUtil.isHAEnabled(conf)) {
String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
(YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
@@ -283,42 +266,23 @@ public class ZKRMStateStore extends RMStateStore {
// ensure root dirs exist
createRootDirRecursively(znodeWorkingPath);
- createRootDir(zkRootNodePath);
+ create(zkRootNodePath);
if (HAUtil.isHAEnabled(getConfig())){
fence();
verifyActiveStatusThread = new VerifyActiveStatusThread();
verifyActiveStatusThread.start();
}
- createRootDir(rmAppRoot);
- createRootDir(rmDTSecretManagerRoot);
- createRootDir(dtMasterKeysRootPath);
- createRootDir(delegationTokensRootPath);
- createRootDir(dtSequenceNumberPath);
- createRootDir(amrmTokenSecretManagerRoot);
- }
-
- protected void createRootDir(final String rootPath) throws Exception {
- // For root dirs, we shouldn't use the doMulti helper methods
- new ZKAction<String>() {
- @Override
- public String run() throws KeeperException, InterruptedException {
- try {
- return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT);
- } catch (KeeperException ke) {
- if (ke.code() == Code.NODEEXISTS) {
- LOG.debug(rootPath + "znode already exists!");
- return null;
- } else {
- throw ke;
- }
- }
- }
- }.runWithRetries();
+ create(rmAppRoot);
+ create(rmDTSecretManagerRoot);
+ create(dtMasterKeysRootPath);
+ create(delegationTokensRootPath);
+ create(dtSequenceNumberPath);
+ create(amrmTokenSecretManagerRoot);
}
private void logRootNodeAcls(String prefix) throws Exception {
Stat getStat = new Stat();
- List<ACL> getAcls = getACLWithRetries(zkRootNodePath, getStat);
+ List<ACL> getAcls = getACL(zkRootNodePath);
StringBuilder builder = new StringBuilder();
builder.append(prefix);
@@ -334,51 +298,21 @@ public class ZKRMStateStore extends RMStateStore {
logRootNodeAcls("Before fencing\n");
}
- new ZKAction<Void>() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- zkClient.setACL(zkRootNodePath, zkRootNodeAcl, -1);
- return null;
- }
- }.runWithRetries();
-
- // delete fencingnodepath
- new ZKAction<Void>() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- try {
- zkClient.multi(Collections.singletonList(deleteFencingNodePathOp));
- } catch (KeeperException.NoNodeException nne) {
- LOG.info("Fencing node " + fencingNodePath + " doesn't exist to delete");
- }
- return null;
- }
- }.runWithRetries();
+ curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath);
+ delete(fencingNodePath);
if (LOG.isTraceEnabled()) {
logRootNodeAcls("After fencing\n");
}
}
- private synchronized void closeZkClients() throws IOException {
- zkClient = null;
- if (activeZkClient != null) {
- try {
- activeZkClient.close();
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while closing ZK", e);
- }
- activeZkClient = null;
- }
- }
-
@Override
protected synchronized void closeInternal() throws Exception {
if (verifyActiveStatusThread != null) {
verifyActiveStatusThread.interrupt();
verifyActiveStatusThread.join(1000);
}
- closeZkClients();
+ curatorFramework.close();
}
@Override
@@ -391,10 +325,10 @@ public class ZKRMStateStore extends RMStateStore {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
byte[] data =
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
- if (existsWithRetries(versionNodePath, false) != null) {
- setDataWithRetries(versionNodePath, data, -1);
+ if (exists(versionNodePath)) {
+ safeSetData(versionNodePath, data, -1);
} else {
- createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
+ safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
}
}
@@ -402,11 +336,9 @@ public class ZKRMStateStore extends RMStateStore {
protected synchronized Version loadVersion() throws Exception {
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
- if (existsWithRetries(versionNodePath, false) != null) {
- byte[] data = getDataWithRetries(versionNodePath, false);
- Version version =
- new VersionPBImpl(VersionProto.parseFrom(data));
- return version;
+ if (exists(versionNodePath)) {
+ byte[] data = getData(versionNodePath);
+ return new VersionPBImpl(VersionProto.parseFrom(data));
}
return null;
}
@@ -415,20 +347,20 @@ public class ZKRMStateStore extends RMStateStore {
public synchronized long getAndIncrementEpoch() throws Exception {
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
long currentEpoch = 0;
- if (existsWithRetries(epochNodePath, false) != null) {
+ if (exists(epochNodePath)) {
// load current epoch
- byte[] data = getDataWithRetries(epochNodePath, false);
+ byte[] data = getData(epochNodePath);
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch();
// increment epoch and store it
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
.toByteArray();
- setDataWithRetries(epochNodePath, storeData, -1);
+ safeSetData(epochNodePath, storeData, -1);
} else {
// initialize epoch node with 1 for the next time.
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
.toByteArray();
- createWithRetries(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
+ safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
}
return currentEpoch;
}
@@ -447,7 +379,7 @@ public class ZKRMStateStore extends RMStateStore {
private void loadAMRMTokenSecretManagerState(RMState rmState)
throws Exception {
- byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, false);
+ byte[] data = getData(amrmTokenSecretManagerRoot);
if (data == null) {
LOG.warn("There is no data saved");
return;
@@ -458,7 +390,6 @@ public class ZKRMStateStore extends RMStateStore {
rmState.amrmTokenSecretManagerState =
AMRMTokenSecretManagerState.newInstance(
stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
-
}
private synchronized void loadRMDTSecretManagerState(RMState rmState)
@@ -470,10 +401,10 @@ public class ZKRMStateStore extends RMStateStore {
private void loadRMDelegationKeyState(RMState rmState) throws Exception {
List<String> childNodes =
- getChildrenWithRetries(dtMasterKeysRootPath, false);
+ getChildren(dtMasterKeysRootPath);
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
- byte[] childData = getDataWithRetries(childNodePath, false);
+ byte[] childData = getData(childNodePath);
if (childData == null) {
LOG.warn("Content of " + childNodePath + " is broken.");
@@ -500,7 +431,7 @@ public class ZKRMStateStore extends RMStateStore {
}
private void loadRMSequentialNumberState(RMState rmState) throws Exception {
- byte[] seqData = getDataWithRetries(dtSequenceNumberPath, false);
+ byte[] seqData = getData(dtSequenceNumberPath);
if (seqData != null) {
ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);
DataInputStream seqIn = new DataInputStream(seqIs);
@@ -515,11 +446,11 @@ public class ZKRMStateStore extends RMStateStore {
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
List<String> childNodes =
- getChildrenWithRetries(delegationTokensRootPath, false);
+ getChildren(delegationTokensRootPath);
for (String childNodeName : childNodes) {
String childNodePath =
getNodePath(delegationTokensRootPath, childNodeName);
- byte[] childData = getDataWithRetries(childNodePath, false);
+ byte[] childData = getData(childNodePath);
if (childData == null) {
LOG.warn("Content of " + childNodePath + " is broken.");
@@ -551,10 +482,10 @@ public class ZKRMStateStore extends RMStateStore {
}
private synchronized void loadRMAppState(RMState rmState) throws Exception {
- List<String> childNodes = getChildrenWithRetries(rmAppRoot, false);
+ List<String> childNodes = getChildren(rmAppRoot);
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(rmAppRoot, childNodeName);
- byte[] childData = getDataWithRetries(childNodePath, false);
+ byte[] childData = getData(childNodePath);
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
if (LOG.isDebugEnabled()) {
@@ -581,11 +512,11 @@ public class ZKRMStateStore extends RMStateStore {
ApplicationId appId)
throws Exception {
String appPath = getNodePath(rmAppRoot, appId.toString());
- List<String> attempts = getChildrenWithRetries(appPath, false);
+ List<String> attempts = getChildren(appPath);
for (String attemptIDStr : attempts) {
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
String attemptPath = getNodePath(appPath, attemptIDStr);
- byte[] attemptData = getDataWithRetries(attemptPath, false);
+ byte[] attemptData = getData(attemptPath);
ApplicationAttemptStateDataPBImpl attemptState =
new ApplicationAttemptStateDataPBImpl(
@@ -606,8 +537,8 @@ public class ZKRMStateStore extends RMStateStore {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
- createWithRetries(nodeCreatePath, appStateData, zkAcl,
- CreateMode.PERSISTENT);
+ safeCreate(nodeCreatePath, appStateData, zkAcl,
+ CreateMode.PERSISTENT);
}
@@ -622,11 +553,11 @@ public class ZKRMStateStore extends RMStateStore {
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
- if (existsWithRetries(nodeUpdatePath, false) != null) {
- setDataWithRetries(nodeUpdatePath, appStateData, -1);
+ if (exists(nodeUpdatePath)) {
+ safeSetData(nodeUpdatePath, appStateData, -1);
} else {
- createWithRetries(nodeUpdatePath, appStateData, zkAcl,
- CreateMode.PERSISTENT);
+ safeCreate(nodeUpdatePath, appStateData, zkAcl,
+ CreateMode.PERSISTENT);
LOG.debug(appId + " znode didn't exist. Created a new znode to"
+ " update the application state.");
}
@@ -646,8 +577,8 @@ public class ZKRMStateStore extends RMStateStore {
+ nodeCreatePath);
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
- createWithRetries(nodeCreatePath, attemptStateData, zkAcl,
- CreateMode.PERSISTENT);
+ safeCreate(nodeCreatePath, attemptStateData, zkAcl,
+ CreateMode.PERSISTENT);
}
@Override
@@ -665,11 +596,11 @@ public class ZKRMStateStore extends RMStateStore {
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
- if (existsWithRetries(nodeUpdatePath, false) != null) {
- setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
+ if (exists(nodeUpdatePath)) {
+ safeSetData(nodeUpdatePath, attemptStateData, -1);
} else {
- createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
- CreateMode.PERSISTENT);
+ safeCreate(nodeUpdatePath, attemptStateData, zkAcl,
+ CreateMode.PERSISTENT);
LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
+ " update the application attempt state.");
}
@@ -682,28 +613,26 @@ public class ZKRMStateStore extends RMStateStore {
String appId = appState.getApplicationSubmissionContext().getApplicationId()
.toString();
String appIdRemovePath = getNodePath(rmAppRoot, appId);
- ArrayList<Op> opList = new ArrayList<Op>();
-
- for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
- String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
- opList.add(Op.delete(attemptRemovePath, -1));
- }
- opList.add(Op.delete(appIdRemovePath, -1));
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
+ " and its attempts.");
}
- doDeleteMultiWithRetries(opList);
+
+ for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
+ String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
+ safeDelete(attemptRemovePath);
+ }
+ safeDelete(appIdRemovePath);
}
@Override
protected synchronized void storeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
- ArrayList<Op> opList = new ArrayList<Op>();
- addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
- doStoreMultiWithRetries(opList);
+ SafeTransaction trx = new SafeTransaction();
+ addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
+ trx.commit();
}
@Override
@@ -716,35 +645,29 @@ public class ZKRMStateStore extends RMStateStore {
LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber());
}
- if (existsWithRetries(nodeRemovePath, false) != null) {
- ArrayList<Op> opList = new ArrayList<Op>();
- opList.add(Op.delete(nodeRemovePath, -1));
- doDeleteMultiWithRetries(opList);
- } else {
- LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
- }
+ safeDelete(nodeRemovePath);
}
@Override
protected synchronized void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
- ArrayList<Op> opList = new ArrayList<Op>();
+ SafeTransaction trx = new SafeTransaction();
String nodeRemovePath =
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
- if (existsWithRetries(nodeRemovePath, false) == null) {
+ if (exists(nodeRemovePath)) {
+ // in case znode exists
+ addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true);
+ } else {
// in case znode doesn't exist
- addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
+ addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
- } else {
- // in case znode exists
- addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
}
- doStoreMultiWithRetries(opList);
+ trx.commit();
}
- private void addStoreOrUpdateOps(ArrayList<Op> opList,
+ private void addStoreOrUpdateOps(SafeTransaction trx,
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
boolean isUpdate) throws Exception {
// store RM delegation token
@@ -762,18 +685,18 @@ public class ZKRMStateStore extends RMStateStore {
}
if (isUpdate) {
- opList.add(Op.setData(nodeCreatePath, identifierData.toByteArray(), -1));
+ trx.setData(nodeCreatePath, identifierData.toByteArray(), -1);
} else {
- opList.add(Op.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
- CreateMode.PERSISTENT));
+ trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
+ CreateMode.PERSISTENT);
// Update Sequence number only while storing DT
seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug((isUpdate ? "Storing " : "Updating ") +
- dtSequenceNumberPath + ". SequenceNumber: "
- + rmDTIdentifier.getSequenceNumber());
+ dtSequenceNumberPath + ". SequenceNumber: "
+ + rmDTIdentifier.getSequenceNumber());
}
- opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
+ trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
}
} finally {
seqOs.close();
@@ -793,7 +716,7 @@ public class ZKRMStateStore extends RMStateStore {
}
delegationKey.write(fsOut);
try {
- createWithRetries(nodeCreatePath, os.toByteArray(), zkAcl,
+ safeCreate(nodeCreatePath, os.toByteArray(), zkAcl,
CreateMode.PERSISTENT);
} finally {
os.close();
@@ -809,243 +732,174 @@ public class ZKRMStateStore extends RMStateStore {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
}
- if (existsWithRetries(nodeRemovePath, false) != null) {
- doDeleteMultiWithRetries(Op.delete(nodeRemovePath, -1));
- } else {
- LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
- }
+ safeDelete(nodeRemovePath);
}
@Override
public synchronized void deleteStore() throws Exception {
- if (existsWithRetries(zkRootNodePath, false) != null) {
- deleteWithRetries(zkRootNodePath, false);
- }
+ delete(zkRootNodePath);
}
@Override
public synchronized void removeApplication(ApplicationId removeAppId)
throws Exception {
String appIdRemovePath = getNodePath(rmAppRoot, removeAppId.toString());
- if (existsWithRetries(appIdRemovePath, false) != null) {
- deleteWithRetries(appIdRemovePath, false);
- }
- }
-
- // ZK related code
- /**
- * Watcher implementation which forward events to the ZKRMStateStore This
- * hides the ZK methods of the store from its public interface
- */
- private final class ForwardingWatcher implements Watcher {
- private ZooKeeper watchedZkClient;
-
- public ForwardingWatcher(ZooKeeper client) {
- this.watchedZkClient = client;
- }
-
- @Override
- public void process(WatchedEvent event) {
- try {
- ZKRMStateStore.this.processWatchEvent(watchedZkClient, event);
- } catch (Throwable t) {
- LOG.error("Failed to process watcher event " + event + ": "
- + StringUtils.stringifyException(t));
- }
- }
+ delete(appIdRemovePath);
}
@VisibleForTesting
- @Private
- @Unstable
- public synchronized void processWatchEvent(ZooKeeper zk,
- WatchedEvent event) throws Exception {
- // only process watcher event from current ZooKeeper Client session.
- if (zk != activeZkClient) {
- LOG.info("Ignore watcher event type: " + event.getType() +
- " with state:" + event.getState() + " for path:" +
- event.getPath() + " from old session");
- return;
- }
-
- Event.EventType eventType = event.getType();
- LOG.info("Watcher event type: " + eventType + " with state:"
- + event.getState() + " for path:" + event.getPath() + " for " + this);
-
- if (eventType == Event.EventType.None) {
-
- // the connection state has changed
- switch (event.getState()) {
- case SyncConnected:
- LOG.info("ZKRMStateStore Session connected");
- if (zkClient == null) {
- // the SyncConnected must be from the client that sent Disconnected
- zkClient = activeZkClient;
- ZKRMStateStore.this.notifyAll();
- LOG.info("ZKRMStateStore Session restored");
- }
- break;
- case Disconnected:
- LOG.info("ZKRMStateStore Session disconnected");
- zkClient = null;
- break;
- case Expired:
- // the connection got terminated because of session timeout
- // call listener to reconnect
- LOG.info("ZKRMStateStore Session expired");
- createConnection();
- break;
- default:
- LOG.error("Unexpected Zookeeper" +
- " watch event state: " + event.getState());
- break;
- }
- }
- }
-
- @VisibleForTesting
- @Private
- @Unstable
String getNodePath(String root, String nodeName) {
return (root + "/" + nodeName);
}
- /**
- * Helper method that creates fencing node, executes the passed operations,
- * and deletes the fencing node.
- */
- private synchronized void doStoreMultiWithRetries(
- final List<Op> opList) throws Exception {
- final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
- execOpList.add(createFencingNodePathOp);
- execOpList.addAll(opList);
- execOpList.add(deleteFencingNodePathOp);
- new ZKAction<Void>() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- zkClient.multi(execOpList);
- return null;
- }
- }.runWithRetries();
+ @Override
+ public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
+ AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
+ throws Exception {
+ AMRMTokenSecretManagerState data =
+ AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
+ byte[] stateData = data.getProto().toByteArray();
+ safeSetData(amrmTokenSecretManagerRoot, stateData, -1);
}
/**
- * Helper method that creates fencing node, executes the passed operation,
- * and deletes the fencing node.
+ * Utility function to ensure that the configured base znode exists.
+ * This recursively creates the znode as well as all of its parents.
*/
- private void doStoreMultiWithRetries(final Op op) throws Exception {
- doStoreMultiWithRetries(Collections.singletonList(op));
+ private void createRootDirRecursively(String path) throws Exception {
+ String pathParts[] = path.split("/");
+ Preconditions.checkArgument(pathParts.length >= 1 && pathParts[0].isEmpty(),
+ "Invalid path: %s", path);
+ StringBuilder sb = new StringBuilder();
+ for (int i = 1; i < pathParts.length; i++) {
+ sb.append("/").append(pathParts[i]);
+ create(sb.toString());
+ }
}
- /**
- * Helper method that creates fencing node, executes the passed
- * delete related operations and deletes the fencing node.
+ /*
+ * ZK operations using curator
*/
- private synchronized void doDeleteMultiWithRetries(
- final List<Op> opList) throws Exception {
- final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
- execOpList.add(createFencingNodePathOp);
- execOpList.addAll(opList);
- execOpList.add(deleteFencingNodePathOp);
- new ZKAction<Void>() {
- @Override
- public Void run() throws KeeperException, InterruptedException {
- setHasDeleteNodeOp(true);
- zkClient.multi(execOpList);
- return null;
- }
- }.runWithRetries();
- }
+ private void createConnection() throws Exception {
+ // Curator connection
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+ builder = builder.connectString(zkHostPort)
+ .connectionTimeoutMs(zkSessionTimeout)
+ .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval));
+
+ // Set up authorization based on fencing scheme
+ List<AuthInfo> authInfos = new ArrayList<>();
+ for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
+ authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
+ }
+ if (useDefaultFencingScheme) {
+ byte[] defaultFencingAuth =
+ (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(
+ Charset.forName("UTF-8"));
+ authInfos.add(new AuthInfo(zkRootNodeAuthScheme, defaultFencingAuth));
+ }
+ builder = builder.authorization(authInfos);
- private void doDeleteMultiWithRetries(final Op op) throws Exception {
- doDeleteMultiWithRetries(Collections.singletonList(op));
+ // Connect to ZK
+ curatorFramework = builder.build();
+ curatorFramework.start();
}
@VisibleForTesting
- @Private
- @Unstable
- public void createWithRetries(
- final String path, final byte[] data, final List<ACL> acl,
- final CreateMode mode) throws Exception {
- doStoreMultiWithRetries(Op.create(path, data, acl, mode));
+ byte[] getData(final String path) throws Exception {
+ return curatorFramework.getData().forPath(path);
}
- @VisibleForTesting
- @Private
- @Unstable
- public void setDataWithRetries(final String path, final byte[] data,
- final int version) throws Exception {
- doStoreMultiWithRetries(Op.setData(path, data, version));
+ private List<ACL> getACL(final String path) throws Exception {
+ return curatorFramework.getACL().forPath(path);
+ }
+
+ private List<String> getChildren(final String path) throws Exception {
+ return curatorFramework.getChildren().forPath(path);
+ }
+
+ private boolean exists(final String path) throws Exception {
+ return curatorFramework.checkExists().forPath(path) != null;
}
@VisibleForTesting
- @Private
- @Unstable
- public byte[] getDataWithRetries(final String path, final boolean watch)
- throws Exception {
- return new ZKAction<byte[]>() {
- @Override
- public byte[] run() throws KeeperException, InterruptedException {
- return zkClient.getData(path, watch, null);
- }
- }.runWithRetries();
+ void create(final String path) throws Exception {
+ if (!exists(path)) {
+ curatorFramework.create()
+ .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
+ .forPath(path, null);
+ }
}
- private List<ACL> getACLWithRetries(
- final String path, final Stat stat) throws Exception {
- return new ZKAction<List<ACL>>() {
- @Override
- public List<ACL> run() throws KeeperException, InterruptedException {
- return zkClient.getACL(path, stat);
- }
- }.runWithRetries();
+ @VisibleForTesting
+ void delete(final String path) throws Exception {
+ if (exists(path)) {
+ curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
+ }
}
- private List<String> getChildrenWithRetries(
- final String path, final boolean watch) throws Exception {
- return new ZKAction<List<String>>() {
- @Override
- List<String> run() throws KeeperException, InterruptedException {
- return zkClient.getChildren(path, watch);
- }
- }.runWithRetries();
+ private void safeCreate(String path, byte[] data, List<ACL> acl,
+ CreateMode mode) throws Exception {
+ if (!exists(path)) {
+ SafeTransaction transaction = new SafeTransaction();
+ transaction.create(path, data, acl, mode);
+ transaction.commit();
+ }
}
- private Stat existsWithRetries(
- final String path, final boolean watch) throws Exception {
- return new ZKAction<Stat>() {
- @Override
- Stat run() throws KeeperException, InterruptedException {
- return zkClient.exists(path, watch);
- }
- }.runWithRetries();
+ private void safeDelete(final String path) throws Exception {
+ if (exists(path)) {
+ SafeTransaction transaction = new SafeTransaction();
+ transaction.delete(path);
+ transaction.commit();
+ }
}
- private void deleteWithRetries(
- final String path, final boolean watch) throws Exception {
- new ZKAction<Void>() {
- @Override
- Void run() throws KeeperException, InterruptedException {
- recursiveDeleteWithRetriesHelper(path, watch);
- return null;
- }
- }.runWithRetries();
+ private void safeSetData(String path, byte[] data, int version)
+ throws Exception {
+ SafeTransaction transaction = new SafeTransaction();
+ transaction.setData(path, data, version);
+ transaction.commit();
}
/**
- * Helper method that deletes znodes recursively
+ * Use curator transactions to ensure zk-operations are performed in an all
+ * or nothing fashion. This is equivalent to using ZooKeeper#multi.
+ *
+ * TODO (YARN-3774): Curator 3.0 introduces CuratorOp similar to Op. We ll
+ * have to rewrite this inner class when we adopt that.
*/
- private void recursiveDeleteWithRetriesHelper(String path, boolean watch)
- throws KeeperException, InterruptedException {
- List<String> children = zkClient.getChildren(path, watch);
- for (String child : children) {
- recursiveDeleteWithRetriesHelper(path + "/" + child, false);
+ private class SafeTransaction {
+ private CuratorTransactionFinal transactionFinal;
+
+ SafeTransaction() throws Exception {
+ CuratorTransaction transaction = curatorFramework.inTransaction();
+ transactionFinal =
+ transaction.create()
+ .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
+ .forPath(fencingNodePath, new byte[0]).and();
}
- try {
- zkClient.delete(path, -1);
- } catch (KeeperException.NoNodeException nne) {
- LOG.info("Node " + path + " doesn't exist to delete");
+ public void commit() throws Exception {
+ transactionFinal = transactionFinal.delete()
+ .forPath(fencingNodePath).and();
+ transactionFinal.commit();
+ }
+
+ public void create(String path, byte[] data, List<ACL> acl, CreateMode mode)
+ throws Exception {
+ transactionFinal = transactionFinal.create()
+ .withMode(mode).withACL(acl).forPath(path, data).and();
+ }
+
+ public void delete(String path) throws Exception {
+ transactionFinal = transactionFinal.delete().forPath(path).and();
+ }
+
+ public void setData(String path, byte[] data, int version)
+ throws Exception {
+ transactionFinal = transactionFinal.setData()
+ .withVersion(version).forPath(path, data).and();
}
}
@@ -1054,8 +908,6 @@ public class ZKRMStateStore extends RMStateStore {
* this RM continues to be the Active.
*/
private class VerifyActiveStatusThread extends Thread {
- private List<Op> emptyOpList = new ArrayList<Op>();
-
VerifyActiveStatusThread() {
super(VerifyActiveStatusThread.class.getName());
}
@@ -1063,10 +915,11 @@ public class ZKRMStateStore extends RMStateStore {
public void run() {
try {
while (true) {
- if(isFencedState()) {
+ if(isFencedState()) {
break;
}
- doStoreMultiWithRetries(emptyOpList);
+ // Create and delete fencing node
+ new SafeTransaction().commit();
Thread.sleep(zkSessionTimeout);
}
} catch (InterruptedException ie) {
@@ -1077,143 +930,4 @@ public class ZKRMStateStore extends RMStateStore {
}
}
}
-
- private abstract class ZKAction<T> {
- private boolean hasDeleteNodeOp = false;
- void setHasDeleteNodeOp(boolean hasDeleteOp) {
- this.hasDeleteNodeOp = hasDeleteOp;
- }
- // run() expects synchronization on ZKRMStateStore.this
- abstract T run() throws KeeperException, InterruptedException;
-
- T runWithCheck() throws Exception {
- long startTime = System.currentTimeMillis();
- synchronized (ZKRMStateStore.this) {
- while (zkClient == null) {
- ZKRMStateStore.this.wait(zkSessionTimeout);
- if (zkClient != null) {
- break;
- }
- if (System.currentTimeMillis() - startTime > zkSessionTimeout) {
- throw new IOException("Wait for ZKClient creation timed out");
- }
- }
- return run();
- }
- }
-
- private boolean shouldRetry(Code code) {
- switch (code) {
- case CONNECTIONLOSS:
- case OPERATIONTIMEOUT:
- case SESSIONEXPIRED:
- case SESSIONMOVED:
- return true;
- default:
- break;
- }
- return false;
- }
-
- T runWithRetries() throws Exception {
- int retry = 0;
- while (true) {
- try {
- return runWithCheck();
- } catch (KeeperException.NoAuthException nae) {
- if (HAUtil.isHAEnabled(getConfig())) {
- // NoAuthException possibly means that this store is fenced due to
- // another RM becoming active. Even if not,
- // it is safer to assume we have been fenced
- throw new StoreFencedException();
- }
- } catch (KeeperException ke) {
- if (ke.code() == Code.NODEEXISTS) {
- LOG.info("znode already exists!");
- return null;
- }
- if (hasDeleteNodeOp && ke.code() == Code.NONODE) {
- LOG.info("znode has already been deleted!");
- return null;
- }
-
- LOG.info("Exception while executing a ZK operation.", ke);
- if (shouldRetry(ke.code()) && ++retry < numRetries) {
- LOG.info("Retrying operation on ZK. Retry no. " + retry);
- Thread.sleep(zkRetryInterval);
- createConnection();
- continue;
- }
- LOG.info("Maxed out ZK retries. Giving up!");
- throw ke;
- }
- }
- }
- }
-
- private synchronized void createConnection()
- throws IOException, InterruptedException {
- closeZkClients();
- for (int retries = 0; retries < numRetries && zkClient == null;
- retries++) {
- try {
- activeZkClient = getNewZooKeeper();
- zkClient = activeZkClient;
- for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
- zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
- }
- if (useDefaultFencingScheme) {
- zkClient.addAuthInfo(zkRootNodeAuthScheme,
- (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes(Charset.forName("UTF-8")));
- }
- } catch (IOException ioe) {
- // Retry in case of network failures
- LOG.info("Failed to connect to the ZooKeeper on attempt - " +
- (retries + 1));
- ioe.printStackTrace();
- }
- }
- if (zkClient == null) {
- LOG.error("Unable to connect to Zookeeper");
- throw new YarnRuntimeException("Unable to connect to Zookeeper");
- }
- ZKRMStateStore.this.notifyAll();
- LOG.info("Created new ZK connection");
- }
-
- // protected to mock for testing
- @VisibleForTesting
- @Private
- @Unstable
- protected synchronized ZooKeeper getNewZooKeeper()
- throws IOException, InterruptedException {
- ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
- zk.register(new ForwardingWatcher(zk));
- return zk;
- }
-
- @Override
- public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
- AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
- throws Exception {
- AMRMTokenSecretManagerState data =
- AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
- byte[] stateData = data.getProto().toByteArray();
- setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1);
- }
-
- /**
- * Utility function to ensure that the configured base znode exists.
- * This recursively creates the znode as well as all of its parents.
- */
- private void createRootDirRecursively(String path) throws Exception {
- String pathParts[] = path.split("/");
- Preconditions.checkArgument(pathParts.length >= 1 && pathParts[0].isEmpty(),
- "Invalid path: %s", path);
- StringBuilder sb = new StringBuilder();
- for (int i = 1; i < pathParts.length; i++) {
- sb.append("/").append(pathParts[i]);
- createRootDir(sb.toString());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 4d0e560..9e0d22b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -40,7 +40,6 @@ import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -76,7 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.util.ConverterUtils;
-public class RMStateStoreTestBase extends ClientBaseWithFixes{
+public class RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 333455c..34a4492 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -18,18 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.crypto.SecretKey;
-
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -50,6 +42,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -61,22 +54,49 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import javax.crypto.SecretKey;
+
public class TestZKRMStateStore extends RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
private static final int ZK_TIMEOUT_MS = 1000;
+ private TestingServer curatorTestingServer;
+ private CuratorFramework curatorFramework;
+
+ @Before
+ public void setupCuratorServer() throws Exception {
+ curatorTestingServer = new TestingServer();
+ curatorTestingServer.start();
+ curatorFramework = CuratorFrameworkFactory.builder()
+ .connectString(curatorTestingServer.getConnectString())
+ .retryPolicy(new RetryNTimes(100, 100))
+ .build();
+ curatorFramework.start();
+ }
+
+ @After
+ public void cleanupCuratorServer() throws IOException {
+ curatorFramework.close();
+ curatorTestingServer.stop();
+ }
class TestZKRMStateStoreTester implements RMStateStoreHelper {
- ZooKeeper client;
TestZKRMStateStoreInternal store;
String workingZnode;
+
class TestZKRMStateStoreInternal extends ZKRMStateStore {
public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
@@ -86,11 +106,6 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
assertTrue(znodeWorkingPath.equals(workingZnode));
}
- @Override
- public ZooKeeper getNewZooKeeper() throws IOException {
- return client;
- }
-
public String getVersionNode() {
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
}
@@ -109,7 +124,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
* @throws Exception
*/
public void testRetryingCreateRootDir() throws Exception {
- createRootDir(znodeWorkingPath);
+ create(znodeWorkingPath);
}
}
@@ -117,23 +132,24 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/jira/issue/3077/rmstore";
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
- this.client = createClient();
this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
return this.store;
}
@Override
public boolean isFinalStateValid() throws Exception {
- List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
- return nodes.size() == 1;
+ return 1 ==
+ curatorFramework.getChildren().forPath(store.znodeWorkingPath).size();
}
@Override
public void writeVersion(Version version) throws Exception {
- client.setData(store.getVersionNode(), ((VersionPBImpl) version)
- .getProto().toByteArray(), -1);
+ curatorFramework.setData().withVersion(-1)
+ .forPath(store.getVersionNode(),
+ ((VersionPBImpl) version).getProto().toByteArray());
}
@Override
@@ -142,10 +158,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
}
public boolean appExists(RMApp app) throws Exception {
- Stat node =
- client.exists(store.getAppNode(app.getApplicationId().toString()),
- false);
- return node !=null;
+ return null != curatorFramework.checkExists()
+ .forPath(store.getAppNode(app.getApplicationId().toString()));
}
}
@@ -178,9 +192,9 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/jira/issue/3077/rmstore";
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
- this.client = createClient();
this.store = new TestZKRMStateStoreInternal(conf, workingZnode) {
Version storedVersion = null;
@@ -217,7 +231,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ curatorTestingServer.getConnectString());
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
conf.set(YarnConfiguration.RM_HA_ID, rmId);
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
index 654b357..e270404 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
@@ -25,6 +25,8 @@ import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import javax.crypto.SecretKey;
+
+import org.apache.curator.test.TestingServer;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
@@ -73,10 +75,11 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
private ZKRMStateStore store;
private AMRMTokenSecretManager appTokenMgr;
private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr;
+ private TestingServer curatorTestingServer;
@Before
public void setUpZKServer() throws Exception {
- super.setUp();
+ curatorTestingServer = new TestingServer();
}
@After
@@ -87,7 +90,7 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
if (appTokenMgr != null) {
appTokenMgr.stop();
}
- super.tearDown();
+ curatorTestingServer.stop();
}
private void initStore(String hostPort) {
@@ -95,7 +98,8 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
RMContext rmContext = mock(RMContext.class);
conf = new YarnConfiguration();
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort));
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ optHostPort.or(curatorTestingServer.getConnectString()));
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
store = new ZKRMStateStore();
@@ -140,7 +144,7 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
if (launchLocalZK) {
try {
- setUp();
+ setUpZKServer();
} catch (Exception e) {
System.err.println("failed to setup. : " + e.getMessage());
return -1;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/960b8f19/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
index 62dc5ef..d188450 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
@@ -20,39 +20,32 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.ClientBaseWithFixes;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
import org.apache.hadoop.util.ZKUtil;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
import java.security.NoSuchAlgorithmException;
-import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class TestZKRMStateStoreZKClientConnections extends
- ClientBaseWithFixes {
-
- private static final int ZK_OP_WAIT_TIME = 3000;
- private static final int ZK_TIMEOUT_MS = 1000;
+public class TestZKRMStateStoreZKClientConnections {
private Log LOG =
LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
+ private static final int ZK_TIMEOUT_MS = 1000;
private static final String DIGEST_USER_PASS="test-user:test-password";
private static final String TEST_AUTH_GOOD = "digest:" + DIGEST_USER_PASS;
private static final String DIGEST_USER_HASH;
@@ -66,14 +59,22 @@ public class TestZKRMStateStoreZKClientConnections extends
}
private static final String TEST_ACL = "digest:" + DIGEST_USER_HASH + ":rwcda";
+ private TestingServer testingServer;
+
+ @Before
+ public void setupZKServer() throws Exception {
+ testingServer = new TestingServer();
+ testingServer.start();
+ }
+
+ @After
+ public void cleanupZKServer() throws Exception {
+ testingServer.stop();
+ }
class TestZKClient {
ZKRMStateStore store;
- boolean forExpire = false;
- TestForwardingWatcher oldWatcher;
- TestForwardingWatcher watcher;
- CyclicBarrier syncBarrier = new CyclicBarrier(2);
protected class TestZKRMStateStore extends ZKRMStateStore {
@@ -83,51 +84,12 @@ public class TestZKRMStateStoreZKClientConnections extends
start();
assertTrue(znodeWorkingPath.equals(workingZnode));
}
-
- @Override
- public ZooKeeper getNewZooKeeper()
- throws IOException, InterruptedException {
- oldWatcher = watcher;
- watcher = new TestForwardingWatcher();
- return createClient(watcher, hostPort, ZK_TIMEOUT_MS);
- }
-
- @Override
- public synchronized void processWatchEvent(ZooKeeper zk,
- WatchedEvent event) throws Exception {
-
- if (forExpire) {
- // a hack... couldn't find a way to trigger expired event.
- WatchedEvent expriredEvent = new WatchedEvent(
- Watcher.Event.EventType.None,
- Watcher.Event.KeeperState.Expired, null);
- super.processWatchEvent(zk, expriredEvent);
- forExpire = false;
- syncBarrier.await();
- } else {
- super.processWatchEvent(zk, event);
- }
- }
- }
-
- private class TestForwardingWatcher extends
- ClientBaseWithFixes.CountdownWatcher {
- public void process(WatchedEvent event) {
- super.process(event);
- try {
- if (store != null) {
- store.processWatchEvent(client, event);
- }
- } catch (Throwable t) {
- LOG.error("Failed to process watcher event " + event + ": "
- + StringUtils.stringifyException(t));
- }
- }
}
public RMStateStore getRMStateStore(Configuration conf) throws Exception {
String workingZnode = "/Test";
- conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS,
+ testingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.store = new TestZKRMStateStore(conf, workingZnode);
return this.store;
@@ -147,12 +109,12 @@ public class TestZKRMStateStoreZKClientConnections extends
store.setRMDispatcher(dispatcher);
final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
- stopServer();
+ testingServer.stop();
Thread clientThread = new Thread() {
@Override
public void run() {
try {
- store.getDataWithRetries(path, true);
+ store.getData(path);
} catch (Exception e) {
e.printStackTrace();
assertionFailedInThread.set(true);
@@ -160,114 +122,19 @@ public class TestZKRMStateStoreZKClientConnections extends
}
};
Thread.sleep(2000);
- startServer();
+ testingServer.start();
clientThread.join();
Assert.assertFalse(assertionFailedInThread.get());
}
@Test(timeout = 20000)
- public void testZKClientDisconnectAndReconnect()
- throws Exception {
-
- TestZKClient zkClientTester = new TestZKClient();
- String path = "/test";
- YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
- ZKRMStateStore store =
- (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
- TestDispatcher dispatcher = new TestDispatcher();
- store.setRMDispatcher(dispatcher);
-
- // trigger watch
- store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- store.getDataWithRetries(path, true);
- store.setDataWithRetries(path, "newBytes".getBytes(), 0);
-
- stopServer();
- zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME);
- try {
- store.getDataWithRetries(path, true);
- fail("Expected ZKClient time out exception");
- } catch (Exception e) {
- assertTrue(e.getMessage().contains(
- "Wait for ZKClient creation timed out"));
- }
-
- // ZKRMStateStore Session restored
- startServer();
- zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME);
- byte[] ret = null;
- try {
- ret = store.getDataWithRetries(path, true);
- } catch (Exception e) {
- String error = "ZKRMStateStore Session restore failed";
- LOG.error(error, e);
- fail(error);
- }
- assertEquals("newBytes", new String(ret));
- }
-
- @Test(timeout = 20000)
- public void testZKSessionTimeout() throws Exception {
-
- TestZKClient zkClientTester = new TestZKClient();
- String path = "/test";
- YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
- ZKRMStateStore store =
- (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
- TestDispatcher dispatcher = new TestDispatcher();
- store.setRMDispatcher(dispatcher);
-
- // a hack to trigger expired event
- zkClientTester.forExpire = true;
-
- // trigger watch
- store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- store.getDataWithRetries(path, true);
- store.setDataWithRetries(path, "bytes".getBytes(), 0);
-
- zkClientTester.syncBarrier.await();
- // after this point, expired event has already been processed.
-
- try {
- byte[] ret = store.getDataWithRetries(path, false);
- assertEquals("bytes", new String(ret));
- } catch (Exception e) {
- String error = "New session creation failed";
- LOG.error(error, e);
- fail(error);
- }
-
- // send Disconnected event from old client session to ZKRMStateStore
- // check the current client session is not affected.
- Assert.assertTrue(zkClientTester.oldWatcher != null);
- WatchedEvent disconnectedEvent = new WatchedEvent(
- Watcher.Event.EventType.None,
- Watcher.Event.KeeperState.Disconnected, null);
- zkClientTester.oldWatcher.process(disconnectedEvent);
- Assert.assertTrue(store.zkClient != null);
-
- zkClientTester.watcher.process(disconnectedEvent);
- Assert.assertTrue(store.zkClient == null);
- WatchedEvent connectedEvent = new WatchedEvent(
- Watcher.Event.EventType.None,
- Watcher.Event.KeeperState.SyncConnected, null);
- zkClientTester.watcher.process(connectedEvent);
- Assert.assertTrue(store.zkClient != null);
- Assert.assertTrue(store.zkClient == store.activeZkClient);
- }
-
- @Test(timeout = 20000)
public void testSetZKAcl() {
TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_ZK_ACL, "world:anyone:rwca");
try {
- zkClientTester.store.zkClient.delete(zkClientTester.store
- .znodeWorkingPath, -1);
+ zkClientTester.store.delete(zkClientTester.store
+ .znodeWorkingPath);
fail("Shouldn't be able to delete path");
} catch (Exception e) {/* expected behavior */
}