You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/11/17 21:20:36 UTC
[04/13] hbase git commit: HBASE-19114 Split out o.a.h.h.zookeeper
from hbase-server and hbase-client
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
new file mode 100644
index 0000000..f6c7a2d
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.ipc.FailedServerException;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaRegionServer;
+
+/**
+ * Utility class to perform operation (get/wait for/verify/set/delete) on znode in ZooKeeper
+ * which keeps hbase:meta region server location.
+ *
+ * Stateless class with a bunch of static methods. Doesn't manage resources passed in
+ * (e.g. Connection, ZKWatcher etc).
+ *
+ * Meta region location is set by <code>RegionServerServices</code>.
+ * This class doesn't use ZK watchers, rather accesses ZK directly.
+ *
+ * This class it stateless. The only reason it's not made a non-instantiable util class
+ * with a collection of static methods is that it'd be rather hard to mock properly in tests.
+ *
+ * TODO: rewrite using RPC calls to master to find out about hbase:meta.
+ */
+@InterfaceAudience.Private
+public class MetaTableLocator {
+ private static final Log LOG = LogFactory.getLog(MetaTableLocator.class);
+
+ // only needed to allow non-timeout infinite waits to stop when cluster shuts down
+ private volatile boolean stopped = false;
+
+ /**
+ * Checks if the meta region location is available.
+ * @return true if meta region location is available, false if not
+ */
+ public boolean isLocationAvailable(ZKWatcher zkw) {
+ return getMetaRegionLocation(zkw) != null;
+ }
+
+ /**
+ * @param zkw ZooKeeper watcher to be used
+ * @return meta table regions and their locations.
+ */
+ public List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw) {
+ return getMetaRegionsAndLocations(zkw, RegionInfo.DEFAULT_REPLICA_ID);
+ }
+
+ /**
+ *
+ * @param zkw
+ * @param replicaId
+ * @return meta table regions and their locations.
+ */
+ public List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw,
+ int replicaId) {
+ ServerName serverName = getMetaRegionLocation(zkw, replicaId);
+ List<Pair<RegionInfo, ServerName>> list = new ArrayList<>(1);
+ list.add(new Pair<>(RegionReplicaUtil.getRegionInfoForReplica(
+ RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId), serverName));
+ return list;
+ }
+
+ /**
+ * @param zkw ZooKeeper watcher to be used
+ * @return List of meta regions
+ */
+ public List<RegionInfo> getMetaRegions(ZKWatcher zkw) {
+ return getMetaRegions(zkw, RegionInfo.DEFAULT_REPLICA_ID);
+ }
+
+ /**
+ *
+ * @param zkw
+ * @param replicaId
+ * @return List of meta regions
+ */
+ public List<RegionInfo> getMetaRegions(ZKWatcher zkw, int replicaId) {
+ List<Pair<RegionInfo, ServerName>> result;
+ result = getMetaRegionsAndLocations(zkw, replicaId);
+ return getListOfRegionInfos(result);
+ }
+
+ private List<RegionInfo> getListOfRegionInfos(
+ final List<Pair<RegionInfo, ServerName>> pairs) {
+ if (pairs == null || pairs.isEmpty()) return Collections.EMPTY_LIST;
+ List<RegionInfo> result = new ArrayList<>(pairs.size());
+ for (Pair<RegionInfo, ServerName> pair: pairs) {
+ result.add(pair.getFirst());
+ }
+ return result;
+ }
+
+ /**
+ * Gets the meta region location, if available. Does not block.
+ * @param zkw zookeeper connection to use
+ * @return server name or null if we failed to get the data.
+ */
+ public ServerName getMetaRegionLocation(final ZKWatcher zkw) {
+ try {
+ RegionState state = getMetaRegionState(zkw);
+ return state.isOpened() ? state.getServerName() : null;
+ } catch (KeeperException ke) {
+ return null;
+ }
+ }
+
+ /**
+ * Gets the meta region location, if available. Does not block.
+ * @param zkw
+ * @param replicaId
+ * @return server name
+ */
+ public ServerName getMetaRegionLocation(final ZKWatcher zkw, int replicaId) {
+ try {
+ RegionState state = getMetaRegionState(zkw, replicaId);
+ return state.isOpened() ? state.getServerName() : null;
+ } catch (KeeperException ke) {
+ return null;
+ }
+ }
+
+ /**
+ * Gets the meta region location, if available, and waits for up to the
+ * specified timeout if not immediately available.
+ * Given the zookeeper notification could be delayed, we will try to
+ * get the latest data.
+ * @param zkw
+ * @param timeout maximum time to wait, in millis
+ * @return server name for server hosting meta region formatted as per
+ * {@link ServerName}, or null if none available
+ * @throws InterruptedException if interrupted while waiting
+ * @throws NotAllMetaRegionsOnlineException
+ */
+ public ServerName waitMetaRegionLocation(ZKWatcher zkw, long timeout)
+ throws InterruptedException, NotAllMetaRegionsOnlineException {
+ return waitMetaRegionLocation(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout);
+ }
+
+ /**
+ * Gets the meta region location, if available, and waits for up to the
+ * specified timeout if not immediately available.
+ * Given the zookeeper notification could be delayed, we will try to
+ * get the latest data.
+ * @param zkw
+ * @param replicaId
+ * @param timeout maximum time to wait, in millis
+ * @return server name for server hosting meta region formatted as per
+ * {@link ServerName}, or null if none available
+ * @throws InterruptedException
+ * @throws NotAllMetaRegionsOnlineException
+ */
+ public ServerName waitMetaRegionLocation(ZKWatcher zkw, int replicaId, long timeout)
+ throws InterruptedException, NotAllMetaRegionsOnlineException {
+ try {
+ if (ZKUtil.checkExists(zkw, zkw.znodePaths.baseZNode) == -1) {
+ String errorMsg = "Check the value configured in 'zookeeper.znode.parent'. "
+ + "There could be a mismatch with the one configured in the master.";
+ LOG.error(errorMsg);
+ throw new IllegalArgumentException(errorMsg);
+ }
+ } catch (KeeperException e) {
+ throw new IllegalStateException("KeeperException while trying to check baseZNode:", e);
+ }
+ ServerName sn = blockUntilAvailable(zkw, replicaId, timeout);
+
+ if (sn == null) {
+ throw new NotAllMetaRegionsOnlineException("Timed out; " + timeout + "ms");
+ }
+
+ return sn;
+ }
+
+ /**
+ * Waits indefinitely for availability of <code>hbase:meta</code>. Used during
+ * cluster startup. Does not verify meta, just that something has been
+ * set up in zk.
+ * @see #waitMetaRegionLocation(ZKWatcher, long)
+ * @throws InterruptedException if interrupted while waiting
+ */
+ public void waitMetaRegionLocation(ZKWatcher zkw) throws InterruptedException {
+ long startTime = System.currentTimeMillis();
+ while (!stopped) {
+ try {
+ if (waitMetaRegionLocation(zkw, 100) != null) break;
+ long sleepTime = System.currentTimeMillis() - startTime;
+ // +1 in case sleepTime=0
+ if ((sleepTime + 1) % 10000 == 0) {
+ LOG.warn("Have been waiting for meta to be assigned for " + sleepTime + "ms");
+ }
+ } catch (NotAllMetaRegionsOnlineException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("hbase:meta still not available, sleeping and retrying." +
+ " Reason: " + e.getMessage());
+ }
+ }
+ }
+ }
+
+ /**
+ * Verify <code>hbase:meta</code> is deployed and accessible.
+ * @param hConnection
+ * @param zkw
+ * @param timeout How long to wait on zk for meta address (passed through to
+ * the internal call to {@link #getMetaServerConnection}.
+ * @return True if the <code>hbase:meta</code> location is healthy.
+ * @throws java.io.IOException
+ * @throws InterruptedException
+ */
+ public boolean verifyMetaRegionLocation(ClusterConnection hConnection,
+ ZKWatcher zkw, final long timeout)
+ throws InterruptedException, IOException {
+ return verifyMetaRegionLocation(hConnection, zkw, timeout, RegionInfo.DEFAULT_REPLICA_ID);
+ }
+
+ /**
+ * Verify <code>hbase:meta</code> is deployed and accessible.
+ * @param connection
+ * @param zkw
+ * @param timeout How long to wait on zk for meta address (passed through to
+ * @param replicaId
+ * @return True if the <code>hbase:meta</code> location is healthy.
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public boolean verifyMetaRegionLocation(ClusterConnection connection,
+ ZKWatcher zkw, final long timeout, int replicaId)
+ throws InterruptedException, IOException {
+ AdminProtos.AdminService.BlockingInterface service = null;
+ try {
+ service = getMetaServerConnection(connection, zkw, timeout, replicaId);
+ } catch (NotAllMetaRegionsOnlineException e) {
+ // Pass
+ } catch (ServerNotRunningYetException e) {
+ // Pass -- remote server is not up so can't be carrying root
+ } catch (UnknownHostException e) {
+ // Pass -- server name doesn't resolve so it can't be assigned anything.
+ } catch (RegionServerStoppedException e) {
+ // Pass -- server name sends us to a server that is dying or already dead.
+ }
+ return (service != null) && verifyRegionLocation(connection, service,
+ getMetaRegionLocation(zkw, replicaId), RegionReplicaUtil.getRegionInfoForReplica(
+ RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId).getRegionName());
+ }
+
+ /**
+ * Verify we can connect to <code>hostingServer</code> and that its carrying
+ * <code>regionName</code>.
+ * @param hostingServer Interface to the server hosting <code>regionName</code>
+ * @param address The servername that goes with the <code>metaServer</code>
+ * Interface. Used logging.
+ * @param regionName The regionname we are interested in.
+ * @return True if we were able to verify the region located at other side of
+ * the Interface.
+ * @throws IOException
+ */
+ // TODO: We should be able to get the ServerName from the AdminProtocol
+ // rather than have to pass it in. Its made awkward by the fact that the
+ // HRI is likely a proxy against remote server so the getServerName needs
+ // to be fixed to go to a local method or to a cache before we can do this.
+ private boolean verifyRegionLocation(final ClusterConnection connection,
+ AdminService.BlockingInterface hostingServer, final ServerName address,
+ final byte [] regionName)
+ throws IOException {
+ if (hostingServer == null) {
+ LOG.info("Passed hostingServer is null");
+ return false;
+ }
+ Throwable t;
+ HBaseRpcController controller = connection.getRpcControllerFactory().newController();
+ try {
+ // Try and get regioninfo from the hosting server.
+ return ProtobufUtil.getRegionInfo(controller, hostingServer, regionName) != null;
+ } catch (ConnectException e) {
+ t = e;
+ } catch (RetriesExhaustedException e) {
+ t = e;
+ } catch (RemoteException e) {
+ IOException ioe = e.unwrapRemoteException();
+ t = ioe;
+ } catch (IOException e) {
+ Throwable cause = e.getCause();
+ if (cause != null && cause instanceof EOFException) {
+ t = cause;
+ } else if (cause != null && cause.getMessage() != null
+ && cause.getMessage().contains("Connection reset")) {
+ t = cause;
+ } else {
+ t = e;
+ }
+ }
+ LOG.info("Failed verification of " + Bytes.toStringBinary(regionName) +
+ " at address=" + address + ", exception=" + t.getMessage());
+ return false;
+ }
+
+ /**
+ * Gets a connection to the server hosting meta, as reported by ZooKeeper,
+ * waiting up to the specified timeout for availability.
+ * <p>WARNING: Does not retry. Use an {@link org.apache.hadoop.hbase.client.HTable} instead.
+ * @param connection
+ * @param zkw
+ * @param timeout How long to wait on meta location
+ * @param replicaId
+ * @return connection to server hosting meta
+ * @throws InterruptedException
+ * @throws NotAllMetaRegionsOnlineException if timed out waiting
+ * @throws IOException
+ */
+ private AdminService.BlockingInterface getMetaServerConnection(ClusterConnection connection,
+ ZKWatcher zkw, long timeout, int replicaId)
+ throws InterruptedException, NotAllMetaRegionsOnlineException, IOException {
+ return getCachedConnection(connection, waitMetaRegionLocation(zkw, replicaId, timeout));
+ }
+
+ /**
+ * @param sn ServerName to get a connection against.
+ * @return The AdminProtocol we got when we connected to <code>sn</code>
+ * May have come from cache, may not be good, may have been setup by this
+ * invocation, or may be null.
+ * @throws IOException
+ */
+ private static AdminService.BlockingInterface getCachedConnection(ClusterConnection connection,
+ ServerName sn)
+ throws IOException {
+ if (sn == null) {
+ return null;
+ }
+ AdminService.BlockingInterface service = null;
+ try {
+ service = connection.getAdmin(sn);
+ } catch (RetriesExhaustedException e) {
+ if (e.getCause() != null && e.getCause() instanceof ConnectException) {
+ // Catch this; presume it means the cached connection has gone bad.
+ } else {
+ throw e;
+ }
+ } catch (SocketTimeoutException e) {
+ LOG.debug("Timed out connecting to " + sn);
+ } catch (NoRouteToHostException e) {
+ LOG.debug("Connecting to " + sn, e);
+ } catch (SocketException e) {
+ LOG.debug("Exception connecting to " + sn);
+ } catch (UnknownHostException e) {
+ LOG.debug("Unknown host exception connecting to " + sn);
+ } catch (FailedServerException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Server " + sn + " is in failed server list.");
+ }
+ } catch (IOException ioe) {
+ Throwable cause = ioe.getCause();
+ if (ioe instanceof ConnectException) {
+ // Catch. Connect refused.
+ } else if (cause != null && cause instanceof EOFException) {
+ // Catch. Other end disconnected us.
+ } else if (cause != null && cause.getMessage() != null &&
+ cause.getMessage().toLowerCase(Locale.ROOT).contains("connection reset")) {
+ // Catch. Connection reset.
+ } else {
+ throw ioe;
+ }
+
+ }
+ return service;
+ }
+
+ /**
+ * Sets the location of <code>hbase:meta</code> in ZooKeeper to the
+ * specified server address.
+ * @param zookeeper zookeeper reference
+ * @param serverName The server hosting <code>hbase:meta</code>
+ * @param state The region transition state
+ * @throws KeeperException unexpected zookeeper exception
+ */
+ public static void setMetaLocation(ZKWatcher zookeeper,
+ ServerName serverName, RegionState.State state) throws KeeperException {
+ setMetaLocation(zookeeper, serverName, RegionInfo.DEFAULT_REPLICA_ID, state);
+ }
+
+ /**
+ * Sets the location of <code>hbase:meta</code> in ZooKeeper to the
+ * specified server address.
+ * @param zookeeper
+ * @param serverName
+ * @param replicaId
+ * @param state
+ * @throws KeeperException
+ */
+ public static void setMetaLocation(ZKWatcher zookeeper,
+ ServerName serverName, int replicaId, RegionState.State state) throws KeeperException {
+ if (serverName == null) {
+ LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
+ return;
+ }
+ LOG.info("Setting hbase:meta (replicaId=" + replicaId + ") location in ZooKeeper as " +
+ serverName);
+ // Make the MetaRegionServer pb and then get its bytes and save this as
+ // the znode content.
+ MetaRegionServer pbrsr = MetaRegionServer.newBuilder()
+ .setServer(ProtobufUtil.toServerName(serverName))
+ .setRpcVersion(HConstants.RPC_CURRENT_VERSION)
+ .setState(state.convert()).build();
+ byte[] data = ProtobufUtil.prependPBMagic(pbrsr.toByteArray());
+ try {
+ ZKUtil.setData(zookeeper,
+ zookeeper.znodePaths.getZNodeForReplica(replicaId), data);
+ } catch(KeeperException.NoNodeException nne) {
+ if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
+ LOG.debug("META region location doesn't exist, create it");
+ } else {
+ LOG.debug("META region location doesn't exist for replicaId=" + replicaId +
+ ", create it");
+ }
+ ZKUtil.createAndWatch(zookeeper, zookeeper.znodePaths.getZNodeForReplica(replicaId), data);
+ }
+ }
+
+ /**
+ * Load the meta region state from the meta server ZNode.
+ */
+ public static RegionState getMetaRegionState(ZKWatcher zkw) throws KeeperException {
+ return getMetaRegionState(zkw, RegionInfo.DEFAULT_REPLICA_ID);
+ }
+
+ /**
+ * Load the meta region state from the meta server ZNode.
+ * @param zkw
+ * @param replicaId
+ * @return regionstate
+ * @throws KeeperException
+ */
+ public static RegionState getMetaRegionState(ZKWatcher zkw, int replicaId)
+ throws KeeperException {
+ RegionState.State state = RegionState.State.OPEN;
+ ServerName serverName = null;
+ try {
+ byte[] data = ZKUtil.getData(zkw, zkw.znodePaths.getZNodeForReplica(replicaId));
+ if (data != null && data.length > 0 && ProtobufUtil.isPBMagicPrefix(data)) {
+ try {
+ int prefixLen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.MetaRegionServer rl =
+ ZooKeeperProtos.MetaRegionServer.PARSER.parseFrom
+ (data, prefixLen, data.length - prefixLen);
+ if (rl.hasState()) {
+ state = RegionState.State.convert(rl.getState());
+ }
+ HBaseProtos.ServerName sn = rl.getServer();
+ serverName = ServerName.valueOf(
+ sn.getHostName(), sn.getPort(), sn.getStartCode());
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException("Unable to parse meta region location");
+ }
+ } else {
+ // old style of meta region location?
+ serverName = ProtobufUtil.parseServerNameFrom(data);
+ }
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (serverName == null) {
+ state = RegionState.State.OFFLINE;
+ }
+ return new RegionState(
+ RegionReplicaUtil.getRegionInfoForReplica(RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId),
+ state, serverName);
+ }
+
+ /**
+ * Deletes the location of <code>hbase:meta</code> in ZooKeeper.
+ * @param zookeeper zookeeper reference
+ * @throws KeeperException unexpected zookeeper exception
+ */
+ public void deleteMetaLocation(ZKWatcher zookeeper)
+ throws KeeperException {
+ deleteMetaLocation(zookeeper, RegionInfo.DEFAULT_REPLICA_ID);
+ }
+
+ public void deleteMetaLocation(ZKWatcher zookeeper, int replicaId)
+ throws KeeperException {
+ if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
+ LOG.info("Deleting hbase:meta region location in ZooKeeper");
+ } else {
+ LOG.info("Deleting hbase:meta for " + replicaId + " region location in ZooKeeper");
+ }
+ try {
+ // Just delete the node. Don't need any watches.
+ ZKUtil.deleteNode(zookeeper, zookeeper.znodePaths.getZNodeForReplica(replicaId));
+ } catch(KeeperException.NoNodeException nne) {
+ // Has already been deleted
+ }
+ }
+ /**
+ * Wait until the primary meta region is available. Get the secondary
+ * locations as well but don't block for those.
+ * @param zkw
+ * @param timeout
+ * @param conf
+ * @return ServerName or null if we timed out.
+ * @throws InterruptedException
+ */
+ public List<ServerName> blockUntilAvailable(final ZKWatcher zkw,
+ final long timeout, Configuration conf)
+ throws InterruptedException {
+ int numReplicasConfigured = 1;
+
+ List<ServerName> servers = new ArrayList<>();
+ // Make the blocking call first so that we do the wait to know
+ // the znodes are all in place or timeout.
+ ServerName server = blockUntilAvailable(zkw, timeout);
+ if (server == null) return null;
+ servers.add(server);
+
+ try {
+ List<String> metaReplicaNodes = zkw.getMetaReplicaNodes();
+ numReplicasConfigured = metaReplicaNodes.size();
+ } catch (KeeperException e) {
+ LOG.warn("Got ZK exception " + e);
+ }
+ for (int replicaId = 1; replicaId < numReplicasConfigured; replicaId++) {
+ // return all replica locations for the meta
+ servers.add(getMetaRegionLocation(zkw, replicaId));
+ }
+ return servers;
+ }
+
+ /**
+ * Wait until the meta region is available and is not in transition.
+ * @param zkw zookeeper connection to use
+ * @param timeout maximum time to wait, in millis
+ * @return ServerName or null if we timed out.
+ * @throws InterruptedException
+ */
+ public ServerName blockUntilAvailable(final ZKWatcher zkw,
+ final long timeout)
+ throws InterruptedException {
+ return blockUntilAvailable(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout);
+ }
+
+ /**
+ * Wait until the meta region is available and is not in transition.
+ * @param zkw
+ * @param replicaId
+ * @param timeout
+ * @return ServerName or null if we timed out.
+ * @throws InterruptedException
+ */
+ public ServerName blockUntilAvailable(final ZKWatcher zkw, int replicaId,
+ final long timeout)
+ throws InterruptedException {
+ if (timeout < 0) throw new IllegalArgumentException();
+ if (zkw == null) throw new IllegalArgumentException();
+ long startTime = System.currentTimeMillis();
+ ServerName sn = null;
+ while (true) {
+ sn = getMetaRegionLocation(zkw, replicaId);
+ if (sn != null || (System.currentTimeMillis() - startTime)
+ > timeout - HConstants.SOCKET_RETRY_WAIT_MS) {
+ break;
+ }
+ Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
+ }
+ return sn;
+ }
+
+ /**
+ * Stop working.
+ * Interrupts any ongoing waits.
+ */
+ public void stop() {
+ if (!stopped) {
+ LOG.debug("Stopping MetaTableLocator");
+ stopped = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
new file mode 100644
index 0000000..ef643bf
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
@@ -0,0 +1,472 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InterruptedIOException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead
+ * of redoing it, we should contribute updates to their code which let us more
+ * easily access testing helper objects.
+ */
+@InterfaceAudience.Public
+public class MiniZooKeeperCluster {
+ private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
+
+ private static final int TICK_TIME = 2000;
+ private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
+ private int connectionTimeout;
+
+ private boolean started;
+
+ /** The default port. If zero, we use a random port. */
+ private int defaultClientPort = 0;
+
+ private List<NIOServerCnxnFactory> standaloneServerFactoryList;
+ private List<ZooKeeperServer> zooKeeperServers;
+ private List<Integer> clientPortList;
+
+ private int activeZKServerIndex;
+ private int tickTime = 0;
+
+ private Configuration configuration;
+
+ public MiniZooKeeperCluster() {
+ this(new Configuration());
+ }
+
+ public MiniZooKeeperCluster(Configuration configuration) {
+ this.started = false;
+ this.configuration = configuration;
+ activeZKServerIndex = -1;
+ zooKeeperServers = new ArrayList<>();
+ clientPortList = new ArrayList<>();
+ standaloneServerFactoryList = new ArrayList<>();
+ connectionTimeout = configuration.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster",
+ DEFAULT_CONNECTION_TIMEOUT);
+ }
+
+ /**
+ * Add a client port to the list.
+ *
+ * @param clientPort the specified port
+ */
+ public void addClientPort(int clientPort) {
+ clientPortList.add(clientPort);
+ }
+
+ /**
+ * Get the list of client ports.
+ * @return clientPortList the client port list
+ */
+ @VisibleForTesting
+ public List<Integer> getClientPortList() {
+ return clientPortList;
+ }
+
+ /**
+ * Check whether the client port in a specific position of the client port list is valid.
+ *
+ * @param index the specified position
+ */
+ private boolean hasValidClientPortInList(int index) {
+ return (clientPortList.size() > index && clientPortList.get(index) > 0);
+ }
+
+ public void setDefaultClientPort(int clientPort) {
+ if (clientPort <= 0) {
+ throw new IllegalArgumentException("Invalid default ZK client port: "
+ + clientPort);
+ }
+ this.defaultClientPort = clientPort;
+ }
+
+ /**
+ * Selects a ZK client port.
+ *
+ * @param seedPort the seed port to start with; -1 means first time.
+ * @Returns a valid and unused client port
+ */
+ private int selectClientPort(int seedPort) {
+ int i;
+ int returnClientPort = seedPort + 1;
+ if (returnClientPort == 0) {
+ // If the new port is invalid, find one - starting with the default client port.
+ // If the default client port is not specified, starting with a random port.
+ // The random port is selected from the range between 49152 to 65535. These ports cannot be
+ // registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports).
+ if (defaultClientPort > 0) {
+ returnClientPort = defaultClientPort;
+ } else {
+ returnClientPort = 0xc000 + new Random().nextInt(0x3f00);
+ }
+ }
+ // Make sure that the port is unused.
+ while (true) {
+ for (i = 0; i < clientPortList.size(); i++) {
+ if (returnClientPort == clientPortList.get(i)) {
+ // Already used. Update the port and retry.
+ returnClientPort++;
+ break;
+ }
+ }
+ if (i == clientPortList.size()) {
+ break; // found a unused port, exit
+ }
+ }
+ return returnClientPort;
+ }
+
+ public void setTickTime(int tickTime) {
+ this.tickTime = tickTime;
+ }
+
+ public int getBackupZooKeeperServerNum() {
+ return zooKeeperServers.size()-1;
+ }
+
+ public int getZooKeeperServerNum() {
+ return zooKeeperServers.size();
+ }
+
+ // / XXX: From o.a.zk.t.ClientBase
+ private static void setupTestEnv() {
+ // during the tests we run with 100K prealloc in the logs.
+ // on windows systems prealloc of 64M was seen to take ~15seconds
+ // resulting in test failure (client timeout on first session).
+ // set env and directly in order to handle static init/gc issues
+ System.setProperty("zookeeper.preAllocSize", "100");
+ FileTxnLog.setPreallocSize(100 * 1024);
+ }
+
+ public int startup(File baseDir) throws IOException, InterruptedException {
+ int numZooKeeperServers = clientPortList.size();
+ if (numZooKeeperServers == 0) {
+ numZooKeeperServers = 1; // need at least 1 ZK server for testing
+ }
+ return startup(baseDir, numZooKeeperServers);
+ }
+
+ /**
+ * @param baseDir
+ * @param numZooKeeperServers
+ * @return ClientPort server bound to, -1 if there was a
+ * binding problem and we couldn't pick another port.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public int startup(File baseDir, int numZooKeeperServers) throws IOException,
+ InterruptedException {
+ if (numZooKeeperServers <= 0)
+ return -1;
+
+ setupTestEnv();
+ shutdown();
+
+ int tentativePort = -1; // the seed port
+ int currentClientPort;
+
+ // running all the ZK servers
+ for (int i = 0; i < numZooKeeperServers; i++) {
+ File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
+ createDir(dir);
+ int tickTimeToUse;
+ if (this.tickTime > 0) {
+ tickTimeToUse = this.tickTime;
+ } else {
+ tickTimeToUse = TICK_TIME;
+ }
+
+ // Set up client port - if we have already had a list of valid ports, use it.
+ if (hasValidClientPortInList(i)) {
+ currentClientPort = clientPortList.get(i);
+ } else {
+ tentativePort = selectClientPort(tentativePort); // update the seed
+ currentClientPort = tentativePort;
+ }
+
+ ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
+ // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
+ server.setMinSessionTimeout(configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1));
+ server.setMaxSessionTimeout(configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1));
+ NIOServerCnxnFactory standaloneServerFactory;
+ while (true) {
+ try {
+ standaloneServerFactory = new NIOServerCnxnFactory();
+ standaloneServerFactory.configure(
+ new InetSocketAddress(currentClientPort),
+ configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
+ } catch (BindException e) {
+ LOG.debug("Failed binding ZK Server to client port: " +
+ currentClientPort, e);
+ // We're told to use some port but it's occupied, fail
+ if (hasValidClientPortInList(i)) {
+ return -1;
+ }
+ // This port is already in use, try to use another.
+ tentativePort = selectClientPort(tentativePort);
+ currentClientPort = tentativePort;
+ continue;
+ }
+ break;
+ }
+
+ // Start up this ZK server
+ standaloneServerFactory.startup(server);
+ // Runs a 'stat' against the servers.
+ if (!waitForServerUp(currentClientPort, connectionTimeout)) {
+ throw new IOException("Waiting for startup of standalone server");
+ }
+
+ // We have selected a port as a client port. Update clientPortList if necessary.
+ if (clientPortList.size() <= i) { // it is not in the list, add the port
+ clientPortList.add(currentClientPort);
+ }
+ else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
+ clientPortList.remove(i);
+ clientPortList.add(i, currentClientPort);
+ }
+
+ standaloneServerFactoryList.add(standaloneServerFactory);
+ zooKeeperServers.add(server);
+ }
+
+ // set the first one to be active ZK; Others are backups
+ activeZKServerIndex = 0;
+ started = true;
+ int clientPort = clientPortList.get(activeZKServerIndex);
+ LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' " +
+ "on client port=" + clientPort);
+ return clientPort;
+ }
+
+ private void createDir(File dir) throws IOException {
+ try {
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+ } catch (SecurityException e) {
+ throw new IOException("creating dir: " + dir, e);
+ }
+ }
+
+ /**
+ * @throws IOException
+ */
+ public void shutdown() throws IOException {
+ // shut down all the zk servers
+ for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(i);
+ int clientPort = clientPortList.get(i);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, connectionTimeout)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+ }
+ standaloneServerFactoryList.clear();
+
+ for (ZooKeeperServer zkServer: zooKeeperServers) {
+ //explicitly close ZKDatabase since ZookeeperServer does not close them
+ zkServer.getZKDatabase().close();
+ }
+ zooKeeperServers.clear();
+
+ // clear everything
+ if (started) {
+ started = false;
+ activeZKServerIndex = 0;
+ clientPortList.clear();
+ LOG.info("Shutdown MiniZK cluster with all ZK servers");
+ }
+ }
+
+ /**@return clientPort return clientPort if there is another ZK backup can run
+ * when killing the current active; return -1, if there is no backups.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public int killCurrentActiveZooKeeperServer() throws IOException,
+ InterruptedException {
+ if (!started || activeZKServerIndex < 0) {
+ return -1;
+ }
+
+ // Shutdown the current active one
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(activeZKServerIndex);
+ int clientPort = clientPortList.get(activeZKServerIndex);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, connectionTimeout)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+
+ zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
+
+ // remove the current active zk server
+ standaloneServerFactoryList.remove(activeZKServerIndex);
+ clientPortList.remove(activeZKServerIndex);
+ zooKeeperServers.remove(activeZKServerIndex);
+ LOG.info("Kill the current active ZK servers in the cluster " +
+ "on client port: " + clientPort);
+
+ if (standaloneServerFactoryList.isEmpty()) {
+ // there is no backup servers;
+ return -1;
+ }
+ clientPort = clientPortList.get(activeZKServerIndex);
+ LOG.info("Activate a backup zk server in the cluster " +
+ "on client port: " + clientPort);
+ // return the next back zk server's port
+ return clientPort;
+ }
+
+ /**
+ * Kill one back up ZK servers
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void killOneBackupZooKeeperServer() throws IOException,
+ InterruptedException {
+ if (!started || activeZKServerIndex < 0 ||
+ standaloneServerFactoryList.size() <= 1) {
+ return ;
+ }
+
+ int backupZKServerIndex = activeZKServerIndex+1;
+ // Shutdown the current active one
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(backupZKServerIndex);
+ int clientPort = clientPortList.get(backupZKServerIndex);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, connectionTimeout)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+
+ zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
+
+ // remove this backup zk server
+ standaloneServerFactoryList.remove(backupZKServerIndex);
+ clientPortList.remove(backupZKServerIndex);
+ zooKeeperServers.remove(backupZKServerIndex);
+ LOG.info("Kill one backup ZK servers in the cluster " +
+ "on client port: " + clientPort);
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerDown(int port, long timeout) throws IOException {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = new Socket("localhost", port);
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+ } finally {
+ sock.close();
+ }
+ } catch (IOException e) {
+ return true;
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+ }
+ }
+ return false;
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerUp(int port, long timeout) throws IOException {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = new Socket("localhost", port);
+ BufferedReader reader = null;
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+
+ Reader isr = new InputStreamReader(sock.getInputStream());
+ reader = new BufferedReader(isr);
+ String line = reader.readLine();
+ if (line != null && line.startsWith("Zookeeper version:")) {
+ return true;
+ }
+ } finally {
+ sock.close();
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ } catch (IOException e) {
+ // ignore as this is expected
+ LOG.info("server localhost:" + port + " not up " + e);
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+ }
+ }
+ return false;
+ }
+
+ public int getClientPort() {
+ return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
+ : clientPortList.get(activeZKServerIndex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java
new file mode 100644
index 0000000..da7d176
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+/**
+ * Placeholder of a watcher which might be triggered before the instance is not yet created.
+ * <p>
+ * {@code ZooKeeper} starts its event thread within its constructor (and that is an anti-pattern),
+ * and the watcher passed to the constructor might be called back by the event thread
+ * before you get the instance of {@code ZooKeeper} from the constructor.
+ * If your watcher calls methods of {@code ZooKeeper},
+ * pass this placeholder to the constructor of the {@code ZooKeeper},
+ * create your watcher using the instance of {@code ZooKeeper},
+ * and then call the method {@code PendingWatcher.prepare}.
+ */
+class PendingWatcher implements Watcher {
+ private final InstancePending<Watcher> pending = new InstancePending<>();
+
+ @Override
+ public void process(WatchedEvent event) {
+ pending.get().process(event);
+ }
+
+ /**
+ * Associates the substantial watcher of processing events.
+ * This method should be called once, and {@code watcher} should be non-null.
+ * This method is expected to call as soon as possible
+ * because the event processing, being invoked by the ZooKeeper event thread,
+ * is uninterruptibly blocked until this method is called.
+ */
+ void prepare(Watcher watcher) {
+ pending.prepare(watcher);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
new file mode 100644
index 0000000..d6c11af
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
@@ -0,0 +1,810 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.htrace.core.TraceScope;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+
+/**
+ * A zookeeper that can handle 'recoverable' errors.
+ * To handle recoverable errors, developers need to realize that there are two
+ * classes of requests: idempotent and non-idempotent requests. Read requests
+ * and unconditional sets and deletes are examples of idempotent requests, they
+ * can be reissued with the same results.
+ * (Although, the delete may throw a NoNodeException on reissue its effect on
+ * the ZooKeeper state is the same.) Non-idempotent requests need special
+ * handling, application and library writers need to keep in mind that they may
+ * need to encode information in the data or name of znodes to detect
+ * retries. A simple example is a create that uses a sequence flag.
+ * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
+ * loss exception, that process will reissue another
+ * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
+ * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
+ * that x-109 was the result of the previous create, so the process actually
+ * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
+ * when doing the create. If the process is using an id of 352, before reissuing
+ * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
+ * "x-352-109", x-333-110". The process will know that the original create
+ * succeeded an the znode it created is "x-352-109".
+ * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
+ */
+@InterfaceAudience.Private
+public class RecoverableZooKeeper {
+ private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
+ // the actual ZooKeeper client instance
+ private ZooKeeper zk;
+ private final RetryCounterFactory retryCounterFactory;
+ // An identifier of this process in the cluster
+ private final String identifier;
+ private final byte[] id;
+ private Watcher watcher;
+ private int sessionTimeout;
+ private String quorumServers;
+ private final ZKMetricsListener metrics;
+
+ public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
+ Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime)
+ throws IOException {
+ this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime,
+ null);
+ }
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
+ justification="None. Its always been this way.")
+ public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
+ Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier)
+ throws IOException {
+ // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
+ this.retryCounterFactory =
+ new RetryCounterFactory(maxRetries+1, retryIntervalMillis, maxSleepTime);
+
+ if (identifier == null || identifier.length() == 0) {
+ // the identifier = processID@hostName
+ identifier = ManagementFactory.getRuntimeMXBean().getName();
+ }
+ LOG.info("Process identifier=" + identifier +
+ " connecting to ZooKeeper ensemble=" + quorumServers);
+ this.identifier = identifier;
+ this.id = Bytes.toBytes(identifier);
+
+ this.watcher = watcher;
+ this.sessionTimeout = sessionTimeout;
+ this.quorumServers = quorumServers;
+ this.metrics = new ZKMetrics();
+ try {checkZk();} catch (Exception x) {/* ignore */}
+ }
+
+ /**
+ * Try to create a ZooKeeper connection. Turns any exception encountered into a
+ * KeeperException.OperationTimeoutException so it can retried.
+ * @return The created ZooKeeper connection object
+ * @throws KeeperException
+ */
+ protected synchronized ZooKeeper checkZk() throws KeeperException {
+ if (this.zk == null) {
+ try {
+ this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
+ } catch (IOException ex) {
+ LOG.warn("Unable to create ZooKeeper Connection", ex);
+ throw new KeeperException.OperationTimeoutException();
+ }
+ }
+ return zk;
+ }
+
+ public synchronized void reconnectAfterExpiration()
+ throws IOException, KeeperException, InterruptedException {
+ if (zk != null) {
+ LOG.info("Closing dead ZooKeeper connection, session" +
+ " was: 0x"+Long.toHexString(zk.getSessionId()));
+ zk.close();
+ // reset the ZooKeeper connection
+ zk = null;
+ }
+ checkZk();
+ LOG.info("Recreated a ZooKeeper, session" +
+ " is: 0x"+Long.toHexString(zk.getSessionId()));
+ }
+
+ /**
+ * delete is an idempotent operation. Retry before throwing exception.
+ * This function will not throw NoNodeException if the path does not
+ * exist.
+ */
+ public void delete(String path, int version) throws InterruptedException, KeeperException {
+ try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.delete")) {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ boolean isRetry = false; // False for first attempt, true for all retries.
+ while (true) {
+ try {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ checkZk().delete(path, version);
+ this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ return;
+ } catch (KeeperException e) {
+ this.metrics.registerFailedZKCall();
+ switch (e.code()) {
+ case NONODE:
+ if (isRetry) {
+ LOG.debug("Node " + path + " already deleted. Assuming a " +
+ "previous attempt succeeded.");
+ return;
+ }
+ LOG.debug("Node " + path + " already deleted, retry=" + isRetry);
+ throw e;
+
+ case CONNECTIONLOSS:
+ this.metrics.registerConnectionLossException();
+ retryOrThrow(retryCounter, e, "delete");
+ break;
+ case OPERATIONTIMEOUT:
+ this.metrics.registerOperationTimeoutException();
+ retryOrThrow(retryCounter, e, "delete");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ isRetry = true;
+ }
+ }
+ }
+
+ /**
+ * exists is an idempotent operation. Retry before throwing exception
+ * @return A Stat instance
+ */
+ public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
+ try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ Stat nodeStat = checkZk().exists(path, watcher);
+ this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ return nodeStat;
+ } catch (KeeperException e) {
+ this.metrics.registerFailedZKCall();
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ this.metrics.registerConnectionLossException();
+ retryOrThrow(retryCounter, e, "exists");
+ break;
+ case OPERATIONTIMEOUT:
+ this.metrics.registerOperationTimeoutException();
+ retryOrThrow(retryCounter, e, "exists");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+ }
+
+ /**
+ * exists is an idempotent operation. Retry before throwing exception
+ * @return A Stat instance
+ */
+ public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
+ try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ Stat nodeStat = checkZk().exists(path, watch);
+ this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ return nodeStat;
+ } catch (KeeperException e) {
+ this.metrics.registerFailedZKCall();
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ this.metrics.registerConnectionLossException();
+ retryOrThrow(retryCounter, e, "exists");
+ break;
+ case OPERATIONTIMEOUT:
+ this.metrics.registerOperationTimeoutException();
+ retryOrThrow(retryCounter, e, "exists");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+ }
+
+ private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
+ String opName) throws KeeperException {
+ if (!retryCounter.shouldRetry()) {
+ LOG.error("ZooKeeper " + opName + " failed after "
+ + retryCounter.getMaxAttempts() + " attempts");
+ throw e;
+ }
+ LOG.debug("Retry, connectivity issue (JVM Pause?); quorum=" + quorumServers + "," +
+ "exception=" + e);
+ }
+
+ /**
+ * getChildren is an idempotent operation. Retry before throwing exception
+ * @return List of children znodes
+ */
+ public List<String> getChildren(String path, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ List<String> children = checkZk().getChildren(path, watcher);
+ this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ return children;
+ } catch (KeeperException e) {
+ this.metrics.registerFailedZKCall();
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ this.metrics.registerConnectionLossException();
+ retryOrThrow(retryCounter, e, "getChildren");
+ break;
+ case OPERATIONTIMEOUT:
+ this.metrics.registerOperationTimeoutException();
+ retryOrThrow(retryCounter, e, "getChildren");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+ }
+
+ /**
+ * getChildren is an idempotent operation. Retry before throwing exception
+ * @return List of children znodes
+ */
+ public List<String> getChildren(String path, boolean watch)
+ throws KeeperException, InterruptedException {
+ try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ List<String> children = checkZk().getChildren(path, watch);
+ this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ return children;
+ } catch (KeeperException e) {
+ this.metrics.registerFailedZKCall();
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ this.metrics.registerConnectionLossException();
+ retryOrThrow(retryCounter, e, "getChildren");
+ break;
+ case OPERATIONTIMEOUT:
+ this.metrics.registerOperationTimeoutException();
+ retryOrThrow(retryCounter, e, "getChildren");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+ }
+
+ /**
+ * getData is an idempotent operation. Retry before throwing exception
+ * @return Data
+ */
+ public byte[] getData(String path, Watcher watcher, Stat stat)
+ throws KeeperException, InterruptedException {
+ try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ byte[] revData = checkZk().getData(path, watcher, stat);
+ this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ return ZKMetadata.removeMetaData(revData);
+ } catch (KeeperException e) {
+ this.metrics.registerFailedZKCall();
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ this.metrics.registerConnectionLossException();
+ retryOrThrow(retryCounter, e, "getData");
+ break;
+ case OPERATIONTIMEOUT:
+ this.metrics.registerOperationTimeoutException();
+ retryOrThrow(retryCounter, e, "getData");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+ }
+
+ /**
+ * getData is an idempotent operation. Retry before throwing exception
+ * @return Data
+ */
+ public byte[] getData(String path, boolean watch, Stat stat)
+ throws KeeperException, InterruptedException {
+ try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ byte[] revData = checkZk().getData(path, watch, stat);
+ this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ return ZKMetadata.removeMetaData(revData);
+ } catch (KeeperException e) {
+ this.metrics.registerFailedZKCall();
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ this.metrics.registerConnectionLossException();
+ retryOrThrow(retryCounter, e, "getData");
+ break;
+ case OPERATIONTIMEOUT:
+ this.metrics.registerOperationTimeoutException();
+ retryOrThrow(retryCounter, e, "getData");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+ }
+
+ /**
+ * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
+ * Adding an identifier field into the data to check whether
+ * badversion is caused by the result of previous correctly setData
+ * @return Stat instance
+ */
+ public Stat setData(String path, byte[] data, int version)
+ throws KeeperException, InterruptedException {
+ try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setData")) {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ byte[] newData = ZKMetadata.appendMetaData(id, data);
+ boolean isRetry = false;
+ long startTime;
+ while (true) {
+ try {
+ startTime = EnvironmentEdgeManager.currentTime();
+ Stat nodeStat = checkZk().setData(path, newData, version);
+ this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ return nodeStat;
+ } catch (KeeperException e) {
+ this.metrics.registerFailedZKCall();
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ this.metrics.registerConnectionLossException();
+ retryOrThrow(retryCounter, e, "setData");
+ break;
+ case OPERATIONTIMEOUT:
+ this.metrics.registerOperationTimeoutException();
+ retryOrThrow(retryCounter, e, "setData");
+ break;
+ case BADVERSION:
+ if (isRetry) {
+ // try to verify whether the previous setData success or not
+ try{
+ Stat stat = new Stat();
+ startTime = EnvironmentEdgeManager.currentTime();
+ byte[] revData = checkZk().getData(path, false, stat);
+ this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ if(Bytes.compareTo(revData, newData) == 0) {
+ // the bad version is caused by previous successful setData
+ return stat;
+ }
+ } catch(KeeperException keeperException){
+ this.metrics.registerFailedZKCall();
+ // the ZK is not reliable at this moment. just throwing exception
+ throw keeperException;
+ }
+ }
+ // throw other exceptions and verified bad version exceptions
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ isRetry = true;
+ }
+ }
+ }
+
+ /**
+ * getAcl is an idempotent operation. Retry before throwing exception
+ * @return list of ACLs
+ */
+ public List<ACL> getAcl(String path, Stat stat)
+ throws KeeperException, InterruptedException {
+ try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ List<ACL> nodeACL = checkZk().getACL(path, stat);
+ this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ return nodeACL;
+ } catch (KeeperException e) {
+ this.metrics.registerFailedZKCall();
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ this.metrics.registerConnectionLossException();
+ retryOrThrow(retryCounter, e, "getAcl");
+ break;
+ case OPERATIONTIMEOUT:
+ this.metrics.registerOperationTimeoutException();
+ retryOrThrow(retryCounter, e, "getAcl");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+ }
+
+ /**
+ * setAcl is an idempotent operation. Retry before throwing exception
+ * @return list of ACLs
+ */
+ public Stat setAcl(String path, List<ACL> acls, int version)
+ throws KeeperException, InterruptedException {
+ try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setAcl")) {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ Stat nodeStat = checkZk().setACL(path, acls, version);
+ this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ return nodeStat;
+ } catch (KeeperException e) {
+ this.metrics.registerFailedZKCall();
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ this.metrics.registerConnectionLossException();
+ retryOrThrow(retryCounter, e, "setAcl");
+ break;
+ case OPERATIONTIMEOUT:
+ this.metrics.registerOperationTimeoutException();
+ retryOrThrow(retryCounter, e, "setAcl");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+ }
+
+ /**
+ * <p>
+ * NONSEQUENTIAL create is idempotent operation.
+ * Retry before throwing exceptions.
+ * But this function will not throw the NodeExist exception back to the
+ * application.
+ * </p>
+ * <p>
+ * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
+ * identifier to the path to verify, whether the previous one is successful
+ * or not.
+ * </p>
+ *
+ * @return Path
+ */
+ public String create(String path, byte[] data, List<ACL> acl,
+ CreateMode createMode)
+ throws KeeperException, InterruptedException {
+ try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.create")) {
+ byte[] newData = ZKMetadata.appendMetaData(id, data);
+ switch (createMode) {
+ case EPHEMERAL:
+ case PERSISTENT:
+ return createNonSequential(path, newData, acl, createMode);
+
+ case EPHEMERAL_SEQUENTIAL:
+ case PERSISTENT_SEQUENTIAL:
+ return createSequential(path, newData, acl, createMode);
+
+ default:
+ throw new IllegalArgumentException("Unrecognized CreateMode: " +
+ createMode);
+ }
+ }
+ }
+
+ private String createNonSequential(String path, byte[] data, List<ACL> acl,
+ CreateMode createMode) throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ boolean isRetry = false; // False for first attempt, true for all retries.
+ long startTime;
+ while (true) {
+ try {
+ startTime = EnvironmentEdgeManager.currentTime();
+ String nodePath = checkZk().create(path, data, acl, createMode);
+ this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ return nodePath;
+ } catch (KeeperException e) {
+ this.metrics.registerFailedZKCall();
+ switch (e.code()) {
+ case NODEEXISTS:
+ if (isRetry) {
+ // If the connection was lost, there is still a possibility that
+ // we have successfully created the node at our previous attempt,
+ // so we read the node and compare.
+ startTime = EnvironmentEdgeManager.currentTime();
+ byte[] currentData = checkZk().getData(path, false, null);
+ this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ if (currentData != null &&
+ Bytes.compareTo(currentData, data) == 0) {
+ // We successfully created a non-sequential node
+ return path;
+ }
+ LOG.error("Node " + path + " already exists with " +
+ Bytes.toStringBinary(currentData) + ", could not write " +
+ Bytes.toStringBinary(data));
+ throw e;
+ }
+ LOG.debug("Node " + path + " already exists");
+ throw e;
+
+ case CONNECTIONLOSS:
+ this.metrics.registerConnectionLossException();
+ retryOrThrow(retryCounter, e, "create");
+ break;
+ case OPERATIONTIMEOUT:
+ this.metrics.registerOperationTimeoutException();
+ retryOrThrow(retryCounter, e, "create");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ isRetry = true;
+ }
+ }
+
+ private String createSequential(String path, byte[] data,
+ List<ACL> acl, CreateMode createMode)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ boolean first = true;
+ String newPath = path+this.identifier;
+ while (true) {
+ try {
+ if (!first) {
+ // Check if we succeeded on a previous attempt
+ String previousResult = findPreviousSequentialNode(newPath);
+ if (previousResult != null) {
+ return previousResult;
+ }
+ }
+ first = false;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ String nodePath = checkZk().create(newPath, data, acl, createMode);
+ this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ return nodePath;
+ } catch (KeeperException e) {
+ this.metrics.registerFailedZKCall();
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ this.metrics.registerConnectionLossException();
+ retryOrThrow(retryCounter, e, "create");
+ break;
+ case OPERATIONTIMEOUT:
+ this.metrics.registerOperationTimeoutException();
+ retryOrThrow(retryCounter, e, "create");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+ /**
+ * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op
+ * instances to actually pass to multi (need to do this in order to appendMetaData).
+ */
+ private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
+ throws UnsupportedOperationException {
+ if(ops == null) return null;
+
+ List<Op> preparedOps = new LinkedList<>();
+ for (Op op : ops) {
+ if (op.getType() == ZooDefs.OpCode.create) {
+ CreateRequest create = (CreateRequest)op.toRequestRecord();
+ preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()),
+ create.getAcl(), create.getFlags()));
+ } else if (op.getType() == ZooDefs.OpCode.delete) {
+ // no need to appendMetaData for delete
+ preparedOps.add(op);
+ } else if (op.getType() == ZooDefs.OpCode.setData) {
+ SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
+ preparedOps.add(Op.setData(setData.getPath(), ZKMetadata.appendMetaData(id, setData.getData()),
+ setData.getVersion()));
+ } else {
+ throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
+ }
+ }
+ return preparedOps;
+ }
+
+ /**
+ * Run multiple operations in a transactional manner. Retry before throwing exception
+ */
+ public List<OpResult> multi(Iterable<Op> ops)
+ throws KeeperException, InterruptedException {
+ try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ Iterable<Op> multiOps = prepareZKMulti(ops);
+ while (true) {
+ try {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ List<OpResult> opResults = checkZk().multi(multiOps);
+ this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ return opResults;
+ } catch (KeeperException e) {
+ this.metrics.registerFailedZKCall();
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ this.metrics.registerConnectionLossException();
+ retryOrThrow(retryCounter, e, "multi");
+ break;
+ case OPERATIONTIMEOUT:
+ this.metrics.registerOperationTimeoutException();
+ retryOrThrow(retryCounter, e, "multi");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+ }
+
+ private String findPreviousSequentialNode(String path)
+ throws KeeperException, InterruptedException {
+ int lastSlashIdx = path.lastIndexOf('/');
+ assert(lastSlashIdx != -1);
+ String parent = path.substring(0, lastSlashIdx);
+ String nodePrefix = path.substring(lastSlashIdx+1);
+ long startTime = EnvironmentEdgeManager.currentTime();
+ List<String> nodes = checkZk().getChildren(parent, false);
+ this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ List<String> matching = filterByPrefix(nodes, nodePrefix);
+ for (String node : matching) {
+ String nodePath = parent + "/" + node;
+ startTime = EnvironmentEdgeManager.currentTime();
+ Stat stat = checkZk().exists(nodePath, false);
+ this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ if (stat != null) {
+ return nodePath;
+ }
+ }
+ return null;
+ }
+
+ public synchronized long getSessionId() {
+ return zk == null ? -1 : zk.getSessionId();
+ }
+
+ public synchronized void close() throws InterruptedException {
+ if (zk != null) zk.close();
+ }
+
+ public synchronized States getState() {
+ return zk == null ? null : zk.getState();
+ }
+
+ public synchronized ZooKeeper getZooKeeper() {
+ return zk;
+ }
+
+ public synchronized byte[] getSessionPasswd() {
+ return zk == null ? null : zk.getSessionPasswd();
+ }
+
+ public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
+ long startTime = EnvironmentEdgeManager.currentTime();
+ checkZk().sync(path, cb, null);
+ this.metrics.registerSyncOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1));
+ }
+
+ /**
+ * Filters the given node list by the given prefixes.
+ * This method is all-inclusive--if any element in the node list starts
+ * with any of the given prefixes, then it is included in the result.
+ *
+ * @param nodes the nodes to filter
+ * @param prefixes the prefixes to include in the result
+ * @return list of every element that starts with one of the prefixes
+ */
+ private static List<String> filterByPrefix(List<String> nodes,
+ String... prefixes) {
+ List<String> lockChildren = new ArrayList<>();
+ for (String child : nodes){
+ for (String prefix : prefixes){
+ if (child.startsWith(prefix)){
+ lockChildren.add(child);
+ break;
+ }
+ }
+ }
+ return lockChildren;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionNormalizerTracker.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionNormalizerTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionNormalizerTracker.java
new file mode 100644
index 0000000..93545ee
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionNormalizerTracker.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionNormalizerProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+
+/**
+ * Tracks region normalizer state up in ZK
+ */
+public class RegionNormalizerTracker extends ZKNodeTracker {
+ private static final Log LOG = LogFactory.getLog(RegionNormalizerTracker.class);
+
+ public RegionNormalizerTracker(ZKWatcher watcher,
+ Abortable abortable) {
+ super(watcher, watcher.znodePaths.regionNormalizerZNode, abortable);
+ }
+
+ /**
+ * Return true if region normalizer is on, false otherwise
+ */
+ public boolean isNormalizerOn() {
+ byte [] upData = super.getData(false);
+ try {
+ // if data in ZK is null, use default of on.
+ return upData == null || parseFrom(upData).getNormalizerOn();
+ } catch (DeserializationException dex) {
+ LOG.error("ZK state for RegionNormalizer could not be parsed "
+ + Bytes.toStringBinary(upData));
+ // return false to be safe.
+ return false;
+ }
+ }
+
+ /**
+ * Set region normalizer on/off
+ * @param normalizerOn whether normalizer should be on or off
+ * @throws KeeperException
+ */
+ public void setNormalizerOn(boolean normalizerOn) throws KeeperException {
+ byte [] upData = toByteArray(normalizerOn);
+ try {
+ ZKUtil.setData(watcher, watcher.znodePaths.regionNormalizerZNode, upData);
+ } catch(KeeperException.NoNodeException nne) {
+ ZKUtil.createAndWatch(watcher, watcher.znodePaths.regionNormalizerZNode, upData);
+ }
+ super.nodeDataChanged(watcher.znodePaths.regionNormalizerZNode);
+ }
+
+ private byte [] toByteArray(boolean isNormalizerOn) {
+ RegionNormalizerProtos.RegionNormalizerState.Builder builder =
+ RegionNormalizerProtos.RegionNormalizerState.newBuilder();
+ builder.setNormalizerOn(isNormalizerOn);
+ return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
+ }
+
+ private RegionNormalizerProtos.RegionNormalizerState parseFrom(byte [] pbBytes)
+ throws DeserializationException {
+ ProtobufUtil.expectPBMagicPrefix(pbBytes);
+ RegionNormalizerProtos.RegionNormalizerState.Builder builder =
+ RegionNormalizerProtos.RegionNormalizerState.newBuilder();
+ try {
+ int magicLen = ProtobufUtil.lengthOfPBMagic();
+ ProtobufUtil.mergeFrom(builder, pbBytes, magicLen, pbBytes.length - magicLen);
+ } catch (IOException e) {
+ throw new DeserializationException(e);
+ }
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java
new file mode 100644
index 0000000..4150f54
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * You may add the jaas.conf option
+ * -Djava.security.auth.login.config=/PATH/jaas.conf
+ *
+ * You may also specify -D to set options
+ * "hbase.zookeeper.quorum" (it should be in hbase-site.xml)
+ * "zookeeper.znode.parent" (it should be in hbase-site.xml)
+ *
+ * Use -set-acls to set the ACLs, no option to erase ACLs
+ */
+@InterfaceAudience.Private
+public class ZKAclReset extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(ZKAclReset.class);
+
+ private static void resetAcls(final ZKWatcher zkw, final String znode,
+ final boolean eraseAcls) throws Exception {
+ List<String> children = ZKUtil.listChildrenNoWatch(zkw, znode);
+ if (children != null) {
+ for (String child: children) {
+ resetAcls(zkw, ZNodePaths.joinZNode(znode, child), eraseAcls);
+ }
+ }
+
+ ZooKeeper zk = zkw.getRecoverableZooKeeper().getZooKeeper();
+ if (eraseAcls) {
+ LOG.info(" - erase ACLs for " + znode);
+ zk.setACL(znode, ZooDefs.Ids.OPEN_ACL_UNSAFE, -1);
+ } else {
+ LOG.info(" - set ACLs for " + znode);
+ zk.setACL(znode, ZKUtil.createACL(zkw, znode, true), -1);
+ }
+ }
+
+ private static void resetAcls(final Configuration conf, boolean eraseAcls)
+ throws Exception {
+ ZKWatcher zkw = new ZKWatcher(conf, "ZKAclReset", null);
+ try {
+ LOG.info((eraseAcls ? "Erase" : "Set") + " HBase ACLs for " +
+ zkw.getQuorum() + " " + zkw.znodePaths.baseZNode);
+ resetAcls(zkw, zkw.znodePaths.baseZNode, eraseAcls);
+ } finally {
+ zkw.close();
+ }
+ }
+
+ private void printUsageAndExit() {
+ System.err.printf("Usage: hbase %s [options]%n", getClass().getName());
+ System.err.println(" where [options] are:");
+ System.err.println(" -h|-help Show this help and exit.");
+ System.err.println(" -set-acls Setup the hbase znode ACLs for a secure cluster");
+ System.err.println();
+ System.err.println("Examples:");
+ System.err.println(" To reset the ACLs to the unsecure cluster behavior:");
+ System.err.println(" hbase " + getClass().getName());
+ System.err.println();
+ System.err.println(" To reset the ACLs to the secure cluster behavior:");
+ System.err.println(" hbase " + getClass().getName() + " -set-acls");
+ System.exit(1);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ boolean eraseAcls = true;
+
+ for (int i = 0; i < args.length; ++i) {
+ if (args[i].equals("-help")) {
+ printUsageAndExit();
+ } else if (args[i].equals("-set-acls")) {
+ eraseAcls = false;
+ } else {
+ printUsageAndExit();
+ }
+ }
+
+ resetAcls(getConf(), eraseAcls);
+ return(0);
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(HBaseConfiguration.create(), new ZKAclReset(), args));
+ }
+}