You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2013/02/25 23:50:29 UTC
svn commit: r1449950 [25/35] - in /hbase/trunk: ./ hbase-client/
hbase-client/src/ hbase-client/src/main/ hbase-client/src/main/java/
hbase-client/src/main/java/org/ hbase-client/src/main/java/org/apache/
hbase-client/src/main/java/org/apache/hadoop/ h...
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,672 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A zookeeper that can handle 'recoverable' errors.
+ * To handle recoverable errors, developers need to realize that there are two
+ * classes of requests: idempotent and non-idempotent requests. Read requests
+ * and unconditional sets and deletes are examples of idempotent requests, they
+ * can be reissued with the same results.
+ * (Although, the delete may throw a NoNodeException on reissue its effect on
+ * the ZooKeeper state is the same.) Non-idempotent requests need special
+ * handling, application and library writers need to keep in mind that they may
+ * need to encode information in the data or name of znodes to detect
+ * retries. A simple example is a create that uses a sequence flag.
+ * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
+ * loss exception, that process will reissue another
+ * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
+ * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
+ * that x-109 was the result of the previous create, so the process actually
+ * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
+ * when doing the create. If the process is using an id of 352, before reissuing
+ * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
+ * "x-352-109", x-333-110". The process will know that the original create
+ * succeeded an the znode it created is "x-352-109".
+ * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RecoverableZooKeeper {
+ private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
+ // the actual ZooKeeper client instance
+ volatile private ZooKeeper zk;
+ private final RetryCounterFactory retryCounterFactory;
+ // An identifier of this process in the cluster
+ private final String identifier;
+ private final byte[] id;
+ private Watcher watcher;
+ private int sessionTimeout;
+ private String quorumServers;
+ private final Random salter;
+
+ // The metadata attached to each piece of data has the
+ // format:
+ // <magic> 1-byte constant
+ // <id length> 4-byte big-endian integer (length of next field)
+ // <id> identifier corresponding uniquely to this process
+ // It is prepended to the data supplied by the user.
+
+ // the magic number is to be backward compatible
+ private static final byte MAGIC =(byte) 0XFF;
+ private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
+ private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
+ private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT;
+
+ public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
+ Watcher watcher, int maxRetries, int retryIntervalMillis)
+ throws IOException {
+ this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
+ null);
+ }
+
+ public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
+ Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
+ throws IOException {
+ this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
+ this.retryCounterFactory =
+ new RetryCounterFactory(maxRetries, retryIntervalMillis);
+
+ if (identifier == null || identifier.length() == 0) {
+ // the identifier = processID@hostName
+ identifier = ManagementFactory.getRuntimeMXBean().getName();
+ }
+ LOG.info("The identifier of this process is " + identifier);
+ this.identifier = identifier;
+ this.id = Bytes.toBytes(identifier);
+
+ this.watcher = watcher;
+ this.sessionTimeout = sessionTimeout;
+ this.quorumServers = quorumServers;
+ salter = new SecureRandom();
+ }
+
+ public void reconnectAfterExpiration()
+ throws IOException, InterruptedException {
+ LOG.info("Closing dead ZooKeeper connection, session" +
+ " was: 0x"+Long.toHexString(zk.getSessionId()));
+ zk.close();
+ this.zk = new ZooKeeper(this.quorumServers,
+ this.sessionTimeout, this.watcher);
+ LOG.info("Recreated a ZooKeeper, session" +
+ " is: 0x"+Long.toHexString(zk.getSessionId()));
+ }
+
+ /**
+ * delete is an idempotent operation. Retry before throwing exception.
+ * This function will not throw NoNodeException if the path does not
+ * exist.
+ */
+ public void delete(String path, int version)
+ throws InterruptedException, KeeperException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ boolean isRetry = false; // False for first attempt, true for all retries.
+ while (true) {
+ try {
+ zk.delete(path, version);
+ return;
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case NONODE:
+ if (isRetry) {
+ LOG.info("Node " + path + " already deleted. Assuming a " +
+ "previous attempt succeeded.");
+ return;
+ }
+ LOG.warn("Node " + path + " already deleted, retry=" + isRetry);
+ throw e;
+
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case OPERATIONTIMEOUT:
+ retryOrThrow(retryCounter, e, "delete");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ isRetry = true;
+ }
+ }
+
+ /**
+ * exists is an idempotent operation. Retry before throwing exception
+ * @return A Stat instance
+ */
+ public Stat exists(String path, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ return zk.exists(path, watcher);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case OPERATIONTIMEOUT:
+ retryOrThrow(retryCounter, e, "exists");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ /**
+ * exists is an idempotent operation. Retry before throwing exception
+ * @return A Stat instance
+ */
+ public Stat exists(String path, boolean watch)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ return zk.exists(path, watch);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case OPERATIONTIMEOUT:
+ retryOrThrow(retryCounter, e, "exists");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
+ String opName) throws KeeperException {
+ LOG.warn("Possibly transient ZooKeeper exception: " + e);
+ if (!retryCounter.shouldRetry()) {
+ LOG.error("ZooKeeper " + opName + " failed after "
+ + retryCounter.getMaxRetries() + " retries");
+ throw e;
+ }
+ }
+
+ /**
+ * getChildren is an idempotent operation. Retry before throwing exception
+ * @return List of children znodes
+ */
+ public List<String> getChildren(String path, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ return zk.getChildren(path, watcher);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case OPERATIONTIMEOUT:
+ retryOrThrow(retryCounter, e, "getChildren");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ /**
+ * getChildren is an idempotent operation. Retry before throwing exception
+ * @return List of children znodes
+ */
+ public List<String> getChildren(String path, boolean watch)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ return zk.getChildren(path, watch);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case OPERATIONTIMEOUT:
+ retryOrThrow(retryCounter, e, "getChildren");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ /**
+ * getData is an idempotent operation. Retry before throwing exception
+ * @return Data
+ */
+ public byte[] getData(String path, Watcher watcher, Stat stat)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ byte[] revData = zk.getData(path, watcher, stat);
+ return this.removeMetaData(revData);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case OPERATIONTIMEOUT:
+ retryOrThrow(retryCounter, e, "getData");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ /**
+ * getData is an idemnpotent operation. Retry before throwing exception
+ * @return Data
+ */
+ public byte[] getData(String path, boolean watch, Stat stat)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try {
+ byte[] revData = zk.getData(path, watch, stat);
+ return this.removeMetaData(revData);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case OPERATIONTIMEOUT:
+ retryOrThrow(retryCounter, e, "getData");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ /**
+ * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
+ * Adding an identifier field into the data to check whether
+ * badversion is caused by the result of previous correctly setData
+ * @return Stat instance
+ */
+ public Stat setData(String path, byte[] data, int version)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ byte[] newData = appendMetaData(data);
+ boolean isRetry = false;
+ while (true) {
+ try {
+ return zk.setData(path, newData, version);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case OPERATIONTIMEOUT:
+ retryOrThrow(retryCounter, e, "setData");
+ break;
+ case BADVERSION:
+ if (isRetry) {
+ // try to verify whether the previous setData success or not
+ try{
+ Stat stat = new Stat();
+ byte[] revData = zk.getData(path, false, stat);
+ if(Bytes.compareTo(revData, newData) == 0) {
+ // the bad version is caused by previous successful setData
+ return stat;
+ }
+ } catch(KeeperException keeperException){
+ // the ZK is not reliable at this moment. just throwing exception
+ throw keeperException;
+ }
+ }
+ // throw other exceptions and verified bad version exceptions
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ isRetry = true;
+ }
+ }
+
+ /**
+ * <p>
+ * NONSEQUENTIAL create is idempotent operation.
+ * Retry before throwing exceptions.
+ * But this function will not throw the NodeExist exception back to the
+ * application.
+ * </p>
+ * <p>
+ * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
+ * identifier to the path to verify, whether the previous one is successful
+ * or not.
+ * </p>
+ *
+ * @return Path
+ */
+ public String create(String path, byte[] data, List<ACL> acl,
+ CreateMode createMode)
+ throws KeeperException, InterruptedException {
+ byte[] newData = appendMetaData(data);
+ switch (createMode) {
+ case EPHEMERAL:
+ case PERSISTENT:
+ return createNonSequential(path, newData, acl, createMode);
+
+ case EPHEMERAL_SEQUENTIAL:
+ case PERSISTENT_SEQUENTIAL:
+ return createSequential(path, newData, acl, createMode);
+
+ default:
+ throw new IllegalArgumentException("Unrecognized CreateMode: " +
+ createMode);
+ }
+ }
+
+ private String createNonSequential(String path, byte[] data, List<ACL> acl,
+ CreateMode createMode) throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ boolean isRetry = false; // False for first attempt, true for all retries.
+ while (true) {
+ try {
+ return zk.create(path, data, acl, createMode);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case NODEEXISTS:
+ if (isRetry) {
+ // If the connection was lost, there is still a possibility that
+ // we have successfully created the node at our previous attempt,
+ // so we read the node and compare.
+ byte[] currentData = zk.getData(path, false, null);
+ if (currentData != null &&
+ Bytes.compareTo(currentData, data) == 0) {
+ // We successfully created a non-sequential node
+ return path;
+ }
+ LOG.error("Node " + path + " already exists with " +
+ Bytes.toStringBinary(currentData) + ", could not write " +
+ Bytes.toStringBinary(data));
+ throw e;
+ }
+ LOG.info("Node " + path + " already exists and this is not a " +
+ "retry");
+ throw e;
+
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case OPERATIONTIMEOUT:
+ retryOrThrow(retryCounter, e, "create");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ isRetry = true;
+ }
+ }
+
+ private String createSequential(String path, byte[] data,
+ List<ACL> acl, CreateMode createMode)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ boolean first = true;
+ String newPath = path+this.identifier;
+ while (true) {
+ try {
+ if (!first) {
+ // Check if we succeeded on a previous attempt
+ String previousResult = findPreviousSequentialNode(newPath);
+ if (previousResult != null) {
+ return previousResult;
+ }
+ }
+ first = false;
+ return zk.create(newPath, data, acl, createMode);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case OPERATIONTIMEOUT:
+ retryOrThrow(retryCounter, e, "create");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+ /**
+ * Convert Iterable of {@link ZKOp} we got into the ZooKeeper.Op
+ * instances to actually pass to multi (need to do this in order to appendMetaData).
+ */
+ private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
+ throws UnsupportedOperationException {
+ if(ops == null) return null;
+
+ List<Op> preparedOps = new LinkedList<Op>();
+ for (Op op : ops) {
+ if (op.getType() == ZooDefs.OpCode.create) {
+ CreateRequest create = (CreateRequest)op.toRequestRecord();
+ preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
+ create.getAcl(), create.getFlags()));
+ } else if (op.getType() == ZooDefs.OpCode.delete) {
+ // no need to appendMetaData for delete
+ preparedOps.add(op);
+ } else if (op.getType() == ZooDefs.OpCode.setData) {
+ SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
+ preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
+ setData.getVersion()));
+ } else {
+ throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
+ }
+ }
+ return preparedOps;
+ }
+
+ /**
+ * Run multiple operations in a transactional manner. Retry before throwing exception
+ */
+ public List<OpResult> multi(Iterable<Op> ops)
+ throws KeeperException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ Iterable<Op> multiOps = prepareZKMulti(ops);
+ while (true) {
+ try {
+ return zk.multi(multiOps);
+ } catch (KeeperException e) {
+ switch (e.code()) {
+ case CONNECTIONLOSS:
+ case SESSIONEXPIRED:
+ case OPERATIONTIMEOUT:
+ retryOrThrow(retryCounter, e, "multi");
+ break;
+
+ default:
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ retryCounter.useRetry();
+ }
+ }
+
+ private String findPreviousSequentialNode(String path)
+ throws KeeperException, InterruptedException {
+ int lastSlashIdx = path.lastIndexOf('/');
+ assert(lastSlashIdx != -1);
+ String parent = path.substring(0, lastSlashIdx);
+ String nodePrefix = path.substring(lastSlashIdx+1);
+
+ List<String> nodes = zk.getChildren(parent, false);
+ List<String> matching = filterByPrefix(nodes, nodePrefix);
+ for (String node : matching) {
+ String nodePath = parent + "/" + node;
+ Stat stat = zk.exists(nodePath, false);
+ if (stat != null) {
+ return nodePath;
+ }
+ }
+ return null;
+ }
+
+ public byte[] removeMetaData(byte[] data) {
+ if(data == null || data.length == 0) {
+ return data;
+ }
+ // check the magic data; to be backward compatible
+ byte magic = data[0];
+ if(magic != MAGIC) {
+ return data;
+ }
+
+ int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
+ int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
+ int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
+
+ byte[] newData = new byte[dataLength];
+ System.arraycopy(data, dataOffset, newData, 0, dataLength);
+ return newData;
+ }
+
+ private byte[] appendMetaData(byte[] data) {
+ if(data == null || data.length == 0){
+ return data;
+ }
+ byte[] salt = Bytes.toBytes(salter.nextLong());
+ int idLength = id.length + salt.length;
+ byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+idLength+data.length];
+ int pos = 0;
+ pos = Bytes.putByte(newData, pos, MAGIC);
+ pos = Bytes.putInt(newData, pos, idLength);
+ pos = Bytes.putBytes(newData, pos, id, 0, id.length);
+ pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
+ pos = Bytes.putBytes(newData, pos, data, 0, data.length);
+ return newData;
+ }
+
+ public long getSessionId() {
+ return zk.getSessionId();
+ }
+
+ public void close() throws InterruptedException {
+ zk.close();
+ }
+
+ public States getState() {
+ return zk.getState();
+ }
+
+ public ZooKeeper getZooKeeper() {
+ return zk;
+ }
+
+ public byte[] getSessionPasswd() {
+ return zk.getSessionPasswd();
+ }
+
+ public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) {
+ this.zk.sync(path, null, null);
+ }
+
+ /**
+ * Filters the given node list by the given prefixes.
+ * This method is all-inclusive--if any element in the node list starts
+ * with any of the given prefixes, then it is included in the result.
+ *
+ * @param nodes the nodes to filter
+ * @param prefixes the prefixes to include in the result
+ * @return list of every element that starts with one of the prefixes
+ */
+ private static List<String> filterByPrefix(List<String> nodes,
+ String... prefixes) {
+ List<String> lockChildren = new ArrayList<String>();
+ for (String child : nodes){
+ for (String prefix : prefixes){
+ if (child.startsWith(prefix)){
+ lockChildren.add(child);
+ break;
+ }
+ }
+ }
+ return lockChildren;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+}
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracks the root region server location node in zookeeper.
+ * Root region location is set by <code>RegionServerServices</code>.
+ * This class has a watcher on the root location and notices changes.
+ */
+@InterfaceAudience.Private
+public class RootRegionTracker extends ZooKeeperNodeTracker {
+ /**
+ * Creates a root region location tracker.
+ *
+ * <p>After construction, use {@link #start} to kick off tracking.
+ *
+ * @param watcher
+ * @param abortable
+ */
+ public RootRegionTracker(ZooKeeperWatcher watcher, Abortable abortable) {
+ super(watcher, watcher.rootServerZNode, abortable);
+ }
+
+ /**
+ * Checks if the root region location is available.
+ * @return true if root region location is available, false if not
+ */
+ public boolean isLocationAvailable() {
+ return super.getData(true) != null;
+ }
+
+ /**
+ * Gets the root region location, if available. Does not block. Sets a watcher.
+ * @return server name or null if we failed to get the data.
+ * @throws InterruptedException
+ */
+ public ServerName getRootRegionLocation() throws InterruptedException {
+ try {
+ return ServerName.parseFrom(super.getData(true));
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse", e);
+ return null;
+ }
+ }
+
+ /**
+ * Gets the root region location, if available. Does not block. Does not set
+ * a watcher (In this regard it differs from {@link #getRootRegionLocation()}.
+ * @param zkw
+ * @return server name or null if we failed to get the data.
+ * @throws KeeperException
+ */
+ public static ServerName getRootRegionLocation(final ZooKeeperWatcher zkw)
+ throws KeeperException {
+ try {
+ return ServerName.parseFrom(ZKUtil.getData(zkw, zkw.rootServerZNode));
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ }
+ }
+
+ /**
+ * Gets the root region location, if available, and waits for up to the
+ * specified timeout if not immediately available.
+ * Given the zookeeper notification could be delayed, we will try to
+ * get the latest data.
+ * @param timeout maximum time to wait, in millis
+ * @return server name for server hosting root region formatted as per
+ * {@link ServerName}, or null if none available
+ * @throws InterruptedException if interrupted while waiting
+ */
+ public ServerName waitRootRegionLocation(long timeout)
+ throws InterruptedException {
+ if (false == checkIfBaseNodeAvailable()) {
+ String errorMsg = "Check the value configured in 'zookeeper.znode.parent'. "
+ + "There could be a mismatch with the one configured in the master.";
+ LOG.error(errorMsg);
+ throw new IllegalArgumentException(errorMsg);
+ }
+ try {
+ return ServerName.parseFrom(super.blockUntilAvailable(timeout, true));
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse", e);
+ return null;
+ }
+ }
+
+ /**
+ * Sets the location of <code>-ROOT-</code> in ZooKeeper to the
+ * specified server address.
+ * @param zookeeper zookeeper reference
+ * @param location The server hosting <code>-ROOT-</code>
+ * @throws KeeperException unexpected zookeeper exception
+ */
+ public static void setRootLocation(ZooKeeperWatcher zookeeper,
+ final ServerName location)
+ throws KeeperException {
+ LOG.info("Setting ROOT region location in ZooKeeper as " + location);
+ // Make the RootRegionServer pb and then get its bytes and save this as
+ // the znode content.
+ byte [] data = toByteArray(location);
+ try {
+ ZKUtil.createAndWatch(zookeeper, zookeeper.rootServerZNode, data);
+ } catch(KeeperException.NodeExistsException nee) {
+ LOG.debug("ROOT region location already existed, updated location");
+ ZKUtil.setData(zookeeper, zookeeper.rootServerZNode, data);
+ }
+ }
+
+ /**
+ * Build up the znode content.
+ * @param sn What to put into the znode.
+ * @return The content of the root-region-server znode
+ */
+ static byte [] toByteArray(final ServerName sn) {
+ // ZNode content is a pb message preceeded by some pb magic.
+ HBaseProtos.ServerName pbsn =
+ HBaseProtos.ServerName.newBuilder().setHostName(sn.getHostname()).
+ setPort(sn.getPort()).setStartCode(sn.getStartcode()).build();
+ ZooKeeperProtos.RootRegionServer pbrsr =
+ ZooKeeperProtos.RootRegionServer.newBuilder().setServer(pbsn).build();
+ return ProtobufUtil.prependPBMagic(pbrsr.toByteArray());
+ }
+
+ /**
+ * Deletes the location of <code>-ROOT-</code> in ZooKeeper.
+ * @param zookeeper zookeeper reference
+ * @throws KeeperException unexpected zookeeper exception
+ */
+ public static void deleteRootLocation(ZooKeeperWatcher zookeeper)
+ throws KeeperException {
+ LOG.info("Unsetting ROOT region location in ZooKeeper");
+ try {
+ // Just delete the node. Don't need any watches.
+ ZKUtil.deleteNode(zookeeper, zookeeper.rootServerZNode);
+ } catch(KeeperException.NoNodeException nne) {
+ // Has already been deleted
+ }
+ }
+
+ /**
+ * Wait until the root region is available.
+ * @param zkw
+ * @param timeout
+ * @return ServerName or null if we timed out.
+ * @throws InterruptedException
+ */
+ public static ServerName blockUntilAvailable(final ZooKeeperWatcher zkw,
+ final long timeout)
+ throws InterruptedException {
+ byte [] data = ZKUtil.blockUntilAvailable(zkw, zkw.rootServerZNode, timeout);
+ if (data == null) return null;
+ try {
+ return ServerName.parseFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse", e);
+ return null;
+ }
+ }
+}
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,963 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionTransition;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+// We should not be importing this Type here, nor a RegionTransition, etc. This class should be
+// about zk and bytes only.
+
+/**
+ * Utility class for doing region assignment in ZooKeeper. This class extends
+ * stuff done in {@link ZKUtil} to cover specific assignment operations.
+ * <p>
+ * Contains only static methods and constants.
+ * <p>
+ * Used by both the Master and RegionServer.
+ * <p>
+ * All valid transitions outlined below:
+ * <p>
+ * <b>MASTER</b>
+ * <ol>
+ * <li>
+ * Master creates an unassigned node as OFFLINE.
+ * - Cluster startup and table enabling.
+ * </li>
+ * <li>
+ * Master forces an existing unassigned node to OFFLINE.
+ * - RegionServer failure.
+ * - Allows transitions from all states to OFFLINE.
+ * </li>
+ * <li>
+ * Master deletes an unassigned node that was in a OPENED state.
+ * - Normal region transitions. Besides cluster startup, no other deletions
+ * of unassigned nodes is allowed.
+ * </li>
+ * <li>
+ * Master deletes all unassigned nodes regardless of state.
+ * - Cluster startup before any assignment happens.
+ * </li>
+ * </ol>
+ * <p>
+ * <b>REGIONSERVER</b>
+ * <ol>
+ * <li>
+ * RegionServer creates an unassigned node as CLOSING.
+ * - All region closes will do this in response to a CLOSE RPC from Master.
+ * - A node can never be transitioned to CLOSING, only created.
+ * </li>
+ * <li>
+ * RegionServer transitions an unassigned node from CLOSING to CLOSED.
+ * - Normal region closes. CAS operation.
+ * </li>
+ * <li>
+ * RegionServer transitions an unassigned node from OFFLINE to OPENING.
+ * - All region opens will do this in response to an OPEN RPC from the Master.
+ * - Normal region opens. CAS operation.
+ * </li>
+ * <li>
+ * RegionServer transitions an unassigned node from OPENING to OPENED.
+ * - Normal region opens. CAS operation.
+ * </li>
+ * </ol>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ZKAssign {
+ private static final Log LOG = LogFactory.getLog(ZKAssign.class);
+
+ /**
+ * Gets the full path node name for the unassigned node for the specified
+ * region.
+ * @param zkw zk reference
+ * @param regionName region name
+ * @return full path node name
+ */
+ public static String getNodeName(ZooKeeperWatcher zkw, String regionName) {
+ return ZKUtil.joinZNode(zkw.assignmentZNode, regionName);
+ }
+
+ /**
+ * Gets the region name from the full path node name of an unassigned node.
+ * @param path full zk path
+ * @return region name
+ */
+ public static String getRegionName(ZooKeeperWatcher zkw, String path) {
+ return path.substring(zkw.assignmentZNode.length()+1);
+ }
+
+ // Master methods
+
+ /**
+ * Creates a new unassigned node in the OFFLINE state for the specified region.
+ *
+ * <p>Does not transition nodes from other states. If a node already exists
+ * for this region, a {@link NodeExistsException} will be thrown.
+ *
+ * <p>Sets a watcher on the unassigned region node if the method is successful.
+ *
+ * <p>This method should only be used during cluster startup and the enabling
+ * of a table.
+ *
+ * @param zkw zk reference
+ * @param region region to be created as offline
+ * @param serverName server transition will happen on
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NodeExistsException if node already exists
+ */
+ public static void createNodeOffline(ZooKeeperWatcher zkw, HRegionInfo region,
+ ServerName serverName)
+ throws KeeperException, KeeperException.NodeExistsException {
+ createNodeOffline(zkw, region, serverName, EventType.M_ZK_REGION_OFFLINE);
+ }
+
+ public static void createNodeOffline(ZooKeeperWatcher zkw, HRegionInfo region,
+ ServerName serverName, final EventType event)
+ throws KeeperException, KeeperException.NodeExistsException {
+ LOG.debug(zkw.prefix("Creating unassigned node for " +
+ region.getEncodedName() + " in OFFLINE state"));
+ RegionTransition rt =
+ RegionTransition.createRegionTransition(event, region.getRegionName(), serverName);
+ String node = getNodeName(zkw, region.getEncodedName());
+ ZKUtil.createAndWatch(zkw, node, rt.toByteArray());
+ }
+
+ /**
+ * Creates an unassigned node in the OFFLINE state for the specified region.
+ * <p>
+ * Runs asynchronously. Depends on no pre-existing znode.
+ *
+ * <p>Sets a watcher on the unassigned region node.
+ *
+ * @param zkw zk reference
+ * @param region region to be created as offline
+ * @param serverName server transition will happen on
+ * @param cb
+ * @param ctx
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NodeExistsException if node already exists
+ */
+ public static void asyncCreateNodeOffline(ZooKeeperWatcher zkw,
+ HRegionInfo region, ServerName serverName,
+ final AsyncCallback.StringCallback cb, final Object ctx)
+ throws KeeperException {
+ LOG.debug(zkw.prefix("Async create of unassigned node for " +
+ region.getEncodedName() + " with OFFLINE state"));
+ RegionTransition rt =
+ RegionTransition.createRegionTransition(
+ EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
+ String node = getNodeName(zkw, region.getEncodedName());
+ ZKUtil.asyncCreate(zkw, node, rt.toByteArray(), cb, ctx);
+ }
+
+ /**
+ * Creates or force updates an unassigned node to the OFFLINE state for the
+ * specified region.
+ * <p>
+ * Attempts to create the node but if it exists will force it to transition to
+ * and OFFLINE state.
+ *
+ * <p>Sets a watcher on the unassigned region node if the method is
+ * successful.
+ *
+ * <p>This method should be used when assigning a region.
+ *
+ * @param zkw zk reference
+ * @param region region to be created as offline
+ * @param serverName server transition will happen on
+ * @return the version of the znode created in OFFLINE state, -1 if
+ * unsuccessful.
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NodeExistsException if node already exists
+ */
+ public static int createOrForceNodeOffline(ZooKeeperWatcher zkw,
+ HRegionInfo region, ServerName serverName) throws KeeperException {
+ LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " +
+ region.getEncodedName() + " with OFFLINE state"));
+ RegionTransition rt = RegionTransition.createRegionTransition(EventType.M_ZK_REGION_OFFLINE,
+ region.getRegionName(), serverName, HConstants.EMPTY_BYTE_ARRAY);
+ byte [] data = rt.toByteArray();
+ String node = getNodeName(zkw, region.getEncodedName());
+ zkw.sync(node);
+ int version = ZKUtil.checkExists(zkw, node);
+ if (version == -1) {
+ return ZKUtil.createAndWatch(zkw, node, data);
+ } else {
+ boolean setData = false;
+ try {
+ setData = ZKUtil.setData(zkw, node, data, version);
+ // Setdata throws KeeperException which aborts the Master. So we are
+ // catching it here.
+ // If just before setting the znode to OFFLINE if the RS has made any
+ // change to the
+ // znode state then we need to return -1.
+ } catch (KeeperException kpe) {
+ LOG.info("Version mismatch while setting the node to OFFLINE state.");
+ return -1;
+ }
+ if (!setData) {
+ return -1;
+ } else {
+ // We successfully forced to OFFLINE, reset watch and handle if
+ // the state changed in between our set and the watch
+ byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName());
+ rt = getRegionTransition(bytes);
+ if (rt.getEventType() != EventType.M_ZK_REGION_OFFLINE) {
+ // state changed, need to process
+ return -1;
+ }
+ }
+ }
+ return version + 1;
+ }
+
+ /**
+ * Deletes an existing unassigned node that is in the OPENED state for the
+ * specified region.
+ *
+ * <p>If a node does not already exist for this region, a
+ * {@link NoNodeException} will be thrown.
+ *
+ * <p>No watcher is set whether this succeeds or not.
+ *
+ * <p>Returns false if the node was not in the proper state but did exist.
+ *
+ * <p>This method is used during normal region transitions when a region
+ * finishes successfully opening. This is the Master acknowledging completion
+ * of the specified regions transition.
+ *
+ * @param zkw zk reference
+ * @param encodedRegionName opened region to be deleted from zk
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NoNodeException if node does not exist
+ */
+ public static boolean deleteOpenedNode(ZooKeeperWatcher zkw,
+ String encodedRegionName)
+ throws KeeperException, KeeperException.NoNodeException {
+ return deleteNode(zkw, encodedRegionName, EventType.RS_ZK_REGION_OPENED);
+ }
+
+ /**
+ * Deletes an existing unassigned node that is in the OFFLINE state for the
+ * specified region.
+ *
+ * <p>If a node does not already exist for this region, a
+ * {@link NoNodeException} will be thrown.
+ *
+ * <p>No watcher is set whether this succeeds or not.
+ *
+ * <p>Returns false if the node was not in the proper state but did exist.
+ *
+ * <p>This method is used during master failover when the regions on an RS
+ * that has died are all set to OFFLINE before being processed.
+ *
+ * @param zkw zk reference
+ * @param encodedRegionName closed region to be deleted from zk
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NoNodeException if node does not exist
+ */
+ public static boolean deleteOfflineNode(ZooKeeperWatcher zkw,
+ String encodedRegionName)
+ throws KeeperException, KeeperException.NoNodeException {
+ return deleteNode(zkw, encodedRegionName, EventType.M_ZK_REGION_OFFLINE);
+ }
+
+ /**
+ * Deletes an existing unassigned node that is in the CLOSED state for the
+ * specified region.
+ *
+ * <p>If a node does not already exist for this region, a
+ * {@link NoNodeException} will be thrown.
+ *
+ * <p>No watcher is set whether this succeeds or not.
+ *
+ * <p>Returns false if the node was not in the proper state but did exist.
+ *
+ * <p>This method is used during table disables when a region finishes
+ * successfully closing. This is the Master acknowledging completion
+ * of the specified regions transition to being closed.
+ *
+ * @param zkw zk reference
+ * @param encodedRegionName closed region to be deleted from zk
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NoNodeException if node does not exist
+ */
+ public static boolean deleteClosedNode(ZooKeeperWatcher zkw,
+ String encodedRegionName)
+ throws KeeperException, KeeperException.NoNodeException {
+ return deleteNode(zkw, encodedRegionName, EventType.RS_ZK_REGION_CLOSED);
+ }
+
+ /**
+ * Deletes an existing unassigned node that is in the CLOSING state for the
+ * specified region.
+ *
+ * <p>If a node does not already exist for this region, a
+ * {@link NoNodeException} will be thrown.
+ *
+ * <p>No watcher is set whether this succeeds or not.
+ *
+ * <p>Returns false if the node was not in the proper state but did exist.
+ *
+ * <p>This method is used during table disables when a region finishes
+ * successfully closing. This is the Master acknowledging completion
+ * of the specified regions transition to being closed.
+ *
+ * @param zkw zk reference
+ * @param region closing region to be deleted from zk
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NoNodeException if node does not exist
+ */
+ public static boolean deleteClosingNode(ZooKeeperWatcher zkw,
+ HRegionInfo region)
+ throws KeeperException, KeeperException.NoNodeException {
+ String encodedRegionName = region.getEncodedName();
+ return deleteNode(zkw, encodedRegionName, EventType.M_ZK_REGION_CLOSING);
+ }
+
+ /**
+ * Deletes an existing unassigned node that is in the specified state for the
+ * specified region.
+ *
+ * <p>If a node does not already exist for this region, a
+ * {@link NoNodeException} will be thrown.
+ *
+ * <p>No watcher is set whether this succeeds or not.
+ *
+ * <p>Returns false if the node was not in the proper state but did exist.
+ *
+ * <p>This method is used when a region finishes opening/closing.
+ * The Master acknowledges completion
+ * of the specified regions transition to being closed/opened.
+ *
+ * @param zkw zk reference
+ * @param encodedRegionName region to be deleted from zk
+ * @param expectedState state region must be in for delete to complete
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NoNodeException if node does not exist
+ */
+ public static boolean deleteNode(ZooKeeperWatcher zkw, String encodedRegionName,
+ EventType expectedState)
+ throws KeeperException, KeeperException.NoNodeException {
+ return deleteNode(zkw, encodedRegionName, expectedState, -1);
+ }
+
+ /**
+ * Deletes an existing unassigned node that is in the specified state for the
+ * specified region.
+ *
+ * <p>If a node does not already exist for this region, a
+ * {@link NoNodeException} will be thrown.
+ *
+ * <p>No watcher is set whether this succeeds or not.
+ *
+ * <p>Returns false if the node was not in the proper state but did exist.
+ *
+ * <p>This method is used when a region finishes opening/closing.
+ * The Master acknowledges completion
+ * of the specified regions transition to being closed/opened.
+ *
+ * @param zkw zk reference
+ * @param encodedRegionName region to be deleted from zk
+ * @param expectedState state region must be in for delete to complete
+ * @param expectedVersion of the znode that is to be deleted.
+ * If expectedVersion need not be compared while deleting the znode
+ * pass -1
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NoNodeException if node does not exist
+ */
+ public static boolean deleteNode(ZooKeeperWatcher zkw, String encodedRegionName,
+ EventType expectedState, int expectedVersion)
+ throws KeeperException, KeeperException.NoNodeException {
+ LOG.debug(zkw.prefix("Deleting existing unassigned " +
+ "node for " + encodedRegionName + " that is in expected state " + expectedState));
+ String node = getNodeName(zkw, encodedRegionName);
+ zkw.sync(node);
+ Stat stat = new Stat();
+ byte [] bytes = ZKUtil.getDataNoWatch(zkw, node, stat);
+ if (bytes == null) {
+ // If it came back null, node does not exist.
+ throw KeeperException.create(Code.NONODE);
+ }
+ RegionTransition rt = getRegionTransition(bytes);
+ EventType et = rt.getEventType();
+ if (!et.equals(expectedState)) {
+ LOG.warn(zkw.prefix("Attempting to delete unassigned node " + encodedRegionName + " in " +
+ expectedState + " state but node is in " + et + " state"));
+ return false;
+ }
+ if (expectedVersion != -1
+ && stat.getVersion() != expectedVersion) {
+ LOG.warn("The node " + encodedRegionName + " we are trying to delete is not" +
+ " the expected one. Got a version mismatch");
+ return false;
+ }
+ if(!ZKUtil.deleteNode(zkw, node, stat.getVersion())) {
+ LOG.warn(zkw.prefix("Attempting to delete " +
+ "unassigned node " + encodedRegionName + " in " + expectedState +
+ " state but after verifying state, we got a version mismatch"));
+ return false;
+ }
+ LOG.debug(zkw.prefix("Successfully deleted unassigned node for region " +
+ encodedRegionName + " in expected state " + expectedState));
+ return true;
+ }
+
+ /**
+ * Deletes all unassigned nodes regardless of their state.
+ *
+ * <p>No watchers are set.
+ *
+ * <p>This method is used by the Master during cluster startup to clear out
+ * any existing state from other cluster runs.
+ *
+ * @param zkw zk reference
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static void deleteAllNodes(ZooKeeperWatcher zkw)
+ throws KeeperException {
+ LOG.debug(zkw.prefix("Deleting any existing unassigned nodes"));
+ ZKUtil.deleteChildrenRecursively(zkw, zkw.assignmentZNode);
+ }
+
+ /**
+ * Creates a new unassigned node in the CLOSING state for the specified
+ * region.
+ *
+ * <p>Does not transition nodes from any states. If a node already exists
+ * for this region, a {@link NodeExistsException} will be thrown.
+ *
+ * <p>If creation is successful, returns the version number of the CLOSING
+ * node created.
+ *
+ * <p>Set a watch.
+ *
+ * <p>This method should only be used by a Master when initiating a
+ * close of a region before sending a close request to the region server.
+ *
+ * @param zkw zk reference
+ * @param region region to be created as closing
+ * @param serverName server transition will happen on
+ * @return version of node after transition, -1 if unsuccessful transition
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NodeExistsException if node already exists
+ */
+ public static int createNodeClosing(ZooKeeperWatcher zkw, HRegionInfo region,
+ ServerName serverName)
+ throws KeeperException, KeeperException.NodeExistsException {
+ LOG.debug(zkw.prefix("Creating unassigned node for " +
+ region.getEncodedName() + " in a CLOSING state"));
+ RegionTransition rt = RegionTransition.createRegionTransition(EventType.M_ZK_REGION_CLOSING,
+ region.getRegionName(), serverName, HConstants.EMPTY_BYTE_ARRAY);
+ String node = getNodeName(zkw, region.getEncodedName());
+ return ZKUtil.createAndWatch(zkw, node, rt.toByteArray());
+ }
+
+ // RegionServer methods
+
+ /**
+ * Transitions an existing unassigned node for the specified region which is
+ * currently in the CLOSING state to be in the CLOSED state.
+ *
+ * <p>Does not transition nodes from other states. If for some reason the
+ * node could not be transitioned, the method returns -1. If the transition
+ * is successful, the version of the node after transition is returned.
+ *
+ * <p>This method can fail and return false for three different reasons:
+ * <ul><li>Unassigned node for this region does not exist</li>
+ * <li>Unassigned node for this region is not in CLOSING state</li>
+ * <li>After verifying CLOSING state, update fails because of wrong version
+ * (someone else already transitioned the node)</li>
+ * </ul>
+ *
+ * <p>Does not set any watches.
+ *
+ * <p>This method should only be used by a RegionServer when initiating a
+ * close of a region after receiving a CLOSE RPC from the Master.
+ *
+ * @param zkw zk reference
+ * @param region region to be transitioned to closed
+ * @param serverName server transition happens on
+ * @return version of node after transition, -1 if unsuccessful transition
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static int transitionNodeClosed(ZooKeeperWatcher zkw,
+ HRegionInfo region, ServerName serverName, int expectedVersion)
+ throws KeeperException {
+ return transitionNode(zkw, region, serverName,
+ EventType.M_ZK_REGION_CLOSING,
+ EventType.RS_ZK_REGION_CLOSED, expectedVersion);
+ }
+
+ /**
+ * Transitions an existing unassigned node for the specified region which is
+ * currently in the OFFLINE state to be in the OPENING state.
+ *
+ * <p>Does not transition nodes from other states. If for some reason the
+ * node could not be transitioned, the method returns -1. If the transition
+ * is successful, the version of the node written as OPENING is returned.
+ *
+ * <p>This method can fail and return -1 for three different reasons:
+ * <ul><li>Unassigned node for this region does not exist</li>
+ * <li>Unassigned node for this region is not in OFFLINE state</li>
+ * <li>After verifying OFFLINE state, update fails because of wrong version
+ * (someone else already transitioned the node)</li>
+ * </ul>
+ *
+ * <p>Does not set any watches.
+ *
+ * <p>This method should only be used by a RegionServer when initiating an
+ * open of a region after receiving an OPEN RPC from the Master.
+ *
+ * @param zkw zk reference
+ * @param region region to be transitioned to opening
+ * @param serverName server transition happens on
+ * @return version of node after transition, -1 if unsuccessful transition
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static int transitionNodeOpening(ZooKeeperWatcher zkw,
+ HRegionInfo region, ServerName serverName)
+ throws KeeperException {
+ return transitionNodeOpening(zkw, region, serverName,
+ EventType.M_ZK_REGION_OFFLINE);
+ }
+
+ public static int transitionNodeOpening(ZooKeeperWatcher zkw,
+ HRegionInfo region, ServerName serverName, final EventType beginState)
+ throws KeeperException {
+ return transitionNode(zkw, region, serverName, beginState,
+ EventType.RS_ZK_REGION_OPENING, -1);
+ }
+
+ /**
+ * Retransitions an existing unassigned node for the specified region which is
+ * currently in the OPENING state to be in the OPENING state.
+ *
+ * <p>Does not transition nodes from other states. If for some reason the
+ * node could not be transitioned, the method returns -1. If the transition
+ * is successful, the version of the node rewritten as OPENING is returned.
+ *
+ * <p>This method can fail and return -1 for three different reasons:
+ * <ul><li>Unassigned node for this region does not exist</li>
+ * <li>Unassigned node for this region is not in OPENING state</li>
+ * <li>After verifying OPENING state, update fails because of wrong version
+ * (someone else already transitioned the node)</li>
+ * </ul>
+ *
+ * <p>Does not set any watches.
+ *
+ * <p>This method should only be used by a RegionServer when initiating an
+ * open of a region after receiving an OPEN RPC from the Master.
+ *
+ * @param zkw zk reference
+ * @param region region to be transitioned to opening
+ * @param serverName server transition happens on
+ * @return version of node after transition, -1 if unsuccessful transition
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static int retransitionNodeOpening(ZooKeeperWatcher zkw,
+ HRegionInfo region, ServerName serverName, int expectedVersion)
+ throws KeeperException {
+ return transitionNode(zkw, region, serverName,
+ EventType.RS_ZK_REGION_OPENING,
+ EventType.RS_ZK_REGION_OPENING, expectedVersion);
+ }
+
+ /**
+ * Transitions an existing unassigned node for the specified region which is
+ * currently in the OPENING state to be in the OPENED state.
+ *
+ * <p>Does not transition nodes from other states. If for some reason the
+ * node could not be transitioned, the method returns -1. If the transition
+ * is successful, the version of the node after transition is returned.
+ *
+ * <p>This method can fail and return false for three different reasons:
+ * <ul><li>Unassigned node for this region does not exist</li>
+ * <li>Unassigned node for this region is not in OPENING state</li>
+ * <li>After verifying OPENING state, update fails because of wrong version
+ * (this should never actually happen since an RS only does this transition
+ * following a transition to OPENING. if two RS are conflicting, one would
+ * fail the original transition to OPENING and not this transition)</li>
+ * </ul>
+ *
+ * <p>Does not set any watches.
+ *
+ * <p>This method should only be used by a RegionServer when completing the
+ * open of a region.
+ *
+ * @param zkw zk reference
+ * @param region region to be transitioned to opened
+ * @param serverName server transition happens on
+ * @return version of node after transition, -1 if unsuccessful transition
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static int transitionNodeOpened(ZooKeeperWatcher zkw,
+ HRegionInfo region, ServerName serverName, int expectedVersion)
+ throws KeeperException {
+ return transitionNode(zkw, region, serverName,
+ EventType.RS_ZK_REGION_OPENING,
+ EventType.RS_ZK_REGION_OPENED, expectedVersion);
+ }
+
+ /**
+ *
+ * @param zkw zk reference
+ * @param region region to be closed
+ * @param expectedVersion expected version of the znode
+ * @return true if the znode exists, has the right version and the right state. False otherwise.
+ * @throws KeeperException
+ */
+ public static boolean checkClosingState(ZooKeeperWatcher zkw, HRegionInfo region,
+ int expectedVersion) throws KeeperException {
+
+ final String encoded = getNodeName(zkw, region.getEncodedName());
+ zkw.sync(encoded);
+
+ // Read existing data of the node
+ Stat stat = new Stat();
+ byte[] existingBytes = ZKUtil.getDataNoWatch(zkw, encoded, stat);
+
+ if (existingBytes == null) {
+ LOG.warn(zkw.prefix("Attempt to check the " +
+ "closing node for " + encoded +
+ ". The node does not exist"));
+ return false;
+ }
+
+ if (expectedVersion != -1 && stat.getVersion() != expectedVersion) {
+ LOG.warn(zkw.prefix("Attempt to check the " +
+ "closing node for " + encoded +
+ ". The node existed but was version " + stat.getVersion() +
+ " not the expected version " + expectedVersion));
+ return false;
+ }
+
+ RegionTransition rt = getRegionTransition(existingBytes);
+
+ if (!EventType.M_ZK_REGION_CLOSING.equals(rt.getEventType())) {
+ LOG.warn(zkw.prefix("Attempt to check the " +
+ "closing node for " + encoded +
+ ". The node existed but was in an unexpected state: " + rt.getEventType()));
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Method that actually performs unassigned node transitions.
+ *
+ * <p>Attempts to transition the unassigned node for the specified region
+ * from the expected state to the state in the specified transition data.
+ *
+ * <p>Method first reads existing data and verifies it is in the expected
+ * state. If the node does not exist or the node is not in the expected
+ * state, the method returns -1. If the transition is successful, the
+ * version number of the node following the transition is returned.
+ *
+ * <p>If the read state is what is expected, it attempts to write the new
+ * state and data into the node. When doing this, it includes the expected
+ * version (determined when the existing state was verified) to ensure that
+ * only one transition is successful. If there is a version mismatch, the
+ * method returns -1.
+ *
+ * <p>If the write is successful, no watch is set and the method returns true.
+ *
+ * @param zkw zk reference
+ * @param region region to be transitioned to opened
+ * @param serverName server transition happens on
+ * @param endState state to transition node to if all checks pass
+ * @param beginState state the node must currently be in to do transition
+ * @param expectedVersion expected version of data before modification, or -1
+ * @return version of node after transition, -1 if unsuccessful transition
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region,
+ ServerName serverName, EventType beginState, EventType endState,
+ int expectedVersion)
+ throws KeeperException {
+ return transitionNode(zkw, region, serverName, beginState, endState, expectedVersion, null);
+ }
+
+ public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region,
+ ServerName serverName, EventType beginState, EventType endState,
+ int expectedVersion, final byte [] payload)
+ throws KeeperException {
+ String encoded = region.getEncodedName();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(zkw.prefix("Attempting to transition node " +
+ HRegionInfo.prettyPrint(encoded) +
+ " from " + beginState.toString() + " to " + endState.toString()));
+ }
+
+ String node = getNodeName(zkw, encoded);
+ zkw.sync(node);
+
+ // Read existing data of the node
+ Stat stat = new Stat();
+ byte [] existingBytes = ZKUtil.getDataNoWatch(zkw, node, stat);
+ if (existingBytes == null) {
+ // Node no longer exists. Return -1. It means unsuccessful transition.
+ return -1;
+ }
+ RegionTransition rt = getRegionTransition(existingBytes);
+
+ // Verify it is the expected version
+ if (expectedVersion != -1 && stat.getVersion() != expectedVersion) {
+ LOG.warn(zkw.prefix("Attempt to transition the " +
+ "unassigned node for " + encoded +
+ " from " + beginState + " to " + endState + " failed, " +
+ "the node existed but was version " + stat.getVersion() +
+ " not the expected version " + expectedVersion));
+ return -1;
+ } else if (beginState.equals(EventType.M_ZK_REGION_OFFLINE)
+ && endState.equals(EventType.RS_ZK_REGION_OPENING)
+ && expectedVersion == -1 && stat.getVersion() != 0) {
+ // the below check ensures that double assignment doesnot happen.
+ // When the node is created for the first time then the expected version
+ // that is passed will be -1 and the version in znode will be 0.
+ // In all other cases the version in znode will be > 0.
+ LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for "
+ + encoded + " from " + beginState + " to " + endState + " failed, "
+ + "the node existed but was version " + stat.getVersion()
+ + " not the expected version " + expectedVersion));
+ return -1;
+ }
+
+ // Verify it is in expected state
+ EventType et = rt.getEventType();
+ if (!et.equals(beginState)) {
+ String existingServer = (rt.getServerName() == null)
+ ? "<unknown>" : rt.getServerName().toString();
+ LOG.warn(zkw.prefix("Attempt to transition the unassigned node for " + encoded
+ + " from " + beginState + " to " + endState + " failed, the node existed but"
+ + " was in the state " + et + " set by the server " + existingServer));
+ return -1;
+ }
+
+ // Write new data, ensuring data has not changed since we last read it
+ try {
+ rt = RegionTransition.createRegionTransition(
+ endState, region.getRegionName(), serverName, payload);
+ if(!ZKUtil.setData(zkw, node, rt.toByteArray(), stat.getVersion())) {
+ LOG.warn(zkw.prefix("Attempt to transition the " +
+ "unassigned node for " + encoded +
+ " from " + beginState + " to " + endState + " failed, " +
+ "the node existed and was in the expected state but then when " +
+ "setting data we got a version mismatch"));
+ return -1;
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(zkw.prefix("Successfully transitioned node " + encoded +
+ " from " + beginState + " to " + endState));
+ }
+ return stat.getVersion() + 1;
+ } catch (KeeperException.NoNodeException nne) {
+ LOG.warn(zkw.prefix("Attempt to transition the " +
+ "unassigned node for " + encoded +
+ " from " + beginState + " to " + endState + " failed, " +
+ "the node existed and was in the expected state but then when " +
+ "setting data it no longer existed"));
+ return -1;
+ }
+ }
+
+ private static RegionTransition getRegionTransition(final byte [] bytes) throws KeeperException {
+ try {
+ return RegionTransition.parseFrom(bytes);
+ } catch (DeserializationException e) {
+ // Convert to a zk exception for now. Otherwise have to change API
+ throw ZKUtil.convert(e);
+ }
+ }
+
+ /**
+ * Gets the current data in the unassigned node for the specified region name
+ * or fully-qualified path.
+ *
+ * <p>Returns null if the region does not currently have a node.
+ *
+ * <p>Sets a watch on the node if the node exists.
+ *
+ * @param zkw zk reference
+ * @param pathOrRegionName fully-specified path or region name
+ * @return znode content
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static byte [] getData(ZooKeeperWatcher zkw,
+ String pathOrRegionName)
+ throws KeeperException {
+ String node = getPath(zkw, pathOrRegionName);
+ return ZKUtil.getDataAndWatch(zkw, node);
+ }
+
+ /**
+ * Gets the current data in the unassigned node for the specified region name
+ * or fully-qualified path.
+ *
+ * <p>Returns null if the region does not currently have a node.
+ *
+ * <p>Sets a watch on the node if the node exists.
+ *
+ * @param zkw zk reference
+ * @param pathOrRegionName fully-specified path or region name
+ * @param stat object to populate the version.
+ * @return znode content
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static byte [] getDataAndWatch(ZooKeeperWatcher zkw,
+ String pathOrRegionName, Stat stat)
+ throws KeeperException {
+ String node = getPath(zkw, pathOrRegionName);
+ return ZKUtil.getDataAndWatch(zkw, node, stat);
+ }
+
+ /**
+ * Gets the current data in the unassigned node for the specified region name
+ * or fully-qualified path.
+ *
+ * <p>Returns null if the region does not currently have a node.
+ *
+ * <p>Does not set a watch.
+ *
+ * @param zkw zk reference
+ * @param pathOrRegionName fully-specified path or region name
+ * @param stat object to store node info into on getData call
+ * @return znode content
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static byte [] getDataNoWatch(ZooKeeperWatcher zkw,
+ String pathOrRegionName, Stat stat)
+ throws KeeperException {
+ String node = getPath(zkw, pathOrRegionName);
+ return ZKUtil.getDataNoWatch(zkw, node, stat);
+ }
+
+ /**
+ * @param zkw
+ * @param pathOrRegionName
+ * @return Path to znode
+ */
+ public static String getPath(final ZooKeeperWatcher zkw, final String pathOrRegionName) {
+ return pathOrRegionName.startsWith("/")? pathOrRegionName : getNodeName(zkw, pathOrRegionName);
+ }
+
+ /**
+ * Get the version of the specified znode
+ * @param zkw zk reference
+ * @param region region's info
+ * @return the version of the znode, -1 if it doesn't exist
+ * @throws KeeperException
+ */
+ public static int getVersion(ZooKeeperWatcher zkw, HRegionInfo region)
+ throws KeeperException {
+ String znode = getNodeName(zkw, region.getEncodedName());
+ return ZKUtil.checkExists(zkw, znode);
+ }
+
+ /**
+ * Delete the assignment node regardless of its current state.
+ * <p>
+ * Fail silent even if the node does not exist at all.
+ * @param watcher
+ * @param regionInfo
+ * @throws KeeperException
+ */
+ public static void deleteNodeFailSilent(ZooKeeperWatcher watcher,
+ HRegionInfo regionInfo)
+ throws KeeperException {
+ String node = getNodeName(watcher, regionInfo.getEncodedName());
+ ZKUtil.deleteNodeFailSilent(watcher, node);
+ }
+
+ /**
+ * Blocks until there are no node in regions in transition.
+ * <p>
+ * Used in testing only.
+ * @param zkw zk reference
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public static void blockUntilNoRIT(ZooKeeperWatcher zkw)
+ throws KeeperException, InterruptedException {
+ while (ZKUtil.nodeHasChildren(zkw, zkw.assignmentZNode)) {
+ List<String> znodes =
+ ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.assignmentZNode);
+ if (znodes != null && !znodes.isEmpty()) {
+ for (String znode : znodes) {
+ LOG.debug("ZK RIT -> " + znode);
+ }
+ }
+ Thread.sleep(100);
+ }
+ }
+
+ /**
+ * Blocks until there is at least one node in regions in transition.
+ * <p>
+ * Used in testing only.
+ * @param zkw zk reference
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public static void blockUntilRIT(ZooKeeperWatcher zkw)
+ throws KeeperException, InterruptedException {
+ while (!ZKUtil.nodeHasChildren(zkw, zkw.assignmentZNode)) {
+ List<String> znodes =
+ ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.assignmentZNode);
+ if (znodes == null || znodes.isEmpty()) {
+ LOG.debug("No RIT in ZK");
+ }
+ Thread.sleep(100);
+ }
+ }
+
+ /**
+ * Presume bytes are serialized unassigned data structure
+ * @param znodeBytes
+ * @return String of the deserialized znode bytes.
+ */
+ static String toString(final byte[] znodeBytes) {
+ // This method should not exist. Used by ZKUtil stringifying RegionTransition. Have the
+ // method in here so RegionTransition does not leak into ZKUtil.
+ try {
+ RegionTransition rt = RegionTransition.parseFrom(znodeBytes);
+ return rt.toString();
+ } catch (DeserializationException e) {
+ return "";
+ }
+ }
+}
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Publishes and synchronizes a unique identifier specific to a given HBase
+ * cluster. The stored identifier is read from the file system by the active
+ * master on startup, and is subsequently available to all watchers (including
+ * clients).
+ */
+@InterfaceAudience.Private
+public class ZKClusterId {
+ private ZooKeeperWatcher watcher;
+ private Abortable abortable;
+ private String id;
+
+ public ZKClusterId(ZooKeeperWatcher watcher, Abortable abortable) {
+ this.watcher = watcher;
+ this.abortable = abortable;
+ }
+
+ public boolean hasId() {
+ return getId() != null;
+ }
+
+ public String getId() {
+ try {
+ if (id == null) {
+ id = readClusterIdZNode(watcher);
+ }
+ } catch (KeeperException ke) {
+ abortable.abort("Unexpected exception from ZooKeeper reading cluster ID",
+ ke);
+ }
+ return id;
+ }
+
+ public static String readClusterIdZNode(ZooKeeperWatcher watcher)
+ throws KeeperException {
+ if (ZKUtil.checkExists(watcher, watcher.clusterIdZNode) != -1) {
+ byte [] data = ZKUtil.getData(watcher, watcher.clusterIdZNode);
+ if (data != null) {
+ try {
+ return ClusterId.parseFrom(data).toString();
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ }
+ }
+ }
+ return null;
+ }
+
+ public static void setClusterId(ZooKeeperWatcher watcher, ClusterId id)
+ throws KeeperException {
+ ZKUtil.createSetData(watcher, watcher.clusterIdZNode, id.toByteArray());
+ }
+}
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,273 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+/**
+ * Utility methods for reading, and building the ZooKeeper configuration.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ZKConfig {
+ private static final Log LOG = LogFactory.getLog(ZKConfig.class);
+
+ private static final String VARIABLE_START = "${";
+ private static final int VARIABLE_START_LENGTH = VARIABLE_START.length();
+ private static final String VARIABLE_END = "}";
+ private static final int VARIABLE_END_LENGTH = VARIABLE_END.length();
+
+ /**
+ * Make a Properties object holding ZooKeeper config.
+ * Parses the corresponding config options from the HBase XML configs
+ * and generates the appropriate ZooKeeper properties.
+ * @param conf Configuration to read from.
+ * @return Properties holding mappings representing ZooKeeper config file.
+ */
+ public static Properties makeZKProps(Configuration conf) {
+ if (conf.getBoolean(HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG,
+ false)) {
+ LOG.warn(
+ "Parsing ZooKeeper's " + HConstants.ZOOKEEPER_CONFIG_NAME +
+ " file for ZK properties " +
+ "has been deprecated. Please instead place all ZK related HBase " +
+ "configuration under the hbase-site.xml, using prefixes " +
+ "of the form '" + HConstants.ZK_CFG_PROPERTY_PREFIX + "', and " +
+ "set property '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG +
+ "' to false");
+ // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read
+ // it and grab its configuration properties.
+ ClassLoader cl = HQuorumPeer.class.getClassLoader();
+ final InputStream inputStream =
+ cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME);
+ if (inputStream != null) {
+ try {
+ return parseZooCfg(conf, inputStream);
+ } catch (IOException e) {
+ LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME +
+ ", loading from XML files", e);
+ }
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Skipped reading ZK properties file '" +
+ HConstants.ZOOKEEPER_CONFIG_NAME +
+ "' since '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG +
+ "' was not set to true");
+ }
+ }
+
+ // Otherwise, use the configuration options from HBase's XML files.
+ Properties zkProperties = new Properties();
+
+ // Directly map all of the hbase.zookeeper.property.KEY properties.
+ for (Entry<String, String> entry : conf) {
+ String key = entry.getKey();
+ if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
+ String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
+ String value = entry.getValue();
+ // If the value has variables substitutions, need to do a get.
+ if (value.contains(VARIABLE_START)) {
+ value = conf.get(key);
+ }
+ zkProperties.put(zkKey, value);
+ }
+ }
+
+ // If clientPort is not set, assign the default.
+ if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
+ zkProperties.put(HConstants.CLIENT_PORT_STR,
+ HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+ }
+
+ // Create the server.X properties.
+ int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
+ int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
+
+ final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
+ HConstants.LOCALHOST);
+ for (int i = 0; i < serverHosts.length; ++i) {
+ String serverHost = serverHosts[i];
+ String address = serverHost + ":" + peerPort + ":" + leaderPort;
+ String key = "server." + i;
+ zkProperties.put(key, address);
+ }
+
+ return zkProperties;
+ }
+
+ /**
+ * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in.
+ * This method is used for testing so we can pass our own InputStream.
+ * @param conf HBaseConfiguration to use for injecting variables.
+ * @param inputStream InputStream to read from.
+ * @return Properties parsed from config stream with variables substituted.
+ * @throws IOException if anything goes wrong parsing config
+ * @deprecated in 0.96 onwards. HBase will no longer rely on zoo.cfg
+ * availability.
+ */
+ @Deprecated
+ public static Properties parseZooCfg(Configuration conf,
+ InputStream inputStream) throws IOException {
+ Properties properties = new Properties();
+ try {
+ properties.load(inputStream);
+ } catch (IOException e) {
+ final String msg = "fail to read properties from "
+ + HConstants.ZOOKEEPER_CONFIG_NAME;
+ LOG.fatal(msg);
+ throw new IOException(msg, e);
+ }
+ for (Entry<Object, Object> entry : properties.entrySet()) {
+ String value = entry.getValue().toString().trim();
+ String key = entry.getKey().toString().trim();
+ StringBuilder newValue = new StringBuilder();
+ int varStart = value.indexOf(VARIABLE_START);
+ int varEnd = 0;
+ while (varStart != -1) {
+ varEnd = value.indexOf(VARIABLE_END, varStart);
+ if (varEnd == -1) {
+ String msg = "variable at " + varStart + " has no end marker";
+ LOG.fatal(msg);
+ throw new IOException(msg);
+ }
+ String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd);
+
+ String substituteValue = System.getProperty(variable);
+ if (substituteValue == null) {
+ substituteValue = conf.get(variable);
+ }
+ if (substituteValue == null) {
+ String msg = "variable " + variable + " not set in system property "
+ + "or hbase configs";
+ LOG.fatal(msg);
+ throw new IOException(msg);
+ }
+
+ newValue.append(substituteValue);
+
+ varEnd += VARIABLE_END_LENGTH;
+ varStart = value.indexOf(VARIABLE_START, varEnd);
+ }
+ // Special case for 'hbase.cluster.distributed' property being 'true'
+ if (key.startsWith("server.")) {
+ boolean mode = conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
+ if (mode == HConstants.CLUSTER_IS_DISTRIBUTED && value.startsWith(HConstants.LOCALHOST)) {
+ String msg = "The server in zoo.cfg cannot be set to localhost " +
+ "in a fully-distributed setup because it won't be reachable. " +
+ "See \"Getting Started\" for more information.";
+ LOG.fatal(msg);
+ throw new IOException(msg);
+ }
+ }
+ newValue.append(value.substring(varEnd));
+ properties.setProperty(key, newValue.toString());
+ }
+ return properties;
+ }
+
+ /**
+ * Return the ZK Quorum servers string given zk properties returned by
+ * makeZKProps
+ * @param properties
+ * @return Quorum servers String
+ */
+ public static String getZKQuorumServersString(Properties properties) {
+ String clientPort = null;
+ List<String> servers = new ArrayList<String>();
+
+ // The clientPort option may come after the server.X hosts, so we need to
+ // grab everything and then create the final host:port comma separated list.
+ boolean anyValid = false;
+ for (Entry<Object,Object> property : properties.entrySet()) {
+ String key = property.getKey().toString().trim();
+ String value = property.getValue().toString().trim();
+ if (key.equals("clientPort")) {
+ clientPort = value;
+ }
+ else if (key.startsWith("server.")) {
+ String host = value.substring(0, value.indexOf(':'));
+ servers.add(host);
+ try {
+ //noinspection ResultOfMethodCallIgnored
+ InetAddress.getByName(host);
+ anyValid = true;
+ } catch (UnknownHostException e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ }
+ }
+ }
+
+ if (!anyValid) {
+ LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+ return null;
+ }
+
+ if (clientPort == null) {
+ LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+ return null;
+ }
+
+ if (servers.isEmpty()) {
+ LOG.fatal("No servers were found in provided ZooKeeper configuration. " +
+ "HBase must have a ZooKeeper cluster configured for its " +
+ "operation. Ensure that you've configured '" +
+ HConstants.ZOOKEEPER_QUORUM + "' properly.");
+ return null;
+ }
+
+ StringBuilder hostPortBuilder = new StringBuilder();
+ for (int i = 0; i < servers.size(); ++i) {
+ String host = servers.get(i);
+ if (i > 0) {
+ hostPortBuilder.append(',');
+ }
+ hostPortBuilder.append(host);
+ hostPortBuilder.append(':');
+ hostPortBuilder.append(clientPort);
+ }
+
+ return hostPortBuilder.toString();
+ }
+
+ /**
+ * Return the ZK Quorum servers string given the specified configuration.
+ * @param conf
+ * @return Quorum servers
+ */
+ public static String getZKQuorumServersString(Configuration conf) {
+ return getZKQuorumServersString(makeZKProps(conf));
+ }
+}