You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2013/02/25 23:50:29 UTC

svn commit: r1449950 [25/35] - in /hbase/trunk: ./ hbase-client/ hbase-client/src/ hbase-client/src/main/ hbase-client/src/main/java/ hbase-client/src/main/java/org/ hbase-client/src/main/java/org/apache/ hbase-client/src/main/java/org/apache/hadoop/ h...

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,672 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A zookeeper that can handle 'recoverable' errors.
+ * To handle recoverable errors, developers need to realize that there are two
+ * classes of requests: idempotent and non-idempotent requests. Read requests
+ * and unconditional sets and deletes are examples of idempotent requests, they
+ * can be reissued with the same results.
+ * (Although, the delete may throw a NoNodeException on reissue its effect on
+ * the ZooKeeper state is the same.) Non-idempotent requests need special
+ * handling, application and library writers need to keep in mind that they may
+ * need to encode information in the data or name of znodes to detect
+ * retries. A simple example is a create that uses a sequence flag.
+ * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
+ * loss exception, that process will reissue another
+ * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
+ * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
+ * that x-109 was the result of the previous create, so the process actually
+ * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
+ * when doing the create. If the process is using an id of 352, before reissuing
+ * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
+ * "x-352-109", x-333-110". The process will know that the original create
+ * succeeded an the znode it created is "x-352-109".
+ * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RecoverableZooKeeper {
+  private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
+  // the actual ZooKeeper client instance
+  volatile private ZooKeeper zk;
+  private final RetryCounterFactory retryCounterFactory;
+  // An identifier of this process in the cluster
+  private final String identifier;
+  private final byte[] id;
+  private Watcher watcher;
+  private int sessionTimeout;
+  private String quorumServers;
+  private final Random salter;
+
+  // The metadata attached to each piece of data has the
+  // format:
+  //   <magic> 1-byte constant
+  //   <id length> 4-byte big-endian integer (length of next field)
+  //   <id> identifier corresponding uniquely to this process
+  // It is prepended to the data supplied by the user.
+
+  // the magic number is to be backward compatible
+  private static final byte MAGIC =(byte) 0XFF;
+  private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
+  private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
+  private static final int ID_LENGTH_SIZE =  Bytes.SIZEOF_INT;
+
+  public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
+      Watcher watcher, int maxRetries, int retryIntervalMillis)
+  throws IOException {
+    this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
+        null);
+  }
+
+  public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
+      Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
+  throws IOException {
+    this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
+    this.retryCounterFactory =
+      new RetryCounterFactory(maxRetries, retryIntervalMillis);
+
+    if (identifier == null || identifier.length() == 0) {
+      // the identifier = processID@hostName
+      identifier = ManagementFactory.getRuntimeMXBean().getName();
+    }
+    LOG.info("The identifier of this process is " + identifier);
+    this.identifier = identifier;
+    this.id = Bytes.toBytes(identifier);
+
+    this.watcher = watcher;
+    this.sessionTimeout = sessionTimeout;
+    this.quorumServers = quorumServers;
+    salter = new SecureRandom();
+  }
+
+  public void reconnectAfterExpiration()
+        throws IOException, InterruptedException {
+    LOG.info("Closing dead ZooKeeper connection, session" +
+      " was: 0x"+Long.toHexString(zk.getSessionId()));
+    zk.close();
+    this.zk = new ZooKeeper(this.quorumServers,
+      this.sessionTimeout, this.watcher);
+    LOG.info("Recreated a ZooKeeper, session" +
+      " is: 0x"+Long.toHexString(zk.getSessionId()));
+  }
+
+  /**
+   * delete is an idempotent operation. Retry before throwing exception.
+   * This function will not throw NoNodeException if the path does not
+   * exist.
+   */
+  public void delete(String path, int version)
+  throws InterruptedException, KeeperException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    boolean isRetry = false; // False for first attempt, true for all retries.
+    while (true) {
+      try {
+        zk.delete(path, version);
+        return;
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case NONODE:
+            if (isRetry) {
+              LOG.info("Node " + path + " already deleted. Assuming a " +
+                  "previous attempt succeeded.");
+              return;
+            }
+            LOG.warn("Node " + path + " already deleted, retry=" + isRetry);
+            throw e;
+
+          case CONNECTIONLOSS:
+          case SESSIONEXPIRED:
+          case OPERATIONTIMEOUT:
+            retryOrThrow(retryCounter, e, "delete");
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+      isRetry = true;
+    }
+  }
+
+  /**
+   * exists is an idempotent operation. Retry before throwing exception
+   * @return A Stat instance
+   */
+  public Stat exists(String path, Watcher watcher)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return zk.exists(path, watcher);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case SESSIONEXPIRED:
+          case OPERATIONTIMEOUT:
+            retryOrThrow(retryCounter, e, "exists");
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * exists is an idempotent operation. Retry before throwing exception
+   * @return A Stat instance
+   */
+  public Stat exists(String path, boolean watch)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return zk.exists(path, watch);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case SESSIONEXPIRED:
+          case OPERATIONTIMEOUT:
+            retryOrThrow(retryCounter, e, "exists");
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
+      String opName) throws KeeperException {
+    LOG.warn("Possibly transient ZooKeeper exception: " + e);
+    if (!retryCounter.shouldRetry()) {
+      LOG.error("ZooKeeper " + opName + " failed after "
+        + retryCounter.getMaxRetries() + " retries");
+      throw e;
+    }
+  }
+
+  /**
+   * getChildren is an idempotent operation. Retry before throwing exception
+   * @return List of children znodes
+   */
+  public List<String> getChildren(String path, Watcher watcher)
+    throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return zk.getChildren(path, watcher);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case SESSIONEXPIRED:
+          case OPERATIONTIMEOUT:
+            retryOrThrow(retryCounter, e, "getChildren");
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * getChildren is an idempotent operation. Retry before throwing exception
+   * @return List of children znodes
+   */
+  public List<String> getChildren(String path, boolean watch)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        return zk.getChildren(path, watch);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case SESSIONEXPIRED:
+          case OPERATIONTIMEOUT:
+            retryOrThrow(retryCounter, e, "getChildren");
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * getData is an idempotent operation. Retry before throwing exception
+   * @return Data
+   */
+  public byte[] getData(String path, Watcher watcher, Stat stat)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        byte[] revData = zk.getData(path, watcher, stat);
+        return this.removeMetaData(revData);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case SESSIONEXPIRED:
+          case OPERATIONTIMEOUT:
+            retryOrThrow(retryCounter, e, "getData");
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * getData is an idemnpotent operation. Retry before throwing exception
+   * @return Data
+   */
+  public byte[] getData(String path, boolean watch, Stat stat)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try {
+        byte[] revData = zk.getData(path, watch, stat);
+        return this.removeMetaData(revData);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case SESSIONEXPIRED:
+          case OPERATIONTIMEOUT:
+            retryOrThrow(retryCounter, e, "getData");
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  /**
+   * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
+   * Adding an identifier field into the data to check whether
+   * badversion is caused by the result of previous correctly setData
+   * @return Stat instance
+   */
+  public Stat setData(String path, byte[] data, int version)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    byte[] newData = appendMetaData(data);
+    boolean isRetry = false;
+    while (true) {
+      try {
+        return zk.setData(path, newData, version);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case SESSIONEXPIRED:
+          case OPERATIONTIMEOUT:
+            retryOrThrow(retryCounter, e, "setData");
+            break;
+          case BADVERSION:
+            if (isRetry) {
+              // try to verify whether the previous setData success or not
+              try{
+                Stat stat = new Stat();
+                byte[] revData = zk.getData(path, false, stat);
+                if(Bytes.compareTo(revData, newData) == 0) {
+                  // the bad version is caused by previous successful setData
+                  return stat;
+                }
+              } catch(KeeperException keeperException){
+                // the ZK is not reliable at this moment. just throwing exception
+                throw keeperException;
+              }
+            }
+          // throw other exceptions and verified bad version exceptions
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+      isRetry = true;
+    }
+  }
+
+  /**
+   * <p>
+   * NONSEQUENTIAL create is idempotent operation.
+   * Retry before throwing exceptions.
+   * But this function will not throw the NodeExist exception back to the
+   * application.
+   * </p>
+   * <p>
+   * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
+   * identifier to the path to verify, whether the previous one is successful
+   * or not.
+   * </p>
+   *
+   * @return Path
+   */
+  public String create(String path, byte[] data, List<ACL> acl,
+      CreateMode createMode)
+  throws KeeperException, InterruptedException {
+    byte[] newData = appendMetaData(data);
+    switch (createMode) {
+      case EPHEMERAL:
+      case PERSISTENT:
+        return createNonSequential(path, newData, acl, createMode);
+
+      case EPHEMERAL_SEQUENTIAL:
+      case PERSISTENT_SEQUENTIAL:
+        return createSequential(path, newData, acl, createMode);
+
+      default:
+        throw new IllegalArgumentException("Unrecognized CreateMode: " +
+            createMode);
+    }
+  }
+
+  private String createNonSequential(String path, byte[] data, List<ACL> acl,
+      CreateMode createMode) throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    boolean isRetry = false; // False for first attempt, true for all retries.
+    while (true) {
+      try {
+        return zk.create(path, data, acl, createMode);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case NODEEXISTS:
+            if (isRetry) {
+              // If the connection was lost, there is still a possibility that
+              // we have successfully created the node at our previous attempt,
+              // so we read the node and compare.
+              byte[] currentData = zk.getData(path, false, null);
+              if (currentData != null &&
+                  Bytes.compareTo(currentData, data) == 0) {
+                // We successfully created a non-sequential node
+                return path;
+              }
+              LOG.error("Node " + path + " already exists with " +
+                  Bytes.toStringBinary(currentData) + ", could not write " +
+                  Bytes.toStringBinary(data));
+              throw e;
+            }
+            LOG.info("Node " + path + " already exists and this is not a " +
+                "retry");
+            throw e;
+
+          case CONNECTIONLOSS:
+          case SESSIONEXPIRED:
+          case OPERATIONTIMEOUT:
+            retryOrThrow(retryCounter, e, "create");
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+      isRetry = true;
+    }
+  }
+
+  private String createSequential(String path, byte[] data,
+      List<ACL> acl, CreateMode createMode)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    boolean first = true;
+    String newPath = path+this.identifier;
+    while (true) {
+      try {
+        if (!first) {
+          // Check if we succeeded on a previous attempt
+          String previousResult = findPreviousSequentialNode(newPath);
+          if (previousResult != null) {
+            return previousResult;
+          }
+        }
+        first = false;
+        return zk.create(newPath, data, acl, createMode);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case SESSIONEXPIRED:
+          case OPERATIONTIMEOUT:
+            retryOrThrow(retryCounter, e, "create");
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+  /**
+   * Convert Iterable of {@link ZKOp} we got into the ZooKeeper.Op
+   * instances to actually pass to multi (need to do this in order to appendMetaData).
+   */
+  private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
+  throws UnsupportedOperationException {
+    if(ops == null) return null;
+
+    List<Op> preparedOps = new LinkedList<Op>();
+    for (Op op : ops) {
+      if (op.getType() == ZooDefs.OpCode.create) {
+        CreateRequest create = (CreateRequest)op.toRequestRecord();
+        preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
+          create.getAcl(), create.getFlags()));
+      } else if (op.getType() == ZooDefs.OpCode.delete) {
+        // no need to appendMetaData for delete
+        preparedOps.add(op);
+      } else if (op.getType() == ZooDefs.OpCode.setData) {
+        SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
+        preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
+          setData.getVersion()));
+      } else {
+        throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
+      }
+    }
+    return preparedOps;
+  }
+
+  /**
+   * Run multiple operations in a transactional manner. Retry before throwing exception
+   */
+  public List<OpResult> multi(Iterable<Op> ops)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    Iterable<Op> multiOps = prepareZKMulti(ops);
+    while (true) {
+      try {
+        return zk.multi(multiOps);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case SESSIONEXPIRED:
+          case OPERATIONTIMEOUT:
+            retryOrThrow(retryCounter, e, "multi");
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
+  private String findPreviousSequentialNode(String path)
+    throws KeeperException, InterruptedException {
+    int lastSlashIdx = path.lastIndexOf('/');
+    assert(lastSlashIdx != -1);
+    String parent = path.substring(0, lastSlashIdx);
+    String nodePrefix = path.substring(lastSlashIdx+1);
+
+    List<String> nodes = zk.getChildren(parent, false);
+    List<String> matching = filterByPrefix(nodes, nodePrefix);
+    for (String node : matching) {
+      String nodePath = parent + "/" + node;
+      Stat stat = zk.exists(nodePath, false);
+      if (stat != null) {
+        return nodePath;
+      }
+    }
+    return null;
+  }
+
+  public byte[] removeMetaData(byte[] data) {
+    if(data == null || data.length == 0) {
+      return data;
+    }
+    // check the magic data; to be backward compatible
+    byte magic = data[0];
+    if(magic != MAGIC) {
+      return data;
+    }
+
+    int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
+    int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
+    int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
+
+    byte[] newData = new byte[dataLength];
+    System.arraycopy(data, dataOffset, newData, 0, dataLength);
+    return newData;
+  }
+
+  private byte[] appendMetaData(byte[] data) {
+    if(data == null || data.length == 0){
+      return data;
+    }
+    byte[] salt = Bytes.toBytes(salter.nextLong());
+    int idLength = id.length + salt.length;
+    byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+idLength+data.length];
+    int pos = 0;
+    pos = Bytes.putByte(newData, pos, MAGIC);
+    pos = Bytes.putInt(newData, pos, idLength);
+    pos = Bytes.putBytes(newData, pos, id, 0, id.length);
+    pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
+    pos = Bytes.putBytes(newData, pos, data, 0, data.length);
+    return newData;
+  }
+
+  public long getSessionId() {
+    return zk.getSessionId();
+  }
+
+  public void close() throws InterruptedException {
+    zk.close();
+  }
+
+  public States getState() {
+    return zk.getState();
+  }
+
+  public ZooKeeper getZooKeeper() {
+    return zk;
+  }
+
+  public byte[] getSessionPasswd() {
+    return zk.getSessionPasswd();
+  }
+
+  public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) {
+    this.zk.sync(path, null, null);
+  }
+
+  /**
+   * Filters the given node list by the given prefixes.
+   * This method is all-inclusive--if any element in the node list starts
+   * with any of the given prefixes, then it is included in the result.
+   *
+   * @param nodes the nodes to filter
+   * @param prefixes the prefixes to include in the result
+   * @return list of every element that starts with one of the prefixes
+   */
+  private static List<String> filterByPrefix(List<String> nodes,
+      String... prefixes) {
+    List<String> lockChildren = new ArrayList<String>();
+    for (String child : nodes){
+      for (String prefix : prefixes){
+        if (child.startsWith(prefix)){
+          lockChildren.add(child);
+          break;
+        }
+      }
+    }
+    return lockChildren;
+  }
+
+  public String getIdentifier() {
+    return identifier;
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracks the root region server location node in zookeeper.
+ * Root region location is set by <code>RegionServerServices</code>.
+ * This class has a watcher on the root location and notices changes.
+ */
+@InterfaceAudience.Private
+public class RootRegionTracker extends ZooKeeperNodeTracker {
+  /**
+   * Creates a root region location tracker.
+   *
+   * <p>After construction, use {@link #start} to kick off tracking.
+   *
+   * @param watcher
+   * @param abortable
+   */
+  public RootRegionTracker(ZooKeeperWatcher watcher, Abortable abortable) {
+    super(watcher, watcher.rootServerZNode, abortable);
+  }
+
+  /**
+   * Checks if the root region location is available.
+   * @return true if root region location is available, false if not
+   */
+  public boolean isLocationAvailable() {
+    return super.getData(true) != null;
+  }
+
+  /**
+   * Gets the root region location, if available.  Does not block.  Sets a watcher.
+   * @return server name or null if we failed to get the data.
+   * @throws InterruptedException
+   */
+  public ServerName getRootRegionLocation() throws InterruptedException {
+    try {
+      return ServerName.parseFrom(super.getData(true));
+    } catch (DeserializationException e) {
+      LOG.warn("Failed parse", e);
+      return null;
+    }
+  }
+
+  /**
+   * Gets the root region location, if available.  Does not block.  Does not set
+   * a watcher (In this regard it differs from {@link #getRootRegionLocation()}.
+   * @param zkw
+   * @return server name or null if we failed to get the data.
+   * @throws KeeperException
+   */
+  public static ServerName getRootRegionLocation(final ZooKeeperWatcher zkw)
+  throws KeeperException {
+    try {
+      return ServerName.parseFrom(ZKUtil.getData(zkw, zkw.rootServerZNode));
+    } catch (DeserializationException e) {
+      throw ZKUtil.convert(e);
+    }
+  }
+
+  /**
+   * Gets the root region location, if available, and waits for up to the
+   * specified timeout if not immediately available.
+   * Given the zookeeper notification could be delayed, we will try to
+   * get the latest data.
+   * @param timeout maximum time to wait, in millis
+   * @return server name for server hosting root region formatted as per
+   * {@link ServerName}, or null if none available
+   * @throws InterruptedException if interrupted while waiting
+   */
+  public ServerName waitRootRegionLocation(long timeout)
+  throws InterruptedException {
+    if (false == checkIfBaseNodeAvailable()) {
+      String errorMsg = "Check the value configured in 'zookeeper.znode.parent'. "
+          + "There could be a mismatch with the one configured in the master.";
+      LOG.error(errorMsg);
+      throw new IllegalArgumentException(errorMsg);
+    }
+    try {
+      return ServerName.parseFrom(super.blockUntilAvailable(timeout, true));
+    } catch (DeserializationException e) {
+      LOG.warn("Failed parse", e);
+      return null;
+    }
+  }
+
+  /**
+   * Sets the location of <code>-ROOT-</code> in ZooKeeper to the
+   * specified server address.
+   * @param zookeeper zookeeper reference
+   * @param location The server hosting <code>-ROOT-</code>
+   * @throws KeeperException unexpected zookeeper exception
+   */
+  public static void setRootLocation(ZooKeeperWatcher zookeeper,
+      final ServerName location)
+  throws KeeperException {
+    LOG.info("Setting ROOT region location in ZooKeeper as " + location);
+    // Make the RootRegionServer pb and then get its bytes and save this as
+    // the znode content.
+    byte [] data = toByteArray(location);
+    try {
+      ZKUtil.createAndWatch(zookeeper, zookeeper.rootServerZNode, data);
+    } catch(KeeperException.NodeExistsException nee) {
+      LOG.debug("ROOT region location already existed, updated location");
+      ZKUtil.setData(zookeeper, zookeeper.rootServerZNode, data);
+    }
+  }
+
+  /**
+   * Build up the znode content.
+   * @param sn What to put into the znode.
+   * @return The content of the root-region-server znode
+   */
+  static byte [] toByteArray(final ServerName sn) {
+    // ZNode content is a pb message preceeded by some pb magic.
+    HBaseProtos.ServerName pbsn =
+      HBaseProtos.ServerName.newBuilder().setHostName(sn.getHostname()).
+      setPort(sn.getPort()).setStartCode(sn.getStartcode()).build();
+    ZooKeeperProtos.RootRegionServer pbrsr =
+      ZooKeeperProtos.RootRegionServer.newBuilder().setServer(pbsn).build();
+    return ProtobufUtil.prependPBMagic(pbrsr.toByteArray());
+  }
+
+  /**
+   * Deletes the location of <code>-ROOT-</code> in ZooKeeper.
+   * @param zookeeper zookeeper reference
+   * @throws KeeperException unexpected zookeeper exception
+   */
+  public static void deleteRootLocation(ZooKeeperWatcher zookeeper)
+  throws KeeperException {
+    LOG.info("Unsetting ROOT region location in ZooKeeper");
+    try {
+      // Just delete the node.  Don't need any watches.
+      ZKUtil.deleteNode(zookeeper, zookeeper.rootServerZNode);
+    } catch(KeeperException.NoNodeException nne) {
+      // Has already been deleted
+    }
+  }
+
+  /**
+   * Wait until the root region is available.
+   * @param zkw
+   * @param timeout
+   * @return ServerName or null if we timed out.
+   * @throws InterruptedException
+   */
+  public static ServerName blockUntilAvailable(final ZooKeeperWatcher zkw,
+      final long timeout)
+  throws InterruptedException {
+    byte [] data = ZKUtil.blockUntilAvailable(zkw, zkw.rootServerZNode, timeout);
+    if (data == null) return null;
+    try {
+      return ServerName.parseFrom(data);
+    } catch (DeserializationException e) {
+      LOG.warn("Failed parse", e);
+      return null;
+    }
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,963 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionTransition;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+// We should not be importing this Type here, nor a RegionTransition, etc.  This class should be
+// about zk and bytes only.
+
+/**
+ * Utility class for doing region assignment in ZooKeeper.  This class extends
+ * stuff done in {@link ZKUtil} to cover specific assignment operations.
+ * <p>
+ * Contains only static methods and constants.
+ * <p>
+ * Used by both the Master and RegionServer.
+ * <p>
+ * All valid transitions outlined below:
+ * <p>
+ * <b>MASTER</b>
+ * <ol>
+ *   <li>
+ *     Master creates an unassigned node as OFFLINE.
+ *     - Cluster startup and table enabling.
+ *   </li>
+ *   <li>
+ *     Master forces an existing unassigned node to OFFLINE.
+ *     - RegionServer failure.
+ *     - Allows transitions from all states to OFFLINE.
+ *   </li>
+ *   <li>
+ *     Master deletes an unassigned node that was in a OPENED state.
+ *     - Normal region transitions.  Besides cluster startup, no other deletions
+ *     of unassigned nodes is allowed.
+ *   </li>
+ *   <li>
+ *     Master deletes all unassigned nodes regardless of state.
+ *     - Cluster startup before any assignment happens.
+ *   </li>
+ * </ol>
+ * <p>
+ * <b>REGIONSERVER</b>
+ * <ol>
+ *   <li>
+ *     RegionServer creates an unassigned node as CLOSING.
+ *     - All region closes will do this in response to a CLOSE RPC from Master.
+ *     - A node can never be transitioned to CLOSING, only created.
+ *   </li>
+ *   <li>
+ *     RegionServer transitions an unassigned node from CLOSING to CLOSED.
+ *     - Normal region closes.  CAS operation.
+ *   </li>
+ *   <li>
+ *     RegionServer transitions an unassigned node from OFFLINE to OPENING.
+ *     - All region opens will do this in response to an OPEN RPC from the Master.
+ *     - Normal region opens.  CAS operation.
+ *   </li>
+ *   <li>
+ *     RegionServer transitions an unassigned node from OPENING to OPENED.
+ *     - Normal region opens.  CAS operation.
+ *   </li>
+ * </ol>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ZKAssign {
+  private static final Log LOG = LogFactory.getLog(ZKAssign.class);
+
+  /**
+   * Gets the full path node name for the unassigned node for the specified
+   * region.
+   * @param zkw zk reference
+   * @param regionName region name
+   * @return full path node name
+   */
+  public static String getNodeName(ZooKeeperWatcher zkw, String regionName) {
+    return ZKUtil.joinZNode(zkw.assignmentZNode, regionName);
+  }
+
+  /**
+   * Gets the region name from the full path node name of an unassigned node.
+   * @param path full zk path
+   * @return region name
+   */
+  public static String getRegionName(ZooKeeperWatcher zkw, String path) {
+    return path.substring(zkw.assignmentZNode.length()+1);
+  }
+
+  // Master methods
+
+  /**
+   * Creates a new unassigned node in the OFFLINE state for the specified region.
+   *
+   * <p>Does not transition nodes from other states.  If a node already exists
+   * for this region, a {@link NodeExistsException} will be thrown.
+   *
+   * <p>Sets a watcher on the unassigned region node if the method is successful.
+   *
+   * <p>This method should only be used during cluster startup and the enabling
+   * of a table.
+   *
+   * @param zkw zk reference
+   * @param region region to be created as offline
+   * @param serverName server transition will happen on
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws KeeperException.NodeExistsException if node already exists
+   */
+  public static void createNodeOffline(ZooKeeperWatcher zkw, HRegionInfo region,
+      ServerName serverName)
+  throws KeeperException, KeeperException.NodeExistsException {
+    createNodeOffline(zkw, region, serverName, EventType.M_ZK_REGION_OFFLINE);
+  }
+
+  public static void createNodeOffline(ZooKeeperWatcher zkw, HRegionInfo region,
+      ServerName serverName, final EventType event)
+  throws KeeperException, KeeperException.NodeExistsException {
+    LOG.debug(zkw.prefix("Creating unassigned node for " +
+      region.getEncodedName() + " in OFFLINE state"));
+    RegionTransition rt =
+      RegionTransition.createRegionTransition(event, region.getRegionName(), serverName);
+    String node = getNodeName(zkw, region.getEncodedName());
+    ZKUtil.createAndWatch(zkw, node, rt.toByteArray());
+  }
+
+  /**
+   * Creates an unassigned node in the OFFLINE state for the specified region.
+   * <p>
+   * Runs asynchronously.  Depends on no pre-existing znode.
+   *
+   * <p>Sets a watcher on the unassigned region node.
+   *
+   * @param zkw zk reference
+   * @param region region to be created as offline
+   * @param serverName server transition will happen on
+   * @param cb
+   * @param ctx
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws KeeperException.NodeExistsException if node already exists
+   */
+  public static void asyncCreateNodeOffline(ZooKeeperWatcher zkw,
+      HRegionInfo region, ServerName serverName,
+      final AsyncCallback.StringCallback cb, final Object ctx)
+  throws KeeperException {
+    LOG.debug(zkw.prefix("Async create of unassigned node for " +
+      region.getEncodedName() + " with OFFLINE state"));
+    RegionTransition rt =
+      RegionTransition.createRegionTransition(
+          EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
+    String node = getNodeName(zkw, region.getEncodedName());
+    ZKUtil.asyncCreate(zkw, node, rt.toByteArray(), cb, ctx);
+  }
+
+  /**
+   * Creates or force updates an unassigned node to the OFFLINE state for the
+   * specified region.
+   * <p>
+   * Attempts to create the node but if it exists will force it to transition to
+   * and OFFLINE state.
+   *
+   * <p>Sets a watcher on the unassigned region node if the method is
+   * successful.
+   *
+   * <p>This method should be used when assigning a region.
+   *
+   * @param zkw zk reference
+   * @param region region to be created as offline
+   * @param serverName server transition will happen on
+   * @return the version of the znode created in OFFLINE state, -1 if
+   *         unsuccessful.
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws KeeperException.NodeExistsException if node already exists
+   */
+  public static int createOrForceNodeOffline(ZooKeeperWatcher zkw,
+      HRegionInfo region, ServerName serverName) throws KeeperException {
+    LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " +
+      region.getEncodedName() + " with OFFLINE state"));
+    RegionTransition rt = RegionTransition.createRegionTransition(EventType.M_ZK_REGION_OFFLINE,
+      region.getRegionName(), serverName, HConstants.EMPTY_BYTE_ARRAY);
+    byte [] data = rt.toByteArray();
+    String node = getNodeName(zkw, region.getEncodedName());
+    zkw.sync(node);
+    int version = ZKUtil.checkExists(zkw, node);
+    if (version == -1) {
+      return ZKUtil.createAndWatch(zkw, node, data);
+    } else {
+      boolean setData = false;
+      try {
+        setData = ZKUtil.setData(zkw, node, data, version);
+        // Setdata throws KeeperException which aborts the Master. So we are
+        // catching it here.
+        // If just before setting the znode to OFFLINE if the RS has made any
+        // change to the
+        // znode state then we need to return -1.
+      } catch (KeeperException kpe) {
+        LOG.info("Version mismatch while setting the node to OFFLINE state.");
+        return -1;
+      }
+      if (!setData) {
+        return -1;
+      } else {
+        // We successfully forced to OFFLINE, reset watch and handle if
+        // the state changed in between our set and the watch
+        byte [] bytes = ZKAssign.getData(zkw, region.getEncodedName());
+        rt = getRegionTransition(bytes);
+        if (rt.getEventType() != EventType.M_ZK_REGION_OFFLINE) {
+          // state changed, need to process
+          return -1;
+        }
+      }
+    }
+    return version + 1;
+  }
+
+  /**
+   * Deletes an existing unassigned node that is in the OPENED state for the
+   * specified region.
+   *
+   * <p>If a node does not already exist for this region, a
+   * {@link NoNodeException} will be thrown.
+   *
+   * <p>No watcher is set whether this succeeds or not.
+   *
+   * <p>Returns false if the node was not in the proper state but did exist.
+   *
+   * <p>This method is used during normal region transitions when a region
+   * finishes successfully opening.  This is the Master acknowledging completion
+   * of the specified regions transition.
+   *
+   * @param zkw zk reference
+   * @param encodedRegionName opened region to be deleted from zk
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws KeeperException.NoNodeException if node does not exist
+   */
+  public static boolean deleteOpenedNode(ZooKeeperWatcher zkw,
+      String encodedRegionName)
+  throws KeeperException, KeeperException.NoNodeException {
+    return deleteNode(zkw, encodedRegionName, EventType.RS_ZK_REGION_OPENED);
+  }
+
+  /**
+   * Deletes an existing unassigned node that is in the OFFLINE state for the
+   * specified region.
+   *
+   * <p>If a node does not already exist for this region, a
+   * {@link NoNodeException} will be thrown.
+   *
+   * <p>No watcher is set whether this succeeds or not.
+   *
+   * <p>Returns false if the node was not in the proper state but did exist.
+   *
+   * <p>This method is used during master failover when the regions on an RS
+   * that has died are all set to OFFLINE before being processed.
+   *
+   * @param zkw zk reference
+   * @param encodedRegionName closed region to be deleted from zk
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws KeeperException.NoNodeException if node does not exist
+   */
+  public static boolean deleteOfflineNode(ZooKeeperWatcher zkw,
+      String encodedRegionName)
+  throws KeeperException, KeeperException.NoNodeException {
+    return deleteNode(zkw, encodedRegionName, EventType.M_ZK_REGION_OFFLINE);
+  }
+
+  /**
+   * Deletes an existing unassigned node that is in the CLOSED state for the
+   * specified region.
+   *
+   * <p>If a node does not already exist for this region, a
+   * {@link NoNodeException} will be thrown.
+   *
+   * <p>No watcher is set whether this succeeds or not.
+   *
+   * <p>Returns false if the node was not in the proper state but did exist.
+   *
+   * <p>This method is used during table disables when a region finishes
+   * successfully closing.  This is the Master acknowledging completion
+   * of the specified regions transition to being closed.
+   *
+   * @param zkw zk reference
+   * @param encodedRegionName closed region to be deleted from zk
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws KeeperException.NoNodeException if node does not exist
+   */
+  public static boolean deleteClosedNode(ZooKeeperWatcher zkw,
+      String encodedRegionName)
+  throws KeeperException, KeeperException.NoNodeException {
+    return deleteNode(zkw, encodedRegionName, EventType.RS_ZK_REGION_CLOSED);
+  }
+
+  /**
+   * Deletes an existing unassigned node that is in the CLOSING state for the
+   * specified region.
+   *
+   * <p>If a node does not already exist for this region, a
+   * {@link NoNodeException} will be thrown.
+   *
+   * <p>No watcher is set whether this succeeds or not.
+   *
+   * <p>Returns false if the node was not in the proper state but did exist.
+   *
+   * <p>This method is used during table disables when a region finishes
+   * successfully closing.  This is the Master acknowledging completion
+   * of the specified regions transition to being closed.
+   *
+   * @param zkw zk reference
+   * @param region closing region to be deleted from zk
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws KeeperException.NoNodeException if node does not exist
+   */
+  public static boolean deleteClosingNode(ZooKeeperWatcher zkw,
+      HRegionInfo region)
+  throws KeeperException, KeeperException.NoNodeException {
+    String encodedRegionName = region.getEncodedName();
+    return deleteNode(zkw, encodedRegionName, EventType.M_ZK_REGION_CLOSING);
+  }
+
+  /**
+   * Deletes an existing unassigned node that is in the specified state for the
+   * specified region.
+   *
+   * <p>If a node does not already exist for this region, a
+   * {@link NoNodeException} will be thrown.
+   *
+   * <p>No watcher is set whether this succeeds or not.
+   *
+   * <p>Returns false if the node was not in the proper state but did exist.
+   *
+   * <p>This method is used when a region finishes opening/closing.
+   * The Master acknowledges completion
+   * of the specified regions transition to being closed/opened.
+   *
+   * @param zkw zk reference
+   * @param encodedRegionName region to be deleted from zk
+   * @param expectedState state region must be in for delete to complete
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws KeeperException.NoNodeException if node does not exist
+   */
+  public static boolean deleteNode(ZooKeeperWatcher zkw, String encodedRegionName,
+      EventType expectedState)
+  throws KeeperException, KeeperException.NoNodeException {
+    return deleteNode(zkw, encodedRegionName, expectedState, -1);
+  }
+
+  /**
+   * Deletes an existing unassigned node that is in the specified state for the
+   * specified region.
+   *
+   * <p>If a node does not already exist for this region, a
+   * {@link NoNodeException} will be thrown.
+   *
+   * <p>No watcher is set whether this succeeds or not.
+   *
+   * <p>Returns false if the node was not in the proper state but did exist.
+   *
+   * <p>This method is used when a region finishes opening/closing.
+   * The Master acknowledges completion
+   * of the specified regions transition to being closed/opened.
+   *
+   * @param zkw zk reference
+   * @param encodedRegionName region to be deleted from zk
+   * @param expectedState state region must be in for delete to complete
+   * @param expectedVersion of the znode that is to be deleted.
+   *        If expectedVersion need not be compared while deleting the znode
+   *        pass -1
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws KeeperException.NoNodeException if node does not exist
+   */
+  public static boolean deleteNode(ZooKeeperWatcher zkw, String encodedRegionName,
+      EventType expectedState, int expectedVersion)
+  throws KeeperException, KeeperException.NoNodeException {
+    LOG.debug(zkw.prefix("Deleting existing unassigned " +
+      "node for " + encodedRegionName + " that is in expected state " + expectedState));
+    String node = getNodeName(zkw, encodedRegionName);
+    zkw.sync(node);
+    Stat stat = new Stat();
+    byte [] bytes = ZKUtil.getDataNoWatch(zkw, node, stat);
+    if (bytes == null) {
+      // If it came back null, node does not exist.
+      throw KeeperException.create(Code.NONODE);
+    }
+    RegionTransition rt = getRegionTransition(bytes);
+    EventType et = rt.getEventType();
+    if (!et.equals(expectedState)) {
+      LOG.warn(zkw.prefix("Attempting to delete unassigned node " + encodedRegionName + " in " +
+        expectedState + " state but node is in " + et + " state"));
+      return false;
+    }
+    if (expectedVersion != -1
+        && stat.getVersion() != expectedVersion) {
+      LOG.warn("The node " + encodedRegionName + " we are trying to delete is not" +
+        " the expected one. Got a version mismatch");
+      return false;
+    }
+    if(!ZKUtil.deleteNode(zkw, node, stat.getVersion())) {
+      LOG.warn(zkw.prefix("Attempting to delete " +
+          "unassigned node " + encodedRegionName + " in " + expectedState +
+          " state but after verifying state, we got a version mismatch"));
+      return false;
+    }
+    LOG.debug(zkw.prefix("Successfully deleted unassigned node for region " +
+        encodedRegionName + " in expected state " + expectedState));
+    return true;
+  }
+
+  /**
+   * Deletes all unassigned nodes regardless of their state.
+   *
+   * <p>No watchers are set.
+   *
+   * <p>This method is used by the Master during cluster startup to clear out
+   * any existing state from other cluster runs.
+   *
+   * @param zkw zk reference
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static void deleteAllNodes(ZooKeeperWatcher zkw)
+  throws KeeperException {
+    LOG.debug(zkw.prefix("Deleting any existing unassigned nodes"));
+    ZKUtil.deleteChildrenRecursively(zkw, zkw.assignmentZNode);
+  }
+
+  /**
+   * Creates a new unassigned node in the CLOSING state for the specified
+   * region.
+   *
+   * <p>Does not transition nodes from any states.  If a node already exists
+   * for this region, a {@link NodeExistsException} will be thrown.
+   *
+   * <p>If creation is successful, returns the version number of the CLOSING
+   * node created.
+   *
+   * <p>Set a watch.
+   *
+   * <p>This method should only be used by a Master when initiating a
+   * close of a region before sending a close request to the region server.
+   *
+   * @param zkw zk reference
+   * @param region region to be created as closing
+   * @param serverName server transition will happen on
+   * @return version of node after transition, -1 if unsuccessful transition
+   * @throws KeeperException if unexpected zookeeper exception
+   * @throws KeeperException.NodeExistsException if node already exists
+   */
+  public static int createNodeClosing(ZooKeeperWatcher zkw, HRegionInfo region,
+      ServerName serverName)
+  throws KeeperException, KeeperException.NodeExistsException {
+    LOG.debug(zkw.prefix("Creating unassigned node for " +
+      region.getEncodedName() + " in a CLOSING state"));
+    RegionTransition rt = RegionTransition.createRegionTransition(EventType.M_ZK_REGION_CLOSING,
+      region.getRegionName(), serverName, HConstants.EMPTY_BYTE_ARRAY);
+    String node = getNodeName(zkw, region.getEncodedName());
+    return ZKUtil.createAndWatch(zkw, node, rt.toByteArray());
+  }
+
+  // RegionServer methods
+
+  /**
+   * Transitions an existing unassigned node for the specified region which is
+   * currently in the CLOSING state to be in the CLOSED state.
+   *
+   * <p>Does not transition nodes from other states.  If for some reason the
+   * node could not be transitioned, the method returns -1.  If the transition
+   * is successful, the version of the node after transition is returned.
+   *
+   * <p>This method can fail and return false for three different reasons:
+   * <ul><li>Unassigned node for this region does not exist</li>
+   * <li>Unassigned node for this region is not in CLOSING state</li>
+   * <li>After verifying CLOSING state, update fails because of wrong version
+   * (someone else already transitioned the node)</li>
+   * </ul>
+   *
+   * <p>Does not set any watches.
+   *
+   * <p>This method should only be used by a RegionServer when initiating a
+   * close of a region after receiving a CLOSE RPC from the Master.
+   *
+   * @param zkw zk reference
+   * @param region region to be transitioned to closed
+   * @param serverName server transition happens on
+   * @return version of node after transition, -1 if unsuccessful transition
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static int transitionNodeClosed(ZooKeeperWatcher zkw,
+      HRegionInfo region, ServerName serverName, int expectedVersion)
+  throws KeeperException {
+    return transitionNode(zkw, region, serverName,
+        EventType.M_ZK_REGION_CLOSING,
+        EventType.RS_ZK_REGION_CLOSED, expectedVersion);
+  }
+
+  /**
+   * Transitions an existing unassigned node for the specified region which is
+   * currently in the OFFLINE state to be in the OPENING state.
+   *
+   * <p>Does not transition nodes from other states.  If for some reason the
+   * node could not be transitioned, the method returns -1.  If the transition
+   * is successful, the version of the node written as OPENING is returned.
+   *
+   * <p>This method can fail and return -1 for three different reasons:
+   * <ul><li>Unassigned node for this region does not exist</li>
+   * <li>Unassigned node for this region is not in OFFLINE state</li>
+   * <li>After verifying OFFLINE state, update fails because of wrong version
+   * (someone else already transitioned the node)</li>
+   * </ul>
+   *
+   * <p>Does not set any watches.
+   *
+   * <p>This method should only be used by a RegionServer when initiating an
+   * open of a region after receiving an OPEN RPC from the Master.
+   *
+   * @param zkw zk reference
+   * @param region region to be transitioned to opening
+   * @param serverName server transition happens on
+   * @return version of node after transition, -1 if unsuccessful transition
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static int transitionNodeOpening(ZooKeeperWatcher zkw,
+      HRegionInfo region, ServerName serverName)
+  throws KeeperException {
+    return transitionNodeOpening(zkw, region, serverName,
+      EventType.M_ZK_REGION_OFFLINE);
+  }
+
+  public static int transitionNodeOpening(ZooKeeperWatcher zkw,
+      HRegionInfo region, ServerName serverName, final EventType beginState)
+  throws KeeperException {
+    return transitionNode(zkw, region, serverName, beginState,
+      EventType.RS_ZK_REGION_OPENING, -1);
+  }
+
+  /**
+   * Retransitions an existing unassigned node for the specified region which is
+   * currently in the OPENING state to be in the OPENING state.
+   *
+   * <p>Does not transition nodes from other states.  If for some reason the
+   * node could not be transitioned, the method returns -1.  If the transition
+   * is successful, the version of the node rewritten as OPENING is returned.
+   *
+   * <p>This method can fail and return -1 for three different reasons:
+   * <ul><li>Unassigned node for this region does not exist</li>
+   * <li>Unassigned node for this region is not in OPENING state</li>
+   * <li>After verifying OPENING state, update fails because of wrong version
+   * (someone else already transitioned the node)</li>
+   * </ul>
+   *
+   * <p>Does not set any watches.
+   *
+   * <p>This method should only be used by a RegionServer when initiating an
+   * open of a region after receiving an OPEN RPC from the Master.
+   *
+   * @param zkw zk reference
+   * @param region region to be transitioned to opening
+   * @param serverName server transition happens on
+   * @return version of node after transition, -1 if unsuccessful transition
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static int retransitionNodeOpening(ZooKeeperWatcher zkw,
+      HRegionInfo region, ServerName serverName, int expectedVersion)
+  throws KeeperException {
+    return transitionNode(zkw, region, serverName,
+        EventType.RS_ZK_REGION_OPENING,
+        EventType.RS_ZK_REGION_OPENING, expectedVersion);
+  }
+
+  /**
+   * Transitions an existing unassigned node for the specified region which is
+   * currently in the OPENING state to be in the OPENED state.
+   *
+   * <p>Does not transition nodes from other states.  If for some reason the
+   * node could not be transitioned, the method returns -1.  If the transition
+   * is successful, the version of the node after transition is returned.
+   *
+   * <p>This method can fail and return false for three different reasons:
+   * <ul><li>Unassigned node for this region does not exist</li>
+   * <li>Unassigned node for this region is not in OPENING state</li>
+   * <li>After verifying OPENING state, update fails because of wrong version
+   * (this should never actually happen since an RS only does this transition
+   * following a transition to OPENING.  if two RS are conflicting, one would
+   * fail the original transition to OPENING and not this transition)</li>
+   * </ul>
+   *
+   * <p>Does not set any watches.
+   *
+   * <p>This method should only be used by a RegionServer when completing the
+   * open of a region.
+   *
+   * @param zkw zk reference
+   * @param region region to be transitioned to opened
+   * @param serverName server transition happens on
+   * @return version of node after transition, -1 if unsuccessful transition
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static int transitionNodeOpened(ZooKeeperWatcher zkw,
+      HRegionInfo region, ServerName serverName, int expectedVersion)
+  throws KeeperException {
+    return transitionNode(zkw, region, serverName,
+        EventType.RS_ZK_REGION_OPENING,
+        EventType.RS_ZK_REGION_OPENED, expectedVersion);
+  }
+
+  /**
+   *
+   * @param zkw zk reference
+   * @param region region to be closed
+   * @param expectedVersion expected version of the znode
+   * @return true if the znode exists, has the right version and the right state. False otherwise.
+   * @throws KeeperException
+   */
+  public static boolean checkClosingState(ZooKeeperWatcher zkw, HRegionInfo region,
+                                          int expectedVersion) throws KeeperException {
+
+    final String encoded = getNodeName(zkw, region.getEncodedName());
+    zkw.sync(encoded);
+
+    // Read existing data of the node
+    Stat stat = new Stat();
+    byte[] existingBytes = ZKUtil.getDataNoWatch(zkw, encoded, stat);
+
+    if (existingBytes == null) {
+      LOG.warn(zkw.prefix("Attempt to check the " +
+          "closing node for " + encoded +
+          ". The node does not exist"));
+      return false;
+    }
+
+    if (expectedVersion != -1 && stat.getVersion() != expectedVersion) {
+      LOG.warn(zkw.prefix("Attempt to check the " +
+          "closing node for " + encoded +
+          ". The node existed but was version " + stat.getVersion() +
+          " not the expected version " + expectedVersion));
+      return false;
+    }
+
+    RegionTransition rt = getRegionTransition(existingBytes);
+
+    if (!EventType.M_ZK_REGION_CLOSING.equals(rt.getEventType())) {
+      LOG.warn(zkw.prefix("Attempt to check the " +
+          "closing node for " + encoded +
+          ". The node existed but was in an unexpected state: " + rt.getEventType()));
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Method that actually performs unassigned node transitions.
+   *
+   * <p>Attempts to transition the unassigned node for the specified region
+   * from the expected state to the state in the specified transition data.
+   *
+   * <p>Method first reads existing data and verifies it is in the expected
+   * state.  If the node does not exist or the node is not in the expected
+   * state, the method returns -1.  If the transition is successful, the
+   * version number of the node following the transition is returned.
+   *
+   * <p>If the read state is what is expected, it attempts to write the new
+   * state and data into the node.  When doing this, it includes the expected
+   * version (determined when the existing state was verified) to ensure that
+   * only one transition is successful.  If there is a version mismatch, the
+   * method returns -1.
+   *
+   * <p>If the write is successful, no watch is set and the method returns true.
+   *
+   * @param zkw zk reference
+   * @param region region to be transitioned to opened
+   * @param serverName server transition happens on
+   * @param endState state to transition node to if all checks pass
+   * @param beginState state the node must currently be in to do transition
+   * @param expectedVersion expected version of data before modification, or -1
+   * @return version of node after transition, -1 if unsuccessful transition
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region,
+      ServerName serverName, EventType beginState, EventType endState,
+      int expectedVersion)
+  throws KeeperException {
+    return transitionNode(zkw, region, serverName, beginState, endState, expectedVersion, null);
+  }
+
+  public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region,
+      ServerName serverName, EventType beginState, EventType endState,
+      int expectedVersion, final byte [] payload)
+  throws KeeperException {
+    String encoded = region.getEncodedName();
+    if(LOG.isDebugEnabled()) {
+      LOG.debug(zkw.prefix("Attempting to transition node " +
+        HRegionInfo.prettyPrint(encoded) +
+        " from " + beginState.toString() + " to " + endState.toString()));
+    }
+
+    String node = getNodeName(zkw, encoded);
+    zkw.sync(node);
+
+    // Read existing data of the node
+    Stat stat = new Stat();
+    byte [] existingBytes = ZKUtil.getDataNoWatch(zkw, node, stat);
+    if (existingBytes == null) {
+      // Node no longer exists.  Return -1. It means unsuccessful transition.
+      return -1;
+    }
+    RegionTransition rt = getRegionTransition(existingBytes);
+
+    // Verify it is the expected version
+    if (expectedVersion != -1 && stat.getVersion() != expectedVersion) {
+      LOG.warn(zkw.prefix("Attempt to transition the " +
+        "unassigned node for " + encoded +
+        " from " + beginState + " to " + endState + " failed, " +
+        "the node existed but was version " + stat.getVersion() +
+        " not the expected version " + expectedVersion));
+        return -1;
+    } else if (beginState.equals(EventType.M_ZK_REGION_OFFLINE)
+        && endState.equals(EventType.RS_ZK_REGION_OPENING)
+        && expectedVersion == -1 && stat.getVersion() != 0) {
+      // the below check ensures that double assignment doesnot happen.
+      // When the node is created for the first time then the expected version
+      // that is passed will be -1 and the version in znode will be 0.
+      // In all other cases the version in znode will be > 0.
+      LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for "
+          + encoded + " from " + beginState + " to " + endState + " failed, "
+          + "the node existed but was version " + stat.getVersion()
+          + " not the expected version " + expectedVersion));
+      return -1;
+    }
+
+    // Verify it is in expected state
+    EventType et = rt.getEventType();
+    if (!et.equals(beginState)) {
+      String existingServer = (rt.getServerName() == null)
+        ? "<unknown>" : rt.getServerName().toString();
+      LOG.warn(zkw.prefix("Attempt to transition the unassigned node for " + encoded
+        + " from " + beginState + " to " + endState + " failed, the node existed but"
+        + " was in the state " + et + " set by the server " + existingServer));
+      return -1;
+    }
+
+    // Write new data, ensuring data has not changed since we last read it
+    try {
+      rt = RegionTransition.createRegionTransition(
+          endState, region.getRegionName(), serverName, payload);
+      if(!ZKUtil.setData(zkw, node, rt.toByteArray(), stat.getVersion())) {
+        LOG.warn(zkw.prefix("Attempt to transition the " +
+        "unassigned node for " + encoded +
+        " from " + beginState + " to " + endState + " failed, " +
+        "the node existed and was in the expected state but then when " +
+        "setting data we got a version mismatch"));
+        return -1;
+      }
+      if(LOG.isDebugEnabled()) {
+        LOG.debug(zkw.prefix("Successfully transitioned node " + encoded +
+          " from " + beginState + " to " + endState));
+      }
+      return stat.getVersion() + 1;
+    } catch (KeeperException.NoNodeException nne) {
+      LOG.warn(zkw.prefix("Attempt to transition the " +
+        "unassigned node for " + encoded +
+        " from " + beginState + " to " + endState + " failed, " +
+        "the node existed and was in the expected state but then when " +
+        "setting data it no longer existed"));
+      return -1;
+    }
+  }
+
+  private static RegionTransition getRegionTransition(final byte [] bytes) throws KeeperException {
+    try {
+      return RegionTransition.parseFrom(bytes);
+    } catch (DeserializationException e) {
+      // Convert to a zk exception for now.  Otherwise have to change API
+      throw ZKUtil.convert(e);
+    }
+  }
+
+  /**
+   * Gets the current data in the unassigned node for the specified region name
+   * or fully-qualified path.
+   *
+   * <p>Returns null if the region does not currently have a node.
+   *
+   * <p>Sets a watch on the node if the node exists.
+   *
+   * @param zkw zk reference
+   * @param pathOrRegionName fully-specified path or region name
+   * @return znode content
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static byte [] getData(ZooKeeperWatcher zkw,
+      String pathOrRegionName)
+  throws KeeperException {
+    String node = getPath(zkw, pathOrRegionName);
+    return ZKUtil.getDataAndWatch(zkw, node);
+  }
+
+  /**
+   * Gets the current data in the unassigned node for the specified region name
+   * or fully-qualified path.
+   *
+   * <p>Returns null if the region does not currently have a node.
+   *
+   * <p>Sets a watch on the node if the node exists.
+   *
+   * @param zkw zk reference
+   * @param pathOrRegionName fully-specified path or region name
+   * @param stat object to populate the version.
+   * @return znode content
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static byte [] getDataAndWatch(ZooKeeperWatcher zkw,
+      String pathOrRegionName, Stat stat)
+  throws KeeperException {
+    String node = getPath(zkw, pathOrRegionName);
+    return ZKUtil.getDataAndWatch(zkw, node, stat);
+  }
+
+  /**
+   * Gets the current data in the unassigned node for the specified region name
+   * or fully-qualified path.
+   *
+   * <p>Returns null if the region does not currently have a node.
+   *
+   * <p>Does not set a watch.
+   *
+   * @param zkw zk reference
+   * @param pathOrRegionName fully-specified path or region name
+   * @param stat object to store node info into on getData call
+   * @return znode content
+   * @throws KeeperException if unexpected zookeeper exception
+   */
+  public static byte [] getDataNoWatch(ZooKeeperWatcher zkw,
+      String pathOrRegionName, Stat stat)
+  throws KeeperException {
+    String node = getPath(zkw, pathOrRegionName);
+    return ZKUtil.getDataNoWatch(zkw, node, stat);
+  }
+
+  /**
+   * @param zkw
+   * @param pathOrRegionName
+   * @return Path to znode
+   */
+  public static String getPath(final ZooKeeperWatcher zkw, final String pathOrRegionName) {
+    return pathOrRegionName.startsWith("/")? pathOrRegionName : getNodeName(zkw, pathOrRegionName);
+  }
+
+  /**
+   * Get the version of the specified znode
+   * @param zkw zk reference
+   * @param region region's info
+   * @return the version of the znode, -1 if it doesn't exist
+   * @throws KeeperException
+   */
+  public static int getVersion(ZooKeeperWatcher zkw, HRegionInfo region)
+    throws KeeperException {
+    String znode = getNodeName(zkw, region.getEncodedName());
+    return ZKUtil.checkExists(zkw, znode);
+  }
+
+  /**
+   * Delete the assignment node regardless of its current state.
+   * <p>
+   * Fail silent even if the node does not exist at all.
+   * @param watcher
+   * @param regionInfo
+   * @throws KeeperException
+   */
+  public static void deleteNodeFailSilent(ZooKeeperWatcher watcher,
+      HRegionInfo regionInfo)
+  throws KeeperException {
+    String node = getNodeName(watcher, regionInfo.getEncodedName());
+    ZKUtil.deleteNodeFailSilent(watcher, node);
+  }
+
+  /**
+   * Blocks until there are no node in regions in transition.
+   * <p>
+   * Used in testing only.
+   * @param zkw zk reference
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public static void blockUntilNoRIT(ZooKeeperWatcher zkw)
+  throws KeeperException, InterruptedException {
+    while (ZKUtil.nodeHasChildren(zkw, zkw.assignmentZNode)) {
+      List<String> znodes =
+        ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.assignmentZNode);
+      if (znodes != null && !znodes.isEmpty()) {
+        for (String znode : znodes) {
+          LOG.debug("ZK RIT -> " + znode);
+        }
+      }
+      Thread.sleep(100);
+    }
+  }
+
+  /**
+   * Blocks until there is at least one node in regions in transition.
+   * <p>
+   * Used in testing only.
+   * @param zkw zk reference
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public static void blockUntilRIT(ZooKeeperWatcher zkw)
+  throws KeeperException, InterruptedException {
+    while (!ZKUtil.nodeHasChildren(zkw, zkw.assignmentZNode)) {
+      List<String> znodes =
+        ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.assignmentZNode);
+      if (znodes == null || znodes.isEmpty()) {
+        LOG.debug("No RIT in ZK");
+      }
+      Thread.sleep(100);
+    }
+  }
+
+  /**
+   * Presume bytes are serialized unassigned data structure
+   * @param znodeBytes
+   * @return String of the deserialized znode bytes.
+   */
+  static String toString(final byte[] znodeBytes) {
+    // This method should not exist.  Used by ZKUtil stringifying RegionTransition.  Have the
+    // method in here so RegionTransition does not leak into ZKUtil.
+    try {
+      RegionTransition rt = RegionTransition.parseFrom(znodeBytes);
+      return rt.toString();
+    } catch (DeserializationException e) {
+      return "";
+    }
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Publishes and synchronizes a unique identifier specific to a given HBase
+ * cluster.  The stored identifier is read from the file system by the active
+ * master on startup, and is subsequently available to all watchers (including
+ * clients).
+ */
+@InterfaceAudience.Private
+public class ZKClusterId {
+  private ZooKeeperWatcher watcher;
+  private Abortable abortable;
+  private String id;
+
+  public ZKClusterId(ZooKeeperWatcher watcher, Abortable abortable) {
+    this.watcher = watcher;
+    this.abortable = abortable;
+  }
+
+  public boolean hasId() {
+    return getId() != null;
+  }
+
+  public String getId() {
+    try {
+      if (id == null) {
+        id = readClusterIdZNode(watcher);
+      }
+    } catch (KeeperException ke) {
+      abortable.abort("Unexpected exception from ZooKeeper reading cluster ID",
+          ke);
+    }
+    return id;
+  }
+
+  public static String readClusterIdZNode(ZooKeeperWatcher watcher)
+  throws KeeperException {
+    if (ZKUtil.checkExists(watcher, watcher.clusterIdZNode) != -1) {
+      byte [] data = ZKUtil.getData(watcher, watcher.clusterIdZNode);
+      if (data != null) {
+        try {
+          return ClusterId.parseFrom(data).toString();
+        } catch (DeserializationException e) {
+          throw ZKUtil.convert(e);
+        }
+      }
+    }
+    return null;
+  }
+
+  public static void setClusterId(ZooKeeperWatcher watcher, ClusterId id)
+      throws KeeperException {
+    ZKUtil.createSetData(watcher, watcher.clusterIdZNode, id.toByteArray());
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,273 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+/**
+ * Utility methods for reading, and building the ZooKeeper configuration.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ZKConfig {
+  private static final Log LOG = LogFactory.getLog(ZKConfig.class);
+
+  private static final String VARIABLE_START = "${";
+  private static final int VARIABLE_START_LENGTH = VARIABLE_START.length();
+  private static final String VARIABLE_END = "}";
+  private static final int VARIABLE_END_LENGTH = VARIABLE_END.length();
+
+  /**
+   * Make a Properties object holding ZooKeeper config.
+   * Parses the corresponding config options from the HBase XML configs
+   * and generates the appropriate ZooKeeper properties.
+   * @param conf Configuration to read from.
+   * @return Properties holding mappings representing ZooKeeper config file.
+   */
+  public static Properties makeZKProps(Configuration conf) {
+    if (conf.getBoolean(HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG,
+        false)) {
+      LOG.warn(
+          "Parsing ZooKeeper's " + HConstants.ZOOKEEPER_CONFIG_NAME +
+          " file for ZK properties " +
+          "has been deprecated. Please instead place all ZK related HBase " +
+          "configuration under the hbase-site.xml, using prefixes " +
+          "of the form '" + HConstants.ZK_CFG_PROPERTY_PREFIX + "', and " +
+          "set property '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG +
+          "' to false");
+      // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read
+      // it and grab its configuration properties.
+      ClassLoader cl = HQuorumPeer.class.getClassLoader();
+      final InputStream inputStream =
+        cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME);
+      if (inputStream != null) {
+        try {
+          return parseZooCfg(conf, inputStream);
+        } catch (IOException e) {
+          LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME +
+                   ", loading from XML files", e);
+        }
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Skipped reading ZK properties file '" +
+            HConstants.ZOOKEEPER_CONFIG_NAME +
+            "' since '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG +
+            "' was not set to true");
+      }
+    }
+
+    // Otherwise, use the configuration options from HBase's XML files.
+    Properties zkProperties = new Properties();
+
+    // Directly map all of the hbase.zookeeper.property.KEY properties.
+    for (Entry<String, String> entry : conf) {
+      String key = entry.getKey();
+      if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
+        String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
+        String value = entry.getValue();
+        // If the value has variables substitutions, need to do a get.
+        if (value.contains(VARIABLE_START)) {
+          value = conf.get(key);
+        }
+        zkProperties.put(zkKey, value);
+      }
+    }
+
+    // If clientPort is not set, assign the default.
+    if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
+      zkProperties.put(HConstants.CLIENT_PORT_STR,
+          HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+    }
+
+    // Create the server.X properties.
+    int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
+    int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
+
+    final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
+                                                 HConstants.LOCALHOST);
+    for (int i = 0; i < serverHosts.length; ++i) {
+      String serverHost = serverHosts[i];
+      String address = serverHost + ":" + peerPort + ":" + leaderPort;
+      String key = "server." + i;
+      zkProperties.put(key, address);
+    }
+
+    return zkProperties;
+  }
+
+  /**
+   * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in.
+   * This method is used for testing so we can pass our own InputStream.
+   * @param conf HBaseConfiguration to use for injecting variables.
+   * @param inputStream InputStream to read from.
+   * @return Properties parsed from config stream with variables substituted.
+   * @throws IOException if anything goes wrong parsing config
+   * @deprecated in 0.96 onwards. HBase will no longer rely on zoo.cfg
+   * availability.
+   */
+  @Deprecated
+  public static Properties parseZooCfg(Configuration conf,
+      InputStream inputStream) throws IOException {
+    Properties properties = new Properties();
+    try {
+      properties.load(inputStream);
+    } catch (IOException e) {
+      final String msg = "fail to read properties from "
+        + HConstants.ZOOKEEPER_CONFIG_NAME;
+      LOG.fatal(msg);
+      throw new IOException(msg, e);
+    }
+    for (Entry<Object, Object> entry : properties.entrySet()) {
+      String value = entry.getValue().toString().trim();
+      String key = entry.getKey().toString().trim();
+      StringBuilder newValue = new StringBuilder();
+      int varStart = value.indexOf(VARIABLE_START);
+      int varEnd = 0;
+      while (varStart != -1) {
+        varEnd = value.indexOf(VARIABLE_END, varStart);
+        if (varEnd == -1) {
+          String msg = "variable at " + varStart + " has no end marker";
+          LOG.fatal(msg);
+          throw new IOException(msg);
+        }
+        String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd);
+
+        String substituteValue = System.getProperty(variable);
+        if (substituteValue == null) {
+          substituteValue = conf.get(variable);
+        }
+        if (substituteValue == null) {
+          String msg = "variable " + variable + " not set in system property "
+                     + "or hbase configs";
+          LOG.fatal(msg);
+          throw new IOException(msg);
+        }
+
+        newValue.append(substituteValue);
+
+        varEnd += VARIABLE_END_LENGTH;
+        varStart = value.indexOf(VARIABLE_START, varEnd);
+      }
+      // Special case for 'hbase.cluster.distributed' property being 'true'
+      if (key.startsWith("server.")) {
+        boolean mode = conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
+        if (mode == HConstants.CLUSTER_IS_DISTRIBUTED && value.startsWith(HConstants.LOCALHOST)) {
+          String msg = "The server in zoo.cfg cannot be set to localhost " +
+              "in a fully-distributed setup because it won't be reachable. " +
+              "See \"Getting Started\" for more information.";
+          LOG.fatal(msg);
+          throw new IOException(msg);
+        }
+      }
+      newValue.append(value.substring(varEnd));
+      properties.setProperty(key, newValue.toString());
+    }
+    return properties;
+  }
+
+  /**
+   * Return the ZK Quorum servers string given zk properties returned by
+   * makeZKProps
+   * @param properties
+   * @return Quorum servers String
+   */
+  public static String getZKQuorumServersString(Properties properties) {
+    String clientPort = null;
+    List<String> servers = new ArrayList<String>();
+
+    // The clientPort option may come after the server.X hosts, so we need to
+    // grab everything and then create the final host:port comma separated list.
+    boolean anyValid = false;
+    for (Entry<Object,Object> property : properties.entrySet()) {
+      String key = property.getKey().toString().trim();
+      String value = property.getValue().toString().trim();
+      if (key.equals("clientPort")) {
+        clientPort = value;
+      }
+      else if (key.startsWith("server.")) {
+        String host = value.substring(0, value.indexOf(':'));
+        servers.add(host);
+        try {
+          //noinspection ResultOfMethodCallIgnored
+          InetAddress.getByName(host);
+          anyValid = true;
+        } catch (UnknownHostException e) {
+          LOG.warn(StringUtils.stringifyException(e));
+        }
+      }
+    }
+
+    if (!anyValid) {
+      LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+      return null;
+    }
+
+    if (clientPort == null) {
+      LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+      return null;
+    }
+
+    if (servers.isEmpty()) {
+      LOG.fatal("No servers were found in provided ZooKeeper configuration. " +
+          "HBase must have a ZooKeeper cluster configured for its " +
+          "operation. Ensure that you've configured '" +
+          HConstants.ZOOKEEPER_QUORUM + "' properly.");
+      return null;
+    }
+
+    StringBuilder hostPortBuilder = new StringBuilder();
+    for (int i = 0; i < servers.size(); ++i) {
+      String host = servers.get(i);
+      if (i > 0) {
+        hostPortBuilder.append(',');
+      }
+      hostPortBuilder.append(host);
+      hostPortBuilder.append(':');
+      hostPortBuilder.append(clientPort);
+    }
+
+    return hostPortBuilder.toString();
+  }
+
+  /**
+   * Return the ZK Quorum servers string given the specified configuration.
+   * @param conf
+   * @return Quorum servers
+   */
+  public static String getZKQuorumServersString(Configuration conf) {
+    return getZKQuorumServersString(makeZKProps(conf));
+  }
+}