You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2011/12/21 07:43:05 UTC
svn commit: r1221635 [2/2] - in /incubator/hcatalog/trunk: ./
storage-drivers/hbase/ storage-drivers/hbase/if/ storage-drivers/hbase/ivy/
storage-drivers/hbase/src/gen-java/ storage-drivers/hbase/src/gen-java/org/
storage-drivers/hbase/src/gen-java/org...
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hcatalog.hbase.snapshot.transaction.thrift.*;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+class ZKUtil {
+
+ private int DEFAULT_SESSION_TIMEOUT = 1000000;
+ private ZooKeeper zkSession;
+ private String baseDir;
+ private String connectString;
+ private static final Log LOG = LogFactory.getLog(ZKUtil.class);
+
+ static enum UpdateMode {
+ APPEND, REMOVE, KEEP_ALIVE
+ };
+
+ ZKUtil(String connection, String baseDir) {
+ this.connectString = connection;
+ this.baseDir = baseDir;
+ }
+
+ /**
+ * This method creates znodes related to table.
+ *
+ * @param table The name of the table.
+ * @param families The list of column families of the table.
+ * @throws IOException
+ */
+ void setUpZnodesForTable(String table, List<String> families)
+ throws IOException {
+
+ String transactionDataTablePath = PathUtil.getTxnDataPath(baseDir, table);
+ ensurePathExists(transactionDataTablePath, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ for (String cf : families) {
+ String runningDataPath = PathUtil.getRunningTxnInfoPath(
+ this.baseDir, table, cf);
+ ensurePathExists(runningDataPath, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ String abortDataPath = PathUtil.getAbortInformationPath(
+ this.baseDir, table, cf);
+ ensurePathExists(abortDataPath, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+
+ }
+
+ /**
+ * This method ensures that a given path exists in zookeeper. If the path
+ * does not exists, it creates one.
+ *
+ * @param path The path of znode that is required to exist.
+ * @param data The data to be associated with the znode.
+ * @param acl The ACLs required.
+ * @param flags The CreateMode for the znode.
+ * @throws IOException
+ */
+ void ensurePathExists(String path, byte[] data, List<ACL> acl,
+ CreateMode flags) throws IOException {
+ String[] dirs = path.split("/");
+ String parentPath = "";
+ for (String subDir : dirs) {
+ if (subDir.equals("") == false) {
+ parentPath = parentPath + "/" + subDir;
+ try {
+ Stat stat = getSession().exists(parentPath, false);
+ if (stat == null) {
+ getSession().create(parentPath, data, acl, flags);
+ }
+ } catch (Exception e) {
+ throw new IOException("Exception while creating path "
+ + parentPath, e);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * This method returns a list of columns of a table which were used in any
+ * of the transactions.
+ *
+ * @param tableName The name of table.
+ * @return List<String> The list of column families in table.
+ * @throws IOException
+ */
+ List<String> getColumnFamiliesOfTable(String tableName) throws IOException {
+ String path = PathUtil.getTxnDataPath(baseDir, tableName);
+ List<String> children = null;
+ List<String> columnFamlies = new ArrayList<String>();
+ try {
+ children = getSession().getChildren(path, false);
+ } catch (KeeperException e) {
+ LOG.warn("Caught: ", e);
+ throw new IOException("Exception while obtaining columns of table.",e);
+ } catch (InterruptedException e) {
+ LOG.warn("Caught: ", e);
+ throw new IOException("Exception while obtaining columns of table.",e);
+ }
+
+ for (String child : children) {
+ if ((child.contains("idgen") == false)
+ && (child.contains("_locknode_") == false)) {
+ columnFamlies.add(child);
+ }
+ }
+ return columnFamlies;
+ }
+
+ /**
+ * This method returns a time stamp for use by the transactions.
+ *
+ * @return long The current timestamp in zookeeper.
+ * @throws IOException
+ */
+ long getTimeStamp() throws IOException {
+ long timeStamp;
+ Stat stat;
+ String clockPath = PathUtil.getClockPath(this.baseDir);
+ ensurePathExists(clockPath, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ try {
+ getSession().exists(clockPath, false);
+ stat = getSession().setData(clockPath, null, -1);
+
+ } catch (KeeperException e) {
+ LOG.warn("Caught: ", e);
+ throw new IOException("Exception while obtaining timestamp ", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Caught: ", e);
+ throw new IOException("Exception while obtaining timestamp ", e);
+ }
+ timeStamp = stat.getMtime();
+ return timeStamp;
+ }
+
+ /**
+ * This method returns the next revision number to be used for any
+ * transaction purposes.
+ *
+ * @param tableName The name of the table.
+ * @return revision number The revision number last used by any transaction.
+ * @throws IOException
+ */
+ long nextId(String tableName) throws IOException {
+ String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
+ ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ String lockNode = PathUtil.getLockManagementNode(idNode);
+ ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
+ long id = idf.obtainID();
+ return id;
+ }
+
+ /**
+ * The latest used revision id of the table.
+ *
+ * @param tableName The name of the table.
+ * @return the long The revision number to use by any transaction.
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ long currentID(String tableName) throws IOException{
+ String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
+ ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ String lockNode = PathUtil.getLockManagementNode(idNode);
+ ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
+ long id = idf.readID();
+ return id;
+ }
+
+ /**
+ * This methods retrieves the list of transaction information associated
+ * with each column/column family of a table.
+ *
+ * @param path The znode path
+ * @return List of FamilyRevision The list of transactions in the given path.
+ * @throws IOException
+ */
+ List<FamilyRevision> getTransactionList(String path)
+ throws IOException {
+
+ byte[] data = getRawData(path, new Stat());
+ ArrayList<FamilyRevision> wtxnList = new ArrayList<FamilyRevision>();
+ if (data == null) {
+ return wtxnList;
+ }
+ StoreFamilyRevisionList txnList = new StoreFamilyRevisionList();
+ deserialize(txnList, data);
+ Iterator<StoreFamilyRevision> itr = txnList.getRevisionListIterator();
+
+ while (itr.hasNext()) {
+ StoreFamilyRevision wtxn = itr.next();
+ wtxnList.add(new FamilyRevision(wtxn.getRevision(), wtxn
+ .getTimestamp()));
+ }
+
+ return wtxnList;
+ }
+
+ /**
+ * This method returns the data associated with the path in zookeeper.
+ *
+ * @param path The znode path
+ * @param stat Zookeeper stat
+ * @return byte array The data stored in the znode.
+ * @throws IOException
+ */
+ byte[] getRawData(String path, Stat stat) throws IOException {
+ byte[] data = null;
+ try {
+ data = getSession().getData(path, false, stat);
+ } catch (Exception e) {
+ throw new IOException(
+ "Exception while obtaining raw data from zookeeper path "
+ + path, e);
+ }
+ return data;
+ }
+
+ /**
+ * This method created the basic znodes in zookeeper for revision
+ * management.
+ *
+ * @throws IOException
+ */
+ void createRootZNodes() throws IOException {
+ String txnBaseNode = PathUtil.getTransactionBasePath(this.baseDir);
+ String clockNode = PathUtil.getClockPath(this.baseDir);
+ ensurePathExists(txnBaseNode, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ ensurePathExists(clockNode, null, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+
+ /**
+ * This method closes the zookeeper session.
+ */
+ void closeZKConnection() {
+ if (zkSession != null) {
+ try {
+ zkSession.close();
+ } catch (InterruptedException e) {
+ LOG.warn("Close failed: ", e);
+ }
+ zkSession = null;
+ LOG.info("Disconnected to ZooKeeper");
+ }
+ }
+
+ /**
+ * This method returns a zookeeper session. If the current session is closed,
+ * then a new session is created.
+ *
+ * @return ZooKeeper An instance of zookeeper client.
+ * @throws IOException
+ */
+ ZooKeeper getSession() throws IOException {
+ if (zkSession == null || zkSession.getState() == States.CLOSED) {
+ synchronized (this) {
+ if (zkSession == null || zkSession.getState() == States.CLOSED) {
+ zkSession = new ZooKeeper(this.connectString,
+ this.DEFAULT_SESSION_TIMEOUT, new ZKWatcher());
+ }
+ }
+ }
+ return zkSession;
+ }
+
+ /**
+ * This method updates the transaction data related to a znode.
+ *
+ * @param String The path to the transaction data.
+ * @param FamilyRevision The FamilyRevision to be updated.
+ * @param UpdateMode The mode to update like append, update, remove.
+ * @throws IOException
+ */
+ void updateData(String path, FamilyRevision updateTx, UpdateMode mode)
+ throws IOException {
+
+ if (updateTx == null) {
+ throw new IOException(
+ "The transaction to be updated found to be null.");
+ }
+ List<FamilyRevision> currentData = getTransactionList(path);
+ List<FamilyRevision> newData = new ArrayList<FamilyRevision>();
+ boolean dataFound = false;
+ long updateVersion = updateTx.getRevision();
+ for (FamilyRevision tranx : currentData) {
+ if (tranx.getRevision() != updateVersion) {
+ newData.add(tranx);
+ } else {
+ dataFound = true;
+ }
+ }
+ switch (mode) {
+ case REMOVE:
+ if (dataFound == false) {
+ throw new IOException(
+ "The transaction to be removed not found in the data.");
+ }
+ LOG.info("Removed trasaction : " + updateTx.toString());
+ break;
+ case KEEP_ALIVE:
+ if (dataFound == false) {
+ throw new IOException(
+ "The transaction to be kept alove not found in the data. It might have been expired.");
+ }
+ newData.add(updateTx);
+ LOG.info("keep alive of transaction : " + updateTx.toString());
+ break;
+ case APPEND:
+ if (dataFound == true) {
+ throw new IOException(
+ "The data to be appended already exists.");
+ }
+ newData.add(updateTx);
+ LOG.info("Added transaction : " + updateTx.toString());
+ break;
+ }
+
+ // For serialization purposes.
+ List<StoreFamilyRevision> newTxnList = new ArrayList<StoreFamilyRevision>();
+ for (FamilyRevision wtxn : newData) {
+ StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
+ wtxn.getExpireTimestamp());
+ newTxnList.add(newTxn);
+ }
+ StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
+ byte[] newByteData = serialize(wtxnList);
+
+ Stat stat = null;
+ try {
+ stat = zkSession.setData(path, newByteData, -1);
+ } catch (KeeperException e) {
+ throw new IOException(
+ "Exception while updating trasactional data. ", e);
+ } catch (InterruptedException e) {
+ throw new IOException(
+ "Exception while updating trasactional data. ", e);
+ }
+
+ if (stat != null) {
+ LOG.info("Transaction list stored at " + path + ".");
+ }
+
+ }
+
+ /**
+ * Refresh transactions on a given transaction data path.
+ *
+ * @param path The path to the transaction data.
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ void refreshTransactions(String path) throws IOException{
+ List<FamilyRevision> currentData = getTransactionList(path);
+ List<FamilyRevision> newData = new ArrayList<FamilyRevision>();
+
+ for (FamilyRevision tranx : currentData) {
+ if (tranx.getExpireTimestamp() > getTimeStamp()) {
+ newData.add(tranx);
+ }
+ }
+
+ if(newData.equals(currentData) == false){
+ List<StoreFamilyRevision> newTxnList = new ArrayList<StoreFamilyRevision>();
+ for (FamilyRevision wtxn : newData) {
+ StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
+ wtxn.getExpireTimestamp());
+ newTxnList.add(newTxn);
+ }
+ StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
+ byte[] newByteData = serialize(wtxnList);
+
+ try {
+ zkSession.setData(path, newByteData, -1);
+ } catch (KeeperException e) {
+ throw new IOException(
+ "Exception while updating trasactional data. ", e);
+ } catch (InterruptedException e) {
+ throw new IOException(
+ "Exception while updating trasactional data. ", e);
+ }
+
+ }
+
+ }
+
+ /**
+ * This method serializes a given instance of TBase object.
+ *
+ * @param obj An instance of TBase
+ * @return byte array The serialized data.
+ * @throws IOException
+ */
+ static byte[] serialize(TBase obj) throws IOException {
+ if (obj == null)
+ return new byte[0];
+ try {
+ TSerializer serializer = new TSerializer(
+ new TBinaryProtocol.Factory());
+ byte[] bytes = serializer.serialize(obj);
+ return bytes;
+ } catch (Exception e) {
+ throw new IOException("Serialization error: ", e);
+ }
+ }
+
+
+ /**
+ * This method deserializes the given byte array into the TBase object.
+ *
+ * @param obj An instance of TBase
+ * @param data Output of deserialization.
+ * @throws IOException
+ */
+ static void deserialize(TBase obj, byte[] data) throws IOException {
+ if (data == null || data.length == 0)
+ return;
+ try {
+ TDeserializer deserializer = new TDeserializer(
+ new TBinaryProtocol.Factory());
+ deserializer.deserialize(obj, data);
+ } catch (Exception e) {
+ throw new IOException("Deserialization error: " + e.getMessage(), e);
+ }
+ }
+
+ private class ZKWatcher implements Watcher {
+ public void process(WatchedEvent event) {
+ switch (event.getState()) {
+ case Expired:
+ LOG.info("The client session has expired. Try opening a new "
+ + "session and connecting again.");
+ zkSession = null;
+ break;
+ default:
+
+ }
+ }
+ }
+
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+/**
+ * This class has two methods which are call
+ * back methods when a lock is acquired and
+ * when the lock is released.
+ * This class has been used as-is from the zookeeper 3.3.4 recipes minor changes
+ * in the package name.
+ */
+public interface LockListener {
+ /**
+ * call back called when the lock
+ * is acquired
+ */
+ public void lockAcquired();
+
+ /**
+ * call back called when the lock is
+ * released.
+ */
+ public void lockReleased();
+}
\ No newline at end of file
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,193 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.hcatalog.hbase.snapshot.lock.ZooKeeperOperation;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A base class for protocol implementations which provides a number of higher
+ * level helper methods for working with ZooKeeper along with retrying synchronous
+ * operations if the connection to ZooKeeper closes such as
+ * {@link #retryOperation(ZooKeeperOperation)}
+ * This class has been used as-is from the zookeeper 3.4.0 recipes with
+ * changes in the retry delay, retry count values and package name.
+ */
+class ProtocolSupport {
+ private static final Logger LOG = Logger.getLogger(ProtocolSupport.class);
+
+ protected final ZooKeeper zookeeper;
+ private AtomicBoolean closed = new AtomicBoolean(false);
+ private long retryDelay = 500L;
+ private int retryCount = 3;
+ private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+ public ProtocolSupport(ZooKeeper zookeeper) {
+ this.zookeeper = zookeeper;
+ }
+
+ /**
+ * Closes this strategy and releases any ZooKeeper resources; but keeps the
+ * ZooKeeper instance open
+ */
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ doClose();
+ }
+ }
+
+ /**
+ * return zookeeper client instance
+ * @return zookeeper client instance
+ */
+ public ZooKeeper getZookeeper() {
+ return zookeeper;
+ }
+
+ /**
+ * return the acl its using
+ * @return the acl.
+ */
+ public List<ACL> getAcl() {
+ return acl;
+ }
+
+ /**
+ * set the acl
+ * @param acl the acl to set to
+ */
+ public void setAcl(List<ACL> acl) {
+ this.acl = acl;
+ }
+
+ /**
+ * get the retry delay in milliseconds
+ * @return the retry delay
+ */
+ public long getRetryDelay() {
+ return retryDelay;
+ }
+
+ /**
+ * Sets the time waited between retry delays
+ * @param retryDelay the retry delay
+ */
+ public void setRetryDelay(long retryDelay) {
+ this.retryDelay = retryDelay;
+ }
+
+ /**
+ * Allow derived classes to perform
+ * some custom closing operations to release resources
+ */
+ protected void doClose() {
+ }
+
+
+ /**
+ * Perform the given operation, retrying if the connection fails
+ * @return object. it needs to be cast to the callee's expected
+ * return type.
+ */
+ protected Object retryOperation(ZooKeeperOperation operation)
+ throws KeeperException, InterruptedException {
+ KeeperException exception = null;
+ for (int i = 0; i < retryCount; i++) {
+ try {
+ return operation.execute();
+ } catch (KeeperException.SessionExpiredException e) {
+ LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e);
+ throw e;
+ } catch (KeeperException.ConnectionLossException e) {
+ if (exception == null) {
+ exception = e;
+ }
+ LOG.debug("Attempt " + i + " failed with connection loss so " +
+ "attempting to reconnect: " + e, e);
+ retryDelay(i);
+ }
+ }
+ throw exception;
+ }
+
+ /**
+ * Ensures that the given path exists with no data, the current
+ * ACL and no flags
+ * @param path
+ */
+ protected void ensurePathExists(String path) {
+ ensureExists(path, null, acl, CreateMode.PERSISTENT);
+ }
+
+ /**
+ * Ensures that the given path exists with the given data, ACL and flags
+ * @param path
+ * @param acl
+ * @param flags
+ */
+ protected void ensureExists(final String path, final byte[] data,
+ final List<ACL> acl, final CreateMode flags) {
+ try {
+ retryOperation(new ZooKeeperOperation() {
+ public boolean execute() throws KeeperException, InterruptedException {
+ Stat stat = zookeeper.exists(path, false);
+ if (stat != null) {
+ return true;
+ }
+ zookeeper.create(path, data, acl, flags);
+ return true;
+ }
+ });
+ } catch (KeeperException e) {
+ LOG.warn("Caught: " + e, e);
+ } catch (InterruptedException e) {
+ LOG.warn("Caught: " + e, e);
+ }
+ }
+
+ /**
+ * Returns true if this protocol has been closed
+ * @return true if this protocol is closed
+ */
+ protected boolean isClosed() {
+ return closed.get();
+ }
+
+ /**
+ * Performs a retry delay if this is not the first attempt
+ * @param attemptCount the number of the attempts performed so far
+ */
+ protected void retryDelay(int attemptCount) {
+ if (attemptCount > 0) {
+ try {
+ Thread.sleep(attemptCount * retryDelay);
+ } catch (InterruptedException e) {
+ LOG.debug("Failed to sleep: " + e, e);
+ }
+ }
+ }
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,297 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * A <a href="package.html">protocol to implement an exclusive
+ * write lock or to elect a leader</a>. <p/> You invoke {@link #lock()} to
+ * start the process of grabbing the lock; you may get the lock then or it may be
+ * some time later. <p/> You can register a listener so that you are invoked
+ * when you get the lock; otherwise you can ask if you have the lock
+ * by calling {@link #isOwner()}
+ * This class has been used as-is from the zookeeper 3.4.0 recipes. The only change
+ * made is a TODO for sorting using suffixes and the package name.
+ */
+public class WriteLock extends ProtocolSupport {
+ private static final Logger LOG = Logger.getLogger(WriteLock.class);
+
+ private final String dir;
+ private String id;
+ private ZNodeName idName;
+ private String ownerId;
+ private String lastChildId;
+ private byte[] data = {0x12, 0x34};
+ private LockListener callback;
+ private LockZooKeeperOperation zop;
+
+ /**
+ * zookeeper contructor for writelock
+ * @param zookeeper zookeeper client instance
+ * @param dir the parent path you want to use for locking
+ * @param acls the acls that you want to use for all the paths,
+ * if null world read/write is used.
+ */
+ public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl) {
+ super(zookeeper);
+ this.dir = dir;
+ if (acl != null) {
+ setAcl(acl);
+ }
+ this.zop = new LockZooKeeperOperation();
+ }
+
+ /**
+ * zookeeper contructor for writelock with callback
+ * @param zookeeper the zookeeper client instance
+ * @param dir the parent path you want to use for locking
+ * @param acl the acls that you want to use for all the paths
+ * @param callback the call back instance
+ */
+ public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl,
+ LockListener callback) {
+ this(zookeeper, dir, acl);
+ this.callback = callback;
+ }
+
+ /**
+ * return the current locklistener
+ * @return the locklistener
+ */
+ public LockListener getLockListener() {
+ return this.callback;
+ }
+
+ /**
+ * register a different call back listener
+ * @param callback the call back instance
+ */
+ public void setLockListener(LockListener callback) {
+ this.callback = callback;
+ }
+
+ /**
+ * Removes the lock or associated znode if
+ * you no longer require the lock. this also
+ * removes your request in the queue for locking
+ * in case you do not already hold the lock.
+ * @throws RuntimeException throws a runtime exception
+ * if it cannot connect to zookeeper.
+ */
+ public synchronized void unlock() throws RuntimeException {
+
+ if (!isClosed() && id != null) {
+ // we don't need to retry this operation in the case of failure
+ // as ZK will remove ephemeral files and we don't wanna hang
+ // this process when closing if we cannot reconnect to ZK
+ try {
+
+ ZooKeeperOperation zopdel = new ZooKeeperOperation() {
+ public boolean execute() throws KeeperException,
+ InterruptedException {
+ zookeeper.delete(id, -1);
+ return Boolean.TRUE;
+ }
+ };
+ zopdel.execute();
+ } catch (InterruptedException e) {
+ LOG.warn("Caught: " + e, e);
+ //set that we have been interrupted.
+ Thread.currentThread().interrupt();
+ } catch (KeeperException.NoNodeException e) {
+ // do nothing
+ } catch (KeeperException e) {
+ LOG.warn("Caught: " + e, e);
+ throw (RuntimeException) new RuntimeException(e.getMessage()).
+ initCause(e);
+ }
+ finally {
+ if (callback != null) {
+ callback.lockReleased();
+ }
+ id = null;
+ }
+ }
+ }
+
+ /**
+ * the watcher called on
+ * getting watch while watching
+ * my predecessor
+ */
+ private class LockWatcher implements Watcher {
+ public void process(WatchedEvent event) {
+ // lets either become the leader or watch the new/updated node
+ LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +
+ event.getState() + " type " + event.getType());
+ try {
+ lock();
+ } catch (Exception e) {
+ LOG.warn("Failed to acquire lock: " + e, e);
+ }
+ }
+ }
+
+ /**
+ * a zoookeeper operation that is mainly responsible
+ * for all the magic required for locking.
+ */
+ private class LockZooKeeperOperation implements ZooKeeperOperation {
+
+ /** find if we have been created earler if not create our node
+ *
+ * @param prefix the prefix node
+ * @param zookeeper teh zookeeper client
+ * @param dir the dir paretn
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
+ throws KeeperException, InterruptedException {
+ List<String> names = zookeeper.getChildren(dir, false);
+ for (String name : names) {
+ if (name.startsWith(prefix)) {
+ id = name;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found id created last time: " + id);
+ }
+ break;
+ }
+ }
+ if (id == null) {
+ id = zookeeper.create(dir + "/" + prefix, data,
+ getAcl(), EPHEMERAL_SEQUENTIAL);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created id: " + id);
+ }
+ }
+
+ }
+
+ /**
+ * the command that is run and retried for actually
+ * obtaining the lock
+ * @return if the command was successful or not
+ */
+ public boolean execute() throws KeeperException, InterruptedException {
+ do {
+ if (id == null) {
+ long sessionId = zookeeper.getSessionId();
+ String prefix = "x-" + sessionId + "-";
+ // lets try look up the current ID if we failed
+ // in the middle of creating the znode
+ findPrefixInChildren(prefix, zookeeper, dir);
+ idName = new ZNodeName(id);
+ }
+ if (id != null) {
+ List<String> names = zookeeper.getChildren(dir, false);
+ if (names.isEmpty()) {
+ LOG.warn("No children in: " + dir + " when we've just " +
+ "created one! Lets recreate it...");
+ // lets force the recreation of the id
+ id = null;
+ } else {
+ // lets sort them explicitly (though they do seem to come back in order ususally :)
+ SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
+ for (String name : names) {
+ //TODO: Just use the suffix to sort.
+ sortedNames.add(new ZNodeName(dir + "/" + name));
+ }
+ ownerId = sortedNames.first().getName();
+ SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
+ if (!lessThanMe.isEmpty()) {
+ ZNodeName lastChildName = lessThanMe.last();
+ lastChildId = lastChildName.getName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("watching less than me node: " + lastChildId);
+ }
+ Stat stat = zookeeper.exists(lastChildId, new LockWatcher());
+ if (stat != null) {
+ return Boolean.FALSE;
+ } else {
+ LOG.warn("Could not find the" +
+ " stats for less than me: " + lastChildName.getName());
+ }
+ } else {
+ if (isOwner()) {
+ if (callback != null) {
+ callback.lockAcquired();
+ }
+ return Boolean.TRUE;
+ }
+ }
+ }
+ }
+ }
+ while (id == null);
+ return Boolean.FALSE;
+ }
+ };
+
+ /**
+ * Attempts to acquire the exclusive write lock returning whether or not it was
+ * acquired. Note that the exclusive lock may be acquired some time later after
+ * this method has been invoked due to the current lock owner going away.
+ */
+ public synchronized boolean lock() throws KeeperException, InterruptedException {
+ if (isClosed()) {
+ return false;
+ }
+ ensurePathExists(dir);
+
+ return (Boolean) retryOperation(zop);
+ }
+
+ /**
+ * return the parent dir for lock
+ * @return the parent dir used for locks.
+ */
+ public String getDir() {
+ return dir;
+ }
+
+ /**
+ * Returns true if this node is the owner of the
+ * lock (or the leader)
+ */
+ public boolean isOwner() {
+ return id != null && ownerId != null && id.equals(ownerId);
+ }
+
+ /**
+ * return the id for this lock
+ * @return the id for this lock
+ */
+ public String getId() {
+ return this.id;
+ }
+}
+
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,110 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Represents an ephemeral znode name which has an ordered sequence number
+ * and can be sorted in order
+ * This class has been used as-is from the zookeeper 3.4.0 recipes with a
+ * change in package name.
+ */
+public class ZNodeName implements Comparable<ZNodeName> {
+ private final String name;
+ private String prefix;
+ private int sequence = -1;
+ private static final Logger LOG = Logger.getLogger(ZNodeName.class);
+
+ public ZNodeName(String name) {
+ if (name == null) {
+ throw new NullPointerException("id cannot be null");
+ }
+ this.name = name;
+ this.prefix = name;
+ int idx = name.lastIndexOf('-');
+ if (idx >= 0) {
+ this.prefix = name.substring(0, idx);
+ try {
+ this.sequence = Integer.parseInt(name.substring(idx + 1));
+ // If an exception occurred we misdetected a sequence suffix,
+ // so return -1.
+ } catch (NumberFormatException e) {
+ LOG.info("Number format exception for " + idx, e);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ LOG.info("Array out of bounds for " + idx, e);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return name.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ZNodeName sequence = (ZNodeName) o;
+
+ if (!name.equals(sequence.name)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode() + 37;
+ }
+
+ public int compareTo(ZNodeName that) {
+ int answer = this.prefix.compareTo(that.prefix);
+ if (answer == 0) {
+ int s1 = this.sequence;
+ int s2 = that.sequence;
+ if (s1 == -1 && s2 == -1) {
+ return this.name.compareTo(that.name);
+ }
+ answer = s1 == -1 ? 1 : s2 == -1 ? -1 : s1 - s2;
+ }
+ return answer;
+ }
+
+ /**
+ * Returns the name of the znode
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns the sequence number
+ */
+ public int getZNodeName() {
+ return sequence;
+ }
+
+ /**
+ * Returns the text prefix before the sequence number
+ */
+ public String getPrefix() {
+ return prefix;
+ }
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A callback object which can be used for implementing retry-able operations in the
+ * {@link org.apache.hcatalog.hbase.snapshot.lock.ProtocolSupport} class
+ * This class has been used as-is from the zookeeper 3.4.0 with change in the
+ * package name .
+ */
+public interface ZooKeeperOperation {
+
+ /**
+ * Performs the operation - which may be involved multiple times if the connection
+ * to ZooKeeper closes during this operation
+ *
+ * @return the result of the operation or null
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public boolean execute() throws KeeperException, InterruptedException;
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class IDGenClient extends Thread {
+
+ String connectionStr;
+ String base_dir;
+ ZKUtil zkutil;
+ Random sleepTime = new Random();
+ int runtime;
+ HashMap<Long, Long> idMap;
+ String tableName;
+
+ IDGenClient(String connectionStr, String base_dir, int time, String tableName) {
+ super();
+ this.connectionStr = connectionStr;
+ this.base_dir = base_dir;
+ this.zkutil = new ZKUtil(connectionStr, base_dir);
+ this.runtime = time;
+ idMap = new HashMap<Long, Long>();
+ this.tableName = tableName;
+ }
+
+ /*
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ long startTime = System.currentTimeMillis();
+ int timeElapsed = 0;
+ while( timeElapsed <= runtime){
+ try {
+ long id = zkutil.nextId(tableName);
+ idMap.put(System.currentTimeMillis(), id);
+
+ int sTime = sleepTime.nextInt(2);
+ Thread.sleep(sTime * 100);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ timeElapsed = (int) Math.ceil((System.currentTimeMillis() - startTime)/(double)1000);
+ }
+
+ }
+
+ Map<Long, Long> getIdMap(){
+ return idMap;
+ }
+
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.hcatalog.hbase.SkeletonHBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestIDGenerator extends SkeletonHBaseTest{
+
+ @Test
+ public void testIDGeneration() throws Exception {
+
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+ String[] splits = servers.split(",");
+ StringBuffer sb = new StringBuffer();
+ for(String split : splits){
+ sb.append(split);
+ sb.append(':');
+ sb.append(port);
+ }
+ ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base");
+
+ String tableName = "myTable";
+ long initId = zkutil.nextId(tableName);
+ for (int i=0; i<10; i++) {
+ long id = zkutil.nextId(tableName);
+ Assert.assertEquals(initId + (i + 1), id);
+ }
+ }
+
+ @Test
+ public void testMultipleClients() throws InterruptedException{
+
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+ String[] splits = servers.split(",");
+ StringBuffer sb = new StringBuffer();
+ for(String split : splits){
+ sb.append(split);
+ sb.append(':');
+ sb.append(port);
+ }
+
+ ArrayList<IDGenClient> clients = new ArrayList<IDGenClient>();
+
+ for(int i =0; i < 5; i++){
+ IDGenClient idClient = new IDGenClient(sb.toString(), "/rm_base", 10, "testTable");
+ clients.add(idClient);
+ }
+
+ for(IDGenClient idClient : clients){
+ idClient.run();
+ }
+
+ for(IDGenClient idClient : clients){
+ idClient.join();
+ }
+
+ HashMap<Long, Long> idMap = new HashMap<Long, Long>();
+ for(IDGenClient idClient : clients){
+ idMap.putAll(idClient.getIdMap());
+ }
+
+ ArrayList<Long> keys = new ArrayList<Long>(idMap.keySet());
+ Collections.sort(keys);
+ int startId = 1;
+ for(Long key: keys){
+ Long id = idMap.get(key);
+ System.out.println("Key: " + key + " Value "+ id);
+ assertTrue(id == startId);
+ startId++;
+
+ }
+ }
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hcatalog.hbase.SkeletonHBaseTest;
+import org.apache.hcatalog.hbase.snapshot.transaction.thrift.*;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Test;
+
+public class TestRevisionManager extends SkeletonHBaseTest{
+
+ @Test
+ public void testBasicZNodeCreation() throws IOException, KeeperException, InterruptedException{
+
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+ String[] splits = servers.split(",");
+ StringBuffer sb = new StringBuffer();
+ for(String split : splits){
+ sb.append(split);
+ sb.append(':');
+ sb.append(port);
+ }
+
+ ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base");
+ String tableName = newTableName("testTable");
+ List<String> columnFamilies = Arrays.asList("cf001", "cf002", "cf003");
+
+ zkutil.createRootZNodes();
+ ZooKeeper zk = zkutil.getSession();
+ Stat tempTwo = zk.exists("/rm_base" + PathUtil.DATA_DIR, false);
+ assertTrue(tempTwo != null);
+ Stat tempThree = zk.exists("/rm_base" + PathUtil.CLOCK_NODE, false);
+ assertTrue(tempThree != null);
+
+ zkutil.setUpZnodesForTable(tableName, columnFamilies);
+ String transactionDataTablePath = "/rm_base" + PathUtil.DATA_DIR + "/" + tableName;
+ Stat result = zk.exists(transactionDataTablePath, false);
+ assertTrue(result != null);
+
+ for(String colFamiliy : columnFamilies){
+ String cfPath = transactionDataTablePath + "/" + colFamiliy;
+ Stat resultTwo = zk.exists(cfPath, false);
+ assertTrue(resultTwo != null);
+ }
+
+ }
+
+ @Test
+ public void testCommitTransaction() throws IOException{
+
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+ String[] splits = servers.split(",");
+ StringBuffer sb = new StringBuffer();
+ for(String split : splits){
+ sb.append(split);
+ sb.append(':');
+ sb.append(port);
+ }
+
+ Properties props = new Properties();
+ props.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
+ props.put(ZKBasedRevisionManager.DATADIR, "/rm_base");
+ ZKBasedRevisionManager manager = new ZKBasedRevisionManager();
+ manager.initialize(props);
+ manager.open();
+ ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base");
+
+ String tableName = newTableName("testTable");
+ List<String> columnFamilies = Arrays.asList("cf1", "cf2", "cf3");
+ Transaction txn = manager.beginWriteTransaction(tableName,
+ columnFamilies);
+
+ List<String> cfs = zkutil.getColumnFamiliesOfTable(tableName);
+ assertTrue(cfs.size() == columnFamilies.size());
+ for (String cf : cfs){
+ assertTrue(columnFamilies.contains(cf));
+ }
+
+ for(String colFamily : columnFamilies){
+ String path = PathUtil.getRunningTxnInfoPath("/rm_base", tableName, colFamily);
+ byte[] data = zkutil.getRawData(path, null);
+ StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+ ZKUtil.deserialize(list, data);
+ assertEquals(list.getRevisionListSize(), 1);
+ StoreFamilyRevision lightTxn = list.getRevisionList().get(0);
+ assertEquals(lightTxn.timestamp, txn.getTransactionExpireTimeStamp());
+ assertEquals(lightTxn.revision, txn.getRevisionNumber());
+
+ }
+ manager.commitWriteTransaction(txn);
+ for(String colFamiliy : columnFamilies){
+ String path = PathUtil.getRunningTxnInfoPath("/rm_base", tableName, colFamiliy);
+ byte[] data = zkutil.getRawData(path, null);
+ StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+ ZKUtil.deserialize(list, data);
+ assertEquals(list.getRevisionListSize(), 0);
+
+ }
+
+ manager.close();
+ }
+
+ @Test
+ public void testAbortTransaction() throws IOException{
+
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String host = getHbaseConf().get("hbase.zookeeper.quorum");
+ Properties props = new Properties();
+ props.put(ZKBasedRevisionManager.HOSTLIST, host + ':' + port);
+ props.put(ZKBasedRevisionManager.DATADIR, "/rm_base");
+ ZKBasedRevisionManager manager = new ZKBasedRevisionManager();
+ manager.initialize(props);
+ manager.open();
+ ZKUtil zkutil = new ZKUtil(host + ':' + port, "/rm_base");
+
+ String tableName = newTableName("testTable");
+ List<String> columnFamilies = Arrays.asList("cf1", "cf2", "cf3");
+ Transaction txn = manager.beginWriteTransaction(tableName, columnFamilies);
+ List<String> cfs = zkutil.getColumnFamiliesOfTable(tableName);
+
+ assertTrue(cfs.size() == columnFamilies.size());
+ for (String cf : cfs){
+ assertTrue(columnFamilies.contains(cf));
+ }
+
+ for(String colFamiliy : columnFamilies){
+ String path = PathUtil.getRunningTxnInfoPath("/rm_base",tableName, colFamiliy);
+ byte[] data = zkutil.getRawData(path, null);
+ StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+ ZKUtil.deserialize(list, data);
+ assertEquals(list.getRevisionListSize(), 1);
+ StoreFamilyRevision lightTxn = list.getRevisionList().get(0);
+ assertEquals(lightTxn.timestamp, txn.getTransactionExpireTimeStamp());
+ assertEquals(lightTxn.revision, txn.getRevisionNumber());
+
+ }
+ manager.abortWriteTransaction(txn);
+ for(String colFamiliy : columnFamilies){
+ String path = PathUtil.getRunningTxnInfoPath("/rm_base",tableName, colFamiliy);
+ byte[] data = zkutil.getRawData(path, null);
+ StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+ ZKUtil.deserialize(list, data);
+ assertEquals(list.getRevisionListSize(), 0);
+
+ }
+
+ for(String colFamiliy : columnFamilies){
+ String path = PathUtil.getAbortInformationPath("/rm_base",tableName, colFamiliy);
+ byte[] data = zkutil.getRawData(path, null);
+ StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+ ZKUtil.deserialize(list, data);
+ assertEquals(list.getRevisionListSize(), 1);
+ StoreFamilyRevision abortedTxn = list.getRevisionList().get(0);
+ assertEquals(abortedTxn.getRevision(), txn.getRevisionNumber());
+ }
+ manager.close();
+ }
+
+ @Test
+ public void testKeepAliveTransaction() throws InterruptedException, IOException {
+
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+ String[] splits = servers.split(",");
+ StringBuffer sb = new StringBuffer();
+ for(String split : splits){
+ sb.append(split);
+ sb.append(':');
+ sb.append(port);
+ }
+
+ Properties props = new Properties();
+ props.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
+ props.put(ZKBasedRevisionManager.DATADIR, "/rm_base");
+ ZKBasedRevisionManager manager = new ZKBasedRevisionManager();
+ manager.initialize(props);
+ manager.open();
+ String tableName = newTableName("testTable");
+ List<String> columnFamilies = Arrays.asList("cf1", "cf2");
+ Transaction txn = manager.beginWriteTransaction(tableName,
+ columnFamilies, 40);
+ Thread.sleep(100);
+ try {
+ manager.commitWriteTransaction(txn);
+ } catch (Exception e) {
+ assertTrue(e instanceof IOException);
+ assertEquals(e.getMessage(),
+ "The transaction to be removed not found in the data.");
+ }
+
+ }
+
+ @Test
+ public void testCreateSnapshot() throws IOException{
+ int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+ String host = getHbaseConf().get("hbase.zookeeper.quorum");
+ Properties props = new Properties();
+ props.put(ZKBasedRevisionManager.HOSTLIST, host + ':' + port);
+ props.put(ZKBasedRevisionManager.DATADIR, "/rm_base");
+ ZKBasedRevisionManager manager = new ZKBasedRevisionManager();
+ manager.initialize(props);
+ manager.open();
+ String tableName = newTableName("testTable");
+ List<String> cfOne = Arrays.asList("cf1", "cf2");
+ List<String> cfTwo = Arrays.asList("cf2", "cf3");
+ Transaction tsx1 = manager.beginWriteTransaction(tableName, cfOne);
+ Transaction tsx2 = manager.beginWriteTransaction(tableName, cfTwo);
+ TableSnapshot snapshotOne = manager.createSnapshot(tableName);
+ assertEquals(snapshotOne.getRevision("cf1"), 0);
+ assertEquals(snapshotOne.getRevision("cf2"), 0);
+ assertEquals(snapshotOne.getRevision("cf3"), 1);
+
+ List<String> cfThree = Arrays.asList("cf1", "cf3");
+ Transaction tsx3 = manager.beginWriteTransaction(tableName, cfThree);
+ manager.commitWriteTransaction(tsx1);
+ TableSnapshot snapshotTwo = manager.createSnapshot(tableName);
+ assertEquals(snapshotTwo.getRevision("cf1"), 2);
+ assertEquals(snapshotTwo.getRevision("cf2"), 1);
+ assertEquals(snapshotTwo.getRevision("cf3"), 1);
+
+ manager.commitWriteTransaction(tsx2);
+ TableSnapshot snapshotThree = manager.createSnapshot(tableName);
+ assertEquals(snapshotThree.getRevision("cf1"), 2);
+ assertEquals(snapshotThree.getRevision("cf2"), 3);
+ assertEquals(snapshotThree.getRevision("cf3"), 2);
+ manager.commitWriteTransaction(tsx3);
+ TableSnapshot snapshotFour = manager.createSnapshot(tableName);
+ assertEquals(snapshotFour.getRevision("cf1"), 3);
+ assertEquals(snapshotFour.getRevision("cf2"), 3);
+ assertEquals(snapshotFour.getRevision("cf3"), 3);
+
+ }
+
+
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hcatalog.hbase.snapshot.transaction.thrift.*;
+import org.junit.Test;
+
+public class TestThriftSerialization {
+
+ @Test
+ public void testLightWeightTransaction(){
+ StoreFamilyRevision trxn = new StoreFamilyRevision(0, 1000);
+ try {
+
+ byte[] data = ZKUtil.serialize(trxn);
+ StoreFamilyRevision newWtx = new StoreFamilyRevision();
+ ZKUtil.deserialize(newWtx, data);
+
+ assertTrue(newWtx.getRevision() == trxn.getRevision());
+ assertTrue(newWtx.getTimestamp() == trxn.getTimestamp());
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testWriteTransactionList(){
+ List<StoreFamilyRevision> txnList = new ArrayList<StoreFamilyRevision>();
+ long version;
+ long timestamp;
+ for( int i = 0; i < 10; i++){
+ version = i;
+ timestamp = 1000 + i;
+ StoreFamilyRevision wtx = new StoreFamilyRevision(version, timestamp);
+ txnList.add(wtx);
+ }
+
+ StoreFamilyRevisionList wList = new StoreFamilyRevisionList(txnList);
+
+ try {
+ byte[] data = ZKUtil.serialize(wList);
+ StoreFamilyRevisionList newList = new StoreFamilyRevisionList();
+ ZKUtil.deserialize(newList, data);
+ assertTrue(newList.getRevisionListSize() == wList.getRevisionListSize());
+
+ Iterator<StoreFamilyRevision> itr = newList.getRevisionListIterator();
+ int i = 0;
+ while(itr.hasNext()){
+ StoreFamilyRevision txn = itr.next();
+ assertTrue(txn.getRevision() == i);
+ assertTrue(txn.getTimestamp() == (i + 1000));
+ i++;
+ }
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * test for writelock
+ * This class is taken from the zookeeper 3.4.0 as-is to test the zookeeper lock
+ * Recipe with a change in the package name.
+ */
+public class WriteLockTest extends ClientBase {
+ protected int sessionTimeout = 10 * 1000;
+ protected String dir = "/" + getClass().getName();
+ protected WriteLock[] nodes;
+ protected CountDownLatch latch = new CountDownLatch(1);
+ private boolean restartServer = true;
+ private boolean workAroundClosingLastZNodeFails = true;
+ private boolean killLeader = true;
+
+ @Test
+ public void testRun() throws Exception {
+ runTest(3);
+ }
+
+ class LockCallback implements LockListener {
+ public void lockAcquired() {
+ latch.countDown();
+ }
+
+ public void lockReleased() {
+
+ }
+
+ }
+ protected void runTest(int count) throws Exception {
+ nodes = new WriteLock[count];
+ for (int i = 0; i < count; i++) {
+ ZooKeeper keeper = createClient();
+ WriteLock leader = new WriteLock(keeper, dir, null);
+ leader.setLockListener(new LockCallback());
+ nodes[i] = leader;
+
+ leader.lock();
+ }
+
+ // lets wait for any previous leaders to die and one of our new
+ // nodes to become the new leader
+ latch.await(30, TimeUnit.SECONDS);
+
+ WriteLock first = nodes[0];
+ dumpNodes(count);
+
+ // lets assert that the first election is the leader
+ Assert.assertTrue("The first znode should be the leader " + first.getId(), first.isOwner());
+
+ for (int i = 1; i < count; i++) {
+ WriteLock node = nodes[i];
+ Assert.assertFalse("Node should not be the leader " + node.getId(), node.isOwner());
+ }
+
+ if (count > 1) {
+ if (killLeader) {
+ System.out.println("Now killing the leader");
+ // now lets kill the leader
+ latch = new CountDownLatch(1);
+ first.unlock();
+ latch.await(30, TimeUnit.SECONDS);
+ //Thread.sleep(10000);
+ WriteLock second = nodes[1];
+ dumpNodes(count);
+ // lets assert that the first election is the leader
+ Assert.assertTrue("The second znode should be the leader " + second.getId(), second.isOwner());
+
+ for (int i = 2; i < count; i++) {
+ WriteLock node = nodes[i];
+ Assert.assertFalse("Node should not be the leader " + node.getId(), node.isOwner());
+ }
+ }
+
+
+ if (restartServer) {
+ // now lets stop the server
+ System.out.println("Now stopping the server");
+ stopServer();
+ Thread.sleep(10000);
+
+ // TODO lets assert that we are no longer the leader
+ dumpNodes(count);
+
+ System.out.println("Starting the server");
+ startServer();
+ Thread.sleep(10000);
+
+ for (int i = 0; i < count - 1; i++) {
+ System.out.println("Calling acquire for node: " + i);
+ nodes[i].lock();
+ }
+ dumpNodes(count);
+ System.out.println("Now closing down...");
+ }
+ }
+ }
+
+ protected void dumpNodes(int count) {
+ for (int i = 0; i < count; i++) {
+ WriteLock node = nodes[i];
+ System.out.println("node: " + i + " id: " +
+ node.getId() + " is leader: " + node.isOwner());
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (nodes != null) {
+ for (int i = 0; i < nodes.length; i++) {
+ WriteLock node = nodes[i];
+ if (node != null) {
+ System.out.println("Closing node: " + i);
+ node.close();
+ if (workAroundClosingLastZNodeFails && i == nodes.length - 1) {
+ System.out.println("Not closing zookeeper: " + i + " due to bug!");
+ } else {
+ System.out.println("Closing zookeeper: " + i);
+ node.getZookeeper().close();
+ System.out.println("Closed zookeeper: " + i);
+ }
+ }
+ }
+ }
+ System.out.println("Now lets stop the server");
+ super.tearDown();
+
+ }
+}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import junit.framework.TestCase;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.junit.Test;
+
+/**
+ * test for znodenames. This class is taken as-is from zookeeper lock recipe test.
+ * The package name has been changed.
+ */
+public class ZNodeNameTest extends TestCase {
+ @Test
+ public void testOrderWithSamePrefix() throws Exception {
+ String[] names = { "x-3", "x-5", "x-11", "x-1" };
+ String[] expected = { "x-1", "x-3", "x-5", "x-11" };
+ assertOrderedNodeNames(names, expected);
+ }
+ @Test
+ public void testOrderWithDifferentPrefixes() throws Exception {
+ String[] names = { "r-3", "r-2", "r-1", "w-2", "w-1" };
+ String[] expected = { "r-1", "r-2", "r-3", "w-1", "w-2" };
+ assertOrderedNodeNames(names, expected);
+ }
+
+ protected void assertOrderedNodeNames(String[] names, String[] expected) {
+ int size = names.length;
+ assertEquals("The two arrays should be the same size!", names.length, expected.length);
+ SortedSet<ZNodeName> nodeNames = new TreeSet<ZNodeName>();
+ for (String name : names) {
+ nodeNames.add(new ZNodeName(name));
+ }
+
+ int index = 0;
+ for (ZNodeName nodeName : nodeNames) {
+ String name = nodeName.getName();
+ assertEquals("Node " + index, expected[index++], name);
+ }
+ }
+
+}