You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/11/17 21:20:36 UTC

[04/13] hbase git commit: HBASE-19114 Split out o.a.h.h.zookeeper from hbase-server and hbase-client

http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
new file mode 100644
index 0000000..f6c7a2d
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.ipc.FailedServerException;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaRegionServer;
+
+/**
+ * Utility class to perform operation (get/wait for/verify/set/delete) on znode in ZooKeeper
+ * which keeps hbase:meta region server location.
+ *
+ * Stateless class with a bunch of static methods. Doesn't manage resources passed in
+ * (e.g. Connection, ZKWatcher etc).
+ *
+ * Meta region location is set by <code>RegionServerServices</code>.
+ * This class doesn't use ZK watchers, rather accesses ZK directly.
+ *
+ * This class it stateless. The only reason it's not made a non-instantiable util class
+ * with a collection of static methods is that it'd be rather hard to mock properly in tests.
+ *
+ * TODO: rewrite using RPC calls to master to find out about hbase:meta.
+ */
+@InterfaceAudience.Private
+public class MetaTableLocator {
+  private static final Log LOG = LogFactory.getLog(MetaTableLocator.class);
+
+  // only needed to allow non-timeout infinite waits to stop when cluster shuts down
+  private volatile boolean stopped = false;
+
+  /**
+   * Checks if the meta region location is available.
+   * @return true if meta region location is available, false if not
+   */
+  public boolean isLocationAvailable(ZKWatcher zkw) {
+    return getMetaRegionLocation(zkw) != null;
+  }
+
+  /**
+   * @param zkw ZooKeeper watcher to be used
+   * @return meta table regions and their locations.
+   */
+  public List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw) {
+    return getMetaRegionsAndLocations(zkw, RegionInfo.DEFAULT_REPLICA_ID);
+  }
+
+  /**
+   *
+   * @param zkw
+   * @param replicaId
+   * @return meta table regions and their locations.
+   */
+  public List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw,
+      int replicaId) {
+    ServerName serverName = getMetaRegionLocation(zkw, replicaId);
+    List<Pair<RegionInfo, ServerName>> list = new ArrayList<>(1);
+    list.add(new Pair<>(RegionReplicaUtil.getRegionInfoForReplica(
+        RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId), serverName));
+    return list;
+  }
+
+  /**
+   * @param zkw ZooKeeper watcher to be used
+   * @return List of meta regions
+   */
+  public List<RegionInfo> getMetaRegions(ZKWatcher zkw) {
+    return getMetaRegions(zkw, RegionInfo.DEFAULT_REPLICA_ID);
+  }
+
+  /**
+   *
+   * @param zkw
+   * @param replicaId
+   * @return List of meta regions
+   */
+  public List<RegionInfo> getMetaRegions(ZKWatcher zkw, int replicaId) {
+    List<Pair<RegionInfo, ServerName>> result;
+    result = getMetaRegionsAndLocations(zkw, replicaId);
+    return getListOfRegionInfos(result);
+  }
+
+  private List<RegionInfo> getListOfRegionInfos(
+      final List<Pair<RegionInfo, ServerName>> pairs) {
+    if (pairs == null || pairs.isEmpty()) return Collections.EMPTY_LIST;
+    List<RegionInfo> result = new ArrayList<>(pairs.size());
+    for (Pair<RegionInfo, ServerName> pair: pairs) {
+      result.add(pair.getFirst());
+    }
+    return result;
+  }
+
+  /**
+   * Gets the meta region location, if available.  Does not block.
+   * @param zkw zookeeper connection to use
+   * @return server name or null if we failed to get the data.
+   */
+  public ServerName getMetaRegionLocation(final ZKWatcher zkw) {
+    try {
+      RegionState state = getMetaRegionState(zkw);
+      return state.isOpened() ? state.getServerName() : null;
+    } catch (KeeperException ke) {
+      return null;
+    }
+  }
+
+  /**
+   * Gets the meta region location, if available.  Does not block.
+   * @param zkw
+   * @param replicaId
+   * @return server name
+   */
+  public ServerName getMetaRegionLocation(final ZKWatcher zkw, int replicaId) {
+    try {
+      RegionState state = getMetaRegionState(zkw, replicaId);
+      return state.isOpened() ? state.getServerName() : null;
+    } catch (KeeperException ke) {
+      return null;
+    }
+  }
+
+  /**
+   * Gets the meta region location, if available, and waits for up to the
+   * specified timeout if not immediately available.
+   * Given the zookeeper notification could be delayed, we will try to
+   * get the latest data.
+   * @param zkw
+   * @param timeout maximum time to wait, in millis
+   * @return server name for server hosting meta region formatted as per
+   * {@link ServerName}, or null if none available
+   * @throws InterruptedException if interrupted while waiting
+   * @throws NotAllMetaRegionsOnlineException
+   */
+  public ServerName waitMetaRegionLocation(ZKWatcher zkw, long timeout)
+  throws InterruptedException, NotAllMetaRegionsOnlineException {
+    return waitMetaRegionLocation(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout);
+  }
+
+  /**
+   * Gets the meta region location, if available, and waits for up to the
+   * specified timeout if not immediately available.
+   * Given the zookeeper notification could be delayed, we will try to
+   * get the latest data.
+   * @param zkw
+   * @param replicaId
+   * @param timeout maximum time to wait, in millis
+   * @return server name for server hosting meta region formatted as per
+   * {@link ServerName}, or null if none available
+   * @throws InterruptedException
+   * @throws NotAllMetaRegionsOnlineException
+   */
+  public ServerName waitMetaRegionLocation(ZKWatcher zkw, int replicaId, long timeout)
+  throws InterruptedException, NotAllMetaRegionsOnlineException {
+    try {
+      if (ZKUtil.checkExists(zkw, zkw.znodePaths.baseZNode) == -1) {
+        String errorMsg = "Check the value configured in 'zookeeper.znode.parent'. "
+            + "There could be a mismatch with the one configured in the master.";
+        LOG.error(errorMsg);
+        throw new IllegalArgumentException(errorMsg);
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException("KeeperException while trying to check baseZNode:", e);
+    }
+    ServerName sn = blockUntilAvailable(zkw, replicaId, timeout);
+
+    if (sn == null) {
+      throw new NotAllMetaRegionsOnlineException("Timed out; " + timeout + "ms");
+    }
+
+    return sn;
+  }
+
+  /**
+   * Waits indefinitely for availability of <code>hbase:meta</code>.  Used during
+   * cluster startup.  Does not verify meta, just that something has been
+   * set up in zk.
+   * @see #waitMetaRegionLocation(ZKWatcher, long)
+   * @throws InterruptedException if interrupted while waiting
+   */
+  public void waitMetaRegionLocation(ZKWatcher zkw) throws InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (!stopped) {
+      try {
+        if (waitMetaRegionLocation(zkw, 100) != null) break;
+        long sleepTime = System.currentTimeMillis() - startTime;
+        // +1 in case sleepTime=0
+        if ((sleepTime + 1) % 10000 == 0) {
+          LOG.warn("Have been waiting for meta to be assigned for " + sleepTime + "ms");
+        }
+      } catch (NotAllMetaRegionsOnlineException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("hbase:meta still not available, sleeping and retrying." +
+            " Reason: " + e.getMessage());
+        }
+      }
+    }
+  }
+
+  /**
+   * Verify <code>hbase:meta</code> is deployed and accessible.
+   * @param hConnection
+   * @param zkw
+   * @param timeout How long to wait on zk for meta address (passed through to
+   * the internal call to {@link #getMetaServerConnection}.
+   * @return True if the <code>hbase:meta</code> location is healthy.
+   * @throws java.io.IOException
+   * @throws InterruptedException
+   */
+  public boolean verifyMetaRegionLocation(ClusterConnection hConnection,
+                                          ZKWatcher zkw, final long timeout)
+  throws InterruptedException, IOException {
+    return verifyMetaRegionLocation(hConnection, zkw, timeout, RegionInfo.DEFAULT_REPLICA_ID);
+  }
+
+  /**
+   * Verify <code>hbase:meta</code> is deployed and accessible.
+   * @param connection
+   * @param zkw
+   * @param timeout How long to wait on zk for meta address (passed through to
+   * @param replicaId
+   * @return True if the <code>hbase:meta</code> location is healthy.
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public boolean verifyMetaRegionLocation(ClusterConnection connection,
+                                          ZKWatcher zkw, final long timeout, int replicaId)
+  throws InterruptedException, IOException {
+    AdminProtos.AdminService.BlockingInterface service = null;
+    try {
+      service = getMetaServerConnection(connection, zkw, timeout, replicaId);
+    } catch (NotAllMetaRegionsOnlineException e) {
+      // Pass
+    } catch (ServerNotRunningYetException e) {
+      // Pass -- remote server is not up so can't be carrying root
+    } catch (UnknownHostException e) {
+      // Pass -- server name doesn't resolve so it can't be assigned anything.
+    } catch (RegionServerStoppedException e) {
+      // Pass -- server name sends us to a server that is dying or already dead.
+    }
+    return (service != null) && verifyRegionLocation(connection, service,
+            getMetaRegionLocation(zkw, replicaId), RegionReplicaUtil.getRegionInfoForReplica(
+                RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId).getRegionName());
+  }
+
+  /**
+   * Verify we can connect to <code>hostingServer</code> and that its carrying
+   * <code>regionName</code>.
+   * @param hostingServer Interface to the server hosting <code>regionName</code>
+   * @param address The servername that goes with the <code>metaServer</code>
+   * Interface.  Used logging.
+   * @param regionName The regionname we are interested in.
+   * @return True if we were able to verify the region located at other side of
+   * the Interface.
+   * @throws IOException
+   */
+  // TODO: We should be able to get the ServerName from the AdminProtocol
+  // rather than have to pass it in.  Its made awkward by the fact that the
+  // HRI is likely a proxy against remote server so the getServerName needs
+  // to be fixed to go to a local method or to a cache before we can do this.
+  private boolean verifyRegionLocation(final ClusterConnection connection,
+      AdminService.BlockingInterface hostingServer, final ServerName address,
+      final byte [] regionName)
+  throws IOException {
+    if (hostingServer == null) {
+      LOG.info("Passed hostingServer is null");
+      return false;
+    }
+    Throwable t;
+    HBaseRpcController controller = connection.getRpcControllerFactory().newController();
+    try {
+      // Try and get regioninfo from the hosting server.
+      return ProtobufUtil.getRegionInfo(controller, hostingServer, regionName) != null;
+    } catch (ConnectException e) {
+      t = e;
+    } catch (RetriesExhaustedException e) {
+      t = e;
+    } catch (RemoteException e) {
+      IOException ioe = e.unwrapRemoteException();
+      t = ioe;
+    } catch (IOException e) {
+      Throwable cause = e.getCause();
+      if (cause != null && cause instanceof EOFException) {
+        t = cause;
+      } else if (cause != null && cause.getMessage() != null
+          && cause.getMessage().contains("Connection reset")) {
+        t = cause;
+      } else {
+        t = e;
+      }
+    }
+    LOG.info("Failed verification of " + Bytes.toStringBinary(regionName) +
+      " at address=" + address + ", exception=" + t.getMessage());
+    return false;
+  }
+
+  /**
+   * Gets a connection to the server hosting meta, as reported by ZooKeeper,
+   * waiting up to the specified timeout for availability.
+   * <p>WARNING: Does not retry.  Use an {@link org.apache.hadoop.hbase.client.HTable} instead.
+   * @param connection
+   * @param zkw
+   * @param timeout How long to wait on meta location
+   * @param replicaId
+   * @return connection to server hosting meta
+   * @throws InterruptedException
+   * @throws NotAllMetaRegionsOnlineException if timed out waiting
+   * @throws IOException
+   */
+  private AdminService.BlockingInterface getMetaServerConnection(ClusterConnection connection,
+                                                                 ZKWatcher zkw, long timeout, int replicaId)
+  throws InterruptedException, NotAllMetaRegionsOnlineException, IOException {
+    return getCachedConnection(connection, waitMetaRegionLocation(zkw, replicaId, timeout));
+  }
+
+  /**
+   * @param sn ServerName to get a connection against.
+   * @return The AdminProtocol we got when we connected to <code>sn</code>
+   * May have come from cache, may not be good, may have been setup by this
+   * invocation, or may be null.
+   * @throws IOException
+   */
+  private static AdminService.BlockingInterface getCachedConnection(ClusterConnection connection,
+    ServerName sn)
+  throws IOException {
+    if (sn == null) {
+      return null;
+    }
+    AdminService.BlockingInterface service = null;
+    try {
+      service = connection.getAdmin(sn);
+    } catch (RetriesExhaustedException e) {
+      if (e.getCause() != null && e.getCause() instanceof ConnectException) {
+        // Catch this; presume it means the cached connection has gone bad.
+      } else {
+        throw e;
+      }
+    } catch (SocketTimeoutException e) {
+      LOG.debug("Timed out connecting to " + sn);
+    } catch (NoRouteToHostException e) {
+      LOG.debug("Connecting to " + sn, e);
+    } catch (SocketException e) {
+      LOG.debug("Exception connecting to " + sn);
+    } catch (UnknownHostException e) {
+      LOG.debug("Unknown host exception connecting to  " + sn);
+    } catch (FailedServerException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Server " + sn + " is in failed server list.");
+      }
+    } catch (IOException ioe) {
+      Throwable cause = ioe.getCause();
+      if (ioe instanceof ConnectException) {
+        // Catch. Connect refused.
+      } else if (cause != null && cause instanceof EOFException) {
+        // Catch. Other end disconnected us.
+      } else if (cause != null && cause.getMessage() != null &&
+        cause.getMessage().toLowerCase(Locale.ROOT).contains("connection reset")) {
+        // Catch. Connection reset.
+      } else {
+        throw ioe;
+      }
+
+    }
+    return service;
+  }
+
+  /**
+   * Sets the location of <code>hbase:meta</code> in ZooKeeper to the
+   * specified server address.
+   * @param zookeeper zookeeper reference
+   * @param serverName The server hosting <code>hbase:meta</code>
+   * @param state The region transition state
+   * @throws KeeperException unexpected zookeeper exception
+   */
+  public static void setMetaLocation(ZKWatcher zookeeper,
+      ServerName serverName, RegionState.State state) throws KeeperException {
+    setMetaLocation(zookeeper, serverName, RegionInfo.DEFAULT_REPLICA_ID, state);
+  }
+
+  /**
+   * Sets the location of <code>hbase:meta</code> in ZooKeeper to the
+   * specified server address.
+   * @param zookeeper
+   * @param serverName
+   * @param replicaId
+   * @param state
+   * @throws KeeperException
+   */
+  public static void setMetaLocation(ZKWatcher zookeeper,
+      ServerName serverName, int replicaId, RegionState.State state) throws KeeperException {
+    if (serverName == null) {
+      LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
+      return;
+    }
+    LOG.info("Setting hbase:meta (replicaId=" + replicaId + ") location in ZooKeeper as " +
+      serverName);
+    // Make the MetaRegionServer pb and then get its bytes and save this as
+    // the znode content.
+    MetaRegionServer pbrsr = MetaRegionServer.newBuilder()
+      .setServer(ProtobufUtil.toServerName(serverName))
+      .setRpcVersion(HConstants.RPC_CURRENT_VERSION)
+      .setState(state.convert()).build();
+    byte[] data = ProtobufUtil.prependPBMagic(pbrsr.toByteArray());
+    try {
+      ZKUtil.setData(zookeeper,
+          zookeeper.znodePaths.getZNodeForReplica(replicaId), data);
+    } catch(KeeperException.NoNodeException nne) {
+      if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
+        LOG.debug("META region location doesn't exist, create it");
+      } else {
+        LOG.debug("META region location doesn't exist for replicaId=" + replicaId +
+            ", create it");
+      }
+      ZKUtil.createAndWatch(zookeeper, zookeeper.znodePaths.getZNodeForReplica(replicaId), data);
+    }
+  }
+
+  /**
+   * Load the meta region state from the meta server ZNode.
+   */
+  public static RegionState getMetaRegionState(ZKWatcher zkw) throws KeeperException {
+    return getMetaRegionState(zkw, RegionInfo.DEFAULT_REPLICA_ID);
+  }
+
+  /**
+   * Load the meta region state from the meta server ZNode.
+   * @param zkw
+   * @param replicaId
+   * @return regionstate
+   * @throws KeeperException
+   */
+  public static RegionState getMetaRegionState(ZKWatcher zkw, int replicaId)
+      throws KeeperException {
+    RegionState.State state = RegionState.State.OPEN;
+    ServerName serverName = null;
+    try {
+      byte[] data = ZKUtil.getData(zkw, zkw.znodePaths.getZNodeForReplica(replicaId));
+      if (data != null && data.length > 0 && ProtobufUtil.isPBMagicPrefix(data)) {
+        try {
+          int prefixLen = ProtobufUtil.lengthOfPBMagic();
+          ZooKeeperProtos.MetaRegionServer rl =
+            ZooKeeperProtos.MetaRegionServer.PARSER.parseFrom
+              (data, prefixLen, data.length - prefixLen);
+          if (rl.hasState()) {
+            state = RegionState.State.convert(rl.getState());
+          }
+          HBaseProtos.ServerName sn = rl.getServer();
+          serverName = ServerName.valueOf(
+            sn.getHostName(), sn.getPort(), sn.getStartCode());
+        } catch (InvalidProtocolBufferException e) {
+          throw new DeserializationException("Unable to parse meta region location");
+        }
+      } else {
+        // old style of meta region location?
+        serverName = ProtobufUtil.parseServerNameFrom(data);
+      }
+    } catch (DeserializationException e) {
+      throw ZKUtil.convert(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    if (serverName == null) {
+      state = RegionState.State.OFFLINE;
+    }
+    return new RegionState(
+        RegionReplicaUtil.getRegionInfoForReplica(RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId),
+      state, serverName);
+  }
+
+  /**
+   * Deletes the location of <code>hbase:meta</code> in ZooKeeper.
+   * @param zookeeper zookeeper reference
+   * @throws KeeperException unexpected zookeeper exception
+   */
+  public void deleteMetaLocation(ZKWatcher zookeeper)
+  throws KeeperException {
+    deleteMetaLocation(zookeeper, RegionInfo.DEFAULT_REPLICA_ID);
+  }
+
+  public void deleteMetaLocation(ZKWatcher zookeeper, int replicaId)
+  throws KeeperException {
+    if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
+      LOG.info("Deleting hbase:meta region location in ZooKeeper");
+    } else {
+      LOG.info("Deleting hbase:meta for " + replicaId + " region location in ZooKeeper");
+    }
+    try {
+      // Just delete the node.  Don't need any watches.
+      ZKUtil.deleteNode(zookeeper, zookeeper.znodePaths.getZNodeForReplica(replicaId));
+    } catch(KeeperException.NoNodeException nne) {
+      // Has already been deleted
+    }
+  }
+  /**
+   * Wait until the primary meta region is available. Get the secondary
+   * locations as well but don't block for those.
+   * @param zkw
+   * @param timeout
+   * @param conf
+   * @return ServerName or null if we timed out.
+   * @throws InterruptedException
+   */
+  public List<ServerName> blockUntilAvailable(final ZKWatcher zkw,
+      final long timeout, Configuration conf)
+          throws InterruptedException {
+    int numReplicasConfigured = 1;
+
+    List<ServerName> servers = new ArrayList<>();
+    // Make the blocking call first so that we do the wait to know
+    // the znodes are all in place or timeout.
+    ServerName server = blockUntilAvailable(zkw, timeout);
+    if (server == null) return null;
+    servers.add(server);
+
+    try {
+      List<String> metaReplicaNodes = zkw.getMetaReplicaNodes();
+      numReplicasConfigured = metaReplicaNodes.size();
+    } catch (KeeperException e) {
+      LOG.warn("Got ZK exception " + e);
+    }
+    for (int replicaId = 1; replicaId < numReplicasConfigured; replicaId++) {
+      // return all replica locations for the meta
+      servers.add(getMetaRegionLocation(zkw, replicaId));
+    }
+    return servers;
+  }
+
+  /**
+   * Wait until the meta region is available and is not in transition.
+   * @param zkw zookeeper connection to use
+   * @param timeout maximum time to wait, in millis
+   * @return ServerName or null if we timed out.
+   * @throws InterruptedException
+   */
+  public ServerName blockUntilAvailable(final ZKWatcher zkw,
+      final long timeout)
+  throws InterruptedException {
+    return blockUntilAvailable(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout);
+  }
+
+  /**
+   * Wait until the meta region is available and is not in transition.
+   * @param zkw
+   * @param replicaId
+   * @param timeout
+   * @return ServerName or null if we timed out.
+   * @throws InterruptedException
+   */
+  public ServerName blockUntilAvailable(final ZKWatcher zkw, int replicaId,
+                                        final long timeout)
+  throws InterruptedException {
+    if (timeout < 0) throw new IllegalArgumentException();
+    if (zkw == null) throw new IllegalArgumentException();
+    long startTime = System.currentTimeMillis();
+    ServerName sn = null;
+    while (true) {
+      sn = getMetaRegionLocation(zkw, replicaId);
+      if (sn != null || (System.currentTimeMillis() - startTime)
+          > timeout - HConstants.SOCKET_RETRY_WAIT_MS) {
+        break;
+      }
+      Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
+    }
+    return sn;
+  }
+
+  /**
+   * Stop working.
+   * Interrupts any ongoing waits.
+   */
+  public void stop() {
+    if (!stopped) {
+      LOG.debug("Stopping MetaTableLocator");
+      stopped = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
new file mode 100644
index 0000000..ef643bf
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
@@ -0,0 +1,472 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InterruptedIOException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead
+ * of redoing it, we should contribute updates to their code which let us more
+ * easily access testing helper objects.
+ */
+@InterfaceAudience.Public
+public class MiniZooKeeperCluster {
+  private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
+
+  private static final int TICK_TIME = 2000;
+  private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
+  private int connectionTimeout;
+
+  private boolean started;
+
+  /** The default port. If zero, we use a random port. */
+  private int defaultClientPort = 0;
+
+  private List<NIOServerCnxnFactory> standaloneServerFactoryList;
+  private List<ZooKeeperServer> zooKeeperServers;
+  private List<Integer> clientPortList;
+
+  private int activeZKServerIndex;
+  private int tickTime = 0;
+
+  private Configuration configuration;
+
+  public MiniZooKeeperCluster() {
+    this(new Configuration());
+  }
+
+  public MiniZooKeeperCluster(Configuration configuration) {
+    this.started = false;
+    this.configuration = configuration;
+    activeZKServerIndex = -1;
+    zooKeeperServers = new ArrayList<>();
+    clientPortList = new ArrayList<>();
+    standaloneServerFactoryList = new ArrayList<>();
+    connectionTimeout = configuration.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster",
+      DEFAULT_CONNECTION_TIMEOUT);
+  }
+
+  /**
+   * Add a client port to the list.
+   *
+   * @param clientPort the specified port
+   */
+  public void addClientPort(int clientPort) {
+    clientPortList.add(clientPort);
+  }
+
+  /**
+   * Get the list of client ports.
+   * @return clientPortList the client port list
+   */
+  @VisibleForTesting
+  public List<Integer> getClientPortList() {
+    return clientPortList;
+  }
+
+  /**
+   * Check whether the client port in a specific position of the client port list is valid.
+   *
+   * @param index the specified position
+   */
+  private boolean hasValidClientPortInList(int index) {
+    return (clientPortList.size() > index && clientPortList.get(index) > 0);
+  }
+
+  public void setDefaultClientPort(int clientPort) {
+    if (clientPort <= 0) {
+      throw new IllegalArgumentException("Invalid default ZK client port: "
+          + clientPort);
+    }
+    this.defaultClientPort = clientPort;
+  }
+
+  /**
+   * Selects a ZK client port.
+   *
+   * @param seedPort the seed port to start with; -1 means first time.
+   * @Returns a valid and unused client port
+   */
+  private int selectClientPort(int seedPort) {
+    int i;
+    int returnClientPort = seedPort + 1;
+    if (returnClientPort == 0) {
+      // If the new port is invalid, find one - starting with the default client port.
+      // If the default client port is not specified, starting with a random port.
+      // The random port is selected from the range between 49152 to 65535. These ports cannot be
+      // registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports).
+      if (defaultClientPort > 0) {
+        returnClientPort = defaultClientPort;
+      } else {
+        returnClientPort = 0xc000 + new Random().nextInt(0x3f00);
+      }
+    }
+    // Make sure that the port is unused.
+    while (true) {
+      for (i = 0; i < clientPortList.size(); i++) {
+        if (returnClientPort == clientPortList.get(i)) {
+          // Already used. Update the port and retry.
+          returnClientPort++;
+          break;
+        }
+      }
+      if (i == clientPortList.size()) {
+        break; // found a unused port, exit
+      }
+    }
+    return returnClientPort;
+  }
+
+  public void setTickTime(int tickTime) {
+    this.tickTime = tickTime;
+  }
+
+  public int getBackupZooKeeperServerNum() {
+    return zooKeeperServers.size()-1;
+  }
+
+  public int getZooKeeperServerNum() {
+    return zooKeeperServers.size();
+  }
+
+  // / XXX: From o.a.zk.t.ClientBase
+  private static void setupTestEnv() {
+    // during the tests we run with 100K prealloc in the logs.
+    // on windows systems prealloc of 64M was seen to take ~15seconds
+    // resulting in test failure (client timeout on first session).
+    // set env and directly in order to handle static init/gc issues
+    System.setProperty("zookeeper.preAllocSize", "100");
+    FileTxnLog.setPreallocSize(100 * 1024);
+  }
+
+  public int startup(File baseDir) throws IOException, InterruptedException {
+    int numZooKeeperServers = clientPortList.size();
+    if (numZooKeeperServers == 0) {
+      numZooKeeperServers = 1; // need at least 1 ZK server for testing
+    }
+    return startup(baseDir, numZooKeeperServers);
+  }
+
+  /**
+   * @param baseDir
+   * @param numZooKeeperServers
+   * @return ClientPort server bound to, -1 if there was a
+   *         binding problem and we couldn't pick another port.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public int startup(File baseDir, int numZooKeeperServers) throws IOException,
+      InterruptedException {
+    if (numZooKeeperServers <= 0)
+      return -1;
+
+    setupTestEnv();
+    shutdown();
+
+    int tentativePort = -1; // the seed port
+    int currentClientPort;
+
+    // running all the ZK servers
+    for (int i = 0; i < numZooKeeperServers; i++) {
+      File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
+      createDir(dir);
+      int tickTimeToUse;
+      if (this.tickTime > 0) {
+        tickTimeToUse = this.tickTime;
+      } else {
+        tickTimeToUse = TICK_TIME;
+      }
+
+      // Set up client port - if we have already had a list of valid ports, use it.
+      if (hasValidClientPortInList(i)) {
+        currentClientPort = clientPortList.get(i);
+      } else {
+        tentativePort = selectClientPort(tentativePort); // update the seed
+        currentClientPort = tentativePort;
+      }
+
+      ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
+      // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
+      server.setMinSessionTimeout(configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1));
+      server.setMaxSessionTimeout(configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1));
+      NIOServerCnxnFactory standaloneServerFactory;
+      while (true) {
+        try {
+          standaloneServerFactory = new NIOServerCnxnFactory();
+          standaloneServerFactory.configure(
+            new InetSocketAddress(currentClientPort),
+            configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
+        } catch (BindException e) {
+          LOG.debug("Failed binding ZK Server to client port: " +
+              currentClientPort, e);
+          // We're told to use some port but it's occupied, fail
+          if (hasValidClientPortInList(i)) {
+            return -1;
+          }
+          // This port is already in use, try to use another.
+          tentativePort = selectClientPort(tentativePort);
+          currentClientPort = tentativePort;
+          continue;
+        }
+        break;
+      }
+
+      // Start up this ZK server
+      standaloneServerFactory.startup(server);
+      // Runs a 'stat' against the servers.
+      if (!waitForServerUp(currentClientPort, connectionTimeout)) {
+        throw new IOException("Waiting for startup of standalone server");
+      }
+
+      // We have selected a port as a client port.  Update clientPortList if necessary.
+      if (clientPortList.size() <= i) { // it is not in the list, add the port
+        clientPortList.add(currentClientPort);
+      }
+      else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
+        clientPortList.remove(i);
+        clientPortList.add(i, currentClientPort);
+      }
+
+      standaloneServerFactoryList.add(standaloneServerFactory);
+      zooKeeperServers.add(server);
+    }
+
+    // set the first one to be active ZK; Others are backups
+    activeZKServerIndex = 0;
+    started = true;
+    int clientPort = clientPortList.get(activeZKServerIndex);
+    LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' " +
+        "on client port=" + clientPort);
+    return clientPort;
+  }
+
+  private void createDir(File dir) throws IOException {
+    try {
+      if (!dir.exists()) {
+        dir.mkdirs();
+      }
+    } catch (SecurityException e) {
+      throw new IOException("creating dir: " + dir, e);
+    }
+  }
+
+  /**
+   * @throws IOException
+   */
+  public void shutdown() throws IOException {
+    // shut down all the zk servers
+    for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
+      NIOServerCnxnFactory standaloneServerFactory =
+        standaloneServerFactoryList.get(i);
+      int clientPort = clientPortList.get(i);
+
+      standaloneServerFactory.shutdown();
+      if (!waitForServerDown(clientPort, connectionTimeout)) {
+        throw new IOException("Waiting for shutdown of standalone server");
+      }
+    }
+    standaloneServerFactoryList.clear();
+
+    for (ZooKeeperServer zkServer: zooKeeperServers) {
+      //explicitly close ZKDatabase since ZookeeperServer does not close them
+      zkServer.getZKDatabase().close();
+    }
+    zooKeeperServers.clear();
+
+    // clear everything
+    if (started) {
+      started = false;
+      activeZKServerIndex = 0;
+      clientPortList.clear();
+      LOG.info("Shutdown MiniZK cluster with all ZK servers");
+    }
+  }
+
+  /**@return clientPort return clientPort if there is another ZK backup can run
+   *         when killing the current active; return -1, if there is no backups.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public int killCurrentActiveZooKeeperServer() throws IOException,
+                                        InterruptedException {
+    if (!started || activeZKServerIndex < 0) {
+      return -1;
+    }
+
+    // Shutdown the current active one
+    NIOServerCnxnFactory standaloneServerFactory =
+      standaloneServerFactoryList.get(activeZKServerIndex);
+    int clientPort = clientPortList.get(activeZKServerIndex);
+
+    standaloneServerFactory.shutdown();
+    if (!waitForServerDown(clientPort, connectionTimeout)) {
+      throw new IOException("Waiting for shutdown of standalone server");
+    }
+
+    zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
+
+    // remove the current active zk server
+    standaloneServerFactoryList.remove(activeZKServerIndex);
+    clientPortList.remove(activeZKServerIndex);
+    zooKeeperServers.remove(activeZKServerIndex);
+    LOG.info("Kill the current active ZK servers in the cluster " +
+        "on client port: " + clientPort);
+
+    if (standaloneServerFactoryList.isEmpty()) {
+      // there is no backup servers;
+      return -1;
+    }
+    clientPort = clientPortList.get(activeZKServerIndex);
+    LOG.info("Activate a backup zk server in the cluster " +
+        "on client port: " + clientPort);
+    // return the next back zk server's port
+    return clientPort;
+  }
+
+  /**
+   * Kill one back up ZK servers
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void killOneBackupZooKeeperServer() throws IOException,
+                                        InterruptedException {
+    if (!started || activeZKServerIndex < 0 ||
+        standaloneServerFactoryList.size() <= 1) {
+      return ;
+    }
+
+    int backupZKServerIndex = activeZKServerIndex+1;
+    // Shutdown the current active one
+    NIOServerCnxnFactory standaloneServerFactory =
+      standaloneServerFactoryList.get(backupZKServerIndex);
+    int clientPort = clientPortList.get(backupZKServerIndex);
+
+    standaloneServerFactory.shutdown();
+    if (!waitForServerDown(clientPort, connectionTimeout)) {
+      throw new IOException("Waiting for shutdown of standalone server");
+    }
+
+    zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
+
+    // remove this backup zk server
+    standaloneServerFactoryList.remove(backupZKServerIndex);
+    clientPortList.remove(backupZKServerIndex);
+    zooKeeperServers.remove(backupZKServerIndex);
+    LOG.info("Kill one backup ZK servers in the cluster " +
+        "on client port: " + clientPort);
+  }
+
+  // XXX: From o.a.zk.t.ClientBase
+  private static boolean waitForServerDown(int port, long timeout) throws IOException {
+    long start = System.currentTimeMillis();
+    while (true) {
+      try {
+        Socket sock = new Socket("localhost", port);
+        try {
+          OutputStream outstream = sock.getOutputStream();
+          outstream.write("stat".getBytes());
+          outstream.flush();
+        } finally {
+          sock.close();
+        }
+      } catch (IOException e) {
+        return true;
+      }
+
+      if (System.currentTimeMillis() > start + timeout) {
+        break;
+      }
+      try {
+        Thread.sleep(250);
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+      }
+    }
+    return false;
+  }
+
+  // XXX: From o.a.zk.t.ClientBase
+  private static boolean waitForServerUp(int port, long timeout) throws IOException {
+    long start = System.currentTimeMillis();
+    while (true) {
+      try {
+        Socket sock = new Socket("localhost", port);
+        BufferedReader reader = null;
+        try {
+          OutputStream outstream = sock.getOutputStream();
+          outstream.write("stat".getBytes());
+          outstream.flush();
+
+          Reader isr = new InputStreamReader(sock.getInputStream());
+          reader = new BufferedReader(isr);
+          String line = reader.readLine();
+          if (line != null && line.startsWith("Zookeeper version:")) {
+            return true;
+          }
+        } finally {
+          sock.close();
+          if (reader != null) {
+            reader.close();
+          }
+        }
+      } catch (IOException e) {
+        // ignore as this is expected
+        LOG.info("server localhost:" + port + " not up " + e);
+      }
+
+      if (System.currentTimeMillis() > start + timeout) {
+        break;
+      }
+      try {
+        Thread.sleep(250);
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+      }
+    }
+    return false;
+  }
+
+  public int getClientPort() {
+    return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
+        : clientPortList.get(activeZKServerIndex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java
new file mode 100644
index 0000000..da7d176
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+/**
+ * Placeholder of a watcher which might be triggered before the instance is not yet created.
+ * <p>
+ * {@code ZooKeeper} starts its event thread within its constructor (and that is an anti-pattern),
+ * and the watcher passed to the constructor might be called back by the event thread
+ * before you get the instance of {@code ZooKeeper} from the constructor.
+ * If your watcher calls methods of {@code ZooKeeper},
+ * pass this placeholder to the constructor of the {@code ZooKeeper},
+ * create your watcher using the instance of {@code ZooKeeper},
+ * and then call the method {@code PendingWatcher.prepare}.
+ */
+class PendingWatcher implements Watcher {
+  private final InstancePending<Watcher> pending = new InstancePending<>();
+
+  @Override
+  public void process(WatchedEvent event) {
+    pending.get().process(event);
+  }
+
+  /**
+   * Associates the substantial watcher of processing events.
+   * This method should be called once, and {@code watcher} should be non-null.
+   * This method is expected to call as soon as possible
+   * because the event processing, being invoked by the ZooKeeper event thread,
+   * is uninterruptibly blocked until this method is called.
+   */
+  void prepare(Watcher watcher) {
+    pending.prepare(watcher);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
new file mode 100644
index 0000000..d6c11af
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
@@ -0,0 +1,810 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.htrace.core.TraceScope;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+
+/**
+ * A zookeeper that can handle 'recoverable' errors.
+ * To handle recoverable errors, developers need to realize that there are two
+ * classes of requests: idempotent and non-idempotent requests. Read requests
+ * and unconditional sets and deletes are examples of idempotent requests, they
+ * can be reissued with the same results.
+ * (Although, the delete may throw a NoNodeException on reissue its effect on
+ * the ZooKeeper state is the same.) Non-idempotent requests need special
+ * handling, application and library writers need to keep in mind that they may
+ * need to encode information in the data or name of znodes to detect
+ * retries. A simple example is a create that uses a sequence flag.
+ * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
+ * loss exception, that process will reissue another
+ * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
+ * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
+ * that x-109 was the result of the previous create, so the process actually
+ * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
+ * when doing the create. If the process is using an id of 352, before reissuing
+ * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
+ * "x-352-109", x-333-110". The process will know that the original create
+ * succeeded an the znode it created is "x-352-109".
+ * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
+ */
+@InterfaceAudience.Private
+public class RecoverableZooKeeper {
+  private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
+  // the actual ZooKeeper client instance
+  private ZooKeeper zk;
+  private final RetryCounterFactory retryCounterFactory;
+  // An identifier of this process in the cluster
+  private final String identifier;
+  private final byte[] id;
+  private Watcher watcher;
+  private int sessionTimeout;
+  private String quorumServers;
+  private final ZKMetricsListener metrics;
+
+  public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
+      Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime)
+  throws IOException {
+    this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime,
+        null);
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
+      justification="None. Its always been this way.")
+  public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
+      Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier)
+  throws IOException {
+    // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
+    this.retryCounterFactory =
+      new RetryCounterFactory(maxRetries+1, retryIntervalMillis, maxSleepTime);
+
+    if (identifier == null || identifier.length() == 0) {
+      // the identifier = processID@hostName
+      identifier = ManagementFactory.getRuntimeMXBean().getName();
+    }
+    LOG.info("Process identifier=" + identifier +
+      " connecting to ZooKeeper ensemble=" + quorumServers);
+    this.identifier = identifier;
+    this.id = Bytes.toBytes(identifier);
+
+    this.watcher = watcher;
+    this.sessionTimeout = sessionTimeout;
+    this.quorumServers = quorumServers;
+    this.metrics = new ZKMetrics();
+    try {checkZk();} catch (Exception x) {/* ignore */}
+  }
+
+  /**
+   * Try to create a ZooKeeper connection. Turns any exception encountered into a
+   * KeeperException.OperationTimeoutException so it can retried.
+   * @return The created ZooKeeper connection object
+   * @throws KeeperException
+   */
+  protected synchronized ZooKeeper checkZk() throws KeeperException {
+    if (this.zk == null) {
+      try {
+        this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
+      } catch (IOException ex) {
+        LOG.warn("Unable to create ZooKeeper Connection", ex);
+        throw new KeeperException.OperationTimeoutException();
+      }
+    }
+    return zk;
+  }
+
+  public synchronized void reconnectAfterExpiration()
+        throws IOException, KeeperException, InterruptedException {
+    if (zk != null) {
+      LOG.info("Closing dead ZooKeeper connection, session" +
+        " was: 0x"+Long.toHexString(zk.getSessionId()));
+      zk.close();
+      // reset the ZooKeeper connection
+      zk = null;
+    }
+    checkZk();
+    LOG.info("Recreated a ZooKeeper, session" +
+      " is: 0x"+Long.toHexString(zk.getSessionId()));
+  }
+
+  /**
+   * delete is an idempotent operation. Retry before throwing exception.
+   * This function will not throw NoNodeException if the path does not
+   * exist.
+   */
+  public void delete(String path, int version) throws InterruptedException, KeeperException {
+    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.delete")) {
+      RetryCounter retryCounter = retryCounterFactory.create();
+      boolean isRetry = false; // False for first attempt, true for all retries.
+      while (true) {
+        try {
+          long startTime = EnvironmentEdgeManager.currentTime();
+          checkZk().delete(path, version);
+          this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+          return;
+        } catch (KeeperException e) {
+          this.metrics.registerFailedZKCall();
+          switch (e.code()) {
+            case NONODE:
+              if (isRetry) {
+                LOG.debug("Node " + path + " already deleted. Assuming a " +
+                    "previous attempt succeeded.");
+                return;
+              }
+              LOG.debug("Node " + path + " already deleted, retry=" + isRetry);
+              throw e;
+
+            case CONNECTIONLOSS:
+              this.metrics.registerConnectionLossException();
+              retryOrThrow(retryCounter, e, "delete");
+              break;
+            case OPERATIONTIMEOUT:
+              this.metrics.registerOperationTimeoutException();
+              retryOrThrow(retryCounter, e, "delete");
+              break;
+
+            default:
+              throw e;
+          }
+        }
+        retryCounter.sleepUntilNextRetry();
+        isRetry = true;
+      }
+    }
+  }
+
+  /**
+   * exists is an idempotent operation. Retry before throwing exception
+   * @return A Stat instance
+   */
+  public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
+    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
+      RetryCounter retryCounter = retryCounterFactory.create();
+      while (true) {
+        try {
+          long startTime = EnvironmentEdgeManager.currentTime();
+          Stat nodeStat = checkZk().exists(path, watcher);
+          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+          return nodeStat;
+        } catch (KeeperException e) {
+          this.metrics.registerFailedZKCall();
+          switch (e.code()) {
+            case CONNECTIONLOSS:
+              this.metrics.registerConnectionLossException();
+              retryOrThrow(retryCounter, e, "exists");
+              break;
+            case OPERATIONTIMEOUT:
+              this.metrics.registerOperationTimeoutException();
+              retryOrThrow(retryCounter, e, "exists");
+              break;
+
+            default:
+              throw e;
+          }
+        }
+        retryCounter.sleepUntilNextRetry();
+      }
+    }
+  }
+
+  /**
+   * exists is an idempotent operation. Retry before throwing exception
+   * @return A Stat instance
+   */
+  public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
+    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
+      RetryCounter retryCounter = retryCounterFactory.create();
+      while (true) {
+        try {
+          long startTime = EnvironmentEdgeManager.currentTime();
+          Stat nodeStat = checkZk().exists(path, watch);
+          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+          return nodeStat;
+        } catch (KeeperException e) {
+          this.metrics.registerFailedZKCall();
+          switch (e.code()) {
+            case CONNECTIONLOSS:
+              this.metrics.registerConnectionLossException();
+              retryOrThrow(retryCounter, e, "exists");
+              break;
+            case OPERATIONTIMEOUT:
+              this.metrics.registerOperationTimeoutException();
+              retryOrThrow(retryCounter, e, "exists");
+              break;
+
+            default:
+              throw e;
+          }
+        }
+        retryCounter.sleepUntilNextRetry();
+      }
+    }
+  }
+
+  private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
+      String opName) throws KeeperException {
+    if (!retryCounter.shouldRetry()) {
+      LOG.error("ZooKeeper " + opName + " failed after "
+        + retryCounter.getMaxAttempts() + " attempts");
+      throw e;
+    }
+    LOG.debug("Retry, connectivity issue (JVM Pause?); quorum=" + quorumServers + "," +
+        "exception=" + e);
+  }
+
+  /**
+   * getChildren is an idempotent operation. Retry before throwing exception
+   * @return List of children znodes
+   */
+  public List<String> getChildren(String path, Watcher watcher)
+    throws KeeperException, InterruptedException {
+    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
+      RetryCounter retryCounter = retryCounterFactory.create();
+      while (true) {
+        try {
+          long startTime = EnvironmentEdgeManager.currentTime();
+          List<String> children = checkZk().getChildren(path, watcher);
+          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+          return children;
+        } catch (KeeperException e) {
+          this.metrics.registerFailedZKCall();
+          switch (e.code()) {
+            case CONNECTIONLOSS:
+              this.metrics.registerConnectionLossException();
+              retryOrThrow(retryCounter, e, "getChildren");
+              break;
+            case OPERATIONTIMEOUT:
+              this.metrics.registerOperationTimeoutException();
+              retryOrThrow(retryCounter, e, "getChildren");
+              break;
+
+            default:
+              throw e;
+          }
+        }
+        retryCounter.sleepUntilNextRetry();
+      }
+    }
+  }
+
+  /**
+   * getChildren is an idempotent operation. Retry before throwing exception
+   * @return List of children znodes
+   */
+  public List<String> getChildren(String path, boolean watch)
+  throws KeeperException, InterruptedException {
+    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
+      RetryCounter retryCounter = retryCounterFactory.create();
+      while (true) {
+        try {
+          long startTime = EnvironmentEdgeManager.currentTime();
+          List<String> children = checkZk().getChildren(path, watch);
+          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+          return children;
+        } catch (KeeperException e) {
+          this.metrics.registerFailedZKCall();
+          switch (e.code()) {
+            case CONNECTIONLOSS:
+              this.metrics.registerConnectionLossException();
+              retryOrThrow(retryCounter, e, "getChildren");
+              break;
+            case OPERATIONTIMEOUT:
+              this.metrics.registerOperationTimeoutException();
+              retryOrThrow(retryCounter, e, "getChildren");
+              break;
+
+            default:
+              throw e;
+          }
+        }
+        retryCounter.sleepUntilNextRetry();
+      }
+    }
+  }
+
+  /**
+   * getData is an idempotent operation. Retry before throwing exception
+   * @return Data
+   */
+  public byte[] getData(String path, Watcher watcher, Stat stat)
+  throws KeeperException, InterruptedException {
+    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
+      RetryCounter retryCounter = retryCounterFactory.create();
+      while (true) {
+        try {
+          long startTime = EnvironmentEdgeManager.currentTime();
+          byte[] revData = checkZk().getData(path, watcher, stat);
+          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+          return ZKMetadata.removeMetaData(revData);
+        } catch (KeeperException e) {
+          this.metrics.registerFailedZKCall();
+          switch (e.code()) {
+            case CONNECTIONLOSS:
+              this.metrics.registerConnectionLossException();
+              retryOrThrow(retryCounter, e, "getData");
+              break;
+            case OPERATIONTIMEOUT:
+              this.metrics.registerOperationTimeoutException();
+              retryOrThrow(retryCounter, e, "getData");
+              break;
+
+            default:
+              throw e;
+          }
+        }
+        retryCounter.sleepUntilNextRetry();
+      }
+    }
+  }
+
+  /**
+   * getData is an idempotent operation. Retry before throwing exception
+   * @return Data
+   */
+  public byte[] getData(String path, boolean watch, Stat stat)
+  throws KeeperException, InterruptedException {
+    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
+      RetryCounter retryCounter = retryCounterFactory.create();
+      while (true) {
+        try {
+          long startTime = EnvironmentEdgeManager.currentTime();
+          byte[] revData = checkZk().getData(path, watch, stat);
+          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+          return ZKMetadata.removeMetaData(revData);
+        } catch (KeeperException e) {
+          this.metrics.registerFailedZKCall();
+          switch (e.code()) {
+            case CONNECTIONLOSS:
+              this.metrics.registerConnectionLossException();
+              retryOrThrow(retryCounter, e, "getData");
+              break;
+            case OPERATIONTIMEOUT:
+              this.metrics.registerOperationTimeoutException();
+              retryOrThrow(retryCounter, e, "getData");
+              break;
+
+            default:
+              throw e;
+          }
+        }
+        retryCounter.sleepUntilNextRetry();
+      }
+    }
+  }
+
+  /**
+   * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
+   * Adding an identifier field into the data to check whether
+   * badversion is caused by the result of previous correctly setData
+   * @return Stat instance
+   */
+  public Stat setData(String path, byte[] data, int version)
+  throws KeeperException, InterruptedException {
+    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setData")) {
+      RetryCounter retryCounter = retryCounterFactory.create();
+      byte[] newData = ZKMetadata.appendMetaData(id, data);
+      boolean isRetry = false;
+      long startTime;
+      while (true) {
+        try {
+          startTime = EnvironmentEdgeManager.currentTime();
+          Stat nodeStat = checkZk().setData(path, newData, version);
+          this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+          return nodeStat;
+        } catch (KeeperException e) {
+          this.metrics.registerFailedZKCall();
+          switch (e.code()) {
+            case CONNECTIONLOSS:
+              this.metrics.registerConnectionLossException();
+              retryOrThrow(retryCounter, e, "setData");
+              break;
+            case OPERATIONTIMEOUT:
+              this.metrics.registerOperationTimeoutException();
+              retryOrThrow(retryCounter, e, "setData");
+              break;
+            case BADVERSION:
+              if (isRetry) {
+                // try to verify whether the previous setData success or not
+                try{
+                  Stat stat = new Stat();
+                  startTime = EnvironmentEdgeManager.currentTime();
+                  byte[] revData = checkZk().getData(path, false, stat);
+                  this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+                  if(Bytes.compareTo(revData, newData) == 0) {
+                    // the bad version is caused by previous successful setData
+                    return stat;
+                  }
+                } catch(KeeperException keeperException){
+                  this.metrics.registerFailedZKCall();
+                  // the ZK is not reliable at this moment. just throwing exception
+                  throw keeperException;
+                }
+              }
+            // throw other exceptions and verified bad version exceptions
+            default:
+              throw e;
+          }
+        }
+        retryCounter.sleepUntilNextRetry();
+        isRetry = true;
+      }
+    }
+  }
+
+  /**
+   * getAcl is an idempotent operation. Retry before throwing exception
+   * @return list of ACLs
+   */
+  public List<ACL> getAcl(String path, Stat stat)
+  throws KeeperException, InterruptedException {
+    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) {
+      RetryCounter retryCounter = retryCounterFactory.create();
+      while (true) {
+        try {
+          long startTime = EnvironmentEdgeManager.currentTime();
+          List<ACL> nodeACL = checkZk().getACL(path, stat);
+          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+          return nodeACL;
+        } catch (KeeperException e) {
+          this.metrics.registerFailedZKCall();
+          switch (e.code()) {
+            case CONNECTIONLOSS:
+              this.metrics.registerConnectionLossException();
+              retryOrThrow(retryCounter, e, "getAcl");
+              break;
+            case OPERATIONTIMEOUT:
+              this.metrics.registerOperationTimeoutException();
+              retryOrThrow(retryCounter, e, "getAcl");
+              break;
+
+            default:
+              throw e;
+          }
+        }
+        retryCounter.sleepUntilNextRetry();
+      }
+    }
+  }
+
+  /**
+   * setAcl is an idempotent operation. Retry before throwing exception
+   * @return list of ACLs
+   */
+  public Stat setAcl(String path, List<ACL> acls, int version)
+  throws KeeperException, InterruptedException {
+    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setAcl")) {
+      RetryCounter retryCounter = retryCounterFactory.create();
+      while (true) {
+        try {
+          long startTime = EnvironmentEdgeManager.currentTime();
+          Stat nodeStat = checkZk().setACL(path, acls, version);
+          this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+          return nodeStat;
+        } catch (KeeperException e) {
+          this.metrics.registerFailedZKCall();
+          switch (e.code()) {
+            case CONNECTIONLOSS:
+              this.metrics.registerConnectionLossException();
+              retryOrThrow(retryCounter, e, "setAcl");
+              break;
+            case OPERATIONTIMEOUT:
+              this.metrics.registerOperationTimeoutException();
+              retryOrThrow(retryCounter, e, "setAcl");
+              break;
+
+            default:
+              throw e;
+          }
+        }
+        retryCounter.sleepUntilNextRetry();
+      }
+    }
+  }
+
+  /**
+   * <p>
+   * NONSEQUENTIAL create is idempotent operation.
+   * Retry before throwing exceptions.
+   * But this function will not throw the NodeExist exception back to the
+   * application.
+   * </p>
+   * <p>
+   * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
+   * identifier to the path to verify, whether the previous one is successful
+   * or not.
+   * </p>
+   *
+   * @return Path
+   */
+  public String create(String path, byte[] data, List<ACL> acl,
+      CreateMode createMode)
+  throws KeeperException, InterruptedException {
+    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.create")) {
+      byte[] newData = ZKMetadata.appendMetaData(id, data);
+      switch (createMode) {
+        case EPHEMERAL:
+        case PERSISTENT:
+          return createNonSequential(path, newData, acl, createMode);
+
+        case EPHEMERAL_SEQUENTIAL:
+        case PERSISTENT_SEQUENTIAL:
+          return createSequential(path, newData, acl, createMode);
+
+        default:
+          throw new IllegalArgumentException("Unrecognized CreateMode: " +
+              createMode);
+      }
+    }
+  }
+
+  private String createNonSequential(String path, byte[] data, List<ACL> acl,
+      CreateMode createMode) throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    boolean isRetry = false; // False for first attempt, true for all retries.
+    long startTime;
+    while (true) {
+      try {
+        startTime = EnvironmentEdgeManager.currentTime();
+        String nodePath = checkZk().create(path, data, acl, createMode);
+        this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+        return nodePath;
+      } catch (KeeperException e) {
+        this.metrics.registerFailedZKCall();
+        switch (e.code()) {
+          case NODEEXISTS:
+            if (isRetry) {
+              // If the connection was lost, there is still a possibility that
+              // we have successfully created the node at our previous attempt,
+              // so we read the node and compare.
+              startTime = EnvironmentEdgeManager.currentTime();
+              byte[] currentData = checkZk().getData(path, false, null);
+              this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+              if (currentData != null &&
+                  Bytes.compareTo(currentData, data) == 0) {
+                // We successfully created a non-sequential node
+                return path;
+              }
+              LOG.error("Node " + path + " already exists with " +
+                  Bytes.toStringBinary(currentData) + ", could not write " +
+                  Bytes.toStringBinary(data));
+              throw e;
+            }
+            LOG.debug("Node " + path + " already exists");
+            throw e;
+
+          case CONNECTIONLOSS:
+            this.metrics.registerConnectionLossException();
+            retryOrThrow(retryCounter, e, "create");
+            break;
+          case OPERATIONTIMEOUT:
+            this.metrics.registerOperationTimeoutException();
+            retryOrThrow(retryCounter, e, "create");
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+      isRetry = true;
+    }
+  }
+
+  private String createSequential(String path, byte[] data,
+      List<ACL> acl, CreateMode createMode)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    boolean first = true;
+    String newPath = path+this.identifier;
+    while (true) {
+      try {
+        if (!first) {
+          // Check if we succeeded on a previous attempt
+          String previousResult = findPreviousSequentialNode(newPath);
+          if (previousResult != null) {
+            return previousResult;
+          }
+        }
+        first = false;
+        long startTime = EnvironmentEdgeManager.currentTime();
+        String nodePath = checkZk().create(newPath, data, acl, createMode);
+        this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+        return nodePath;
+      } catch (KeeperException e) {
+        this.metrics.registerFailedZKCall();
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+            this.metrics.registerConnectionLossException();
+            retryOrThrow(retryCounter, e, "create");
+            break;
+          case OPERATIONTIMEOUT:
+            this.metrics.registerOperationTimeoutException();
+            retryOrThrow(retryCounter, e, "create");
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+    }
+  }
+  /**
+   * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op
+   * instances to actually pass to multi (need to do this in order to appendMetaData).
+   */
+  private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
+  throws UnsupportedOperationException {
+    if(ops == null) return null;
+
+    List<Op> preparedOps = new LinkedList<>();
+    for (Op op : ops) {
+      if (op.getType() == ZooDefs.OpCode.create) {
+        CreateRequest create = (CreateRequest)op.toRequestRecord();
+        preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()),
+          create.getAcl(), create.getFlags()));
+      } else if (op.getType() == ZooDefs.OpCode.delete) {
+        // no need to appendMetaData for delete
+        preparedOps.add(op);
+      } else if (op.getType() == ZooDefs.OpCode.setData) {
+        SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
+        preparedOps.add(Op.setData(setData.getPath(), ZKMetadata.appendMetaData(id, setData.getData()),
+          setData.getVersion()));
+      } else {
+        throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
+      }
+    }
+    return preparedOps;
+  }
+
+  /**
+   * Run multiple operations in a transactional manner. Retry before throwing exception
+   */
+  public List<OpResult> multi(Iterable<Op> ops)
+  throws KeeperException, InterruptedException {
+    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) {
+      RetryCounter retryCounter = retryCounterFactory.create();
+      Iterable<Op> multiOps = prepareZKMulti(ops);
+      while (true) {
+        try {
+          long startTime = EnvironmentEdgeManager.currentTime();
+          List<OpResult> opResults = checkZk().multi(multiOps);
+          this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+          return opResults;
+        } catch (KeeperException e) {
+          this.metrics.registerFailedZKCall();
+          switch (e.code()) {
+            case CONNECTIONLOSS:
+              this.metrics.registerConnectionLossException();
+              retryOrThrow(retryCounter, e, "multi");
+              break;
+            case OPERATIONTIMEOUT:
+              this.metrics.registerOperationTimeoutException();
+              retryOrThrow(retryCounter, e, "multi");
+              break;
+
+            default:
+              throw e;
+          }
+        }
+        retryCounter.sleepUntilNextRetry();
+      }
+    }
+  }
+
+  private String findPreviousSequentialNode(String path)
+    throws KeeperException, InterruptedException {
+    int lastSlashIdx = path.lastIndexOf('/');
+    assert(lastSlashIdx != -1);
+    String parent = path.substring(0, lastSlashIdx);
+    String nodePrefix = path.substring(lastSlashIdx+1);
+    long startTime = EnvironmentEdgeManager.currentTime();
+    List<String> nodes = checkZk().getChildren(parent, false);
+    this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+    List<String> matching = filterByPrefix(nodes, nodePrefix);
+    for (String node : matching) {
+      String nodePath = parent + "/" + node;
+      startTime = EnvironmentEdgeManager.currentTime();
+      Stat stat = checkZk().exists(nodePath, false);
+      this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+      if (stat != null) {
+        return nodePath;
+      }
+    }
+    return null;
+  }
+
+  public synchronized long getSessionId() {
+    return zk == null ? -1 : zk.getSessionId();
+  }
+
+  public synchronized void close() throws InterruptedException {
+    if (zk != null) zk.close();
+  }
+
+  public synchronized States getState() {
+    return zk == null ? null : zk.getState();
+  }
+
+  public synchronized ZooKeeper getZooKeeper() {
+    return zk;
+  }
+
+  public synchronized byte[] getSessionPasswd() {
+    return zk == null ? null : zk.getSessionPasswd();
+  }
+
+  public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
+    long startTime = EnvironmentEdgeManager.currentTime();
+    checkZk().sync(path, cb, null);
+    this.metrics.registerSyncOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime,  1));
+  }
+
+  /**
+   * Filters the given node list by the given prefixes.
+   * This method is all-inclusive--if any element in the node list starts
+   * with any of the given prefixes, then it is included in the result.
+   *
+   * @param nodes the nodes to filter
+   * @param prefixes the prefixes to include in the result
+   * @return list of every element that starts with one of the prefixes
+   */
+  private static List<String> filterByPrefix(List<String> nodes,
+      String... prefixes) {
+    List<String> lockChildren = new ArrayList<>();
+    for (String child : nodes){
+      for (String prefix : prefixes){
+        if (child.startsWith(prefix)){
+          lockChildren.add(child);
+          break;
+        }
+      }
+    }
+    return lockChildren;
+  }
+
+  public String getIdentifier() {
+    return identifier;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionNormalizerTracker.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionNormalizerTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionNormalizerTracker.java
new file mode 100644
index 0000000..93545ee
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionNormalizerTracker.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionNormalizerProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+
+/**
+ * Tracks region normalizer state up in ZK
+ */
+public class RegionNormalizerTracker extends ZKNodeTracker {
+  private static final Log LOG = LogFactory.getLog(RegionNormalizerTracker.class);
+
+  public RegionNormalizerTracker(ZKWatcher watcher,
+                             Abortable abortable) {
+    super(watcher, watcher.znodePaths.regionNormalizerZNode, abortable);
+  }
+
+  /**
+   * Return true if region normalizer is on, false otherwise
+   */
+  public boolean isNormalizerOn() {
+    byte [] upData = super.getData(false);
+    try {
+      // if data in ZK is null, use default of on.
+      return upData == null || parseFrom(upData).getNormalizerOn();
+    } catch (DeserializationException dex) {
+      LOG.error("ZK state for RegionNormalizer could not be parsed "
+        + Bytes.toStringBinary(upData));
+      // return false to be safe.
+      return false;
+    }
+  }
+
+  /**
+   * Set region normalizer on/off
+   * @param normalizerOn whether normalizer should be on or off
+   * @throws KeeperException
+   */
+  public void setNormalizerOn(boolean normalizerOn) throws KeeperException {
+    byte [] upData = toByteArray(normalizerOn);
+    try {
+      ZKUtil.setData(watcher, watcher.znodePaths.regionNormalizerZNode, upData);
+    } catch(KeeperException.NoNodeException nne) {
+      ZKUtil.createAndWatch(watcher, watcher.znodePaths.regionNormalizerZNode, upData);
+    }
+    super.nodeDataChanged(watcher.znodePaths.regionNormalizerZNode);
+  }
+
+  private byte [] toByteArray(boolean isNormalizerOn) {
+    RegionNormalizerProtos.RegionNormalizerState.Builder builder =
+      RegionNormalizerProtos.RegionNormalizerState.newBuilder();
+    builder.setNormalizerOn(isNormalizerOn);
+    return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
+  }
+
+  private RegionNormalizerProtos.RegionNormalizerState parseFrom(byte [] pbBytes)
+    throws DeserializationException {
+    ProtobufUtil.expectPBMagicPrefix(pbBytes);
+    RegionNormalizerProtos.RegionNormalizerState.Builder builder =
+      RegionNormalizerProtos.RegionNormalizerState.newBuilder();
+    try {
+      int magicLen = ProtobufUtil.lengthOfPBMagic();
+      ProtobufUtil.mergeFrom(builder, pbBytes, magicLen, pbBytes.length - magicLen);
+    } catch (IOException e) {
+      throw new DeserializationException(e);
+    }
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java
new file mode 100644
index 0000000..4150f54
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * You may add the jaas.conf option
+ *    -Djava.security.auth.login.config=/PATH/jaas.conf
+ *
+ * You may also specify -D to set options
+ *    "hbase.zookeeper.quorum"    (it should be in hbase-site.xml)
+ *    "zookeeper.znode.parent"    (it should be in hbase-site.xml)
+ *
+ * Use -set-acls to set the ACLs, no option to erase ACLs
+ */
+@InterfaceAudience.Private
+public class ZKAclReset extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(ZKAclReset.class);
+
+  private static void resetAcls(final ZKWatcher zkw, final String znode,
+                                final boolean eraseAcls) throws Exception {
+    List<String> children = ZKUtil.listChildrenNoWatch(zkw, znode);
+    if (children != null) {
+      for (String child: children) {
+        resetAcls(zkw, ZNodePaths.joinZNode(znode, child), eraseAcls);
+      }
+    }
+
+    ZooKeeper zk = zkw.getRecoverableZooKeeper().getZooKeeper();
+    if (eraseAcls) {
+      LOG.info(" - erase ACLs for " + znode);
+      zk.setACL(znode, ZooDefs.Ids.OPEN_ACL_UNSAFE, -1);
+    } else {
+      LOG.info(" - set ACLs for " + znode);
+      zk.setACL(znode, ZKUtil.createACL(zkw, znode, true), -1);
+    }
+  }
+
+  private static void resetAcls(final Configuration conf, boolean eraseAcls)
+      throws Exception {
+    ZKWatcher zkw = new ZKWatcher(conf, "ZKAclReset", null);
+    try {
+      LOG.info((eraseAcls ? "Erase" : "Set") + " HBase ACLs for " +
+                zkw.getQuorum() + " " + zkw.znodePaths.baseZNode);
+      resetAcls(zkw, zkw.znodePaths.baseZNode, eraseAcls);
+    } finally {
+      zkw.close();
+    }
+  }
+
+  private void printUsageAndExit() {
+    System.err.printf("Usage: hbase %s [options]%n", getClass().getName());
+    System.err.println(" where [options] are:");
+    System.err.println("  -h|-help                Show this help and exit.");
+    System.err.println("  -set-acls               Setup the hbase znode ACLs for a secure cluster");
+    System.err.println();
+    System.err.println("Examples:");
+    System.err.println("  To reset the ACLs to the unsecure cluster behavior:");
+    System.err.println("  hbase " + getClass().getName());
+    System.err.println();
+    System.err.println("  To reset the ACLs to the secure cluster behavior:");
+    System.err.println("  hbase " + getClass().getName() + " -set-acls");
+    System.exit(1);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    boolean eraseAcls = true;
+
+    for (int i = 0; i < args.length; ++i) {
+      if (args[i].equals("-help")) {
+        printUsageAndExit();
+      } else if (args[i].equals("-set-acls")) {
+        eraseAcls = false;
+      } else {
+        printUsageAndExit();
+      }
+    }
+
+    resetAcls(getConf(), eraseAcls);
+    return(0);
+  }
+
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(HBaseConfiguration.create(), new ZKAclReset(), args));
+  }
+}