You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/07/28 08:44:29 UTC

svn commit: r1151751 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/util/ src/main/java/org/apa...

Author: stack
Date: Thu Jul 28 06:44:27 2011
New Revision: 1151751

URL: http://svn.apache.org/viewvc?rev=1151751&view=rev
Log:
HBASE-3065 Retry all 'retryable' zk operations; e.g. connection loss

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Jul 28 06:44:27 2011
@@ -342,6 +342,7 @@ Release 0.91.0 - Unreleased
    HBASE-1938  Make in-memory table scanning faster (nkeywal)
    HBASE-4143  HTable.doPut(List) should check the writebuffer length every so often
                (Doug Meil via Ted Yu)
+   HBASE-3065  Retry all 'retryable' zk operations; e.g. connection loss (Liyin Tang)
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Thu Jul 28 06:44:27 2011
@@ -1706,8 +1706,8 @@ public class HConnectionManager {
       }
       this.servers.clear();
       if (this.zooKeeper != null) {
-        LOG.info("Closed zookeeper sessionid=0x"
-            + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
+        LOG.info("Closed zookeeper sessionid=0x" +
+          Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
         this.zooKeeper.close();
         this.zooKeeper = null;
       }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu Jul 28 06:44:27 2011
@@ -1145,7 +1145,7 @@ public class AssignmentManager extends Z
       LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.toString());
       // Async exists to set a watcher so we'll get triggered when
       // unassigned node changes.
-      this.zkw.getZooKeeper().exists(path, this.zkw,
+      this.zkw.getRecoverableZooKeeper().getZooKeeper().exists(path, this.zkw,
         new ExistsUnassignedAsyncCallback(this.counter, destination), ctx);
     }
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Thu Jul 28 06:44:27 2011
@@ -370,7 +370,7 @@ implements HMasterInterface, HMasterRegi
 
     LOG.info("Server active/primary master; " + this.serverName +
         ", sessionid=0x" +
-        Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) +
+        Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
         ", cluster-up flag was=" + wasUp);
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Thu Jul 28 06:44:27 2011
@@ -197,6 +197,7 @@ public class SplitLogManager extends Zoo
    * @throws IOException
    *             if there was an error while splitting any log file
    * @return cumulative size of the logfiles split
+   * @throws KeeperException 
    */
   public long splitLogDistributed(final Path logDir) throws IOException {
     List<Path> logDirs = new ArrayList<Path>();
@@ -370,7 +371,8 @@ public class SplitLogManager extends Zoo
 
 
   private void getDataSetWatch(String path, Long retry_count) {
-    this.watcher.getZooKeeper().getData(path, this.watcher,
+    this.watcher.getRecoverableZooKeeper().getZooKeeper().
+        getData(path, this.watcher,
         new GetDataAsyncCallback(), retry_count);
     tot_mgr_get_data_queued.incrementAndGet();
   }
@@ -524,7 +526,8 @@ public class SplitLogManager extends Zoo
 
   private void deleteNode(String path, Long retries) {
     tot_mgr_node_delete_queued.incrementAndGet();
-    this.watcher.getZooKeeper().delete(path, -1, new DeleteAsyncCallback(),
+    this.watcher.getRecoverableZooKeeper().getZooKeeper().
+      delete(path, -1, new DeleteAsyncCallback(),
         retries);
   }
 
@@ -551,9 +554,11 @@ public class SplitLogManager extends Zoo
   /**
    * signal the workers that a task was resubmitted by creating the
    * RESCAN node.
+   * @throws KeeperException 
    */
   private void createRescanNode(long retries) {
-    watcher.getZooKeeper().create(ZKSplitLog.getRescanNode(watcher),
+    this.watcher.getRecoverableZooKeeper().getZooKeeper().
+      create(ZKSplitLog.getRescanNode(watcher),
         TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT_SEQUENTIAL,
         new CreateRescanAsyncCallback(), new Long(retries));

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Jul 28 06:44:27 2011
@@ -879,7 +879,7 @@ public class HRegionServer implements HR
       LOG.info("Serving as " + this.serverNameFromMasterPOV +
         ", RPC listening on " + this.isa +
         ", sessionid=0x" +
-        Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
+        Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
       isOnline = true;
     } catch (Throwable e) {
       this.isOnline = false;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Thu Jul 28 06:44:27 2011
@@ -329,7 +329,7 @@ public class SplitLogWorker extends ZooK
    */
   private boolean ownTask(boolean isFirstTime) {
     try {
-      Stat stat = this.watcher.getZooKeeper().setData(currentTask,
+      Stat stat = this.watcher.getRecoverableZooKeeper().setData(currentTask,
           TaskState.TASK_OWNED.get(serverName), currentVersion);
       if (stat == null) {
         LOG.warn("zk.setData() returned null for path " + currentTask);
@@ -392,8 +392,9 @@ public class SplitLogWorker extends ZooK
   }
 
   void getDataSetWatchAsync() {
-    this.watcher.getZooKeeper().getData(currentTask, this.watcher,
-        new GetDataAsyncCallback(), null);
+    this.watcher.getRecoverableZooKeeper().getZooKeeper().
+      getData(currentTask, this.watcher,
+      new GetDataAsyncCallback(), null);
     tot_wkr_get_data_queued.incrementAndGet();
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Thu Jul 28 06:44:27 2011
@@ -310,7 +310,16 @@ public class Bytes {
   public static String toStringBinary(final byte [] b) {
     return toStringBinary(b, 0, b.length);
   }
-
+  
+  /**
+   * The same as {@link #toStringBinary(byte[])}, but returns a string "null"
+   * if given a null argument.
+   */
+  public static String toStringBinarySafe(final byte [] b) {
+    if (b == null)
+      return "null";
+    return toStringBinary(b, 0, b.length);
+  }
   /**
    * Write a printable representation of a byte array. Non-printable
    * characters are hex escaped in the format \\x%02X, eg:

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java?rev=1151751&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounter.java Thu Jul 28 06:44:27 2011
@@ -0,0 +1,57 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.TimeUnit;
+
+public class RetryCounter {
+  private final int maxRetries;
+  private int retriesRemaining;
+  private final int retryIntervalMillis;
+  private final TimeUnit timeUnit;
+
+  public RetryCounter(int maxRetries, 
+  int retryIntervalMillis, TimeUnit timeUnit) {
+    this.maxRetries = maxRetries;
+    this.retriesRemaining = maxRetries;
+    this.retryIntervalMillis = retryIntervalMillis;
+    this.timeUnit = timeUnit;
+  }
+
+  public int getMaxRetries() {
+    return maxRetries;
+  }
+
+  public void sleepUntilNextRetry() throws InterruptedException {
+    timeUnit.sleep(retryIntervalMillis);
+  }
+
+  public boolean shouldRetry() {
+    return retriesRemaining > 0;
+  }
+
+  public void useRetry() {
+    retriesRemaining--;
+  }
+  
+  public int getAttemptTimes() {
+    return maxRetries-retriesRemaining+1;
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java?rev=1151751&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/RetryCounterFactory.java Thu Jul 28 06:44:27 2011
@@ -0,0 +1,38 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.TimeUnit;
+
+public class RetryCounterFactory {
+  private final int maxRetries;
+  private final int retryIntervalMillis;
+
+  public RetryCounterFactory(int maxRetries, int retryIntervalMillis) {
+    this.maxRetries = maxRetries;
+    this.retryIntervalMillis = retryIntervalMillis;
+  }
+
+  public RetryCounter create() {
+    return new RetryCounter(
+      maxRetries, retryIntervalMillis, TimeUnit.MILLISECONDS
+    );
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java?rev=1151751&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java Thu Jul 28 06:44:27 2011
@@ -0,0 +1,661 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * A zookeeper that can handle 'recoverable' errors.
+ * To handle recoverable errors, developers need to realize that there are two 
+ * classes of requests: idempotent and non-idempotent requests. Read requests 
+ * and unconditional sets and deletes are examples of idempotent requests, they 
+ * can be reissued with the same results. 
+ * (Although, the delete may throw a NoNodeException on reissue its effect on 
+ * the ZooKeeper state is the same.) Non-idempotent requests need special 
+ * handling, application and library writers need to keep in mind that they may 
+ * need to encode information in the data or name of znodes to detect 
+ * retries. A simple example is a create that uses a sequence flag. 
+ * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection 
+ * loss exception, that process will reissue another 
+ * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a 
+ * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be 
+ * that x-109 was the result of the previous create, so the process actually 
+ * owns both x-109 and x-111. An easy way around this is to use "x-process id-" 
+ * when doing the create. If the process is using an id of 352, before reissuing
+ * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", 
+ * "x-352-109", x-333-110". The process will know that the original create 
+ * succeeded an the znode it created is "x-352-109".
+ * @see http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
+ */
+public class RecoverableZooKeeper {
+  private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
+  // the actual ZooKeeper client instance
+  private ZooKeeper zk;
+  private final RetryCounterFactory retryCounterFactory;
+  // An identifier of this process in the cluster
+  private final String identifier;
+  private final byte[] id;
+  private int retryIntervalMillis;
+
+  private static final int ID_OFFSET =  Bytes.SIZEOF_INT;
+  // the magic number is to be backward compatible
+  private static final byte MAGIC =(byte) 0XFF;
+  private static final int MAGIC_OFFSET = Bytes.SIZEOF_BYTE;
+
+  public RecoverableZooKeeper(String quorumServers, int seesionTimeout,
+      Watcher watcher, int maxRetries, int retryIntervalMillis) 
+  throws IOException {
+    this.zk = new ZooKeeper(quorumServers, seesionTimeout, watcher);
+    this.retryCounterFactory =
+      new RetryCounterFactory(maxRetries, retryIntervalMillis);
+    this.retryIntervalMillis = retryIntervalMillis;
+
+    // the identifier = processID@hostName
+    this.identifier = ManagementFactory.getRuntimeMXBean().getName();
+    LOG.info("The identifier of this process is " + identifier);
+    this.id = Bytes.toBytes(identifier);
+  }
+
+  /**
+   * delete is an idempotent operation. Retry before throw out exception.
+   * This function will not throw out NoNodeException if the path is not existed
+   * @param path
+   * @param version
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public void delete(String path, int version)
+  throws InterruptedException, KeeperException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    boolean isRetry = false; // False for first attempt, true for all retries.
+    while (true) {
+      try {
+        zk.delete(path, version);
+        return;
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case NONODE:
+            if (isRetry) {
+              LOG.info("Node " + path + " already deleted. Assuming that a " +
+                  "previous attempt succeeded.");
+              return;
+            }
+            LOG.warn("Node " + path + " already deleted, and this is not a " +
+                     "retry");
+            throw e;
+
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper delete failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+      isRetry = true;
+    }
+  }
+
+  /**
+   * exists is an idempotent operation. Retry before throw out exception
+   * @param path
+   * @param watcher
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public Stat exists(String path, Watcher watcher)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return zk.exists(path, watcher);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper exists failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * exists is an idempotent operation. Retry before throw out exception
+   * @param path
+   * @param watch
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public Stat exists(String path, boolean watch)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return zk.exists(path, watch);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper exists failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * getChildren is an idempotent operation. Retry before throw out exception
+   * @param path
+   * @param watcher
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public List<String> getChildren(String path, Watcher watcher)
+    throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return zk.getChildren(path, watcher);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper getChildren failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * getChildren is an idempotent operation. Retry before throw out exception
+   * @param path
+   * @param watch
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public List<String> getChildren(String path, boolean watch)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return zk.getChildren(path, watch);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper getChildren failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * getData is an idempotent operation. Retry before throw out exception
+   * @param path
+   * @param watcher
+   * @param stat
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public byte[] getData(String path, Watcher watcher, Stat stat)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        byte[] revData = zk.getData(path, watcher, stat);       
+        return this.removeMetaData(revData);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper getData failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      } 
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * getData is an idemnpotent operation. Retry before throw out exception
+   * @param path
+   * @param watch
+   * @param stat
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public byte[] getData(String path, boolean watch, Stat stat)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        byte[] revData = zk.getData(path, watch, stat);
+        return this.removeMetaData(revData);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper getData failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
+   * Adding an identifier field into the data to check whether 
+   * badversion is caused by the result of previous correctly setData
+   * @param path
+   * @param data
+   * @param version
+   * @return 
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public Stat setData(String path, byte[] data, int version)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    byte[] newData = appendMetaData(data);
+    while (true) {
+      try {
+        return zk.setData(path, newData, version);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper setData failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+          case BADVERSION:
+            // try to verify whether the previous setData success or not
+            try{
+              Stat stat = new Stat();
+              byte[] revData = zk.getData(path, false, stat);
+              int idLength = Bytes.toInt(revData, ID_OFFSET);
+              int dataLength = revData.length-ID_OFFSET-idLength;
+              int dataOffset = ID_OFFSET+idLength;
+              
+              if(Bytes.compareTo(revData, ID_OFFSET, id.length, 
+                  revData, dataOffset, dataLength) == 0) {
+                // the bad version is caused by previous successful setData
+                return stat;
+              }
+            } catch(KeeperException keeperException){
+              // the ZK is not reliable at this moment. just throw out exception
+              throw keeperException;
+            }            
+          
+          // throw out other exceptions and verified bad version exceptions
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * <p>
+   * NONSEQUENTIAL create is idempotent operation. 
+   * Retry before throw out exceptions.
+   * But this function will not throw out the NodeExist exception back to the
+   * application.
+   * </p>
+   * <p>
+   * But SEQUENTIAL is NOT idempotent operation. It is necessary to add 
+   * identifier to the path to verify, whether the previous one is successful 
+   * or not.
+   * </p>
+   * 
+   * @param path
+   * @param data
+   * @param acl
+   * @param createMode
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public String create(String path, byte[] data, List<ACL> acl,
+      CreateMode createMode)
+  throws KeeperException, InterruptedException {
+    byte[] newData = appendMetaData(data);
+    switch (createMode) {
+      case EPHEMERAL:
+      case PERSISTENT:
+        return createNonSequential(path, newData, acl, createMode);
+
+      case EPHEMERAL_SEQUENTIAL:
+      case PERSISTENT_SEQUENTIAL:
+        return createSequential(path, newData, acl, createMode);
+
+      default:
+        throw new IllegalArgumentException("Unrecognized CreateMode: " + 
+            createMode);
+    }
+  }
+
+  private String createNonSequential(String path, byte[] data, List<ACL> acl, 
+      CreateMode createMode) throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    boolean isRetry = false; // False for first attempt, true for all retries.
+    while (true) {
+      try {
+        return zk.create(path, data, acl, createMode);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case NODEEXISTS:
+            if (isRetry) {
+              // If the connection was lost, there is still a possibility that
+              // we have successfully created the node at our previous attempt,
+              // so we read the node and compare. 
+              byte[] currentData = zk.getData(path, false, null);
+              if (currentData != null &&
+                  Bytes.compareTo(currentData, data) == 0) { 
+                // We successfully created a non-sequential node
+                return path;
+              }
+              LOG.error("Node " + path + " already exists with " + 
+                  Bytes.toStringBinarySafe(currentData) + ", could not write " +
+                  Bytes.toStringBinarySafe(data));
+              throw e;
+            }
+            LOG.error("Node " + path + " already exists and this is not a " +
+                "retry");
+            throw e;
+
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper create failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+      isRetry = true;
+    }
+  }
+  
+  private String createSequential(String path, byte[] data, 
+      List<ACL> acl, CreateMode createMode)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    boolean first = true;
+    String newPath = path+this.identifier;
+    while (true) {
+      try {
+        if (!first) {
+          // Check if we succeeded on a previous attempt
+          String previousResult = findPreviousSequentialNode(newPath);
+          if (previousResult != null) {
+            return previousResult;
+          }
+        }
+        first = false;
+        return zk.create(newPath, data, acl, createMode);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case OPERATIONTIMEOUT:
+            LOG.warn("Possibly transient ZooKeeper exception: " + e);
+            if (!retryCounter.shouldRetry()) {
+              LOG.error("ZooKeeper create failed after "
+                + retryCounter.getMaxRetries() + " retries");
+              throw e;
+            }
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      LOG.info("The "+retryCounter.getAttemptTimes()+" times to retry " +
+          "ZooKeeper after sleeping "+retryIntervalMillis+" ms");
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  private String findPreviousSequentialNode(String path)
+    throws KeeperException, InterruptedException {
+    int lastSlashIdx = path.lastIndexOf('/');
+    assert(lastSlashIdx != -1);
+    String parent = path.substring(0, lastSlashIdx);
+    String nodePrefix = path.substring(lastSlashIdx+1);
+
+    List<String> nodes = zk.getChildren(parent, false);
+    List<String> matching = filterByPrefix(nodes, nodePrefix);
+    for (String node : matching) {
+      String nodePath = parent + "/" + node;
+      Stat stat = zk.exists(nodePath, false);
+      if (stat != null) {
+        return nodePath;
+      }
+    }
+    return null;
+  }
+  
+  public byte[] removeMetaData(byte[] data) {
+    if(data == null || data.length == 0) {
+      return data;
+    }
+    // check the magic data; to be backward compatible
+    byte magic = data[0];
+    if(magic != MAGIC) {
+      return data;
+    }
+    
+    int idLength = Bytes.toInt(data, MAGIC_OFFSET);
+    int dataLength = data.length-MAGIC_OFFSET-ID_OFFSET-idLength;
+    int dataOffset = MAGIC_OFFSET+ID_OFFSET+idLength;
+
+    byte[] newData = new byte[dataLength];
+    System.arraycopy(data, dataOffset, newData, 0, dataLength);
+    
+    return newData;
+    
+  }
+  
+  private byte[] appendMetaData(byte[] data) {
+    if(data == null){
+      return null;
+    }
+    
+    byte[] newData = new byte[MAGIC_OFFSET+ID_OFFSET+id.length+data.length];
+    int pos = 0;
+    pos = Bytes.putByte(newData, pos, MAGIC);
+    pos = Bytes.putInt(newData, pos, id.length);
+    pos = Bytes.putBytes(newData, pos, id, 0, id.length);
+    pos = Bytes.putBytes(newData, pos, data, 0, data.length);
+
+    return newData;
+  }
+
+  public long getSessionId() {
+    return zk.getSessionId();
+  }
+
+  public void close() throws InterruptedException {
+    zk.close();
+  }
+
+  public States getState() {
+    return zk.getState();
+  }
+
+  public ZooKeeper getZooKeeper() {
+    return zk;
+  }
+
+  public byte[] getSessionPasswd() {
+    return zk.getSessionPasswd();
+  }
+
+  public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) {
+    this.zk.sync(path, null, null);
+  }
+
+  /**
+   * Filters the given node list by the given prefixes.
+   * This method is all-inclusive--if any element in the node list starts
+   * with any of the given prefixes, then it is included in the result.
+   *
+   * @param nodes the nodes to filter
+   * @param prefixes the prefixes to include in the result
+   * @return list of every element that starts with one of the prefixes
+   */
+  private static List<String> filterByPrefix(List<String> nodes, 
+      String... prefixes) {
+    List<String> lockChildren = new ArrayList<String>();
+    for (String child : nodes){
+      for (String prefix : prefixes){
+        if (child.startsWith(prefix)){
+          lockChildren.add(child);
+          break;
+        }
+      }
+    }
+    return lockChildren;
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Thu Jul 28 06:44:27 2011
@@ -70,20 +70,20 @@ public class ZKUtil {
    * @return connection to zookeeper
    * @throws IOException if unable to connect to zk or config problem
    */
-  public static ZooKeeper connect(Configuration conf, Watcher watcher)
+  public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
   throws IOException {
     Properties properties = ZKConfig.makeZKProps(conf);
     String ensemble = ZKConfig.getZKQuorumServersString(properties);
     return connect(conf, ensemble, watcher);
   }
 
-  public static ZooKeeper connect(Configuration conf, String ensemble,
+  public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
       Watcher watcher)
   throws IOException {
     return connect(conf, ensemble, watcher, "");
   }
 
-  public static ZooKeeper connect(Configuration conf, String ensemble,
+  public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
       Watcher watcher, final String descriptor)
   throws IOException {
     if(ensemble == null) {
@@ -92,7 +92,11 @@ public class ZKUtil {
     int timeout = conf.getInt("zookeeper.session.timeout", 180 * 1000);
     LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" +
         ensemble + ")");
-    return new ZooKeeper(ensemble, timeout, watcher);
+    int retry = conf.getInt("zookeeper.recovery.retry", 3);
+    int retryIntervalMillis = 
+      conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
+    return new RecoverableZooKeeper(ensemble, timeout, watcher, 
+        retry, retryIntervalMillis);
   }
 
   //
@@ -214,7 +218,7 @@ public class ZKUtil {
   public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode)
   throws KeeperException {
     try {
-      Stat s = zkw.getZooKeeper().exists(znode, zkw);
+      Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw);
       LOG.debug(zkw.prefix("Set watcher on existing znode " + znode));
       return s != null ? true : false;
     } catch (KeeperException e) {
@@ -242,7 +246,7 @@ public class ZKUtil {
   public static int checkExists(ZooKeeperWatcher zkw, String znode)
   throws KeeperException {
     try {
-      Stat s = zkw.getZooKeeper().exists(znode, null);
+      Stat s = zkw.getRecoverableZooKeeper().exists(znode, null);
       return s != null ? s.getVersion() : -1;
     } catch (KeeperException e) {
       LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e);
@@ -279,7 +283,7 @@ public class ZKUtil {
       ZooKeeperWatcher zkw, String znode)
   throws KeeperException {
     try {
-      List<String> children = zkw.getZooKeeper().getChildren(znode, zkw);
+      List<String> children = zkw.getRecoverableZooKeeper().getChildren(znode, zkw);
       return children;
     } catch(KeeperException.NoNodeException ke) {
       LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
@@ -339,7 +343,7 @@ public class ZKUtil {
     List<String> children = null;
     try {
       // List the children without watching
-      children = zkw.getZooKeeper().getChildren(znode, null);
+      children = zkw.getRecoverableZooKeeper().getChildren(znode, null);
     } catch(KeeperException.NoNodeException nne) {
       return null;
     } catch(InterruptedException ie) {
@@ -389,7 +393,7 @@ public class ZKUtil {
   public static boolean nodeHasChildren(ZooKeeperWatcher zkw, String znode)
   throws KeeperException {
     try {
-      return !zkw.getZooKeeper().getChildren(znode, null).isEmpty();
+      return !zkw.getRecoverableZooKeeper().getChildren(znode, null).isEmpty();
     } catch(KeeperException.NoNodeException ke) {
       LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
       "because node does not exist (not an error)"));
@@ -421,7 +425,7 @@ public class ZKUtil {
   public static int getNumberOfChildren(ZooKeeperWatcher zkw, String znode)
   throws KeeperException {
     try {
-      Stat stat = zkw.getZooKeeper().exists(znode, null);
+      Stat stat = zkw.getRecoverableZooKeeper().exists(znode, null);
       return stat == null ? 0 : stat.getNumChildren();
     } catch(KeeperException e) {
       LOG.warn(zkw.prefix("Unable to get children of node " + znode));
@@ -443,7 +447,7 @@ public class ZKUtil {
   public static byte [] getData(ZooKeeperWatcher zkw, String znode)
   throws KeeperException {
     try {
-      byte [] data = zkw.getZooKeeper().getData(znode, null, null);
+      byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, null);
       logRetrievedMsg(zkw, znode, data, false);
       return data;
     } catch (KeeperException.NoNodeException e) {
@@ -475,7 +479,7 @@ public class ZKUtil {
   public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode)
   throws KeeperException {
     try {
-      byte [] data = zkw.getZooKeeper().getData(znode, zkw, null);
+      byte [] data = zkw.getRecoverableZooKeeper().getData(znode, zkw, null);
       logRetrievedMsg(zkw, znode, data, true);
       return data;
     } catch (KeeperException.NoNodeException e) {
@@ -512,7 +516,7 @@ public class ZKUtil {
       Stat stat)
   throws KeeperException {
     try {
-      byte [] data = zkw.getZooKeeper().getData(znode, null, stat);
+      byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, stat);
       logRetrievedMsg(zkw, znode, data, false);
       return data;
     } catch (KeeperException.NoNodeException e) {
@@ -549,7 +553,7 @@ public class ZKUtil {
       byte [] data, int expectedVersion)
   throws KeeperException {
     try {
-      zkw.getZooKeeper().setData(znode, data, expectedVersion);
+      zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion);
     } catch(InterruptedException ie) {
       zkw.interruptedException(ie);
     }
@@ -583,7 +587,7 @@ public class ZKUtil {
       byte [] data, int expectedVersion)
   throws KeeperException, KeeperException.NoNodeException {
     try {
-      return zkw.getZooKeeper().setData(znode, data, expectedVersion) != null;
+      return zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion) != null;
     } catch (InterruptedException e) {
       zkw.interruptedException(e);
       return false;
@@ -654,7 +658,7 @@ public class ZKUtil {
       String znode, byte [] data)
   throws KeeperException {
     try {
-      zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
+      zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
           CreateMode.EPHEMERAL);
     } catch (KeeperException.NodeExistsException nee) {
       if(!watchAndCheckExists(zkw, znode)) {
@@ -693,11 +697,11 @@ public class ZKUtil {
       ZooKeeperWatcher zkw, String znode, byte [] data)
   throws KeeperException {
     try {
-      zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
+      zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
           CreateMode.PERSISTENT);
     } catch (KeeperException.NodeExistsException nee) {
       try {
-        zkw.getZooKeeper().exists(znode, zkw);
+        zkw.getRecoverableZooKeeper().exists(znode, zkw);
       } catch (InterruptedException e) {
         zkw.interruptedException(e);
         return false;
@@ -730,9 +734,9 @@ public class ZKUtil {
       String znode, byte [] data)
   throws KeeperException, KeeperException.NodeExistsException {
     try {
-      zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
+      zkw.getRecoverableZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
           CreateMode.PERSISTENT);
-      return zkw.getZooKeeper().exists(znode, zkw).getVersion();
+      return zkw.getRecoverableZooKeeper().exists(znode, zkw).getVersion();
     } catch (InterruptedException e) {
       zkw.interruptedException(e);
       return -1;
@@ -757,7 +761,7 @@ public class ZKUtil {
   public static void asyncCreate(ZooKeeperWatcher zkw,
       String znode, byte [] data, final AsyncCallback.StringCallback cb,
       final Object ctx) {
-    zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
+    zkw.getRecoverableZooKeeper().getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
        CreateMode.PERSISTENT, cb, ctx);
   }
 
@@ -775,7 +779,7 @@ public class ZKUtil {
       String znode)
   throws KeeperException {
     try {
-      ZooKeeper zk = zkw.getZooKeeper();
+      RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper();
       if (zk.exists(znode, false) == null) {
         zk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE,
             CreateMode.PERSISTENT);
@@ -783,7 +787,7 @@ public class ZKUtil {
     } catch(KeeperException.NodeExistsException nee) {
     } catch(KeeperException.NoAuthException nee){
       try {
-        if (null == zkw.getZooKeeper().exists(znode, false)) {
+        if (null == zkw.getRecoverableZooKeeper().exists(znode, false)) {
           // If we failed to create the file and it does not already exist.
           throw(nee);
         }
@@ -813,7 +817,7 @@ public class ZKUtil {
       if(znode == null) {
         return;
       }
-      zkw.getZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE,
+      zkw.getRecoverableZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE,
           CreateMode.PERSISTENT);
     } catch(KeeperException.NodeExistsException nee) {
       return;
@@ -845,7 +849,7 @@ public class ZKUtil {
       int version)
   throws KeeperException {
     try {
-      zkw.getZooKeeper().delete(node, version);
+      zkw.getRecoverableZooKeeper().delete(node, version);
       return true;
     } catch(KeeperException.BadVersionException bve) {
       return false;
@@ -864,7 +868,7 @@ public class ZKUtil {
   public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node)
   throws KeeperException {
     try {
-      zkw.getZooKeeper().delete(node, -1);
+      zkw.getRecoverableZooKeeper().delete(node, -1);
     } catch(KeeperException.NoNodeException nne) {
     } catch(InterruptedException ie) {
       zkw.interruptedException(ie);
@@ -886,7 +890,7 @@ public class ZKUtil {
           deleteNodeRecursively(zkw, joinZNode(node, child));
         }
       }
-      zkw.getZooKeeper().delete(node, -1);
+      zkw.getRecoverableZooKeeper().delete(node, -1);
     } catch(InterruptedException ie) {
       zkw.interruptedException(ie);
     }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Thu Jul 28 06:44:27 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.zookeepe
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.logging.Log;
@@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.util.Thre
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 
 /**
  * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
@@ -58,7 +56,7 @@ public class ZooKeeperWatcher implements
   private String quorum;
 
   // zookeeper connection
-  private ZooKeeper zooKeeper;
+  private RecoverableZooKeeper recoverableZooKeeper;
 
   // abortable in case of zk failure
   private Abortable abortable;
@@ -116,51 +114,11 @@ public class ZooKeeperWatcher implements
     this.identifier = descriptor;
     this.abortable = abortable;
     setNodeNames(conf);
-    this.zooKeeper = ZKUtil.connect(conf, quorum, this, descriptor);
+    this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, descriptor);
     try {
       // Create all the necessary "directories" of znodes
       // TODO: Move this to an init method somewhere so not everyone calls it?
-
-      // The first call against zk can fail with connection loss.  Seems common.
-      // Apparently this is recoverable.  Retry a while.
-      // See http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
-      // TODO: Generalize out in ZKUtil.
-      long wait = conf.getLong(HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
-          HConstants.DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME);
-      long finished = System.currentTimeMillis() + wait;
-      KeeperException ke = null;
-      do {
-        try {
-          ZKUtil.createAndFailSilent(this, baseZNode);
-          ke = null;
-          break;
-        } catch (KeeperException.ConnectionLossException e) {
-          if (LOG.isDebugEnabled() && (isFinishedRetryingRecoverable(finished))) {
-            LOG.debug("Retrying zk create for another " +
-              (finished - System.currentTimeMillis()) +
-              "ms; set 'hbase.zookeeper.recoverable.waittime' to change " +
-              "wait time); " + e.getMessage());
-          }
-          ke = e;
-        }
-      } while (isFinishedRetryingRecoverable(finished));
-      // Convert connectionloss exception to ZKCE.
-      if (ke != null) {
-        try {
-          // If we don't close it, the zk connection managers won't be killed
-          this.zooKeeper.close();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          LOG.warn("Interrupted while closing", e);
-        }
-        throw new ZooKeeperConnectionException("HBase is able to connect to" +
-            " ZooKeeper but the connection closes immediately. This could be" +
-            " a sign that the server has too many connections (30 is the" +
-            " default). Consider inspecting your ZK server logs for that" +
-            " error and then make sure you are reusing HBaseConfiguration" +
-            " as often as you can. See HTable's javadoc for more information.",
-            ke);
-      }
+      ZKUtil.createAndFailSilent(this, baseZNode);
       ZKUtil.createAndFailSilent(this, assignmentZNode);
       ZKUtil.createAndFailSilent(this, rsZNode);
       ZKUtil.createAndFailSilent(this, tableZNode);
@@ -235,8 +193,8 @@ public class ZooKeeperWatcher implements
    * Get the connection to ZooKeeper.
    * @return connection reference to zookeeper
    */
-  public ZooKeeper getZooKeeper() {
-    return zooKeeper;
+  public RecoverableZooKeeper getRecoverableZooKeeper() {
+    return recoverableZooKeeper;
   }
 
   /**
@@ -321,16 +279,16 @@ public class ZooKeeperWatcher implements
           this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
         while (System.currentTimeMillis() < finished) {
           Threads.sleep(1);
-          if (this.zooKeeper != null) break;
+          if (this.recoverableZooKeeper != null) break;
         }
-        if (this.zooKeeper == null) {
+        if (this.recoverableZooKeeper == null) {
           LOG.error("ZK is null on connection event -- see stack trace " +
             "for the stack trace when constructor was called on this zkw",
             this.constructorCaller);
           throw new NullPointerException("ZK is null");
         }
         this.identifier = this.identifier + "-0x" +
-          Long.toHexString(this.zooKeeper.getSessionId());
+          Long.toHexString(this.recoverableZooKeeper.getSessionId());
         // Update our identifier.  Otherwise ignore.
         LOG.debug(this.identifier + " connected");
         break;
@@ -365,7 +323,7 @@ public class ZooKeeperWatcher implements
    * is up-to-date from when we begin the operation.
    */
   public void sync(String path) {
-    this.zooKeeper.sync(path, null, null);
+    this.recoverableZooKeeper.sync(path, null, null);
   }
 
   /**
@@ -408,8 +366,8 @@ public class ZooKeeperWatcher implements
    */
   public void close() {
     try {
-      if (zooKeeper != null) {
-        zooKeeper.close();
+      if (recoverableZooKeeper != null) {
+        recoverableZooKeeper.close();
 //        super.close();
       }
     } catch (InterruptedException e) {

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu Jul 28 06:44:27 2011
@@ -1126,7 +1126,7 @@ public class HBaseTestingUtility {
     Configuration c = new Configuration(this.conf);
     String quorumServers = ZKConfig.getZKQuorumServersString(c);
     int sessionTimeout = 5 * 1000; // 5 seconds
-    ZooKeeper zk = nodeZK.getZooKeeper();
+    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
     byte[] password = zk.getSessionPasswd();
     long sessionID = zk.getSessionId();
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java Thu Jul 28 06:44:27 2011
@@ -99,9 +99,8 @@ public class TestZooKeeper {
     int sessionTimeout = 5 * 1000; // 5 seconds
     HConnection connection = HConnectionManager.getConnection(c);
     ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher();
-    long sessionID = connectionZK.getZooKeeper().getSessionId();
-    byte [] password = connectionZK.getZooKeeper().getSessionPasswd();
-
+    long sessionID = connectionZK.getRecoverableZooKeeper().getSessionId();
+    byte[] password = connectionZK.getRecoverableZooKeeper().getSessionPasswd();
     ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout,
         EmptyWatcher.instance, sessionID, password);
     LOG.info("Session timeout=" + zk.getSessionTimeout() +
@@ -116,15 +115,16 @@ public class TestZooKeeper {
 
     // Check that the old ZK connection is closed, means we did expire
     System.err.println("ZooKeeper should have timed out");
-    String state = connectionZK.getZooKeeper().getState().toString();
-    Assert.assertTrue("State=" + state,
-      connectionZK.getZooKeeper().getState().equals(States.CLOSED));
+    String state = connectionZK.getRecoverableZooKeeper().getState().toString();
+    LOG.info("state=" + connectionZK.getRecoverableZooKeeper().getState());
+    Assert.assertTrue(connectionZK.getRecoverableZooKeeper().getState().
+      equals(States.CLOSED));
 
     // Check that the client recovered
     ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher();
-    LOG.info("state=" + newConnectionZK.getZooKeeper().getState());
-    Assert.assertTrue(newConnectionZK.getZooKeeper().getState().equals(
-        States.CONNECTED));
+    LOG.info("state=" + newConnectionZK.getRecoverableZooKeeper().getState());
+    Assert.assertTrue(newConnectionZK.getRecoverableZooKeeper().getState().equals(
+      States.CONNECTED));
   }
   
   @Test
@@ -272,4 +272,4 @@ public class TestZooKeeper {
 
     ZKUtil.createAndFailSilent(zk2, aclZnode);
  }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Thu Jul 28 06:44:27 2011
@@ -198,7 +198,7 @@ public class TestSplitLogManager {
     LOG.info("TestOrphanTaskAcquisition");
 
     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
-    zkw.getZooKeeper().create(tasknode,
+    zkw.getRecoverableZooKeeper().create(tasknode,
         TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -231,7 +231,7 @@ public class TestSplitLogManager {
         " startup");
     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
     //create an unassigned orphan task
-    zkw.getZooKeeper().create(tasknode,
+    zkw.getRecoverableZooKeeper().create(tasknode,
         TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
     int version = ZKUtil.checkExists(zkw, tasknode);
@@ -391,7 +391,7 @@ public class TestSplitLogManager {
 
     // create an orphan task in OWNED state
     String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
-    zkw.getZooKeeper().create(tasknode1,
+    zkw.getRecoverableZooKeeper().create(tasknode1,
         TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java Thu Jul 28 06:44:27 2011
@@ -129,7 +129,7 @@ public class TestSplitLogWorker {
     LOG.info("testAcquireTaskAtStartup");
     ZKSplitLog.Counters.resetCounters();
 
-    zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"),
+    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"),
         TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -161,7 +161,7 @@ public class TestSplitLogWorker {
     LOG.info("testRaceForTask");
     ZKSplitLog.Counters.resetCounters();
 
-    zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"),
+    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"),
         TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -200,7 +200,7 @@ public class TestSplitLogWorker {
       Thread.sleep(100);
 
       // this time create a task node after starting the splitLogWorker
-      zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
+      zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
         TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -228,7 +228,7 @@ public class TestSplitLogWorker {
       Thread.yield(); // let the worker start
       Thread.sleep(100);
 
-      zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
+      zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
         TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -236,7 +236,7 @@ public class TestSplitLogWorker {
       // now the worker is busy doing the above task
 
       // create another task
-      zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"),
+      zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"),
         TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -264,7 +264,7 @@ public class TestSplitLogWorker {
     Thread.yield(); // let the worker start
     Thread.sleep(100);
 
-    zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"),
+    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"),
         TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -277,7 +277,7 @@ public class TestSplitLogWorker {
     waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
 
     // create a RESCAN node
-    zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"),
+    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"),
         TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT_SEQUENTIAL);
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java?rev=1151751&r1=1151750&r2=1151751&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java Thu Jul 28 06:44:27 2011
@@ -139,7 +139,7 @@ public class TestSplitTransactionOnClust
       String path = ZKAssign.getNodeName(t.getConnection().getZooKeeperWatcher(),
         hri.getEncodedName());
       Stat stats =
-        t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false);
+        t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
       LOG.info("EPHEMERAL NODE BEFORE SERVER ABORT, path=" + path + ", stats=" + stats);
       RegionTransitionData rtd =
         ZKAssign.getData(t.getConnection().getZooKeeperWatcher(),
@@ -162,7 +162,7 @@ public class TestSplitTransactionOnClust
         assertTrue(daughters.contains(r));
       }
       // Finally assert that the ephemeral SPLIT znode was cleaned up.
-      stats = t.getConnection().getZooKeeperWatcher().getZooKeeper().exists(path, false);
+      stats = t.getConnection().getZooKeeperWatcher().getRecoverableZooKeeper().exists(path, false);
       LOG.info("EPHEMERAL NODE AFTER SERVER ABORT, path=" + path + ", stats=" + stats);
       assertTrue(stats == null);
     } finally {