You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/01 23:27:05 UTC

hadoop git commit: YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving events for old client. (Zhihai Xu via kasha)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6.1 dbc5bab9f -> 5a6755cc0


YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving events for old client. (Zhihai Xu via kasha)

(cherry picked from commit 8d88691d162f87f95c9ed7e0a569ef08e8385d4f)
(cherry picked from commit 0d62e948877e5d50f1b6fbe735a94ac6da5ff472)
(cherry picked from commit 4a5b0e708d42fbff571229a43d1762d1767e2db5)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5a6755cc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5a6755cc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5a6755cc

Branch: refs/heads/branch-2.6.1
Commit: 5a6755cc0fccb96d6cded15dec4b426c7f047e54
Parents: dbc5bab
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Mar 4 19:47:02 2015 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Tue Sep 1 14:06:34 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/ha/ClientBaseWithFixes.java   | 11 +++-
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../recovery/ZKRMStateStore.java                | 53 ++++++++++++--------
 .../TestZKRMStateStoreZKClientConnections.java  | 33 +++++++++---
 4 files changed, 70 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a6755cc/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
index 11d4657..f063863 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
@@ -90,6 +90,14 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
         // XXX this doesn't need to be volatile! (Should probably be final)
         volatile CountDownLatch clientConnected;
         volatile boolean connected;
+        protected ZooKeeper client;
+
+        public void initializeWatchedClient(ZooKeeper zk) {
+            if (client != null) {
+                throw new RuntimeException("Watched Client was already set");
+            }
+            client = zk;
+        }
 
         public CountdownWatcher() {
             reset();
@@ -191,8 +199,7 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
                 zk.close();
             }
         }
-
-
+        watcher.initializeWatchedClient(zk);
         return zk;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a6755cc/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 334a672..1d0518e 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -94,6 +94,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending 
     jobs. (Siqi Li via kasha)
 
+    YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving 
+    events for old client. (Zhihai Xu via kasha)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a6755cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 1774b39..8abc64e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -153,7 +153,13 @@ public class ZKRMStateStore extends RMStateStore {
 
   @VisibleForTesting
   protected ZooKeeper zkClient;
-  private ZooKeeper oldZkClient;
+
+  /* activeZkClient is not used to do actual operations,
+   * it is only used to verify client session for watched events and
+   * it gets activated into zkClient on connection event.
+   */
+  @VisibleForTesting
+  ZooKeeper activeZkClient;
 
   /** Fencing related variables */
   private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@@ -355,21 +361,14 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   private synchronized void closeZkClients() throws IOException {
-    if (zkClient != null) {
+    zkClient = null;
+    if (activeZkClient != null) {
       try {
-        zkClient.close();
+        activeZkClient.close();
       } catch (InterruptedException e) {
         throw new IOException("Interrupted while closing ZK", e);
       }
-      zkClient = null;
-    }
-    if (oldZkClient != null) {
-      try {
-        oldZkClient.close();
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted while closing old ZK", e);
-      }
-      oldZkClient = null;
+      activeZkClient = null;
     }
   }
 
@@ -860,11 +859,16 @@ public class ZKRMStateStore extends RMStateStore {
    * hides the ZK methods of the store from its public interface
    */
   private final class ForwardingWatcher implements Watcher {
+    private ZooKeeper watchedZkClient;
+
+    public ForwardingWatcher(ZooKeeper client) {
+      this.watchedZkClient = client;
+    }
 
     @Override
     public void process(WatchedEvent event) {
       try {
-        ZKRMStateStore.this.processWatchEvent(event);
+        ZKRMStateStore.this.processWatchEvent(watchedZkClient, event);
       } catch (Throwable t) {
         LOG.error("Failed to process watcher event " + event + ": "
             + StringUtils.stringifyException(t));
@@ -875,8 +879,16 @@ public class ZKRMStateStore extends RMStateStore {
   @VisibleForTesting
   @Private
   @Unstable
-  public synchronized void processWatchEvent(WatchedEvent event)
-      throws Exception {
+  public synchronized void processWatchEvent(ZooKeeper zk,
+      WatchedEvent event) throws Exception {
+    // only process watcher event from current ZooKeeper Client session.
+    if (zk != activeZkClient) {
+      LOG.info("Ignore watcher event type: " + event.getType() +
+          " with state:" + event.getState() + " for path:" +
+          event.getPath() + " from old session");
+      return;
+    }
+
     Event.EventType eventType = event.getType();
     LOG.info("Watcher event type: " + eventType + " with state:"
         + event.getState() + " for path:" + event.getPath() + " for " + this);
@@ -887,17 +899,15 @@ public class ZKRMStateStore extends RMStateStore {
       switch (event.getState()) {
         case SyncConnected:
           LOG.info("ZKRMStateStore Session connected");
-          if (oldZkClient != null) {
+          if (zkClient == null) {
             // the SyncConnected must be from the client that sent Disconnected
-            zkClient = oldZkClient;
-            oldZkClient = null;
+            zkClient = activeZkClient;
             ZKRMStateStore.this.notifyAll();
             LOG.info("ZKRMStateStore Session restored");
           }
           break;
         case Disconnected:
           LOG.info("ZKRMStateStore Session disconnected");
-          oldZkClient = zkClient;
           zkClient = null;
           break;
         case Expired:
@@ -1127,7 +1137,8 @@ public class ZKRMStateStore extends RMStateStore {
     for (int retries = 0; retries < numRetries && zkClient == null;
         retries++) {
       try {
-        zkClient = getNewZooKeeper();
+        activeZkClient = getNewZooKeeper();
+        zkClient = activeZkClient;
         for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
           zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
         }
@@ -1157,7 +1168,7 @@ public class ZKRMStateStore extends RMStateStore {
   protected synchronized ZooKeeper getNewZooKeeper()
       throws IOException, InterruptedException {
     ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
-    zk.register(new ForwardingWatcher());
+    zk.register(new ForwardingWatcher(zk));
     return zk;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a6755cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
index 8dc3628..62dc5ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
@@ -71,6 +71,7 @@ public class TestZKRMStateStoreZKClientConnections extends
 
     ZKRMStateStore store;
     boolean forExpire = false;
+    TestForwardingWatcher oldWatcher;
     TestForwardingWatcher watcher;
     CyclicBarrier syncBarrier = new CyclicBarrier(2);
 
@@ -86,35 +87,36 @@ public class TestZKRMStateStoreZKClientConnections extends
       @Override
       public ZooKeeper getNewZooKeeper()
           throws IOException, InterruptedException {
+        oldWatcher = watcher;
+        watcher = new TestForwardingWatcher();
         return createClient(watcher, hostPort, ZK_TIMEOUT_MS);
       }
 
       @Override
-      public synchronized void processWatchEvent(WatchedEvent event)
-          throws Exception {
+      public synchronized void processWatchEvent(ZooKeeper zk,
+          WatchedEvent event) throws Exception {
 
         if (forExpire) {
           // a hack... couldn't find a way to trigger expired event.
           WatchedEvent expriredEvent = new WatchedEvent(
               Watcher.Event.EventType.None,
               Watcher.Event.KeeperState.Expired, null);
-          super.processWatchEvent(expriredEvent);
+          super.processWatchEvent(zk, expriredEvent);
           forExpire = false;
           syncBarrier.await();
         } else {
-          super.processWatchEvent(event);
+          super.processWatchEvent(zk, event);
         }
       }
     }
 
     private class TestForwardingWatcher extends
         ClientBaseWithFixes.CountdownWatcher {
-
       public void process(WatchedEvent event) {
         super.process(event);
         try {
           if (store != null) {
-            store.processWatchEvent(event);
+            store.processWatchEvent(client, event);
           }
         } catch (Throwable t) {
           LOG.error("Failed to process watcher event " + event + ": "
@@ -127,7 +129,6 @@ public class TestZKRMStateStoreZKClientConnections extends
       String workingZnode = "/Test";
       conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
       conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
-      watcher = new TestForwardingWatcher();
       this.store = new TestZKRMStateStore(conf, workingZnode);
       return this.store;
     }
@@ -239,6 +240,24 @@ public class TestZKRMStateStoreZKClientConnections extends
       LOG.error(error, e);
       fail(error);
     }
+
+    // send Disconnected event from old client session to ZKRMStateStore
+    // check the current client session is not affected.
+    Assert.assertTrue(zkClientTester.oldWatcher != null);
+    WatchedEvent disconnectedEvent = new WatchedEvent(
+        Watcher.Event.EventType.None,
+        Watcher.Event.KeeperState.Disconnected, null);
+    zkClientTester.oldWatcher.process(disconnectedEvent);
+    Assert.assertTrue(store.zkClient != null);
+
+    zkClientTester.watcher.process(disconnectedEvent);
+    Assert.assertTrue(store.zkClient == null);
+    WatchedEvent connectedEvent = new WatchedEvent(
+        Watcher.Event.EventType.None,
+        Watcher.Event.KeeperState.SyncConnected, null);
+    zkClientTester.watcher.process(connectedEvent);
+    Assert.assertTrue(store.zkClient != null);
+    Assert.assertTrue(store.zkClient == store.activeZkClient);
   }
 
   @Test(timeout = 20000)