You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/06 00:04:26 UTC

[19/27] hadoop git commit: YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving events for old client. (Zhihai Xu via kasha)

YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving events for old client. (Zhihai Xu via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8d88691d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8d88691d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8d88691d

Branch: refs/heads/YARN-2928
Commit: 8d88691d162f87f95c9ed7e0a569ef08e8385d4f
Parents: ded0200
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Mar 4 19:47:02 2015 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Mar 4 19:49:05 2015 -0800

----------------------------------------------------------------------
 .../apache/hadoop/ha/ClientBaseWithFixes.java   | 11 +++-
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../recovery/ZKRMStateStore.java                | 53 ++++++++++++--------
 .../TestZKRMStateStoreZKClientConnections.java  | 33 +++++++++---
 4 files changed, 70 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d88691d/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
index 7d0727a..5f03133 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
@@ -90,6 +90,14 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
         // XXX this doesn't need to be volatile! (Should probably be final)
         volatile CountDownLatch clientConnected;
         volatile boolean connected;
+        protected ZooKeeper client;
+
+        public void initializeWatchedClient(ZooKeeper zk) {
+            if (client != null) {
+                throw new RuntimeException("Watched Client was already set");
+            }
+            client = zk;
+        }
 
         public CountdownWatcher() {
             reset();
@@ -191,8 +199,7 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
                 zk.close();
             }
         }
-
-
+        watcher.initializeWatchedClient(zk);
         return zk;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d88691d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 9a52325..4dd61eb 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -701,6 +701,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending 
     jobs. (Siqi Li via kasha)
 
+    YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving 
+    events for old client. (Zhihai Xu via kasha)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d88691d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 591a551..614ef15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -153,7 +153,13 @@ public class ZKRMStateStore extends RMStateStore {
 
   @VisibleForTesting
   protected ZooKeeper zkClient;
-  private ZooKeeper oldZkClient;
+
+  /* activeZkClient is not used to do actual operations,
+   * it is only used to verify client session for watched events and
+   * it gets activated into zkClient on connection event.
+   */
+  @VisibleForTesting
+  ZooKeeper activeZkClient;
 
   /** Fencing related variables */
   private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@@ -355,21 +361,14 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   private synchronized void closeZkClients() throws IOException {
-    if (zkClient != null) {
+    zkClient = null;
+    if (activeZkClient != null) {
       try {
-        zkClient.close();
+        activeZkClient.close();
       } catch (InterruptedException e) {
         throw new IOException("Interrupted while closing ZK", e);
       }
-      zkClient = null;
-    }
-    if (oldZkClient != null) {
-      try {
-        oldZkClient.close();
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted while closing old ZK", e);
-      }
-      oldZkClient = null;
+      activeZkClient = null;
     }
   }
 
@@ -830,11 +829,16 @@ public class ZKRMStateStore extends RMStateStore {
    * hides the ZK methods of the store from its public interface
    */
   private final class ForwardingWatcher implements Watcher {
+    private ZooKeeper watchedZkClient;
+
+    public ForwardingWatcher(ZooKeeper client) {
+      this.watchedZkClient = client;
+    }
 
     @Override
     public void process(WatchedEvent event) {
       try {
-        ZKRMStateStore.this.processWatchEvent(event);
+        ZKRMStateStore.this.processWatchEvent(watchedZkClient, event);
       } catch (Throwable t) {
         LOG.error("Failed to process watcher event " + event + ": "
             + StringUtils.stringifyException(t));
@@ -845,8 +849,16 @@ public class ZKRMStateStore extends RMStateStore {
   @VisibleForTesting
   @Private
   @Unstable
-  public synchronized void processWatchEvent(WatchedEvent event)
-      throws Exception {
+  public synchronized void processWatchEvent(ZooKeeper zk,
+      WatchedEvent event) throws Exception {
+    // only process watcher event from current ZooKeeper Client session.
+    if (zk != activeZkClient) {
+      LOG.info("Ignore watcher event type: " + event.getType() +
+          " with state:" + event.getState() + " for path:" +
+          event.getPath() + " from old session");
+      return;
+    }
+
     Event.EventType eventType = event.getType();
     LOG.info("Watcher event type: " + eventType + " with state:"
         + event.getState() + " for path:" + event.getPath() + " for " + this);
@@ -857,17 +869,15 @@ public class ZKRMStateStore extends RMStateStore {
       switch (event.getState()) {
         case SyncConnected:
           LOG.info("ZKRMStateStore Session connected");
-          if (oldZkClient != null) {
+          if (zkClient == null) {
             // the SyncConnected must be from the client that sent Disconnected
-            zkClient = oldZkClient;
-            oldZkClient = null;
+            zkClient = activeZkClient;
             ZKRMStateStore.this.notifyAll();
             LOG.info("ZKRMStateStore Session restored");
           }
           break;
         case Disconnected:
           LOG.info("ZKRMStateStore Session disconnected");
-          oldZkClient = zkClient;
           zkClient = null;
           break;
         case Expired:
@@ -1100,7 +1110,8 @@ public class ZKRMStateStore extends RMStateStore {
     for (int retries = 0; retries < numRetries && zkClient == null;
         retries++) {
       try {
-        zkClient = getNewZooKeeper();
+        activeZkClient = getNewZooKeeper();
+        zkClient = activeZkClient;
         for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
           zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
         }
@@ -1130,7 +1141,7 @@ public class ZKRMStateStore extends RMStateStore {
   protected synchronized ZooKeeper getNewZooKeeper()
       throws IOException, InterruptedException {
     ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
-    zk.register(new ForwardingWatcher());
+    zk.register(new ForwardingWatcher(zk));
     return zk;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d88691d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
index 8dc3628..62dc5ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
@@ -71,6 +71,7 @@ public class TestZKRMStateStoreZKClientConnections extends
 
     ZKRMStateStore store;
     boolean forExpire = false;
+    TestForwardingWatcher oldWatcher;
     TestForwardingWatcher watcher;
     CyclicBarrier syncBarrier = new CyclicBarrier(2);
 
@@ -86,35 +87,36 @@ public class TestZKRMStateStoreZKClientConnections extends
       @Override
       public ZooKeeper getNewZooKeeper()
           throws IOException, InterruptedException {
+        oldWatcher = watcher;
+        watcher = new TestForwardingWatcher();
         return createClient(watcher, hostPort, ZK_TIMEOUT_MS);
       }
 
       @Override
-      public synchronized void processWatchEvent(WatchedEvent event)
-          throws Exception {
+      public synchronized void processWatchEvent(ZooKeeper zk,
+          WatchedEvent event) throws Exception {
 
         if (forExpire) {
           // a hack... couldn't find a way to trigger expired event.
           WatchedEvent expriredEvent = new WatchedEvent(
               Watcher.Event.EventType.None,
               Watcher.Event.KeeperState.Expired, null);
-          super.processWatchEvent(expriredEvent);
+          super.processWatchEvent(zk, expriredEvent);
           forExpire = false;
           syncBarrier.await();
         } else {
-          super.processWatchEvent(event);
+          super.processWatchEvent(zk, event);
         }
       }
     }
 
     private class TestForwardingWatcher extends
         ClientBaseWithFixes.CountdownWatcher {
-
       public void process(WatchedEvent event) {
         super.process(event);
         try {
           if (store != null) {
-            store.processWatchEvent(event);
+            store.processWatchEvent(client, event);
           }
         } catch (Throwable t) {
           LOG.error("Failed to process watcher event " + event + ": "
@@ -127,7 +129,6 @@ public class TestZKRMStateStoreZKClientConnections extends
       String workingZnode = "/Test";
       conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
       conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
-      watcher = new TestForwardingWatcher();
       this.store = new TestZKRMStateStore(conf, workingZnode);
       return this.store;
     }
@@ -239,6 +240,24 @@ public class TestZKRMStateStoreZKClientConnections extends
       LOG.error(error, e);
       fail(error);
     }
+
+    // send Disconnected event from old client session to ZKRMStateStore
+    // check the current client session is not affected.
+    Assert.assertTrue(zkClientTester.oldWatcher != null);
+    WatchedEvent disconnectedEvent = new WatchedEvent(
+        Watcher.Event.EventType.None,
+        Watcher.Event.KeeperState.Disconnected, null);
+    zkClientTester.oldWatcher.process(disconnectedEvent);
+    Assert.assertTrue(store.zkClient != null);
+
+    zkClientTester.watcher.process(disconnectedEvent);
+    Assert.assertTrue(store.zkClient == null);
+    WatchedEvent connectedEvent = new WatchedEvent(
+        Watcher.Event.EventType.None,
+        Watcher.Event.KeeperState.SyncConnected, null);
+    zkClientTester.watcher.process(connectedEvent);
+    Assert.assertTrue(store.zkClient != null);
+    Assert.assertTrue(store.zkClient == store.activeZkClient);
   }
 
   @Test(timeout = 20000)