You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2017/08/23 02:57:37 UTC

[26/36] hadoop git commit: YARN-7053. Move curator transaction support to ZKCuratorManager. (Jonathan Hung via Subru).

YARN-7053. Move curator transaction support to ZKCuratorManager. (Jonathan Hung via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4249172e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4249172e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4249172e

Branch: refs/heads/YARN-5734
Commit: 4249172e1419acdb2b69ae3db43dc59da2aa2e03
Parents: c379310
Author: Subru Krishnan <su...@apache.org>
Authored: Tue Aug 22 19:20:57 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Tue Aug 22 19:20:57 2017 -0700

----------------------------------------------------------------------
 .../hadoop/util/curator/ZKCuratorManager.java   |  88 +++++++++++-
 .../util/curator/TestZKCuratorManager.java      |  39 ++++++
 .../recovery/ZKRMStateStore.java                | 139 ++++++-------------
 3 files changed, 164 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4249172e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
index 9a031af..e1efcb5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
@@ -26,6 +26,8 @@ import java.util.List;
 import org.apache.curator.framework.AuthInfo;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -54,7 +56,6 @@ public final class ZKCuratorManager {
   /** Curator for ZooKeeper. */
   private CuratorFramework curator;
 
-
   public ZKCuratorManager(Configuration config) throws IOException {
     this.conf = config;
   }
@@ -119,7 +120,6 @@ public final class ZKCuratorManager {
 
   /**
    * Start the connection to the ZooKeeper ensemble.
-   * @param conf Configuration for the connection.
    * @throws IOException If the connection cannot be started.
    */
   public void start() throws IOException {
@@ -128,7 +128,6 @@ public final class ZKCuratorManager {
 
   /**
    * Start the connection to the ZooKeeper ensemble.
-   * @param conf Configuration for the connection.
    * @param authInfos List of authentication keys.
    * @throws IOException If the connection cannot be started.
    */
@@ -337,4 +336,87 @@ public final class ZKCuratorManager {
   public static String getNodePath(String root, String nodeName) {
     return root + "/" + nodeName;
   }
+
+  public void safeCreate(String path, byte[] data, List<ACL> acl,
+      CreateMode mode, List<ACL> fencingACL, String fencingNodePath)
+      throws Exception {
+    if (!exists(path)) {
+      SafeTransaction transaction = createTransaction(fencingACL,
+          fencingNodePath);
+      transaction.create(path, data, acl, mode);
+      transaction.commit();
+    }
+  }
+
+  /**
+   * Deletes the path. Checks for existence of path as well.
+   * @param path Path to be deleted.
+   * @throws Exception if any problem occurs while performing deletion.
+   */
+  public void safeDelete(final String path, List<ACL> fencingACL,
+      String fencingNodePath) throws Exception {
+    if (exists(path)) {
+      SafeTransaction transaction = createTransaction(fencingACL,
+          fencingNodePath);
+      transaction.delete(path);
+      transaction.commit();
+    }
+  }
+
+  public void safeSetData(String path, byte[] data, int version,
+      List<ACL> fencingACL, String fencingNodePath)
+      throws Exception {
+    SafeTransaction transaction = createTransaction(fencingACL,
+        fencingNodePath);
+    transaction.setData(path, data, version);
+    transaction.commit();
+  }
+
+  public SafeTransaction createTransaction(List<ACL> fencingACL,
+      String fencingNodePath) throws Exception {
+    return new SafeTransaction(fencingACL, fencingNodePath);
+  }
+
+  /**
+   * Use curator transactions to ensure zk-operations are performed in an all
+   * or nothing fashion. This is equivalent to using ZooKeeper#multi.
+   *
+   * TODO (YARN-3774): Curator 3.0 introduces CuratorOp similar to Op. We ll
+   * have to rewrite this inner class when we adopt that.
+   */
+  public class SafeTransaction {
+    private CuratorTransactionFinal transactionFinal;
+    private String fencingNodePath;
+
+    SafeTransaction(List<ACL> fencingACL, String fencingNodePath)
+        throws Exception {
+      this.fencingNodePath = fencingNodePath;
+      CuratorTransaction transaction = curator.inTransaction();
+      transactionFinal = transaction.create()
+          .withMode(CreateMode.PERSISTENT).withACL(fencingACL)
+          .forPath(fencingNodePath, new byte[0]).and();
+    }
+
+    public void commit() throws Exception {
+      transactionFinal = transactionFinal.delete()
+          .forPath(fencingNodePath).and();
+      transactionFinal.commit();
+    }
+
+    public void create(String path, byte[] data, List<ACL> acl, CreateMode mode)
+        throws Exception {
+      transactionFinal = transactionFinal.create()
+          .withMode(mode).withACL(acl).forPath(path, data).and();
+    }
+
+    public void delete(String path) throws Exception {
+      transactionFinal = transactionFinal.delete().forPath(path).and();
+    }
+
+    public void setData(String path, byte[] data, int version)
+        throws Exception {
+      transactionFinal = transactionFinal.setData()
+          .withVersion(version).forPath(path, data).and();
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4249172e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java
index 3e78a44..486e89a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java
@@ -21,11 +21,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.util.ZKUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -92,4 +96,39 @@ public class TestZKCuratorManager {
     children = curator.getChildren("/");
     assertEquals(2, children.size());
   }
+
+  @Test
+  public void testTransaction() throws Exception {
+    List<ACL> zkAcl = ZKUtil.parseACLs(CommonConfigurationKeys.ZK_ACL_DEFAULT);
+    String fencingNodePath = "/fencing";
+    String node1 = "/node1";
+    String node2 = "/node2";
+    byte[] testData = "testData".getBytes("UTF-8");
+    assertFalse(curator.exists(fencingNodePath));
+    assertFalse(curator.exists(node1));
+    assertFalse(curator.exists(node2));
+    ZKCuratorManager.SafeTransaction txn = curator.createTransaction(
+        zkAcl, fencingNodePath);
+    txn.create(node1, testData, zkAcl, CreateMode.PERSISTENT);
+    txn.create(node2, testData, zkAcl, CreateMode.PERSISTENT);
+    assertFalse(curator.exists(fencingNodePath));
+    assertFalse(curator.exists(node1));
+    assertFalse(curator.exists(node2));
+    txn.commit();
+    assertFalse(curator.exists(fencingNodePath));
+    assertTrue(curator.exists(node1));
+    assertTrue(curator.exists(node2));
+    assertTrue(Arrays.equals(testData, curator.getData(node1)));
+    assertTrue(Arrays.equals(testData, curator.getData(node2)));
+
+    byte[] setData = "setData".getBytes("UTF-8");
+    txn = curator.createTransaction(zkAcl, fencingNodePath);
+    txn.setData(node1, setData, -1);
+    txn.delete(node2);
+    assertTrue(curator.exists(node2));
+    assertTrue(Arrays.equals(testData, curator.getData(node1)));
+    txn.commit();
+    assertFalse(curator.exists(node2));
+    assertTrue(Arrays.equals(setData, curator.getData(node1)));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4249172e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index a445e75..ac67dcd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -22,8 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.transaction.CuratorTransaction;
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -31,6 +29,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.ZKUtil;
 import org.apache.hadoop.util.curator.ZKCuratorManager;
+import org.apache.hadoop.util.curator.ZKCuratorManager.SafeTransaction;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -416,9 +415,10 @@ public class ZKRMStateStore extends RMStateStore {
         ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
 
     if (exists(versionNodePath)) {
-      safeSetData(versionNodePath, data, -1);
+      zkManager.safeSetData(versionNodePath, data, -1, zkAcl, fencingNodePath);
     } else {
-      safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
+      zkManager.safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT,
+          zkAcl, fencingNodePath);
     }
   }
 
@@ -447,12 +447,14 @@ public class ZKRMStateStore extends RMStateStore {
       // increment epoch and store it
       byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
           .toByteArray();
-      safeSetData(epochNodePath, storeData, -1);
+      zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl,
+          fencingNodePath);
     } else {
       // initialize epoch node with 1 for the next time.
       byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
           .toByteArray();
-      safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
+      zkManager.safeCreate(epochNodePath, storeData, zkAcl,
+          CreateMode.PERSISTENT, zkAcl, fencingNodePath);
     }
 
     return currentEpoch;
@@ -721,7 +723,7 @@ public class ZKRMStateStore extends RMStateStore {
       // No apps stored under parent path.
       if (children != null && children.isEmpty()) {
         try {
-          safeDelete(parentAppNode);
+          zkManager.safeDelete(parentAppNode, zkAcl, fencingNodePath);
           if (LOG.isDebugEnabled()) {
             LOG.debug("No leaf app node exists. Removing parent node " +
                 parentAppNode);
@@ -749,7 +751,8 @@ public class ZKRMStateStore extends RMStateStore {
 
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
     if (appStateData.length <= zknodeLimit) {
-      safeCreate(nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
+      zkManager.safeCreate(nodeCreatePath, appStateData, zkAcl,
+          CreateMode.PERSISTENT, zkAcl, fencingNodePath);
     } else {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Application state data size for " + appId + " is "
@@ -780,7 +783,8 @@ public class ZKRMStateStore extends RMStateStore {
           String rootNode =
               getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
           if (!exists(rootNode)) {
-            safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT);
+            zkManager.safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT,
+                zkAcl, fencingNodePath);
           }
         }
       }
@@ -794,9 +798,11 @@ public class ZKRMStateStore extends RMStateStore {
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
 
     if (pathExists) {
-      safeSetData(nodeUpdatePath, appStateData, -1);
+      zkManager.safeSetData(nodeUpdatePath, appStateData, -1, zkAcl,
+          fencingNodePath);
     } else {
-      safeCreate(nodeUpdatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
+      zkManager.safeCreate(nodeUpdatePath, appStateData, zkAcl,
+          CreateMode.PERSISTENT, zkAcl, fencingNodePath);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Path " + nodeUpdatePath + " for " + appId + " didn't " +
             "exist. Creating a new znode to update the application state.");
@@ -839,9 +845,11 @@ public class ZKRMStateStore extends RMStateStore {
     switch (operation) {
     case UPDATE:
       if (exists(path)) {
-        safeSetData(path, attemptStateData, -1);
+        zkManager.safeSetData(path, attemptStateData, -1, zkAcl,
+            fencingNodePath);
       } else {
-        safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
+        zkManager.safeCreate(path, attemptStateData, zkAcl,
+            CreateMode.PERSISTENT, zkAcl, fencingNodePath);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Path " + path + " for " + appAttemptId + " didn't exist." +
               " Created a new znode to update the application attempt state.");
@@ -849,10 +857,11 @@ public class ZKRMStateStore extends RMStateStore {
       }
       break;
     case STORE:
-      safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
+      zkManager.safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT,
+          zkAcl, fencingNodePath);
       break;
     case REMOVE:
-      safeDelete(path);
+      zkManager.safeDelete(path, zkAcl, fencingNodePath);
       break;
     default:
       break;
@@ -930,10 +939,10 @@ public class ZKRMStateStore extends RMStateStore {
         for (ApplicationAttemptId attemptId : attempts) {
           String attemptRemovePath =
               getNodePath(appIdRemovePath, attemptId.toString());
-          safeDelete(attemptRemovePath);
+          zkManager.safeDelete(attemptRemovePath, zkAcl, fencingNodePath);
         }
       }
-      safeDelete(appIdRemovePath);
+      zkManager.safeDelete(appIdRemovePath, zkAcl, fencingNodePath);
     } else {
       CuratorFramework curatorFramework = zkManager.getCurator();
       curatorFramework.delete().deletingChildrenIfNeeded().
@@ -947,7 +956,7 @@ public class ZKRMStateStore extends RMStateStore {
   protected synchronized void storeRMDelegationTokenState(
       RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
       throws Exception {
-    SafeTransaction trx = new SafeTransaction();
+    SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
     addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
     trx.commit();
   }
@@ -964,14 +973,14 @@ public class ZKRMStateStore extends RMStateStore {
           + rmDTIdentifier.getSequenceNumber());
     }
 
-    safeDelete(nodeRemovePath);
+    zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath);
   }
 
   @Override
   protected synchronized void updateRMDelegationTokenState(
       RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
       throws Exception {
-    SafeTransaction trx = new SafeTransaction();
+    SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
     String nodeRemovePath =
         getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
@@ -1035,8 +1044,8 @@ public class ZKRMStateStore extends RMStateStore {
     ByteArrayOutputStream os = new ByteArrayOutputStream();
     try(DataOutputStream fsOut = new DataOutputStream(os)) {
       delegationKey.write(fsOut);
-      safeCreate(nodeCreatePath, os.toByteArray(), zkAcl,
-          CreateMode.PERSISTENT);
+      zkManager.safeCreate(nodeCreatePath, os.toByteArray(), zkAcl,
+          CreateMode.PERSISTENT, zkAcl, fencingNodePath);
     }
   }
 
@@ -1051,7 +1060,7 @@ public class ZKRMStateStore extends RMStateStore {
       LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
     }
 
-    safeDelete(nodeRemovePath);
+    zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath);
   }
 
   @Override
@@ -1078,7 +1087,8 @@ public class ZKRMStateStore extends RMStateStore {
         AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
     byte[] stateData = data.getProto().toByteArray();
 
-    safeSetData(amrmTokenSecretManagerRoot, stateData, -1);
+    zkManager.safeSetData(amrmTokenSecretManagerRoot, stateData, -1, zkAcl,
+        fencingNodePath);
   }
 
   @Override
@@ -1092,12 +1102,12 @@ public class ZKRMStateStore extends RMStateStore {
           + " for" + " plan " + planName);
     }
 
-    safeDelete(reservationPath);
+    zkManager.safeDelete(reservationPath, zkAcl, fencingNodePath);
 
     List<String> reservationNodes = getChildren(planNodePath);
 
     if (reservationNodes.isEmpty()) {
-      safeDelete(planNodePath);
+      zkManager.safeDelete(planNodePath, zkAcl, fencingNodePath);
     }
   }
 
@@ -1105,7 +1115,7 @@ public class ZKRMStateStore extends RMStateStore {
   protected synchronized void storeReservationState(
       ReservationAllocationStateProto reservationAllocation, String planName,
       String reservationIdName) throws Exception {
-    SafeTransaction trx = new SafeTransaction();
+    SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
     addOrUpdateReservationState(reservationAllocation, planName,
         reservationIdName, trx, false);
     trx.commit();
@@ -1191,7 +1201,8 @@ public class ZKRMStateStore extends RMStateStore {
         getNodePath(rootNode, nodeName.substring(0, splitIdx));
     if (createParentIfNotExists && !exists(rootNodePath)) {
       try {
-        safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT);
+        zkManager.safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT,
+            zkAcl, fencingNodePath);
       } catch (KeeperException.NodeExistsException e) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Unable to create app parent node " + rootNodePath +
@@ -1248,76 +1259,6 @@ public class ZKRMStateStore extends RMStateStore {
     zkManager.delete(path);
   }
 
-  private void safeCreate(String path, byte[] data, List<ACL> acl,
-      CreateMode mode) throws Exception {
-    if (!exists(path)) {
-      SafeTransaction transaction = new SafeTransaction();
-      transaction.create(path, data, acl, mode);
-      transaction.commit();
-    }
-  }
-
-  /**
-   * Deletes the path. Checks for existence of path as well.
-   * @param path Path to be deleted.
-   * @throws Exception if any problem occurs while performing deletion.
-   */
-  private void safeDelete(final String path) throws Exception {
-    if (exists(path)) {
-      SafeTransaction transaction = new SafeTransaction();
-      transaction.delete(path);
-      transaction.commit();
-    }
-  }
-
-  private void safeSetData(String path, byte[] data, int version)
-      throws Exception {
-    SafeTransaction transaction = new SafeTransaction();
-    transaction.setData(path, data, version);
-    transaction.commit();
-  }
-
-  /**
-   * Use curator transactions to ensure zk-operations are performed in an all
-   * or nothing fashion. This is equivalent to using ZooKeeper#multi.
-   *
-   * TODO (YARN-3774): Curator 3.0 introduces CuratorOp similar to Op. We ll
-   * have to rewrite this inner class when we adopt that.
-   */
-  private class SafeTransaction {
-    private CuratorTransactionFinal transactionFinal;
-
-    SafeTransaction() throws Exception {
-      CuratorFramework curatorFramework = zkManager.getCurator();
-      CuratorTransaction transaction = curatorFramework.inTransaction();
-      transactionFinal = transaction.create()
-          .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
-          .forPath(fencingNodePath, new byte[0]).and();
-    }
-
-    public void commit() throws Exception {
-      transactionFinal = transactionFinal.delete()
-          .forPath(fencingNodePath).and();
-      transactionFinal.commit();
-    }
-
-    public void create(String path, byte[] data, List<ACL> acl, CreateMode mode)
-        throws Exception {
-      transactionFinal = transactionFinal.create()
-          .withMode(mode).withACL(acl).forPath(path, data).and();
-    }
-
-    public void delete(String path) throws Exception {
-      transactionFinal = transactionFinal.delete().forPath(path).and();
-    }
-
-    public void setData(String path, byte[] data, int version)
-        throws Exception {
-      transactionFinal = transactionFinal.setData()
-          .withVersion(version).forPath(path, data).and();
-    }
-  }
-
   /**
    * Helper class that periodically attempts creating a znode to ensure that
    * this RM continues to be the Active.
@@ -1332,7 +1273,7 @@ public class ZKRMStateStore extends RMStateStore {
       try {
         while (!isFencedState()) {
           // Create and delete fencing node
-          new SafeTransaction().commit();
+          zkManager.createTransaction(zkAcl, fencingNodePath).commit();
           Thread.sleep(zkSessionTimeout);
         }
       } catch (InterruptedException ie) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org