You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/05/07 02:51:54 UTC

hadoop git commit: YARN-3385. Fixed a race-condition in ResourceManager's ZooKeeper based state-store to avoid crashing on duplicate deletes. Contributed by Zhihai Xu.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 31b627b2a -> 4c7b9b6ab


YARN-3385. Fixed a race-condition in ResourceManager's ZooKeeper based state-store to avoid crashing on duplicate deletes. Contributed by Zhihai Xu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c7b9b6a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c7b9b6a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c7b9b6a

Branch: refs/heads/trunk
Commit: 4c7b9b6abe2452c9752a11214762be2e7665fb32
Parents: 31b627b
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Wed May 6 17:51:17 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Wed May 6 17:51:17 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../recovery/ZKRMStateStore.java                | 64 ++++++++++++++++----
 .../recovery/TestZKRMStateStore.java            | 35 +++++++++++
 3 files changed, 89 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c7b9b6a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 81180b5..7a4b65a 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -373,6 +373,9 @@ Release 2.7.1 - UNRELEASED
     YARN-3301. Fixed the format issue of the new RM attempt web page.
     (Xuan Gong via jianhe)
 
+    YARN-3385. Fixed a race-condition in ResourceManager's ZooKeeper based
+    state-store to avoid crashing on duplicate deletes. (Zhihai Xu via vinodkv)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c7b9b6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 97dd029..364c970 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -694,7 +694,7 @@ public class ZKRMStateStore extends RMStateStore {
       LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
           + " and its attempts.");
     }
-    doMultiWithRetries(opList);
+    doDeleteMultiWithRetries(opList);
   }
 
   @Override
@@ -703,13 +703,12 @@ public class ZKRMStateStore extends RMStateStore {
       throws Exception {
     ArrayList<Op> opList = new ArrayList<Op>();
     addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
-    doMultiWithRetries(opList);
+    doStoreMultiWithRetries(opList);
   }
 
   @Override
   protected synchronized void removeRMDelegationTokenState(
       RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
-    ArrayList<Op> opList = new ArrayList<Op>();
     String nodeRemovePath =
         getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
@@ -718,11 +717,12 @@ public class ZKRMStateStore extends RMStateStore {
           + rmDTIdentifier.getSequenceNumber());
     }
     if (existsWithRetries(nodeRemovePath, false) != null) {
+      ArrayList<Op> opList = new ArrayList<Op>();
       opList.add(Op.delete(nodeRemovePath, -1));
+      doDeleteMultiWithRetries(opList);
     } else {
       LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
     }
-    doMultiWithRetries(opList);
   }
 
   @Override
@@ -741,7 +741,7 @@ public class ZKRMStateStore extends RMStateStore {
       // in case znode exists
       addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
     }
-    doMultiWithRetries(opList);
+    doStoreMultiWithRetries(opList);
   }
 
   private void addStoreOrUpdateOps(ArrayList<Op> opList,
@@ -810,7 +810,7 @@ public class ZKRMStateStore extends RMStateStore {
       LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
     }
     if (existsWithRetries(nodeRemovePath, false) != null) {
-      doMultiWithRetries(Op.delete(nodeRemovePath, -1));
+      doDeleteMultiWithRetries(Op.delete(nodeRemovePath, -1));
     } else {
       LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
     }
@@ -914,7 +914,7 @@ public class ZKRMStateStore extends RMStateStore {
    * Helper method that creates fencing node, executes the passed operations,
    * and deletes the fencing node.
    */
-  private synchronized void doMultiWithRetries(
+  private synchronized void doStoreMultiWithRetries(
       final List<Op> opList) throws Exception {
     final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
     execOpList.add(createFencingNodePathOp);
@@ -933,8 +933,32 @@ public class ZKRMStateStore extends RMStateStore {
    * Helper method that creates fencing node, executes the passed operation,
    * and deletes the fencing node.
    */
-  private void doMultiWithRetries(final Op op) throws Exception {
-    doMultiWithRetries(Collections.singletonList(op));
+  private void doStoreMultiWithRetries(final Op op) throws Exception {
+    doStoreMultiWithRetries(Collections.singletonList(op));
+  }
+
+  /**
+   * Helper method that creates fencing node, executes the passed
+   * delete related operations and deletes the fencing node.
+   */
+  private synchronized void doDeleteMultiWithRetries(
+      final List<Op> opList) throws Exception {
+    final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
+    execOpList.add(createFencingNodePathOp);
+    execOpList.addAll(opList);
+    execOpList.add(deleteFencingNodePathOp);
+    new ZKAction<Void>() {
+      @Override
+      public Void run() throws KeeperException, InterruptedException {
+        setHasDeleteNodeOp(true);
+        zkClient.multi(execOpList);
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  private void doDeleteMultiWithRetries(final Op op) throws Exception {
+    doDeleteMultiWithRetries(Collections.singletonList(op));
   }
 
   @VisibleForTesting
@@ -943,7 +967,7 @@ public class ZKRMStateStore extends RMStateStore {
   public void createWithRetries(
       final String path, final byte[] data, final List<ACL> acl,
       final CreateMode mode) throws Exception {
-    doMultiWithRetries(Op.create(path, data, acl, mode));
+    doStoreMultiWithRetries(Op.create(path, data, acl, mode));
   }
 
   @VisibleForTesting
@@ -951,7 +975,7 @@ public class ZKRMStateStore extends RMStateStore {
   @Unstable
   public void setDataWithRetries(final String path, final byte[] data,
                                  final int version) throws Exception {
-    doMultiWithRetries(Op.setData(path, data, version));
+    doStoreMultiWithRetries(Op.setData(path, data, version));
   }
 
   @VisibleForTesting
@@ -1017,7 +1041,12 @@ public class ZKRMStateStore extends RMStateStore {
     for (String child : children) {
       recursiveDeleteWithRetriesHelper(path + "/" + child, false);
     }
-    zkClient.delete(path, -1);
+
+    try {
+      zkClient.delete(path, -1);
+    } catch (KeeperException.NoNodeException nne) {
+      LOG.info("Node " + path + " doesn't exist to delete");
+    }
   }
 
   /**
@@ -1037,7 +1066,7 @@ public class ZKRMStateStore extends RMStateStore {
           if(isFencedState()) { 
             break;
           }
-          doMultiWithRetries(emptyOpList);
+          doStoreMultiWithRetries(emptyOpList);
           Thread.sleep(zkSessionTimeout);
         }
       } catch (InterruptedException ie) {
@@ -1050,6 +1079,10 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   private abstract class ZKAction<T> {
+    private boolean hasDeleteNodeOp = false;
+    void setHasDeleteNodeOp(boolean hasDeleteOp) {
+      this.hasDeleteNodeOp = hasDeleteOp;
+    }
     // run() expects synchronization on ZKRMStateStore.this
     abstract T run() throws KeeperException, InterruptedException;
 
@@ -1099,6 +1132,11 @@ public class ZKRMStateStore extends RMStateStore {
             LOG.info("znode already exists!");
             return null;
           }
+          if (hasDeleteNodeOp && ke.code() == Code.NONODE) {
+            LOG.info("znode has already been deleted!");
+            return null;
+          }
+
           LOG.info("Exception while executing a ZK operation.", ke);
           if (shouldRetry(ke.code()) && ++retry < numRetries) {
             LOG.info("Retrying operation on ZK. Retry no. " + retry);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c7b9b6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index c632a06..333455c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 
 import javax.crypto.SecretKey;
@@ -38,6 +39,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -58,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptM
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.junit.Assert;
@@ -381,4 +384,36 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
 
     store.close();
   }
+
+  @Test
+  public void testDuplicateRMAppDeletion() throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+    long submitTime = System.currentTimeMillis();
+    long startTime = System.currentTimeMillis() + 1234;
+    RMStateStore store = zkTester.getRMStateStore();
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setRMDispatcher(dispatcher);
+
+    ApplicationAttemptId attemptIdRemoved = ConverterUtils
+        .toApplicationAttemptId("appattempt_1352994193343_0002_000001");
+    ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
+    storeApp(store, appIdRemoved, submitTime, startTime);
+    storeAttempt(store, attemptIdRemoved,
+        "container_1352994193343_0002_01_000001", null, null, dispatcher);
+
+    ApplicationSubmissionContext context =
+        new ApplicationSubmissionContextPBImpl();
+    context.setApplicationId(appIdRemoved);
+    ApplicationStateData appStateRemoved =
+        ApplicationStateData.newInstance(
+            submitTime, startTime, context, "user1");
+    appStateRemoved.attempts.put(attemptIdRemoved, null);
+    store.removeApplicationStateInternal(appStateRemoved);
+    try {
+      store.removeApplicationStateInternal(appStateRemoved);
+    } catch (KeeperException.NoNodeException nne) {
+      Assert.fail("NoNodeException should not happen.");
+    }
+    store.close();
+  }
 }