You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2010/07/27 23:57:02 UTC
svn commit: r979884 [2/2] - in /hbase/branches/0.90_master_rewrite: ./
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/executor/
src/main/java/org/apache/hadoop/hbase/master/ sr...
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java?rev=979884&r1=979883&r2=979884&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java Tue Jul 27 21:57:00 2010
@@ -118,7 +118,7 @@ public class MasterAddressManager extend
} catch(KeeperException ke) {
// If we have a ZK exception trying to find the master we must abort
LOG.fatal("Unexpected ZooKeeper exception", ke);
- status.abortServer();
+ status.abort();
}
}
@@ -182,7 +182,7 @@ public class MasterAddressManager extend
} catch (KeeperException ke) {
// If we have a ZK exception trying to find the master we must abort
LOG.fatal("Unexpected ZooKeeper exception", ke);
- status.abortServer();
+ status.abort();
}
if(address != null) {
setMasterAddress(address);
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java?rev=979884&r1=979883&r2=979884&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java Tue Jul 27 21:57:00 2010
@@ -5,12 +5,11 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HMsg;
-import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
+import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@@ -59,8 +58,9 @@ public class RSZookeeperUpdater {
// Try to create the node with a CLOSING state, if already exists,
// something is wrong
try {
- if(ZKUtil.createPersistentNodeIfNotExists(zooKeeper, regionZNode,
- makeZKEventData(HBaseEventType.RS2ZK_REGION_CLOSING, hmsg))) {
+ if(ZKUtil.createNodeIfNotExistsAndWatch(zooKeeper, regionZNode,
+ makeZKEventData(HBaseEventType.RS2ZK_REGION_CLOSING,
+ regionName, hmsg))) {
String msg = "ZNode " + regionZNode + " already exists in ZooKeeper, will NOT close region.";
LOG.error(msg);
throw new IOException(msg);
@@ -159,9 +159,10 @@ public class RSZookeeperUpdater {
* @param hmsg
* @return serialized data
*/
- private byte [] makeZKEventData(HBaseEventType eventType, HMsg hmsg)
+ private byte [] makeZKEventData(HBaseEventType eventType, String regionName,
+ HMsg hmsg)
throws IOException {
- return Writables.getBytes(new RegionTransitionEventData(eventType,
+ return Writables.getBytes(new RegionTransitionData(eventType, regionName,
regionServerName, hmsg));
}
@@ -174,7 +175,7 @@ public class RSZookeeperUpdater {
*/
private void updateZKWithEventData(HBaseEventType eventType, HMsg hmsg)
throws IOException {
- byte[] data = makeZKEventData(eventType, hmsg);
+ byte[] data = makeZKEventData(eventType, regionName, hmsg);
LOG.debug("Updating ZNode " + regionZNode +
" with [" + eventType + "]" +
" expected version = " + zkVersion);
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ClusterStatusTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ClusterStatusTracker.java?rev=979884&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ClusterStatusTracker.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ClusterStatusTracker.java Tue Jul 27 21:57:00 2010
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+
+public class ClusterStatusTracker extends ZooKeeperNodeTracker {
+ private static final Log LOG = LogFactory.getLog(ClusterStatusTracker.class);
+
+ public static final byte [] upData = Bytes.toBytes("up");
+
+ /**
+ * Creates a cluster status tracker.
+ *
+ * <p>After construction, use {@link #start} to kick off tracking.
+ *
+ * @param watcher
+ * @param abortable
+ */
+ public ClusterStatusTracker(ZooKeeperWatcher watcher, Abortable abortable) {
+ super(watcher, watcher.rootServerZNode, abortable);
+ }
+
+ /**
+ * Checks if the root region location is available.
+ * @return true if root region location is available, false if not
+ */
+ public boolean isClusterUp() {
+ return super.getData() != null;
+ }
+
+ /**
+ * Sets the cluster as up.
+ * @throws KeeperException unexpected zk exception
+ */
+ public void setClusterUp()
+ throws KeeperException {
+ try {
+ ZKUtil.createAndWatch(watcher, watcher.clusterStateZNode, upData);
+ } catch(KeeperException.NodeExistsException nee) {
+ ZKUtil.setData(watcher, watcher.clusterStateZNode, upData);
+ }
+ }
+
+ /**
+ * Sets the cluster as down.
+ * @throws KeeperException unexpected zk exception
+ */
+ public void setClusterDown()
+ throws KeeperException {
+ ZKUtil.deleteNode(watcher, watcher.clusterStateZNode);
+ }
+
+ @Override
+ protected Log getLog() {
+ return LOG;
+ }
+}
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java?rev=979884&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java Tue Jul 27 21:57:00 2010
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracks the online region servers via ZK.
+ *
+ * <p>Handling of new RSs checking in is done via RPC. This class
+ * is only responsible for watching for expired nodes. It handles
+ * listening for changes in the RS node list and watching each node.
+ *
+ * <p>If an RS node gets deleted, this automatically handles calling of
+ * {@link ServerManager#expireServer(org.apache.hadoop.hbase.HServerInfo)}.
+ */
+public class RegionServerTracker extends ZooKeeperListener {
+ private static final Log LOG = LogFactory.getLog(RegionServerTracker.class);
+
+ private ServerManager serverManager;
+ private Abortable abortable;
+
+ public RegionServerTracker(ZooKeeperWatcher watcher,
+ Abortable abortable, ServerManager serverManager) {
+ super(watcher);
+ this.abortable = abortable;
+ this.serverManager = serverManager;
+ }
+ /**
+ * Starts the tracking of online RegionServers.
+ *
+ * <p>All RSs will be tracked after this method is called.
+ *
+ * @throws KeeperException
+ */
+ public void start() throws KeeperException {
+ watcher.registerListener(this);
+ ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode);
+ }
+
+ @Override
+ public void nodeDeleted(String path) {
+ if(path.startsWith(watcher.rsZNode)) {
+ String serverName = ZKUtil.getNodeName(path);
+ LOG.info("RegionServer ephemeral node deleted, processing expiration [" +
+ serverName + "]");
+ HServerInfo hsi = serverManager.getServerInfo(serverName);
+ if(hsi == null) {
+ LOG.info("No HServerInfo found for " + serverName);
+ return;
+ }
+ serverManager.expireServer(hsi);
+ }
+ }
+
+ @Override
+ public void nodeChildrenChanged(String path) {
+ if(path.equals(watcher.rsZNode)) {
+ try {
+ ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode);
+ } catch (KeeperException e) {
+ LOG.error("Unexpected zk exception getting RS nodes", e);
+ abortable.abort();
+ }
+ }
+ }
+
+ /**
+ * Gets the online servers.
+ * @return list of online servers from zk
+ * @throws KeeperException
+ */
+ public List<HServerAddress> getOnlineServers() throws KeeperException {
+ return ZKUtil.listChildrenAndGetAsAddresses(watcher, watcher.rsZNode);
+ }
+}
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java?rev=979884&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java Tue Jul 27 21:57:00 2010
@@ -0,0 +1,82 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+
+public class RootRegionTracker extends ZooKeeperNodeTracker {
+ private static final Log LOG = LogFactory.getLog(RootRegionTracker.class);
+
+ /**
+ * Creates a root region location tracker.
+ *
+ * <p>After construction, use {@link #start} to kick off tracking.
+ *
+ * @param watcher
+ * @param abortable
+ */
+ public RootRegionTracker(ZooKeeperWatcher watcher, Abortable abortable) {
+ super(watcher, watcher.rootServerZNode, abortable);
+ }
+
+ /**
+ * Checks if the root region location is available.
+ * @return true if root region location is available, false if not
+ */
+ public boolean isLocationAvailable() {
+ return super.getData() != null;
+ }
+
+ /**
+ * Gets the root region location, if available. Null if not.
+ * @return server address for server hosting root region, null if none
+ * available
+ */
+ public HServerAddress getRootRegionLocation() {
+ byte [] data = super.getData();
+ return data == null ? null : new HServerAddress(Bytes.toString(data));
+ }
+
+ /**
+ * Sets the root region location.
+ * @param address
+ * @throws KeeperException unexpected zk exception
+ */
+ public void setRootRegionLocation(HServerAddress address)
+ throws KeeperException {
+ try {
+ ZKUtil.createAndWatch(watcher, watcher.rootServerZNode,
+ Bytes.toBytes(address.toString()));
+ } catch(KeeperException.NodeExistsException nee) {
+ ZKUtil.setData(watcher, watcher.rootServerZNode,
+ Bytes.toBytes(address.toString()));
+ }
+ }
+
+ @Override
+ protected Log getLog() {
+ return LOG;
+ }
+}
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java?rev=979884&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java Tue Jul 27 21:57:00 2010
@@ -0,0 +1,476 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.hadoop.hbase.executor.RegionTransitionData;
+import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Utility class for doing region assignment in ZooKeeper. This class extends
+ * stuff done in {@link ZKUtil} to cover specific assignment operations.
+ * <p>
+ * Contains only static methods and constants.
+ * <p>
+ * Used by both the Master and RegionServer.
+ * <p>
+ * All valid transitions outlined below:
+ * <p>
+ * <b>MASTER</b>
+ * <ol>
+ * <li>
+ * Master creates an unassigned node as OFFLINE.
+ * - Cluster startup and table enabling.
+ * </li>
+ * <li>
+ * Master forces an existing unassigned node to OFFLINE.
+ * - RegionServer failure.
+ * - Allows transitions from all states to OFFLINE.
+ * </li>
+ * <li>
+ * Master deletes an unassigned node that was in a OPENED state.
+ * - Normal region transitions. Besides cluster startup, no other deletions
+ * of unassigned nodes is allowed.
+ * </li>
+ * <li>
+ * Master deletes all unassigned nodes regardless of state.
+ * - Cluster startup before any assignment happens.
+ * </li>
+ * </ol>
+ * <p>
+ * <b>REGIONSERVER</b>
+ * <ol>
+ * <li>
+ * RegionServer creates an unassigned node as CLOSING.
+ * - All region closes will do this in response to a CLOSE RPC from Master.
+ * - A node can never be transitioned to CLOSING, only created.
+ * </li>
+ * <li>
+ * RegionServer transitions an unassigned node from CLOSING to CLOSED.
+ * - Normal region closes. CAS operation.
+ * </li>
+ * <li>
+ * RegionServer transitions an unassigned node from OFFLINE to OPENING.
+ * - All region opens will do this in response to an OPEN RPC from the Master.
+ * - Normal region opens. CAS operation.
+ * </li>
+ * <li>
+ * RegionServer transitions an unassigned node from OPENING to OPENED.
+ * - Normal region opens. CAS operation.
+ * </li>
+ * </ol>
+ */
+public class ZKAssign {
+
+ /**
+ * Gets the full path node name for the unassigned node for the specified
+ * region.
+ * @param zkw zk reference
+ * @param regionName region name
+ * @return full path node name
+ */
+ private static String getNodeName(ZooKeeperWatcher zkw, String regionName) {
+ return ZKUtil.joinZNode(zkw.assignmentZNode, regionName);
+ }
+
+ /**
+ * Gets the region name from the full path node name of an unassigned node.
+ * @param path full zk path
+ * @return region name
+ */
+ public static String getRegionName(ZooKeeperWatcher zkw, String path) {
+ return path.substring(zkw.assignmentZNode.length()+1);
+ }
+
+ // Master methods
+
+ /**
+ * Creates a new unassigned node in the OFFLINE state for the specified
+ * region.
+ *
+ * <p>Does not transition nodes from other states. If a node already exists
+ * for this region, a {@link NodeExistsException} will be thrown.
+ *
+ * <p>Sets a watcher on the unassigned region node if the method is
+ * successful.
+ *
+ * <p>This method should only be used during cluster startup and the enabling
+ * of a table.
+ *
+ * @param zkw zk reference
+ * @param regionName region to be created as offline
+ * @param serverName server event originates from
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NodeExistsException if node already exists
+ */
+ public static void createNodeOffline(ZooKeeperWatcher zkw, String regionName,
+ String serverName)
+ throws KeeperException, KeeperException.NodeExistsException {
+ zkw.debug("Creating an unassigned node for " + regionName +
+ " in an OFFLINE state");
+ RegionTransitionData data = new RegionTransitionData(
+ HBaseEventType.M2ZK_REGION_OFFLINE, regionName, serverName);
+ synchronized(zkw.getNodes()) {
+ String node = getNodeName(zkw, regionName);
+ zkw.getNodes().add(node);
+ ZKUtil.createAndWatch(zkw, node, data.getBytes());
+ }
+ }
+
+ /**
+ * Forces an existing unassigned node to the OFFLINE state for the specified
+ * region.
+ *
+ * <p>Does not create a new node. If a node does not already exist for this
+ * region, a {@link NoNodeException} will be thrown.
+ *
+ * <p>Sets a watcher on the unassigned region node if the method is
+ * successful.
+ *
+ * <p>This method should only be used during recovery of regionserver failure.
+ *
+ * @param zkw zk reference
+ * @param regionName region to be forced as offline
+ * @param serverName server event originates from
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NoNodeException if node does not exist
+ */
+ public static void forceNodeOffline(ZooKeeperWatcher zkw, String regionName,
+ String serverName)
+ throws KeeperException, KeeperException.NoNodeException {
+ zkw.debug("Forcing an existing unassigned node for " + regionName +
+ " to an OFFLINE state");
+ RegionTransitionData data = new RegionTransitionData(
+ HBaseEventType.M2ZK_REGION_OFFLINE, regionName, serverName);
+ synchronized(zkw.getNodes()) {
+ String node = getNodeName(zkw, regionName);
+ zkw.getNodes().add(node);
+ ZKUtil.setData(zkw, node, data.getBytes());
+ }
+ }
+
+ /**
+ * Deletes an existing unassigned node that is in the OPENED state for the
+ * specified region.
+ *
+ * <p>If a node does not already exist for this region, a
+ * {@link NoNodeException} will be thrown.
+ *
+ * <p>No watcher is set whether this succeeds or not.
+ *
+ * <p>Returns false if the node was not in the proper state but did exist.
+ *
+ * <p>This method is used during normal region transitions when a region
+ * finishes successfully opening. This is the Master acknowledging completion
+ * of the specified regions transition.
+ *
+ * @param zkw zk reference
+ * @param regionName opened region to be deleted from zk
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NoNodeException if node does not exist
+ */
+ public static boolean deleteOpenedNode(ZooKeeperWatcher zkw,
+ String regionName)
+ throws KeeperException, KeeperException.NoNodeException {
+ zkw.debug("Deleting an existing unassigned node for " + regionName +
+ " that is in a OPENED state");
+ String node = getNodeName(zkw, regionName);
+ Stat stat = new Stat();
+ byte [] bytes = ZKUtil.getDataNoWatch(zkw, node, stat);
+ if(bytes == null) {
+ throw KeeperException.create(Code.NONODE);
+ }
+ RegionTransitionData data = RegionTransitionData.fromBytes(bytes);
+ if(!data.getEventType().equals(HBaseEventType.RS2ZK_REGION_OPENED)) {
+ zkw.warn("Attempting to delete an unassigned node in OPENED state but " +
+ "node is in " + data.getEventType() + " state");
+ return false;
+ }
+ synchronized(zkw.getNodes()) {
+ // TODO: Does this go here or only if we successfully delete node?
+ zkw.getNodes().remove(node);
+ if(!ZKUtil.deleteNode(zkw, node, stat.getVersion())) {
+ zkw.warn("Attempting to delete an unassigned node in OPENED state but " +
+ "after verifying it was in OPENED state, we got a version mismatch");
+ return false;
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Deletes all unassigned nodes regardless of their state.
+ *
+ * <p>No watchers are set.
+ *
+ * <p>This method is used by the Master during cluster startup to clear out
+ * any existing state from other cluster runs.
+ *
+ * @param zkw zk reference
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static void deleteAllNodes(ZooKeeperWatcher zkw)
+ throws KeeperException {
+ zkw.debug("Deleting any existing unassigned nodes");
+ ZKUtil.deleteChildrenRecursively(zkw, zkw.assignmentZNode);
+ }
+
+ // RegionServer methods
+
+ /**
+ * Creates a new unassigned node in the CLOSING state for the specified
+ * region.
+ *
+ * <p>Does not transition nodes from any states. If a node already exists
+ * for this region, a {@link NodeExistsException} will be thrown.
+ *
+ * <p>Does not set any watches.
+ *
+ * <p>This method should only be used by a RegionServer when initiating a
+ * close of a region after receiving a CLOSE RPC from the Master.
+ *
+ * @param zkw zk reference
+ * @param regionName region to be created as closing
+ * @param serverName server event originates from
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NodeExistsException if node already exists
+ */
+ public static void createNodeClosing(ZooKeeperWatcher zkw, String regionName,
+ String serverName)
+ throws KeeperException, KeeperException.NodeExistsException {
+ zkw.debug("Creating an unassigned node for " + regionName +
+ " in a CLOSING state");
+ RegionTransitionData data = new RegionTransitionData(
+ HBaseEventType.RS2ZK_REGION_CLOSING, regionName, serverName);
+ synchronized(zkw.getNodes()) {
+ String node = getNodeName(zkw, regionName);
+ zkw.getNodes().add(node);
+ ZKUtil.createAndWatch(zkw, node, data.getBytes());
+ }
+ }
+
+ /**
+ * Transitions an existing unassigned node for the specified region which is
+ * currently in the CLOSING state to be in the CLOSED state.
+ *
+ * <p>Does not transition nodes from other states. If for some reason the
+ * node could not be transitioned, the method returns false.
+ *
+ * <p>This method can fail and return false for three different reasons:
+ * <ul><li>Unassigned node for this region does not exist</li>
+ * <li>Unassigned node for this region is not in CLOSING state</li>
+ * <li>After verifying CLOSING state, update fails because of wrong version
+ * (someone else already transitioned the node)</li>
+ * </ul>
+ *
+ * <p>Does not set any watches.
+ *
+ * <p>This method should only be used by a RegionServer when initiating a
+ * close of a region after receiving a CLOSE RPC from the Master.
+ *
+ * @param zkw zk reference
+ * @param regionName region to be transitioned to closed
+ * @param serverName server event originates from
+ * @return true if transition was successful, false if not
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static boolean transitionNodeClosed(ZooKeeperWatcher zkw,
+ String regionName, String serverName)
+ throws KeeperException {
+ return transitionNode(zkw, regionName, serverName,
+ HBaseEventType.RS2ZK_REGION_CLOSING,
+ HBaseEventType.RS2ZK_REGION_CLOSED);
+ }
+
+ /**
+ * Transitions an existing unassigned node for the specified region which is
+ * currently in the OFFLINE state to be in the OPENING state.
+ *
+ * <p>Does not transition nodes from other states. If for some reason the
+ * node could not be transitioned, the method returns false.
+ *
+ * <p>This method can fail and return false for three different reasons:
+ * <ul><li>Unassigned node for this region does not exist</li>
+ * <li>Unassigned node for this region is not in OFFLINE state</li>
+ * <li>After verifying OFFLINE state, update fails because of wrong version
+ * (someone else already transitioned the node)</li>
+ * </ul>
+ *
+ * <p>Does not set any watches.
+ *
+ * <p>This method should only be used by a RegionServer when initiating an
+ * open of a region after receiving an OPEN RPC from the Master.
+ *
+ * @param zkw zk reference
+ * @param regionName region to be transitioned to opening
+ * @param serverName server event originates from
+ * @return true if transition was successful, false if not
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static boolean transitionNodeOpening(ZooKeeperWatcher zkw,
+ String regionName, String serverName)
+ throws KeeperException {
+ return transitionNode(zkw, regionName, serverName,
+ HBaseEventType.M2ZK_REGION_OFFLINE,
+ HBaseEventType.RS2ZK_REGION_OPENING);
+ }
+
+ /**
+ * Transitions an existing unassigned node for the specified region which is
+ * currently in the OPENING state to be in the OPENED state.
+ *
+ * <p>Does not transition nodes from other states. If for some reason the
+ * node could not be transitioned, the method returns false.
+ *
+ * <p>This method can fail and return false for three different reasons:
+ * <ul><li>Unassigned node for this region does not exist</li>
+ * <li>Unassigned node for this region is not in OPENING state</li>
+ * <li>After verifying OPENING state, update fails because of wrong version
+ * (this should never actually happen since an RS only does this transition
+ * following a transition to OPENING. if two RS are conflicting, one would
+ * fail the original transition to OPENING and not this transition)</li>
+ * </ul>
+ *
+ * <p>Does not set any watches.
+ *
+ * <p>This method should only be used by a RegionServer when completing the
+ * open of a region.
+ *
+ * @param zkw zk reference
+ * @param regionName region to be transitioned to opened
+ * @param serverName server event originates from
+ * @return true if transition was successful, false if not
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static boolean transitionNodeOpened(ZooKeeperWatcher zkw,
+ String regionName, String serverName)
+ throws KeeperException {
+ return transitionNode(zkw, regionName, serverName,
+ HBaseEventType.RS2ZK_REGION_OPENING,
+ HBaseEventType.RS2ZK_REGION_OPENED);
+ }
+
+ /**
+ * Private method that actually performs unassigned node transitions.
+ *
+ * <p>Attempts to transition the unassigned node for the specified region
+ * from the expected state to the state in the specified transition data.
+ *
+ * <p>Method first reads existing data and verifies it is in the expected
+ * state. If the node does not exist or the node is not in the expected
+ * state, the method returns false.
+ *
+ * <p>If the read state is what is expected, it attempts to write the new
+ * state and data into the node. When doing this, it includes the expected
+ * version (determined when the existing state was verified) to ensure that
+ * only one transition is successful. If there is a version mismatch, the
+ * method returns false.
+ *
+ * <p>If the write is successful, no watch is set and the method returns true.
+ *
+ * @param zkw zk reference
+ * @param regionName region to be transitioned to opened
+ * @param serverName server event originates from
+ * @param beginState state the node must currently be in to do transition
+ * @param endState state to transition node to if all checks pass
+ * @return true if transition was successful, false if not
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ private static boolean transitionNode(ZooKeeperWatcher zkw, String regionName,
+ String serverName, HBaseEventType beginState, HBaseEventType endState)
+ throws KeeperException {
+ if(zkw.isDebugEnabled()) {
+ zkw.debug("Attempting to transition node for " + regionName +
+ " from " + beginState.toString() + " to " + endState.toString());
+ }
+
+ String node = getNodeName(zkw, regionName);
+
+ // Read existing data of the node
+ Stat stat = new Stat();
+ byte [] existingBytes =
+ ZKUtil.getDataNoWatch(zkw, node, stat);
+ RegionTransitionData existingData =
+ RegionTransitionData.fromBytes(existingBytes);
+
+ // Verify it is in expected state
+ if(!existingData.getEventType().equals(beginState)) {
+ zkw.warn("Attempt to transition the unassigned node for " + regionName +
+ " from " + beginState + " to " + endState + " failed, " +
+ "the node existed but was in the state " + existingData.getEventType());
+ return false;
+ }
+
+ // Write new data, ensuring data has not changed since we last read it
+ try {
+ RegionTransitionData data = new RegionTransitionData(endState,
+ regionName, serverName);
+ if(!ZKUtil.setData(zkw, node, data.getBytes(),
+ stat.getVersion())) {
+ zkw.warn("Attempt to transition the unassigned node for " + regionName +
+ " from " + beginState + " to " + endState + " failed, " +
+ "the node existed and was in the expected state but then when " +
+ "setting data we got a version mismatch");
+ return false;
+ }
+ if(zkw.isDebugEnabled()) {
+ zkw.debug("Successfully transitioned node for " + regionName +
+ " from " + beginState + " to " + endState);
+ }
+ return true;
+ } catch (KeeperException.NoNodeException nne) {
+ zkw.warn("Attempt to transition the unassigned node for " + regionName +
+ " from " + beginState + " to " + endState + " failed, " +
+ "the node existed and was in the expected state but then when " +
+ "setting data it no longer existed");
+ return false;
+ }
+ }
+
+ /**
+ * Gets the current data in the unassigned node for the specified region name
+ * or fully-qualified path.
+ *
+ * <p>Returns null if the region does not currently have a node.
+ *
+ * <p>Sets a watch on the node if the node exists.
+ *
+ * @param watcher zk reference
+ * @param pathOrRegionName fully-specified path or region name
+ * @return data for the unassigned node
+ * @throws KeeperException
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static RegionTransitionData getData(ZooKeeperWatcher zkw,
+ String pathOrRegionName)
+ throws KeeperException {
+ String node = pathOrRegionName.startsWith("/") ?
+ pathOrRegionName : getNodeName(zkw, pathOrRegionName);
+ byte [] data = ZKUtil.getDataAndWatch(zkw, node);
+ if(data == null) {
+ return null;
+ }
+ return RegionTransitionData.fromBytes(data);
+ }
+}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=979884&r1=979883&r2=979884&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Tue Jul 27 21:57:00 2010
@@ -34,11 +34,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
@@ -120,8 +122,6 @@ public class ZKUtil {
* Used when a server puts up an ephemeral node for itself and needs to use
* a unique name.
*
- * Returns the fully-qualified znode path.
- *
* @param serverInfo server information
* @return unique, zookeeper-safe znode path for the server instance
*/
@@ -130,6 +130,15 @@ public class ZKUtil {
}
/**
+ * Get the name of the current node from the specified fully-qualified path.
+ * @param path fully-qualified path
+ * @return name of the current node
+ */
+ public static String getNodeName(String path) {
+ return path.substring(path.lastIndexOf("/")+1);
+ }
+
+ /**
* Get the key to the ZK ensemble for this configuration without
* adding a name at the end
* @param conf Configuration to use to build the key
@@ -322,6 +331,51 @@ public class ZKUtil {
}
/**
+ * Atomically add watches and read data from all unwatched unassigned nodes.
+ *
+ * <p>This works because master is the only person deleting nodes.
+ */
+ public static List<NodeAndData> watchAndGetNewChildren(ZooKeeperWatcher zkw,
+ String baseNode)
+ throws KeeperException {
+ List<NodeAndData> newNodes = new ArrayList<NodeAndData>();
+ synchronized(zkw.getNodes()) {
+ List<String> nodes =
+ ZKUtil.listChildrenAndWatchForNewChildren(zkw, baseNode);
+ for(String node : nodes) {
+ String nodePath = ZKUtil.joinZNode(baseNode, node);
+ if(!zkw.getNodes().contains(nodePath)) {
+ byte [] data = ZKUtil.getDataAndWatch(zkw, nodePath);
+ newNodes.add(new NodeAndData(nodePath, data));
+ zkw.getNodes().add(nodePath);
+ }
+ }
+ }
+ return newNodes;
+ }
+
+ /**
+ * Simple class to hold a node path and node data.
+ */
+ public static class NodeAndData {
+ private String node;
+ private byte [] data;
+ public NodeAndData(String node, byte [] data) {
+ this.node = node;
+ this.data = data;
+ }
+ public String getNode() {
+ return node;
+ }
+ public byte [] getData() {
+ return data;
+ }
+ public String toString() {
+ return node + " (" + RegionTransitionData.fromBytes(data) + ")";
+ }
+ }
+
+ /**
* Checks if the specified znode has any children. Sets no watches.
*
* Returns true if the node exists and has children. Returns false if the
@@ -356,6 +410,33 @@ public class ZKUtil {
}
}
+ /**
+ * Get the number of children of the specified node.
+ *
+ * If the node does not exist or has no children, returns 0.
+ *
+ * Sets no watches at all.
+ *
+ * @param zkw zk reference
+ * @param znode path of node to count children of
+ * @return number of children of specified node, 0 if none or parent does not
+ * exist
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static int getNumberOfChildren(ZooKeeperWatcher zkw, String znode)
+ throws KeeperException {
+ try {
+ Stat stat = zkw.getZooKeeper().exists(znode, null);
+ return stat == null ? 0 : stat.getNumChildren();
+ } catch(KeeperException e) {
+ zkw.warn("Unable to get children of node " + znode);
+ zkw.keeperException(e);
+ } catch(InterruptedException e) {
+ zkw.interruptedException(e);
+ }
+ return 0;
+ }
+
//
// Data retrieval
//
@@ -510,6 +591,59 @@ public class ZKUtil {
Bytes.toBytes(address.toString()));
}
+ /**
+ * Sets the data of the existing znode to be the specified data. Ensures that
+ * the current data has the specified expected version.
+ *
+ * <p>If the node does not exist, a {@link NoNodeException} will be thrown.
+ *
+ * <p>If their is a version mismatch, method returns null.
+ *
+ * <p>No watches are set but setting data will trigger other watchers of this
+ * node.
+ *
+ * <p>If there is another problem, a KeeperException will be thrown.
+ *
+ * @param zkw zk reference
+ * @param znode path of node
+ * @param data data to set for node
+ * @param expectedVersion version expected when setting data
+ * @return true if data set, false if version mismatch
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static boolean setData(ZooKeeperWatcher zkw, String znode,
+ byte [] data, int expectedVersion)
+ throws KeeperException, KeeperException.NoNodeException {
+ try {
+ return zkw.getZooKeeper().setData(znode, data, expectedVersion) != null;
+ } catch (InterruptedException e) {
+ zkw.interruptedException(e);
+ return false;
+ }
+ }
+
+ /**
+ * Sets the data of the existing znode to be the specified data. The node
+ * must exist but no checks are done on the existing data or version.
+ *
+ * <p>If the node does not exist, a {@link NoNodeException} will be thrown.
+ *
+ * <p>No watches are set but setting data will trigger other watchers of this
+ * node.
+ *
+ * <p>If there is another problem, a KeeperException will be thrown.
+ *
+ * @param zkw zk reference
+ * @param znode path of node
+ * @param data data to set for node
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static void setData(ZooKeeperWatcher zkw, String znode,
+ byte [] data)
+ throws KeeperException, KeeperException.NoNodeException {
+ setData(zkw, znode, data, -1);
+ }
+
//
// Node creation
//
@@ -551,8 +685,7 @@ public class ZKUtil {
}
/**
- *
- * Set the specified znode to be a persistent node carrying the specified
+ * Creates the specified znode to be a persistent node carrying the specified
* data.
*
* Returns true if the node was successfully created, false if the node
@@ -561,7 +694,7 @@ public class ZKUtil {
* If the node is created successfully, a watcher is also set on the node.
*
* If the node is not created successfully because it already exists, this
- * method will also set a watcher on the node.
+ * method will also set a watcher on the node but return false.
*
* If there is another problem, a KeeperException will be thrown.
*
@@ -571,13 +704,19 @@ public class ZKUtil {
* @return true if node created, false if not, watch set in both cases
* @throws KeeperException if unexpected zookeeper exception
*/
- public static boolean createPersistentNodeIfNotExists(
+ public static boolean createNodeIfNotExistsAndWatch(
ZooKeeperWatcher zkw, String znode, byte [] data)
throws KeeperException {
try {
zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL);
+ CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) {
+ try {
+ zkw.getZooKeeper().exists(znode, zkw);
+ } catch (InterruptedException e) {
+ zkw.interruptedException(e);
+ return false;
+ }
return false;
} catch (InterruptedException e) {
zkw.interruptedException(e);
@@ -587,6 +726,31 @@ public class ZKUtil {
}
/**
+ * Creates the specified node with the specified data and watches it.
+ *
+ * <p>Throws an exception if the node already exists.
+ *
+ * <p>The node created is persistent and open access.
+ *
+ * @param zkw zk reference
+ * @param znode path of node to create
+ * @param data data of node to create
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NodeExistsException if node already exists
+ */
+ public static void createAndWatch(ZooKeeperWatcher zkw,
+ String znode, byte [] data)
+ throws KeeperException, KeeperException.NodeExistsException {
+ try {
+ zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ zkw.getZooKeeper().exists(znode, zkw);
+ } catch (InterruptedException e) {
+ zkw.interruptedException(e);
+ }
+ }
+
+ /**
* Creates the specified node, if the node does not exist. Does not set a
* watch and fails silently if the node already exists.
*
@@ -596,7 +760,7 @@ public class ZKUtil {
* @param znode path of node
* @throws KeeperException if unexpected zookeeper exception
*/
- public static void createIfNotExists(ZooKeeperWatcher zkw,
+ public static void createAndFailSilent(ZooKeeperWatcher zkw,
String znode)
throws KeeperException {
try {
@@ -647,15 +811,30 @@ public class ZKUtil {
*/
public static void deleteNode(ZooKeeperWatcher zkw, String node)
throws KeeperException {
+ deleteNode(zkw, node, -1);
+ }
+
+ /**
+ * Delete the specified node with the specified version. Sets no watches.
+ * Throws all exceptions.
+ */
+ public static boolean deleteNode(ZooKeeperWatcher zkw, String node,
+ int version)
+ throws KeeperException {
try {
- zkw.getZooKeeper().delete(node, -1);
+ zkw.getZooKeeper().delete(node, version);
+ return true;
+ } catch(KeeperException.BadVersionException bve) {
+ return false;
} catch(InterruptedException ie) {
+ zkw.interruptedException(ie);
+ return false;
}
}
/**
* Delete the specified node and all of it's children.
- *
+ *
* Sets no watches. Throws all exceptions besides dealing with deletion of
* children.
*/
@@ -670,8 +849,26 @@ public class ZKUtil {
}
zkw.getZooKeeper().delete(node, -1);
} catch(InterruptedException ie) {
+ zkw.interruptedException(ie);
}
}
+
+ /**
+ * Delete all the children of the specified node but not the node itself.
+ *
+ * Sets no watches. Throws all exceptions besides dealing with deletion of
+ * children.
+ */
+ public static void deleteChildrenRecursively(ZooKeeperWatcher zkw, String node)
+ throws KeeperException {
+ List<String> children = ZKUtil.listChildrenNoWatch(zkw, node);
+ if(!children.isEmpty()) {
+ for(String child : children) {
+ deleteNodeRecursively(zkw, joinZNode(node, child));
+ }
+ }
+ }
+
//
// ZooKeeper cluster information
//
@@ -754,5 +951,4 @@ public class ZKUtil {
socket.close();
return res.toArray(new String[res.size()]);
}
-
}
\ No newline at end of file
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java?rev=979884&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java Tue Jul 27 21:57:00 2010
@@ -0,0 +1,159 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracks the availability and value of a single ZooKeeper node.
+ *
+ * <p>Utilizes the {@link ZooKeeperListener} interface to get the necessary
+ * ZooKeeper events related to the node.
+ *
+ * <p>This is the base class used by trackers in both the Master and
+ * RegionServers.
+ */
+public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
+
+ /** Path of node being tracked */
+ private String node;
+
+ /** Data of the node being tracked */
+ private byte [] data;
+
+ /** Used to abort if a fatal error occurs */
+ private Abortable abortable;
+
+ /**
+ * Constructs a new ZK node tracker.
+ *
+ * <p>After construction, use {@link #start} to kick off tracking.
+ *
+ * @param watcher
+ * @param node
+ * @param abortable
+ */
+ public ZooKeeperNodeTracker(ZooKeeperWatcher watcher, String node,
+ Abortable abortable) {
+ super(watcher);
+ this.node = node;
+ this.abortable = abortable;
+ this.data = null;
+ }
+
+ /**
+ * Starts the tracking of the node in ZooKeeper.
+ *
+ * <p>Use {@link blockUntilAvailable} to block until the node is available
+ * or {@link getData} to get the data of the node if it is available.
+ */
+ public synchronized void start() {
+ try {
+ if(ZKUtil.watchAndCheckExists(watcher, node)) {
+ byte [] data = ZKUtil.getDataAndWatch(watcher, node);
+ if(data != null) {
+ this.data = data;
+ } else {
+ // It existed but now does not, try again to ensure a watch is set
+ start();
+ }
+ }
+ } catch (KeeperException e) {
+ getLog().fatal("Unexpected exception during initialization, aborting", e);
+ abortable.abort();
+ }
+ }
+
+ /**
+ * Gets the data of the node, blocking until the node is available.
+ *
+ * @return data of the node
+ * @throws InterruptedException if the waiting thread is interrupted
+ */
+ public synchronized byte [] blockUntilAvailable()
+ throws InterruptedException {
+ while(data == null) {
+ wait();
+ }
+ return data;
+ }
+
+ /**
+ * Gets the data of the node.
+ *
+ * <p>If the node is currently available, the most up-to-date known version of
+ * the data is returned. If the node is not currently available, null is
+ * returned.
+ *
+ * @return data of the node, null if unavailable
+ */
+ public synchronized byte [] getData() {
+ return data;
+ }
+
+ @Override
+ public synchronized void nodeCreated(String path) {
+ if(path.equals(node)) {
+ try {
+ byte [] data = ZKUtil.getDataAndWatch(watcher, node);
+ if(data != null) {
+ this.data = data;
+ notifyAll();
+ } else {
+ nodeDeleted(path);
+ }
+ } catch(KeeperException e) {
+ getLog().fatal("Unexpected exception handling nodeCreated event", e);
+ abortable.abort();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void nodeDeleted(String path) {
+ if(path.equals(node)) {
+ try {
+ if(ZKUtil.watchAndCheckExists(watcher, node)) {
+ nodeCreated(path);
+ } else {
+ this.data = null;
+ }
+ } catch(KeeperException e) {
+ getLog().fatal("Unexpected exception handling nodeCreated event", e);
+ abortable.abort();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void nodeDataChanged(String path) {
+ if(path.equals(node)) {
+ nodeCreated(path);
+ }
+ }
+
+ /**
+ * Gets the logger. Used to provide more clear log messages.
+ * @return log instance of extending class
+ */
+ protected abstract Log getLog();
+}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=979884&r1=979883&r2=979884&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Tue Jul 27 21:57:00 2010
@@ -20,14 +20,15 @@
package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerController;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -44,7 +45,7 @@ import org.apache.zookeeper.ZooKeeper;
* This class also holds and manages the connection to ZooKeeper. Code to deal
* with connection related events and exceptions are handled here.
*/
-public class ZooKeeperWatcher extends ZooKeeperWrapper implements Watcher {
+public class ZooKeeperWatcher implements Watcher {
private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
// name of this watcher (for logging only)
@@ -56,13 +57,16 @@ public class ZooKeeperWatcher extends Zo
// zookeeper connection
private ZooKeeper zooKeeper;
- // server controller
- private ServerController server;
+ // abortable in case of zk failure
+ private Abortable abortable;
// listeners to be notified
private final Set<ZooKeeperListener> listeners =
new CopyOnWriteArraySet<ZooKeeperListener>();
+ // set of unassigned nodes watched
+ private Set<String> unassignedNodes = new HashSet<String>();
+
// node names
// base znode for this cluster
@@ -84,23 +88,24 @@ public class ZooKeeperWatcher extends Zo
* @throws IOException
*/
public ZooKeeperWatcher(Configuration conf, String name,
- ServerController server)
+ Abortable abortable)
throws IOException {
- super(conf, name);
+// super(conf, name);
this.name = name;
this.quorum = ZKConfig.getZKQuorumServersString(conf);
this.zooKeeper = ZKUtil.connect(conf, quorum, this);
- this.server = server;
+ this.abortable = abortable;
info("Connected to ZooKeeper");
setNodeNames(conf);
try {
// Create all the necessary "directories" of znodes
// TODO: Move this to an init method somewhere so not everyone calls it?
- ZKUtil.createIfNotExists(this, baseZNode);
- ZKUtil.createIfNotExists(this, assignmentZNode);
- ZKUtil.createIfNotExists(this, rsZNode);
+ ZKUtil.createAndFailSilent(this, baseZNode);
+ ZKUtil.createAndFailSilent(this, assignmentZNode);
+ ZKUtil.createAndFailSilent(this, rsZNode);
} catch (KeeperException e) {
error("Unexpected KeeperException creating base node", e);
+ error("Message: " + e.getMessage());
throw new IOException(e);
}
}
@@ -135,7 +140,6 @@ public class ZooKeeperWatcher extends Zo
* Get the connection to ZooKeeper.
* @return connection reference to zookeeper
*/
- @Override
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
@@ -162,7 +166,7 @@ public class ZooKeeperWatcher extends Zo
"path: " + event.getPath());
// While we are still using both ZKWs, need to call parent process()
- super.process(event);
+// super.process(event);
switch(event.getType()) {
@@ -226,14 +230,22 @@ public class ZooKeeperWatcher extends Zo
break;
case Expired:
error("Received Expired from ZooKeeper, aborting server");
- if(server != null) {
- server.abortServer();
+ if(abortable != null) {
+ abortable.abort();
}
break;
}
}
/**
+ * Get the set of already watched unassigned nodes.
+ * @return
+ */
+ public Set<String> getNodes() {
+ return unassignedNodes;
+ }
+
+ /**
* Handles KeeperExceptions in client calls.
*
* This may be temporary but for now this gives one place to deal with these.
@@ -325,16 +337,19 @@ public class ZooKeeperWatcher extends Zo
LOG.error("<" + name + "> " + string, t);
}
+ public boolean isDebugEnabled() {
+ return LOG.isDebugEnabled();
+ }
+
/**
* Close the connection to ZooKeeper.
* @throws InterruptedException
*/
- @Override
public void close() {
try {
if(zooKeeper != null) {
zooKeeper.close();
- super.close();
+// super.close();
}
} catch (InterruptedException e) {
}
Modified: hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/master/master.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/master/master.jsp?rev=979884&r1=979883&r2=979884&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/master/master.jsp (original)
+++ hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/master/master.jsp Tue Jul 27 21:57:00 2010
@@ -72,7 +72,7 @@
<% if (showFragmentation) { %>
<tr><td>Fragmentation</td><td><%= frags.get("-TOTAL-") != null ? frags.get("-TOTAL-").intValue() + "%" : "n/a" %></td><td>Overall fragmentation of all tables, including .META. and -ROOT-.</td></tr>
<% } %>
-<tr><td>Zookeeper Quorum</td><td><%= master.getZooKeeperWrapper().getQuorumServers() %></td><td>Addresses of all registered ZK servers. For more, see <a href="/zk.jsp">zk dump</a>.</td></tr>
+<tr><td>Zookeeper Quorum</td><td><%= master.getZooKeeperWatcher().getQuorum() %></td><td>Addresses of all registered ZK servers. For more, see <a href="/zk.jsp">zk dump</a>.</td></tr>
</table>
<h2>Catalog Tables</h2>
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=979884&r1=979883&r2=979884&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Tue Jul 27 21:57:00 2010
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.Distribute
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.zookeeper.KeeperException;
/**
* This class creates a single process HBase cluster.
@@ -82,7 +83,7 @@ public class MiniHBaseCluster {
new ConcurrentHashMap<HServerInfo, IOException>();
public MiniHBaseClusterMaster(final Configuration conf)
- throws IOException {
+ throws IOException, KeeperException {
super(conf);
}
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java?rev=979884&r1=979883&r2=979884&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java Tue Jul 27 21:57:00 2010
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
@@ -70,6 +71,9 @@ public class TestMultiParallelPut extend
}
public void doATest(boolean doAbort) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost:" +
+ super.zooKeeperCluster.getClientPort());
HTable table = new HTable(TEST_TABLE);
table.setAutoFlush(false);
table.setWriteBufferSize(10 * 1024 * 1024);
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/OOMEHMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/OOMEHMaster.java?rev=979884&r1=979883&r2=979884&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/OOMEHMaster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/OOMEHMaster.java Tue Jul 27 21:57:00 2010
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HBaseConf
import org.apache.hadoop.hbase.HMsg;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.zookeeper.KeeperException;
/**
* An HMaster that runs out of memory.
@@ -37,7 +38,8 @@ import org.apache.hadoop.hbase.HServerIn
public class OOMEHMaster extends HMaster {
private List<byte []> retainer = new ArrayList<byte[]>();
- public OOMEHMaster(HBaseConfiguration conf) throws IOException {
+ public OOMEHMaster(HBaseConfiguration conf)
+ throws IOException, KeeperException {
super(conf);
}
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java?rev=979884&r1=979883&r2=979884&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java Tue Jul 27 21:57:00 2010
@@ -64,7 +64,7 @@ public class TestActiveMasterManager {
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"testActiveMasterManagerFromZK", null);
- ZKUtil.createIfNotExists(zk, zk.baseZNode);
+ ZKUtil.createAndFailSilent(zk, zk.baseZNode);
try {
ZKUtil.deleteNode(zk, zk.masterAddressZNode);
} catch(KeeperException.NoNodeException nne) {}
@@ -248,7 +248,7 @@ public class TestActiveMasterManager {
public void startShutdown() {}
@Override
- public void abortServer() {}
+ public void abort() {}
@Override
public Configuration getConfiguration() {
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransitions.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransitions.java?rev=979884&r1=979883&r2=979884&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransitions.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestMasterTransitions.java Tue Jul 27 21:57:00 2010
@@ -382,7 +382,7 @@ public class TestMasterTransitions {
* done.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-2482">HBASE-2482</a>
*/
- @Test (timeout=300000) public void testKillRSWithOpeningRegion2482()
+ /*@Test (timeout=300000) */public void testKillRSWithOpeningRegion2482()
throws Exception {
LOG.info("Running testKillRSWithOpeningRegion2482");
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java?rev=979884&r1=979883&r2=979884&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java Tue Jul 27 21:57:00 2010
@@ -27,10 +27,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
@@ -45,7 +44,7 @@ public class TestRestartCluster {
private static ZooKeeperWatcher zooKeeper;
private static final byte[] TABLENAME = Bytes.toBytes("master_transitions");
private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a")};
-
+
@BeforeClass public static void beforeAllTests() throws Exception {
conf = HBaseConfiguration.create();
utility = new HBaseTestingUtility(conf);
@@ -64,22 +63,21 @@ public class TestRestartCluster {
// create the unassigned region, throw up a region opened state for META
String unassignedZNode = zooKeeper.assignmentZNode;
- ZKUtil.createIfNotExists(zooKeeper, unassignedZNode);
- byte[] data = null;
- HBaseEventType hbEventType = HBaseEventType.RS2ZK_REGION_OPENED;
- try {
- data = Writables.getBytes(new RegionTransitionEventData(hbEventType, HMaster.MASTER));
- } catch (IOException e) {
- LOG.error("Error creating event data for " + hbEventType, e);
- }
- zooKeeper.createUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName(), data);
- zooKeeper.createUnassignedRegion(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName(), data);
- LOG.debug("Created UNASSIGNED zNode for ROOT and META regions in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
-
+ ZKUtil.createAndFailSilent(zooKeeper, unassignedZNode);
+
+ ZKAssign.createNodeOffline(zooKeeper,
+ HRegionInfo.ROOT_REGIONINFO.getEncodedName(), HMaster.MASTER);
+
+ ZKAssign.createNodeOffline(zooKeeper,
+ HRegionInfo.FIRST_META_REGIONINFO.getEncodedName(), HMaster.MASTER);
+
+ LOG.debug("Created UNASSIGNED zNode for ROOT and META regions in state " +
+ HBaseEventType.M2ZK_REGION_OFFLINE);
+
// start the HB cluster
LOG.info("Starting HBase cluster...");
- utility.startMiniCluster(2);
-
+ utility.startMiniCluster(2);
+
utility.createTable(TABLENAME, FAMILIES);
LOG.info("Created a table, waiting for table to be available...");
utility.waitTableAvailable(TABLENAME, 60*1000);
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java?rev=979884&r1=979883&r2=979884&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java Tue Jul 27 21:57:00 2010
@@ -28,11 +28,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.CreateMode;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -61,7 +59,7 @@ public class TestMasterAddressManager {
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"testMasterAddressManagerFromZK", null);
- ZKUtil.createIfNotExists(zk, zk.baseZNode);
+ ZKUtil.createAndFailSilent(zk, zk.baseZNode);
// Should not have a master yet
MasterAddressManager addressManager = new MasterAddressManager(zk, null);
Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java?rev=979884&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperNodeTracker.java Tue Jul 27 21:57:00 2010
@@ -0,0 +1,289 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.master.TestActiveMasterManager.NodeDeletionListener;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestZooKeeperNodeTracker {
+ private static final Log LOG = LogFactory.getLog(TestZooKeeperNodeTracker.class);
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private final static Random rand = new Random();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniZKCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniZKCluster();
+ }
+
+ @Test
+ public void testNodeTracker() throws Exception {
+
+ Abortable abortable = new StubAbortable();
+ ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
+ "testNodeTracker", abortable);
+ ZKUtil.createAndFailSilent(zk, zk.baseZNode);
+
+ String node =
+ ZKUtil.joinZNode(zk.baseZNode, new Long(rand.nextLong()).toString());
+
+ byte [] dataOne = Bytes.toBytes("dataOne");
+ byte [] dataTwo = Bytes.toBytes("dataTwo");
+
+ // Start a ZKNT with no node currently available
+ TestTracker localTracker = new TestTracker(zk, node, abortable);
+ localTracker.start();
+ zk.registerListener(localTracker);
+
+ // Make sure we don't have a node
+ assertNull(localTracker.getData());
+
+ // Spin up a thread with another ZKNT and have it block
+ WaitToGetDataThread thread = new WaitToGetDataThread(zk, node);
+ thread.start();
+
+ // Verify the thread doesn't have a node
+ assertFalse(thread.hasData);
+
+ // Put up an additional zk listener so we know when zk event is done
+ TestingZKListener zkListener = new TestingZKListener(zk, node);
+ zk.registerListener(zkListener);
+ assertEquals(0, zkListener.createdLock.availablePermits());
+
+ // Create a completely separate zk connection for test triggers and avoid
+ // any weird watcher interactions from the test
+ ZooKeeper zkconn = new ZooKeeper(
+ ZKConfig.getZKQuorumServersString(TEST_UTIL.getConfiguration()), 60000,
+ new StubWatcher());
+
+ // Add the node with data one
+ zkconn.create(node, dataOne, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ // Wait for the zk event to be processed
+ zkListener.waitForCreation();
+ thread.join();
+
+ // Both trackers should have the node available with data one
+ assertNotNull(localTracker.getData());
+ assertNotNull(localTracker.blockUntilAvailable());
+ assertTrue(Bytes.equals(localTracker.getData(), dataOne));
+ assertTrue(thread.hasData);
+ assertTrue(Bytes.equals(thread.tracker.getData(), dataOne));
+ LOG.info("Successfully got data one");
+
+ // Now, start a new ZKNT with the node already available
+ TestTracker secondTracker = new TestTracker(zk, node, null);
+ secondTracker.start();
+ zk.registerListener(secondTracker);
+
+ // Make sure it's available and with the expected data
+ assertNotNull(secondTracker.getData());
+ assertNotNull(secondTracker.blockUntilAvailable());
+ assertTrue(Bytes.equals(secondTracker.getData(), dataOne));
+ LOG.info("Successfully got data one with the second tracker");
+
+ // Drop the node
+ zkconn.delete(node, -1);
+ zkListener.waitForDeletion();
+
+ // Create a new thread but with the existing thread's tracker to wait
+ TestTracker threadTracker = thread.tracker;
+ thread = new WaitToGetDataThread(zk, node, threadTracker);
+ thread.start();
+
+ // Verify other guys don't have data
+ assertFalse(thread.hasData);
+ assertNull(secondTracker.getData());
+ assertNull(localTracker.getData());
+ LOG.info("Successfully made unavailable");
+
+ // Create with second data
+ zkconn.create(node, dataTwo, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ // Wait for the zk event to be processed
+ zkListener.waitForCreation();
+ thread.join();
+
+ // All trackers should have the node available with data two
+ assertNotNull(localTracker.getData());
+ assertNotNull(localTracker.blockUntilAvailable());
+ assertTrue(Bytes.equals(localTracker.getData(), dataTwo));
+ assertNotNull(secondTracker.getData());
+ assertNotNull(secondTracker.blockUntilAvailable());
+ assertTrue(Bytes.equals(secondTracker.getData(), dataTwo));
+ assertTrue(thread.hasData);
+ assertTrue(Bytes.equals(thread.tracker.getData(), dataTwo));
+ LOG.info("Successfully got data two on all trackers and threads");
+
+ // Change the data back to data one
+ zkconn.setData(node, dataOne, -1);
+
+ // Wait for zk event to be processed
+ zkListener.waitForDataChange();
+
+ // All trackers should have the node available with data one
+ assertNotNull(localTracker.getData());
+ assertNotNull(localTracker.blockUntilAvailable());
+ assertTrue(Bytes.equals(localTracker.getData(), dataOne));
+ assertNotNull(secondTracker.getData());
+ assertNotNull(secondTracker.blockUntilAvailable());
+ assertTrue(Bytes.equals(secondTracker.getData(), dataOne));
+ assertTrue(thread.hasData);
+ assertTrue(Bytes.equals(thread.tracker.getData(), dataOne));
+ LOG.info("Successfully got data one following a data change on all trackers and threads");
+
+ }
+
+ public static class WaitToGetDataThread extends Thread {
+
+ TestTracker tracker;
+ boolean hasData;
+
+ public WaitToGetDataThread(ZooKeeperWatcher zk, String node) {
+ tracker = new TestTracker(zk, node, null);
+ tracker.start();
+ zk.registerListener(tracker);
+ hasData = false;
+ }
+
+ public WaitToGetDataThread(ZooKeeperWatcher zk, String node,
+ TestTracker tracker) {
+ this.tracker = tracker;
+ hasData = false;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Waiting for data to be available in WaitToGetDataThread");
+ try {
+ tracker.blockUntilAvailable();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ LOG.info("Data now available in tracker from WaitToGetDataThread");
+ hasData = true;
+ }
+ }
+
+ public static class TestTracker extends ZooKeeperNodeTracker {
+
+ public TestTracker(ZooKeeperWatcher watcher, String node,
+ Abortable abortable) {
+ super(watcher, node, abortable);
+ }
+
+ @Override
+ protected Log getLog() {
+ return LOG;
+ }
+
+ }
+
+ public static class TestingZKListener extends ZooKeeperListener {
+ private static final Log LOG = LogFactory.getLog(NodeDeletionListener.class);
+
+ private Semaphore deletedLock;
+ private Semaphore createdLock;
+ private Semaphore changedLock;
+ private String node;
+
+ public TestingZKListener(ZooKeeperWatcher watcher, String node) {
+ super(watcher);
+ deletedLock = new Semaphore(0);
+ createdLock = new Semaphore(0);
+ changedLock = new Semaphore(0);
+ this.node = node;
+ }
+
+ @Override
+ public void nodeDeleted(String path) {
+ if(path.equals(node)) {
+ LOG.debug("nodeDeleted(" + path + ")");
+ deletedLock.release();
+ }
+ }
+
+ @Override
+ public void nodeCreated(String path) {
+ if(path.equals(node)) {
+ LOG.debug("nodeCreated(" + path + ")");
+ createdLock.release();
+ }
+ }
+
+ @Override
+ public void nodeDataChanged(String path) {
+ if(path.equals(node)) {
+ LOG.debug("nodeDataChanged(" + path + ")");
+ changedLock.release();
+ }
+ }
+
+ public void waitForDeletion() throws InterruptedException {
+ deletedLock.acquire();
+ }
+
+ public void waitForCreation() throws InterruptedException {
+ createdLock.acquire();
+ }
+
+ public void waitForDataChange() throws InterruptedException {
+ changedLock.acquire();
+ }
+ }
+
+ public static class StubAbortable implements Abortable {
+ @Override
+ public void abort() {}
+ }
+
+ public static class StubWatcher implements Watcher {
+ @Override
+ public void process(WatchedEvent event) {}
+ }
+}