You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/05/07 02:59:06 UTC

[3/3] hadoop git commit: YARN-3385. Fixed a race-condition in ResourceManager's ZooKeeper based state-store to avoid crashing on duplicate deletes. Contributed by Zhihai Xu.

YARN-3385. Fixed a race-condition in ResourceManager's ZooKeeper based state-store to avoid crashing on duplicate deletes. Contributed by Zhihai Xu.

(cherry picked from commit 4c7b9b6abe2452c9752a11214762be2e7665fb32)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3d1d551c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3d1d551c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3d1d551c

Branch: refs/heads/branch-2.7
Commit: 3d1d551c33ffd84cf6b9acfccce47daa1271b644
Parents: 419b8f6
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Wed May 6 17:51:17 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Wed May 6 17:57:49 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../recovery/ZKRMStateStore.java                | 64 ++++++++++++++++----
 .../recovery/TestZKRMStateStore.java            | 35 +++++++++++
 3 files changed, 89 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d1d551c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index c2430c2..0229c29 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -62,6 +62,9 @@ Release 2.7.1 - UNRELEASED
     YARN-3301. Fixed the format issue of the new RM attempt web page.
     (Xuan Gong via jianhe)
 
+    YARN-3385. Fixed a race-condition in ResourceManager's ZooKeeper based
+    state-store to avoid crashing on duplicate deletes. (Zhihai Xu via vinodkv)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d1d551c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index e8891a2..a116e06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -694,7 +694,7 @@ public class ZKRMStateStore extends RMStateStore {
       LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
           + " and its attempts.");
     }
-    doMultiWithRetries(opList);
+    doDeleteMultiWithRetries(opList);
   }
 
   @Override
@@ -703,13 +703,12 @@ public class ZKRMStateStore extends RMStateStore {
       throws Exception {
     ArrayList<Op> opList = new ArrayList<Op>();
     addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
-    doMultiWithRetries(opList);
+    doStoreMultiWithRetries(opList);
   }
 
   @Override
   protected synchronized void removeRMDelegationTokenState(
       RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
-    ArrayList<Op> opList = new ArrayList<Op>();
     String nodeRemovePath =
         getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
@@ -718,11 +717,12 @@ public class ZKRMStateStore extends RMStateStore {
           + rmDTIdentifier.getSequenceNumber());
     }
     if (existsWithRetries(nodeRemovePath, false) != null) {
+      ArrayList<Op> opList = new ArrayList<Op>();
       opList.add(Op.delete(nodeRemovePath, -1));
+      doDeleteMultiWithRetries(opList);
     } else {
       LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
     }
-    doMultiWithRetries(opList);
   }
 
   @Override
@@ -741,7 +741,7 @@ public class ZKRMStateStore extends RMStateStore {
       // in case znode exists
       addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
     }
-    doMultiWithRetries(opList);
+    doStoreMultiWithRetries(opList);
   }
 
   private void addStoreOrUpdateOps(ArrayList<Op> opList,
@@ -810,7 +810,7 @@ public class ZKRMStateStore extends RMStateStore {
       LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
     }
     if (existsWithRetries(nodeRemovePath, false) != null) {
-      doMultiWithRetries(Op.delete(nodeRemovePath, -1));
+      doDeleteMultiWithRetries(Op.delete(nodeRemovePath, -1));
     } else {
       LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
     }
@@ -905,7 +905,7 @@ public class ZKRMStateStore extends RMStateStore {
    * Helper method that creates fencing node, executes the passed operations,
    * and deletes the fencing node.
    */
-  private synchronized void doMultiWithRetries(
+  private synchronized void doStoreMultiWithRetries(
       final List<Op> opList) throws Exception {
     final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
     execOpList.add(createFencingNodePathOp);
@@ -924,8 +924,32 @@ public class ZKRMStateStore extends RMStateStore {
    * Helper method that creates fencing node, executes the passed operation,
    * and deletes the fencing node.
    */
-  private void doMultiWithRetries(final Op op) throws Exception {
-    doMultiWithRetries(Collections.singletonList(op));
+  private void doStoreMultiWithRetries(final Op op) throws Exception {
+    doStoreMultiWithRetries(Collections.singletonList(op));
+  }
+
+  /**
+   * Helper method that creates fencing node, executes the passed
+   * delete related operations and deletes the fencing node.
+   */
+  private synchronized void doDeleteMultiWithRetries(
+      final List<Op> opList) throws Exception {
+    final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
+    execOpList.add(createFencingNodePathOp);
+    execOpList.addAll(opList);
+    execOpList.add(deleteFencingNodePathOp);
+    new ZKAction<Void>() {
+      @Override
+      public Void run() throws KeeperException, InterruptedException {
+        setHasDeleteNodeOp(true);
+        zkClient.multi(execOpList);
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  private void doDeleteMultiWithRetries(final Op op) throws Exception {
+    doDeleteMultiWithRetries(Collections.singletonList(op));
   }
 
   @VisibleForTesting
@@ -934,7 +958,7 @@ public class ZKRMStateStore extends RMStateStore {
   public void createWithRetries(
       final String path, final byte[] data, final List<ACL> acl,
       final CreateMode mode) throws Exception {
-    doMultiWithRetries(Op.create(path, data, acl, mode));
+    doStoreMultiWithRetries(Op.create(path, data, acl, mode));
   }
 
   @VisibleForTesting
@@ -942,7 +966,7 @@ public class ZKRMStateStore extends RMStateStore {
   @Unstable
   public void setDataWithRetries(final String path, final byte[] data,
                                  final int version) throws Exception {
-    doMultiWithRetries(Op.setData(path, data, version));
+    doStoreMultiWithRetries(Op.setData(path, data, version));
   }
 
   @VisibleForTesting
@@ -1008,7 +1032,12 @@ public class ZKRMStateStore extends RMStateStore {
     for (String child : children) {
       recursiveDeleteWithRetriesHelper(path + "/" + child, false);
     }
-    zkClient.delete(path, -1);
+
+    try {
+      zkClient.delete(path, -1);
+    } catch (KeeperException.NoNodeException nne) {
+      LOG.info("Node " + path + " doesn't exist to delete");
+    }
   }
 
   /**
@@ -1028,7 +1057,7 @@ public class ZKRMStateStore extends RMStateStore {
           if(isFencedState()) { 
             break;
           }
-          doMultiWithRetries(emptyOpList);
+          doStoreMultiWithRetries(emptyOpList);
           Thread.sleep(zkSessionTimeout);
         }
       } catch (InterruptedException ie) {
@@ -1041,6 +1070,10 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   private abstract class ZKAction<T> {
+    private boolean hasDeleteNodeOp = false;
+    void setHasDeleteNodeOp(boolean hasDeleteOp) {
+      this.hasDeleteNodeOp = hasDeleteOp;
+    }
     // run() expects synchronization on ZKRMStateStore.this
     abstract T run() throws KeeperException, InterruptedException;
 
@@ -1090,6 +1123,11 @@ public class ZKRMStateStore extends RMStateStore {
             LOG.info("znode already exists!");
             return null;
           }
+          if (hasDeleteNodeOp && ke.code() == Code.NONODE) {
+            LOG.info("znode has already been deleted!");
+            return null;
+          }
+
           LOG.info("Exception while executing a ZK operation.", ke);
           if (shouldRetry(ke.code()) && ++retry < numRetries) {
             LOG.info("Retrying operation on ZK. Retry no. " + retry);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3d1d551c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index bbbf5a1..991c35f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 
 import javax.crypto.SecretKey;
@@ -37,6 +38,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -57,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptM
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.junit.Assert;
@@ -370,4 +373,36 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
 
     store.close();
   }
+
+  @Test
+  public void testDuplicateRMAppDeletion() throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+    long submitTime = System.currentTimeMillis();
+    long startTime = System.currentTimeMillis() + 1234;
+    RMStateStore store = zkTester.getRMStateStore();
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setRMDispatcher(dispatcher);
+
+    ApplicationAttemptId attemptIdRemoved = ConverterUtils
+        .toApplicationAttemptId("appattempt_1352994193343_0002_000001");
+    ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
+    storeApp(store, appIdRemoved, submitTime, startTime);
+    storeAttempt(store, attemptIdRemoved,
+        "container_1352994193343_0002_01_000001", null, null, dispatcher);
+
+    ApplicationSubmissionContext context =
+        new ApplicationSubmissionContextPBImpl();
+    context.setApplicationId(appIdRemoved);
+    ApplicationStateData appStateRemoved =
+        ApplicationStateData.newInstance(
+            submitTime, startTime, context, "user1");
+    appStateRemoved.attempts.put(attemptIdRemoved, null);
+    store.removeApplicationStateInternal(appStateRemoved);
+    try {
+      store.removeApplicationStateInternal(appStateRemoved);
+    } catch (KeeperException.NoNodeException nne) {
+      Assert.fail("NoNodeException should not happen.");
+    }
+    store.close();
+  }
 }