You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ji...@apache.org on 2014/04/16 04:03:55 UTC

svn commit: r1587778 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: CHANGES.txt hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

Author: jianhe
Date: Wed Apr 16 02:03:54 2014
New Revision: 1587778

URL: http://svn.apache.org/r1587778
Log:
Merge r1587776 from trunk. YARN-1934. Fixed a potential NPE in ZKRMStateStore caused by handling Disconnected event from ZK. Contributed by Karthik Kambatla

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1587778&r1=1587777&r2=1587778&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Apr 16 02:03:54 2014
@@ -111,6 +111,9 @@ Release 2.4.1 - UNRELEASED
     YARN-1928. Fixed a race condition in TestAMRMRPCNodeUpdates which caused it
     to fail occassionally. (Zhijie Shen via vinodkv)
 
+    YARN-1934. Fixed a potential NPE in ZKRMStateStore caused by handling
+    Disconnected event from ZK. (Karthik Kambatla via jianhe)
+
 Release 2.4.0 - 2014-04-07 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1587778&r1=1587777&r2=1587778&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Wed Apr 16 02:03:54 2014
@@ -280,10 +280,9 @@ public class ZKRMStateStore extends RMSt
     }
   }
 
-  private void logRootNodeAcls(String prefix) throws KeeperException,
-      InterruptedException {
+  private void logRootNodeAcls(String prefix) throws Exception {
     Stat getStat = new Stat();
-    List<ACL> getAcls = zkClient.getACL(zkRootNodePath, getStat);
+    List<ACL> getAcls = getACLWithRetries(zkRootNodePath, getStat);
 
     StringBuilder builder = new StringBuilder();
     builder.append(prefix);
@@ -363,7 +362,7 @@ public class ZKRMStateStore extends RMSt
     String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
     byte[] data =
         ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
-    if (zkClient.exists(versionNodePath, true) != null) {
+    if (existsWithRetries(versionNodePath, true) != null) {
       setDataWithRetries(versionNodePath, data, -1);
     } else {
       createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
@@ -374,7 +373,7 @@ public class ZKRMStateStore extends RMSt
   protected synchronized RMStateVersion loadVersion() throws Exception {
     String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
 
-    if (zkClient.exists(versionNodePath, true) != null) {
+    if (existsWithRetries(versionNodePath, true) != null) {
       byte[] data = getDataWithRetries(versionNodePath, true);
       RMStateVersion version =
           new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
@@ -442,7 +441,8 @@ public class ZKRMStateStore extends RMSt
   }
 
   private void loadRMDelegationTokenState(RMState rmState) throws Exception {
-    List<String> childNodes = zkClient.getChildren(delegationTokensRootPath, true);
+    List<String> childNodes =
+        getChildrenWithRetries(delegationTokensRootPath, true);
     for (String childNodeName : childNodes) {
       String childNodePath =
           getNodePath(delegationTokensRootPath, childNodeName);
@@ -567,7 +567,7 @@ public class ZKRMStateStore extends RMSt
     }
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
 
-    if (zkClient.exists(nodeUpdatePath, true) != null) {
+    if (existsWithRetries(nodeUpdatePath, true) != null) {
       setDataWithRetries(nodeUpdatePath, appStateData, -1);
     } else {
       createWithRetries(nodeUpdatePath, appStateData, zkAcl,
@@ -610,7 +610,7 @@ public class ZKRMStateStore extends RMSt
     }
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
 
-    if (zkClient.exists(nodeUpdatePath, true) != null) {
+    if (existsWithRetries(nodeUpdatePath, true) != null) {
       setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
     } else {
       createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
@@ -661,7 +661,7 @@ public class ZKRMStateStore extends RMSt
       LOG.debug("Removing RMDelegationToken_"
           + rmDTIdentifier.getSequenceNumber());
     }
-    if (zkClient.exists(nodeRemovePath, true) != null) {
+    if (existsWithRetries(nodeRemovePath, true) != null) {
       opList.add(Op.delete(nodeRemovePath, -1));
     } else {
       LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
@@ -677,7 +677,7 @@ public class ZKRMStateStore extends RMSt
     String nodeRemovePath =
         getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
-    if (zkClient.exists(nodeRemovePath, true) == null) {
+    if (existsWithRetries(nodeRemovePath, true) == null) {
       // in case znode doesn't exist
       addStoreOrUpdateOps(
           opList, rmDTIdentifier, renewDate, latestSequenceNumber, false);
@@ -760,7 +760,7 @@ public class ZKRMStateStore extends RMSt
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
     }
-    if (zkClient.exists(nodeRemovePath, true) != null) {
+    if (existsWithRetries(nodeRemovePath, true) != null) {
       doMultiWithRetries(Op.delete(nodeRemovePath, -1));
     } else {
       LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath);
@@ -891,6 +891,16 @@ public class ZKRMStateStore extends RMSt
     }.runWithRetries();
   }
 
+  private List<ACL> getACLWithRetries(
+      final String path, final Stat stat) throws Exception {
+    return new ZKAction<List<ACL>>() {
+      @Override
+      public List<ACL> run() throws KeeperException, InterruptedException {
+        return zkClient.getACL(path, stat);
+      }
+    }.runWithRetries();
+  }
+
   private List<String> getChildrenWithRetries(
       final String path, final boolean watch) throws Exception {
     return new ZKAction<List<String>>() {
@@ -901,6 +911,16 @@ public class ZKRMStateStore extends RMSt
     }.runWithRetries();
   }
 
+  private Stat existsWithRetries(
+      final String path, final boolean watch) throws Exception {
+    return new ZKAction<Stat>() {
+      @Override
+      Stat run() throws KeeperException, InterruptedException {
+        return zkClient.exists(path, watch);
+      }
+    }.runWithRetries();
+  }
+
   /**
    * Helper class that periodically attempts creating a znode to ensure that
    * this RM continues to be the Active.