You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:15:47 UTC

svn commit: r1181515 - in /hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase: master/HMaster.java util/RetryCounter.java util/RetryCounterFactory.java zookeeper/RecoverableZooKeeper.java zookeeper/ZooKeeperWrapper.java

Author: nspiegelberg
Date: Tue Oct 11 02:15:46 2011
New Revision: 1181515

URL: http://svn.apache.org/viewvc?rev=1181515&view=rev
Log:
HBASE-3065: make the ZooKeeper recoverable from ConnectionLoss Exception

Summary:
All the zookeeper client call can be failed by ConnectionLoss Exception and we
need a neat way to retry and recovery from the connection loss.

There are 2 tricky operations, setData and create.
These 2 operations are not idempotent.

We cannot simply  retry them and but need to verify whether the previous
operation succeeded.

So this patch will add the jvm name as metadata(identifier) to verify whether
the previous operation works.

Test Plan:
still need more test

Reviewed By: kranganathan
Reviewers: jgray, nspiegelberg, kannan, kranganathan
Commenters: nspiegelberg, kannan, jgray
CC: jgray, nspiegelberg, liyintang, kannan, kranganathan, hbase@lists,

Tasks:
#523287: some hbase zk issues on app server
#512653: Unavailable First ZK Quorum Node Causes RS Cyclical Restart (DL#2)

Revert Plan:
OK

Differential Revision: 231368

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1181515&r1=1181514&r2=1181515&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Oct 11 02:15:46 2011
@@ -515,10 +515,15 @@ public class HMaster extends Thread impl
   /** Main processing loop */
   @Override
   public void run() {
-    joinCluster();
-    startServiceThreads();
-    /* Main processing loop */
+   try {
+     joinCluster();
+     startServiceThreads();
+   }catch (IOException e) {
+     LOG.fatal("Unhandled exception. Master quits.", e);
+     return;
+   }
     try {
+      /* Main processing loop */
       FINISHED: while (!this.closed.get()) {
         // check if we should be shutting down
         if (this.shutdownRequested.get()) {
@@ -582,56 +587,58 @@ public class HMaster extends Thread impl
    * master was started following a failover. In the second case, it inspects
    * the region server directory and gets their regions assignment.
    */
-  private void joinCluster()  {
-      LOG.debug("Checking cluster state...");
-      HServerAddress rootLocation =
-        this.zooKeeperWrapper.readRootRegionLocation();
-      List<HServerAddress> addresses = this.zooKeeperWrapper.scanRSDirectory();
-      // Check if this is a fresh start of the cluster
-      if (addresses.isEmpty()) {
-        LOG.debug("Master fresh start, proceeding with normal startup");
-        splitLogAfterStartup();
-        return;
-      }
-      // Failover case.
-      LOG.info("Master failover, ZK inspection begins...");
-      boolean isRootRegionAssigned = false;
-      Map <byte[], HRegionInfo> assignedRegions =
-        new HashMap<byte[], HRegionInfo>();
-      // We must:
-      // - contact every region server to add them to the regionservers list
-      // - get their current regions assignment
-      // TODO: Run in parallel?
-      for (HServerAddress address : addresses) {
-        HRegionInfo[] regions = null;
-        try {
-          HRegionInterface hri =
-            this.connection.getHRegionConnection(address, false);
-          HServerInfo info = hri.getHServerInfo();
-          LOG.debug("Inspection found server " + info.getServerName());
-          this.serverManager.recordNewServer(info, true);
-          regions = hri.getRegionsAssignment();
-        } catch (IOException e) {
-          LOG.error("Failed contacting " + address.toString(), e);
-          continue;
-        }
-        for (HRegionInfo r: regions) {
-          if (r.isRootRegion()) {
-            this.connection.setRootRegionLocation(new HRegionLocation(r, rootLocation));
-            this.regionManager.setRootRegionLocation(rootLocation);
-            // Undo the unassign work in the RegionManager constructor
-            this.regionManager.removeRegion(r);
-            isRootRegionAssigned = true;
-          } else if (r.isMetaRegion()) {
-            MetaRegion m = new MetaRegion(new HServerAddress(address), r);
-            this.regionManager.addMetaRegionToScan(m);
-          }
-          assignedRegions.put(r.getRegionName(), r);
-        }
-      }
-      LOG.info("Inspection found " + assignedRegions.size() + " regions, " +
-        (isRootRegionAssigned ? "with -ROOT-" : "but -ROOT- was MIA"));
+  private void joinCluster() throws IOException  {
+    LOG.debug("Checking cluster state...");
+
+    List<HServerAddress> addresses = this.zooKeeperWrapper.scanRSDirectory();
+    // Check if this is a fresh start of the cluster
+    if (addresses.isEmpty()) {
+      LOG.debug("Master fresh start, proceeding with normal startup");
       splitLogAfterStartup();
+      return;
+    }
+    // Failover case.
+    LOG.info("Master failover, ZK inspection begins...");
+    // only read the rootlocation if it is failover
+    HServerAddress rootLocation =
+      this.zooKeeperWrapper.readRootRegionLocation();
+    boolean isRootRegionAssigned = false;
+    Map <byte[], HRegionInfo> assignedRegions =
+      new HashMap<byte[], HRegionInfo>();
+    // We must:
+    // - contact every region server to add them to the regionservers list
+    // - get their current regions assignment
+    // TODO: Run in parallel?
+    for (HServerAddress address : addresses) {
+      HRegionInfo[] regions = null;
+      try {
+        HRegionInterface hri =
+          this.connection.getHRegionConnection(address, false);
+        HServerInfo info = hri.getHServerInfo();
+        LOG.debug("Inspection found server " + info.getServerName());
+        this.serverManager.recordNewServer(info, true);
+        regions = hri.getRegionsAssignment();
+      } catch (IOException e) {
+        LOG.error("Failed contacting " + address.toString(), e);
+        continue;
+      }
+      for (HRegionInfo r: regions) {
+        if (r.isRootRegion()) {
+          this.connection.setRootRegionLocation(new HRegionLocation(r, rootLocation));
+          this.regionManager.setRootRegionLocation(rootLocation);
+          // Undo the unassign work in the RegionManager constructor
+          this.regionManager.removeRegion(r);
+          isRootRegionAssigned = true;
+        } else if (r.isMetaRegion()) {
+          MetaRegion m = new MetaRegion(new HServerAddress(address), r);
+          this.regionManager.addMetaRegionToScan(m);
+        }
+        assignedRegions.put(r.getRegionName(), r);
+      }
+    }
+    LOG.info("Inspection found " + assignedRegions.size() + " regions, " +
+      (isRootRegionAssigned ? "with -ROOT-" : "but -ROOT- was MIA"));
+    splitLogAfterStartup();
   }
 
   /*

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java?rev=1181515&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java Tue Oct 11 02:15:46 2011
@@ -0,0 +1,38 @@
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.TimeUnit;
+
+public class RetryCounter {
+  private final int maxRetries;
+  private int retriesRemaining;
+  private final int retryIntervalMillis;
+  private final TimeUnit timeUnit;
+
+  public RetryCounter(int maxRetries,
+  int retryIntervalMillis, TimeUnit timeUnit) {
+    this.maxRetries = maxRetries;
+    this.retriesRemaining = maxRetries;
+    this.retryIntervalMillis = retryIntervalMillis;
+    this.timeUnit = timeUnit;
+  }
+
+  public int getMaxRetries() {
+    return maxRetries;
+  }
+
+  public void sleepUntilNextRetry() throws InterruptedException {
+    timeUnit.sleep(retryIntervalMillis);
+  }
+
+  public boolean shouldRetry() {
+    return retriesRemaining > 0;
+  }
+
+  public void useRetry() {
+    retriesRemaining--;
+  }
+
+  public int getAttemptTimes() {
+    return maxRetries-retriesRemaining+1;
+  }
+}
\ No newline at end of file

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java?rev=1181515&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java Tue Oct 11 02:15:46 2011
@@ -0,0 +1,20 @@
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.TimeUnit;
+
+public class RetryCounterFactory {
+  private final int maxRetries;
+  private final int retryIntervalMillis;
+
+  public RetryCounterFactory(int maxRetries, int retryIntervalMillis) {
+    this.maxRetries = maxRetries;
+    this.retryIntervalMillis = retryIntervalMillis;
+  }
+
+  public RetryCounter create() {
+    return
+      new RetryCounter(
+        maxRetries, retryIntervalMillis, TimeUnit.MILLISECONDS
+      );
+  }
+}

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java?rev=1181515&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java Tue Oct 11 02:15:46 2011
@@ -0,0 +1,620 @@
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.ZooKeeper.States;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
+ * To handle recoverable errors, developers need to realize that there are two
+ * classes of requests: idempotent and non-idempotent requests. Read requests
+ * and unconditional sets and deletes are examples of idempotent requests, they
+ * can be reissued with the same results.
+ * (Although, the delete may throw a NoNodeException on reissue its effect on
+ * the ZooKeeper state is the same.) Non-idempotent requests need special
+ * handling, application and library writers need to keep in mind that they may
+ * need to encode information in the data or name of znodes to detect when a
+ * retries. A simple example is a create that uses a sequence flag.
+ * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
+ * loss exception, that process will reissue another
+ * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
+ * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
+ * that x-109 was the result of the previous create, so the process actually
+ * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
+ * when doing the create. If the process is using an id of 352, before reissuing
+ * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
+ * "x-352-109", x-333-110". The process will know that the original create
+ * succeeded an the znode it created is "x-352-109".
+ *
+ * Reuse part of the code from calligraphus project in the datafreeway team
+ */
+
+public class RecoverableZooKeeper {
+  private static final Logger LOG = Logger.getLogger(RecoverableZooKeeper.class);
+  // the actual ZooKeeper client instance
+  private ZooKeeper zk;
+  private final RetryCounterFactory retryCounterFactory;
+  // An identifier of this process in the cluster
+  private final String identifier;
+  private final byte[] id;
+  private int retryIntervalMillis;
+
+  private static final int ID_OFFSET =  Bytes.SIZEOF_INT;
+  // the magic number is to be backward compatible
+  private static final byte MAGIC =(byte) 0XFF;
+  private static final int MAGIC_OFFSET = Bytes.SIZEOF_BYTE;
+
+  public RecoverableZooKeeper(String quorumServers,int seesionTimeout,
+      Watcher watcher, int maxRetries, int retryIntervalMillis)
+      throws IOException {
+    this.zk = new ZooKeeper(quorumServers, seesionTimeout, watcher);
+    this.retryCounterFactory =
+     new RetryCounterFactory(maxRetries, retryIntervalMillis);
+    this.retryIntervalMillis = retryIntervalMillis;
+
+    // the identifier = processID@hostName
+    this.identifier = ManagementFactory.getRuntimeMXBean().getName();
+    LOG.info("The identifier of this process is "+identifier);
+    this.id = Bytes.toBytes(identifier);
+  }
+
+  /**
+   * delete is an idempotent operation. Retry before throw out exception.
+   * This function will not throw out NoNodeException if the path is not existed
+   * @param path
+   * @param version
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public void delete(String path, int version)
+    throws InterruptedException, KeeperException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        zk.delete(path, version);
+        return;
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case NONODE:
+            return; // Delete was successful
+
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper delete failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+		"ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * exists is an idempotent operation. Retry before throw out exception
+   * @param path
+   * @param watcher
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public Stat exists(String path, Watcher watcher)
+    throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return zk.exists(path, watcher);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper exists failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * exists is an idempotent operation. Retry before throw out exception
+   * @param path
+   * @param watch
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public Stat exists(String path, boolean watch)
+    throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return zk.exists(path, watch);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper exists failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * getChildren is an idempotent operation. Retry before throw out exception
+   * @param path
+   * @param watcher
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public List<String> getChildren(String path, Watcher watcher)
+    throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return zk.getChildren(path, watcher);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper getChildren failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * getChildren is an idempotent operation. Retry before throw out exception
+   * @param path
+   * @param watch
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public List<String> getChildren(String path, boolean watch)
+    throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return zk.getChildren(path, watch);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper getChildren failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * getData is an idempotent operation. Retry before throw out exception
+   * @param path
+   * @param watcher
+   * @param stat
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public byte[] getData(String path, Watcher watcher, Stat stat)
+    throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        byte[] revData = zk.getData(path, watcher, stat);
+        return this.removeMetaData(revData);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper getData failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * getData is an idemnpotent operation. Retry before throw out exception
+   * @param path
+   * @param watch
+   * @param stat
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public byte[] getData(String path, boolean watch, Stat stat)
+    throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        byte[] revData = zk.getData(path, watch, stat);
+        return this.removeMetaData(revData);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper getData failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
+   * Adding an identifier field into the data to check whether
+   * badversion is caused by the result of previous correctly setData
+   * @param path
+   * @param data
+   * @param version
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public Stat setData(String path, byte[] data, int version)
+    throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+
+    byte[] newData = appendMetaData(data);
+    while (true) {
+      try {
+        return zk.setData(path, newData, version);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper setData failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+          case BADVERSION:
+            // try to verify whether the previous setData success or not
+            try{
+              Stat stat = new Stat();
+              byte[] revData = zk.getData(path, false, stat);
+              int idLength = Bytes.toInt(revData, ID_OFFSET);
+              int dataLength = revData.length-ID_OFFSET-idLength;
+              int dataOffset = ID_OFFSET+idLength;
+
+              if(Bytes.compareTo(revData, ID_OFFSET, id.length,
+                  revData, dataOffset, dataLength) == 0) {
+                // the bad version is caused by previous successful setData
+                return stat;
+              }
+            } catch(KeeperException keeperException){
+              // the ZK is not reliable at this moment. just throw out exception
+              throw keeperException;
+            }
+
+          // throw out other exceptions and verified bad version exceptions
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * <p>
+   * NONSEQUENTIAL create is idempotent operation.
+   * Retry before throw out exceptions.
+   * But this function will not throw out the NodeExist exception back to the
+   * application.
+   * </p>
+   * <p>
+   * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
+   * identifier to the path to verify, whether the previous one is successful
+   * or not.
+   * </p>
+   *
+   * @param path
+   * @param data
+   * @param acl
+   * @param createMode
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public String create(String path, byte[] data, List<ACL> acl,
+      CreateMode createMode) throws KeeperException, InterruptedException {
+    byte[] newData = appendMetaData(data);
+    switch (createMode) {
+      case EPHEMERAL:
+      case PERSISTENT:
+        return createNonSequential(path, newData, acl, createMode);
+
+      case EPHEMERAL_SEQUENTIAL:
+      case PERSISTENT_SEQUENTIAL:
+        return createSequential(path, newData, acl, createMode);
+
+      default:
+        throw new IllegalArgumentException("Unrecognized CreateMode: " +
+            createMode);
+    }
+  }
+
+  private String createNonSequential(String path, byte[] data, List<ACL> acl,
+      CreateMode createMode) throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return zk.create(path, data, acl, createMode);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case NODEEXISTS:
+            // Non-sequential node was successfully created
+            return path;
+
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper create failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  private String createSequential(String path, byte[] data,
+      List<ACL> acl, CreateMode createMode)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    boolean first = true;
+    String newPath = path+this.identifier;
+    while (true) {
+      try {
+        if (!first) {
+          // Check if we succeeded on a previous attempt
+          String previousResult = findPreviousSequentialNode(newPath);
+          if (previousResult != null) {
+            return previousResult;
+          }
+        }
+        first = false;
+        return zk.create(newPath, data, acl, createMode);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper create failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  private String findPreviousSequentialNode(String path)
+    throws KeeperException, InterruptedException {
+    int lastSlashIdx = path.lastIndexOf('/');
+    assert(lastSlashIdx != -1);
+    String parent = path.substring(0, lastSlashIdx);
+    String nodePrefix = path.substring(lastSlashIdx+1);
+
+    List<String> nodes = zk.getChildren(parent, false);
+    List<String> matching = filterByPrefix(nodes, nodePrefix);
+    for (String node : matching) {
+      String nodePath = parent + "/" + node;
+      Stat stat = zk.exists(nodePath, false);
+      if (stat != null) {
+        return nodePath;
+      }
+    }
+    return null;
+  }
+
+  public byte[] removeMetaData(byte[] data) {
+    if(data == null || data.length == 0) {
+      return data;
+    }
+    // check the magic data; to be backward compatible
+    byte magic = data[0];
+    if(magic != MAGIC) {
+      return data;
+    }
+
+    int idLength = Bytes.toInt(data, MAGIC_OFFSET);
+    int dataLength = data.length-MAGIC_OFFSET-ID_OFFSET-idLength;
+    int dataOffset = MAGIC_OFFSET+ID_OFFSET+idLength;
+
+    byte[] newData = new byte[dataLength];
+    System.arraycopy(data, dataOffset, newData, 0, dataLength);
+
+    return newData;
+
+  }
+
+  private byte[] appendMetaData(byte[] data) {
+    if(data == null){
+      return null;
+    }
+
+    byte[] newData = new byte[MAGIC_OFFSET+ID_OFFSET+id.length+data.length];
+    int pos = 0;
+    pos = Bytes.putByte(newData, pos, MAGIC);
+    pos = Bytes.putInt(newData, pos, id.length);
+    pos = Bytes.putBytes(newData, pos, id, 0, id.length);
+    pos = Bytes.putBytes(newData, pos, data, 0, data.length);
+
+    return newData;
+  }
+
+  public long getSessionId() {
+    return zk.getSessionId();
+  }
+
+  public void close() throws InterruptedException {
+    zk.close();
+  }
+
+  public States getState() {
+    return zk.getState();
+  }
+
+  public ZooKeeper getZooKeeper() {
+    return zk;
+  }
+
+  public byte[] getSessionPassword() {
+    return zk.getSessionPasswd();
+  }
+
+  public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) {
+    this.zk.sync(path, null, null);
+  }
+
+
+  /**
+   * Filters the given node list by the given prefixes.
+   * This method is all-inclusive--if any element in the node list starts
+   * with any of the given prefixes, then it is included in the result.
+   *
+   * @param nodes the nodes to filter
+   * @param prefixes the prefixes to include in the result
+   * @return list of every element that starts with one of the prefixes
+   */
+  private static List<String> filterByPrefix(List<String> nodes,
+      String... prefixes) {
+    List<String> lockChildren = new ArrayList<String>();
+    for (String child : nodes){
+      for (String prefix : prefixes){
+        if (child.startsWith(prefix)){
+          lockChildren.add(child);
+          break;
+        }
+      }
+    }
+    return lockChildren;
+  }
+
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java?rev=1181515&r1=1181514&r2=1181515&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java Tue Oct 11 02:15:46 2011
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.zookeepe
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.InterruptedIOException;
 import java.io.PrintWriter;
 import java.io.InputStream;
 import java.io.ByteArrayInputStream;
@@ -111,7 +112,7 @@ public class ZooKeeperWrapper implements
 
   private String quorumServers = null;
   private final int sessionTimeout;
-  private ZooKeeper zooKeeper;
+  private RecoverableZooKeeper recoverableZK;
 
   /*
    * All the HBase directories are hosted under this parent
@@ -151,21 +152,17 @@ public class ZooKeeperWrapper implements
     return INSTANCES.get(name);
   }
   // creates only one instance
-  public static ZooKeeperWrapper createInstance(Configuration conf, String name) {
+  public static ZooKeeperWrapper createInstance(Configuration conf, String name)
+  throws IOException {
     if (getInstance(conf, name) != null) {
       return getInstance(conf, name);
     }
     ZooKeeperWrapper.createLock.lock();
     try {
       if (getInstance(conf, name) == null) {
-        try {
-          String fullname = getZookeeperClusterKey(conf, name);
-          ZooKeeperWrapper instance = new ZooKeeperWrapper(conf, fullname);
-          INSTANCES.put(fullname, instance);
-        }
-        catch (Exception e) {
-          LOG.error("<" + name + ">" + "Error creating a ZooKeeperWrapper " + e);
-        }
+        String fullname = getZookeeperClusterKey(conf, name);
+        ZooKeeperWrapper instance = new ZooKeeperWrapper(conf, fullname);
+        INSTANCES.put(fullname, instance);
       }
     }
     finally {
@@ -207,44 +204,40 @@ public class ZooKeeperWrapper implements
     rgnsInTransitZNode  = getZNode(parentZNode, regionsInTransitZNodeName);
     masterElectionZNode = getZNode(parentZNode, masterAddressZNodeName);
     clusterStateZNode   = getZNode(parentZNode, stateZNodeName);
-
-    connectToZk();
+    int retryNum = conf.getInt("zookeeper.connection.retry.num",3);
+    int retryFreq = conf.getInt("zookeeper.connection.retry.freq",1000);
+    connectToZk(retryNum,retryFreq);
   }
 
-  public void connectToZk() throws IOException {
+  public void connectToZk(int retryNum, int retryFreq)
+  throws IOException {
     try {
       LOG.info("Connecting to zookeeper");
-      if(zooKeeper != null) {
-        zooKeeper.close();
+      if(recoverableZK != null) {
+        recoverableZK.close();
         LOG.error("<" + instanceName + ">" + " Closed existing zookeeper client");
       }
-      zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, this);
+      recoverableZK = new RecoverableZooKeeper(quorumServers, sessionTimeout, this,
+          retryNum, retryFreq);
       LOG.debug("<" + instanceName + ">" + " Connected to zookeeper");
       // Ensure we are actually connected
-      ensureZkAvailable(10);
+      ensureZkAvailable();
     } catch (IOException e) {
       LOG.error("<" + instanceName + "> " + "Failed to create ZooKeeper object: " + e);
-      throw new IOException(e);
+      throw e;
     } catch (InterruptedException e) {
       LOG.error("<" + instanceName + " >" + "Error closing ZK connection: " + e);
-      throw new IOException(e);
+      throw new InterruptedIOException();
     }
   }
 
-  private void ensureZkAvailable(int maxRetries)
-  throws IOException, InterruptedException {
-    while (maxRetries-- > 0) {
-      try {
-        zooKeeper.exists(parentZNode, false);
-        return;
-      } catch(KeeperException.ConnectionLossException cle) {
-        LOG.info("Received ZK ConnectionLossException, ZK not done initializing"
-            + ", retrying connect to ZK after 1 second sleep");
-        Thread.sleep(1000);
-      } catch(KeeperException ke) {
-        LOG.error("Received abnormal ZK exception, aborting", ke);
-        throw new IOException(ke);
-      }
+  private void ensureZkAvailable() throws IOException, InterruptedException {
+    try {
+      recoverableZK.exists(parentZNode, false);
+      return;
+    } catch(KeeperException ke) {
+      LOG.error("Received ZK exception. ZK is not available", ke);
+      throw new IOException(ke);
     }
   }
 
@@ -437,7 +430,7 @@ public class ZooKeeperWrapper implements
    * is up-to-date from when we begin the operation.
    */
   public void sync(String path) {
-    this.zooKeeper.sync(path, null, null);
+    this.recoverableZK.sync(path, null, null);
   }
 
   /**
@@ -449,7 +442,7 @@ public class ZooKeeperWrapper implements
    */
   public boolean exists(String znode, boolean watch) {
     try {
-      return zooKeeper.exists(getZNode(parentZNode, znode), watch ? this : null)
+      return recoverableZK.exists(getZNode(parentZNode, znode), watch ? this : null)
              != null;
     } catch (KeeperException e) {
       LOG.error("Received KeeperException on exists() call, aborting", e);
@@ -462,7 +455,7 @@ public class ZooKeeperWrapper implements
 
   /** @return ZooKeeper used by this wrapper. */
   public ZooKeeper getZooKeeper() {
-    return zooKeeper;
+    return recoverableZK.getZooKeeper();
   }
 
   /**
@@ -471,7 +464,7 @@ public class ZooKeeperWrapper implements
    * @return long session ID of this ZooKeeper session.
    */
   public long getSessionID() {
-    return zooKeeper.getSessionId();
+    return recoverableZK.getSessionId();
   }
 
   /**
@@ -480,7 +473,7 @@ public class ZooKeeperWrapper implements
    * @return byte[] password of this ZooKeeper session.
    */
   public byte[] getSessionPassword() {
-    return zooKeeper.getSessionPasswd();
+    return recoverableZK.getSessionPassword();
   }
 
   /** @return host:port list of quorum servers. */
@@ -490,7 +483,7 @@ public class ZooKeeperWrapper implements
 
   /** @return true if currently connected to ZooKeeper, false otherwise. */
   public boolean isConnected() {
-    return zooKeeper.getState() == States.CONNECTED;
+    return recoverableZK.getState() == States.CONNECTED;
   }
 
   /**
@@ -518,7 +511,7 @@ public class ZooKeeperWrapper implements
    */
   public void setClusterStateWatch() {
     try {
-      zooKeeper.exists(clusterStateZNode, this);
+      recoverableZK.exists(clusterStateZNode, this);
     } catch (InterruptedException e) {
       LOG.warn("<" + instanceName + ">" + "Failed to check on ZNode " + clusterStateZNode, e);
     } catch (KeeperException e) {
@@ -538,11 +531,11 @@ public class ZooKeeperWrapper implements
     try {
       if(up) {
         byte[] data = Bytes.toBytes("up");
-        zooKeeper.create(clusterStateZNode, data,
+        recoverableZK.create(clusterStateZNode, data,
             Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         LOG.debug("<" + instanceName + ">" + "State node wrote in ZooKeeper");
       } else {
-        zooKeeper.delete(clusterStateZNode, -1);
+        recoverableZK.delete(clusterStateZNode, -1);
         LOG.debug("<" + instanceName + ">" + "State node deleted in ZooKeeper");
       }
       return true;
@@ -570,7 +563,7 @@ public class ZooKeeperWrapper implements
   public boolean watchMasterAddress(Watcher watcher)
   throws KeeperException {
     try {
-      Stat s = zooKeeper.exists(masterElectionZNode, watcher);
+      Stat s = recoverableZK.exists(masterElectionZNode, watcher);
       LOG.debug("<" + instanceName + ">" + " Set watcher on master address ZNode " + masterElectionZNode);
       return s != null;
     } catch (KeeperException e) {
@@ -611,7 +604,7 @@ public class ZooKeeperWrapper implements
   throws KeeperException {
     byte[] data;
     try {
-      data = zooKeeper.getData(znode, watcher, null);
+      data = recoverableZK.getData(znode, watcher, null);
     } catch (InterruptedException e) {
       // This should not happen
       return null;
@@ -631,11 +624,11 @@ public class ZooKeeperWrapper implements
    */
   public boolean ensureExists(final String znode) {
     try {
-      Stat stat = zooKeeper.exists(znode, false);
+      Stat stat = recoverableZK.exists(znode, false);
       if (stat != null) {
         return true;
       }
-      zooKeeper.create(znode, new byte[0],
+      recoverableZK.create(znode, new byte[0],
                        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
       LOG.debug("<" + instanceName + ">" + "Created ZNode " + znode);
       return true;
@@ -706,7 +699,7 @@ public class ZooKeeperWrapper implements
     throws KeeperException, InterruptedException {
     if (recursive) {
       LOG.info("<" + instanceName + ">" + "deleteZNode get children for " + znode);
-      List<String> znodes = this.zooKeeper.getChildren(znode, false);
+      List<String> znodes = this.recoverableZK.getChildren(znode, false);
       if (znodes != null && znodes.size() > 0) {
         for (String child : znodes) {
           String childFullPath = getZNode(znode, child);
@@ -715,14 +708,14 @@ public class ZooKeeperWrapper implements
         }
       }
     }
-    this.zooKeeper.delete(znode, -1);
+    this.recoverableZK.delete(znode, -1);
     LOG.debug("<" + instanceName + ">" + "Deleted ZNode " + znode);
   }
 
   private boolean createRootRegionLocation(String address) {
     byte[] data = Bytes.toBytes(address);
     try {
-      zooKeeper.create(rootRegionZNode, data, Ids.OPEN_ACL_UNSAFE,
+      recoverableZK.create(rootRegionZNode, data, Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT);
       LOG.debug("<" + instanceName + ">" + "Created ZNode " + rootRegionZNode + " with data " + address);
       return true;
@@ -738,7 +731,7 @@ public class ZooKeeperWrapper implements
   private boolean updateRootRegionLocation(String address) {
     byte[] data = Bytes.toBytes(address);
     try {
-      zooKeeper.setData(rootRegionZNode, data, -1);
+      recoverableZK.setData(rootRegionZNode, data, -1);
       LOG.debug("<" + instanceName + ">" + "SetData of ZNode " + rootRegionZNode + " with " + address);
       return true;
     } catch (KeeperException e) {
@@ -789,7 +782,7 @@ public class ZooKeeperWrapper implements
     String addressStr = address.toString();
     byte[] data = Bytes.toBytes(addressStr);
     try {
-      zooKeeper.create(masterElectionZNode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+      recoverableZK.create(masterElectionZNode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
       LOG.debug("<" + instanceName + ">" + "Wrote master address " + address + " to ZooKeeper");
       return true;
     } catch (InterruptedException e) {
@@ -812,7 +805,7 @@ public class ZooKeeperWrapper implements
     byte[] data = Bytes.toBytes(info.getServerAddress().toString());
     String znode = joinPath(rsZNode, info.getServerName());
     try {
-      zooKeeper.create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+      recoverableZK.create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
       LOG.debug("<" + instanceName + ">" + "Created ZNode " + znode
           + " with data " + info.getServerAddress().toString());
       return true;
@@ -834,10 +827,10 @@ public class ZooKeeperWrapper implements
     byte[] data = Bytes.toBytes(info.getServerAddress().toString());
     String znode = rsZNode + ZNODE_PATH_SEPARATOR + info.getServerName();
     try {
-      zooKeeper.setData(znode, data, -1);
+      recoverableZK.setData(znode, data, -1);
       LOG.debug("<" + instanceName + ">" + "Updated ZNode " + znode
           + " with data " + info.getServerAddress().toString());
-      zooKeeper.getData(znode, watcher, null);
+      recoverableZK.getData(znode, watcher, null);
       return true;
     } catch (KeeperException e) {
       LOG.warn("<" + instanceName + ">" + "Failed to update " + znode + " znode in ZooKeeper: " + e);
@@ -871,10 +864,10 @@ public class ZooKeeperWrapper implements
    */
   public void clearRSDirectory() {
     try {
-      List<String> nodes = zooKeeper.getChildren(rsZNode, false);
+      List<String> nodes = recoverableZK.getChildren(rsZNode, false);
       for (String node : nodes) {
         LOG.debug("<" + instanceName + ">" + "Deleting node: " + node);
-        zooKeeper.delete(joinPath(this.rsZNode, node), -1);
+        recoverableZK.delete(joinPath(this.rsZNode, node), -1);
       }
     } catch (KeeperException e) {
       LOG.warn("<" + instanceName + ">" + "Failed to delete " + rsZNode + " znodes in ZooKeeper: " + e);
@@ -889,7 +882,7 @@ public class ZooKeeperWrapper implements
   public int getRSDirectoryCount() {
     Stat stat = null;
     try {
-      stat = zooKeeper.exists(rsZNode, false);
+      stat = recoverableZK.exists(rsZNode, false);
     } catch (KeeperException e) {
       LOG.warn("Problem getting stats for " + rsZNode, e);
     } catch (InterruptedException e) {
@@ -901,7 +894,7 @@ public class ZooKeeperWrapper implements
   private boolean checkExistenceOf(String path) {
     Stat stat = null;
     try {
-      stat = zooKeeper.exists(path, false);
+      stat = recoverableZK.exists(path, false);
     } catch (KeeperException e) {
       LOG.warn("<" + instanceName + ">" + "checking existence of " + path, e);
     } catch (InterruptedException e) {
@@ -916,7 +909,7 @@ public class ZooKeeperWrapper implements
    */
   public void close() {
     try {
-      zooKeeper.close();
+      recoverableZK.close();
       INSTANCES.remove(instanceName);
       LOG.debug("<" + instanceName + ">" + "Closed connection with ZooKeeper; " + this.rootRegionZNode);
     } catch (InterruptedException e) {
@@ -995,7 +988,7 @@ public class ZooKeeperWrapper implements
     }
     try {
       if (checkExistenceOf(znode)) {
-        nodes = zooKeeper.getChildren(znode, this);
+        nodes = recoverableZK.getChildren(znode, this);
         for (String node : nodes) {
           getDataAndWatch(znode, node, this);
         }
@@ -1019,7 +1012,7 @@ public class ZooKeeperWrapper implements
       String path = joinPath(parentZNode, znode);
       // TODO: ZK-REFACTOR: remove existance check?
       if (checkExistenceOf(path)) {
-        data = zooKeeper.getData(path, watcher, null);
+        data = recoverableZK.getData(path, watcher, null);
       }
     } catch (KeeperException e) {
       LOG.warn("<" + instanceName + ">" + "Failed to read " + znode + " znode in ZooKeeper: " + e);
@@ -1060,13 +1053,13 @@ public class ZooKeeperWrapper implements
       LOG.error("<" + instanceName + ">" + "unable to ensure parent exists: " + parentPath);
     }
     byte[] data = Bytes.toBytes(strData);
-    Stat stat = this.zooKeeper.exists(path, false);
+    Stat stat = this.recoverableZK.exists(path, false);
     if (failOnWrite || stat == null) {
-      this.zooKeeper.create(path, data,
+      this.recoverableZK.create(path, data,
           Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
       LOG.debug("<" + instanceName + ">" + "Created " + path + " with data " + strData);
     } else {
-      this.zooKeeper.setData(path, data, -1);
+      this.recoverableZK.setData(path, data, -1);
       LOG.debug("<" + instanceName + ">" + "Updated " + path + " with data " + strData);
     }
   }
@@ -1121,7 +1114,7 @@ public class ZooKeeperWrapper implements
     String fullyQualifiedZNodeName = getZNode(parentZNode, zNodeName);
     try
     {
-      zooKeeper.delete(fullyQualifiedZNodeName, version);
+      recoverableZK.delete(fullyQualifiedZNodeName, version);
     }
     catch (InterruptedException e)
     {
@@ -1141,9 +1134,9 @@ public class ZooKeeperWrapper implements
     String fullyQualifiedZNodeName = getZNode(parentZNode, zNodeName);
 
     try {
-      zooKeeper.exists(fullyQualifiedZNodeName, this);
-      zooKeeper.getData(fullyQualifiedZNodeName, this, null);
-      zooKeeper.getChildren(fullyQualifiedZNodeName, this);
+      recoverableZK.exists(fullyQualifiedZNodeName, this);
+      recoverableZK.getData(fullyQualifiedZNodeName, this, null);
+      recoverableZK.getChildren(fullyQualifiedZNodeName, this);
     } catch (InterruptedException e) {
       LOG.warn("<" + instanceName + ">" + "Failed to create ZNode " + fullyQualifiedZNodeName + " in ZooKeeper", e);
     } catch (KeeperException e) {
@@ -1160,7 +1153,7 @@ public class ZooKeeperWrapper implements
 
     try {
       // create the znode
-      zooKeeper.create(fullyQualifiedZNodeName, data, Ids.OPEN_ACL_UNSAFE, createMode);
+      recoverableZK.create(fullyQualifiedZNodeName, data, Ids.OPEN_ACL_UNSAFE, createMode);
       LOG.debug("<" + instanceName + ">" + "Created ZNode " + fullyQualifiedZNodeName + " in ZooKeeper");
     } catch (KeeperException.NodeExistsException nee) {
       LOG.debug("<" + instanceName + "> " + "ZNode " + fullyQualifiedZNodeName + " already exists, still setting watch");
@@ -1183,7 +1176,7 @@ public class ZooKeeperWrapper implements
     byte[] data;
     try {
       String fullyQualifiedZNodeName = getZNode(parentZNode, znodeName);
-      data = zooKeeper.getData(fullyQualifiedZNodeName, this, stat);
+      data = recoverableZK.getData(fullyQualifiedZNodeName, this, stat);
     } catch (InterruptedException e) {
       throw new IOException(e);
     } catch (KeeperException e) {
@@ -1196,9 +1189,9 @@ public class ZooKeeperWrapper implements
   public boolean writeZNode(String znodeName, byte[] data, int version, boolean watch) throws IOException {
       try {
         String fullyQualifiedZNodeName = getZNode(parentZNode, znodeName);
-        zooKeeper.setData(fullyQualifiedZNodeName, data, version);
+        recoverableZK.setData(fullyQualifiedZNodeName, data, version);
         if(watch) {
-          zooKeeper.getData(fullyQualifiedZNodeName, this, null);
+          recoverableZK.getData(fullyQualifiedZNodeName, this, null);
         }
         return true;
       } catch (InterruptedException e) {
@@ -1359,7 +1352,7 @@ public class ZooKeeperWrapper implements
     try {
       if (checkExistenceOf(znode)) {
         synchronized(unassignedZNodesWatched) {
-          nodes = zooKeeper.getChildren(znode, this);
+          nodes = recoverableZK.getChildren(znode, this);
           for (String node : nodes) {
             String znodePath = joinPath(znode, node);
             if(!unassignedZNodesWatched.contains(znodePath)) {