You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/06 00:04:26 UTC
[19/27] hadoop git commit: YARN-3242. Asynchrony in ZK-close can lead
to ZKRMStateStore watcher receiving events for old client. (Zhihai Xu via
kasha)
YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving events for old client. (Zhihai Xu via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8d88691d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8d88691d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8d88691d
Branch: refs/heads/YARN-2928
Commit: 8d88691d162f87f95c9ed7e0a569ef08e8385d4f
Parents: ded0200
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Mar 4 19:47:02 2015 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Mar 4 19:49:05 2015 -0800
----------------------------------------------------------------------
.../apache/hadoop/ha/ClientBaseWithFixes.java | 11 +++-
hadoop-yarn-project/CHANGES.txt | 3 ++
.../recovery/ZKRMStateStore.java | 53 ++++++++++++--------
.../TestZKRMStateStoreZKClientConnections.java | 33 +++++++++---
4 files changed, 70 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d88691d/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
index 7d0727a..5f03133 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
@@ -90,6 +90,14 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
// XXX this doesn't need to be volatile! (Should probably be final)
volatile CountDownLatch clientConnected;
volatile boolean connected;
+ protected ZooKeeper client;
+
+ public void initializeWatchedClient(ZooKeeper zk) {
+ if (client != null) {
+ throw new RuntimeException("Watched Client was already set");
+ }
+ client = zk;
+ }
public CountdownWatcher() {
reset();
@@ -191,8 +199,7 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
zk.close();
}
}
-
-
+ watcher.initializeWatchedClient(zk);
return zk;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d88691d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 9a52325..4dd61eb 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -701,6 +701,9 @@ Release 2.7.0 - UNRELEASED
YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending
jobs. (Siqi Li via kasha)
+ YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving
+ events for old client. (Zhihai Xu via kasha)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d88691d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 591a551..614ef15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -153,7 +153,13 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting
protected ZooKeeper zkClient;
- private ZooKeeper oldZkClient;
+
+ /* activeZkClient is not used to do actual operations,
+ * it is only used to verify client session for watched events and
+ * it gets activated into zkClient on connection event.
+ */
+ @VisibleForTesting
+ ZooKeeper activeZkClient;
/** Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@@ -355,21 +361,14 @@ public class ZKRMStateStore extends RMStateStore {
}
private synchronized void closeZkClients() throws IOException {
- if (zkClient != null) {
+ zkClient = null;
+ if (activeZkClient != null) {
try {
- zkClient.close();
+ activeZkClient.close();
} catch (InterruptedException e) {
throw new IOException("Interrupted while closing ZK", e);
}
- zkClient = null;
- }
- if (oldZkClient != null) {
- try {
- oldZkClient.close();
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while closing old ZK", e);
- }
- oldZkClient = null;
+ activeZkClient = null;
}
}
@@ -830,11 +829,16 @@ public class ZKRMStateStore extends RMStateStore {
* hides the ZK methods of the store from its public interface
*/
private final class ForwardingWatcher implements Watcher {
+ private ZooKeeper watchedZkClient;
+
+ public ForwardingWatcher(ZooKeeper client) {
+ this.watchedZkClient = client;
+ }
@Override
public void process(WatchedEvent event) {
try {
- ZKRMStateStore.this.processWatchEvent(event);
+ ZKRMStateStore.this.processWatchEvent(watchedZkClient, event);
} catch (Throwable t) {
LOG.error("Failed to process watcher event " + event + ": "
+ StringUtils.stringifyException(t));
@@ -845,8 +849,16 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting
@Private
@Unstable
- public synchronized void processWatchEvent(WatchedEvent event)
- throws Exception {
+ public synchronized void processWatchEvent(ZooKeeper zk,
+ WatchedEvent event) throws Exception {
+ // only process watcher event from current ZooKeeper Client session.
+ if (zk != activeZkClient) {
+ LOG.info("Ignore watcher event type: " + event.getType() +
+ " with state:" + event.getState() + " for path:" +
+ event.getPath() + " from old session");
+ return;
+ }
+
Event.EventType eventType = event.getType();
LOG.info("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath() + " for " + this);
@@ -857,17 +869,15 @@ public class ZKRMStateStore extends RMStateStore {
switch (event.getState()) {
case SyncConnected:
LOG.info("ZKRMStateStore Session connected");
- if (oldZkClient != null) {
+ if (zkClient == null) {
// the SyncConnected must be from the client that sent Disconnected
- zkClient = oldZkClient;
- oldZkClient = null;
+ zkClient = activeZkClient;
ZKRMStateStore.this.notifyAll();
LOG.info("ZKRMStateStore Session restored");
}
break;
case Disconnected:
LOG.info("ZKRMStateStore Session disconnected");
- oldZkClient = zkClient;
zkClient = null;
break;
case Expired:
@@ -1100,7 +1110,8 @@ public class ZKRMStateStore extends RMStateStore {
for (int retries = 0; retries < numRetries && zkClient == null;
retries++) {
try {
- zkClient = getNewZooKeeper();
+ activeZkClient = getNewZooKeeper();
+ zkClient = activeZkClient;
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
}
@@ -1130,7 +1141,7 @@ public class ZKRMStateStore extends RMStateStore {
protected synchronized ZooKeeper getNewZooKeeper()
throws IOException, InterruptedException {
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
- zk.register(new ForwardingWatcher());
+ zk.register(new ForwardingWatcher(zk));
return zk;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d88691d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
index 8dc3628..62dc5ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
@@ -71,6 +71,7 @@ public class TestZKRMStateStoreZKClientConnections extends
ZKRMStateStore store;
boolean forExpire = false;
+ TestForwardingWatcher oldWatcher;
TestForwardingWatcher watcher;
CyclicBarrier syncBarrier = new CyclicBarrier(2);
@@ -86,35 +87,36 @@ public class TestZKRMStateStoreZKClientConnections extends
@Override
public ZooKeeper getNewZooKeeper()
throws IOException, InterruptedException {
+ oldWatcher = watcher;
+ watcher = new TestForwardingWatcher();
return createClient(watcher, hostPort, ZK_TIMEOUT_MS);
}
@Override
- public synchronized void processWatchEvent(WatchedEvent event)
- throws Exception {
+ public synchronized void processWatchEvent(ZooKeeper zk,
+ WatchedEvent event) throws Exception {
if (forExpire) {
// a hack... couldn't find a way to trigger expired event.
WatchedEvent expriredEvent = new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null);
- super.processWatchEvent(expriredEvent);
+ super.processWatchEvent(zk, expriredEvent);
forExpire = false;
syncBarrier.await();
} else {
- super.processWatchEvent(event);
+ super.processWatchEvent(zk, event);
}
}
}
private class TestForwardingWatcher extends
ClientBaseWithFixes.CountdownWatcher {
-
public void process(WatchedEvent event) {
super.process(event);
try {
if (store != null) {
- store.processWatchEvent(event);
+ store.processWatchEvent(client, event);
}
} catch (Throwable t) {
LOG.error("Failed to process watcher event " + event + ": "
@@ -127,7 +129,6 @@ public class TestZKRMStateStoreZKClientConnections extends
String workingZnode = "/Test";
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
- watcher = new TestForwardingWatcher();
this.store = new TestZKRMStateStore(conf, workingZnode);
return this.store;
}
@@ -239,6 +240,24 @@ public class TestZKRMStateStoreZKClientConnections extends
LOG.error(error, e);
fail(error);
}
+
+ // send Disconnected event from old client session to ZKRMStateStore
+ // check the current client session is not affected.
+ Assert.assertTrue(zkClientTester.oldWatcher != null);
+ WatchedEvent disconnectedEvent = new WatchedEvent(
+ Watcher.Event.EventType.None,
+ Watcher.Event.KeeperState.Disconnected, null);
+ zkClientTester.oldWatcher.process(disconnectedEvent);
+ Assert.assertTrue(store.zkClient != null);
+
+ zkClientTester.watcher.process(disconnectedEvent);
+ Assert.assertTrue(store.zkClient == null);
+ WatchedEvent connectedEvent = new WatchedEvent(
+ Watcher.Event.EventType.None,
+ Watcher.Event.KeeperState.SyncConnected, null);
+ zkClientTester.watcher.process(connectedEvent);
+ Assert.assertTrue(store.zkClient != null);
+ Assert.assertTrue(store.zkClient == store.activeZkClient);
}
@Test(timeout = 20000)