You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2010/07/14 05:41:12 UTC
svn commit: r963935 - in /hbase/branches/0.90_master_rewrite: ./
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/zookeeper/
src/test/java/org/apache/hadoop/hbase/ma...
Author: jgray
Date: Wed Jul 14 03:41:11 2010
New Revision: 963935
URL: http://svn.apache.org/viewvc?rev=963935&view=rev
Log:
HBASE-2695 [MasterStartupCleanup-v4] Cleans up the master startup, adds new ZK tool ActiveMasterManager for master-side (part of master cleanup and refactor)
Added:
hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
Modified:
hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/MasterStatus.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java
Modified: hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt?rev=963935&r1=963934&r2=963935&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt (original)
+++ hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt Wed Jul 14 03:41:11 2010
@@ -14,6 +14,9 @@ Branch 0.90.0 - Master Rewrite Branch
HBASE-2696 Re-enabled TestZooKeeper.testRegionServerSessionExpired
HBASE-2699 [LoadBalancer-v5] Reimplement load balancing to be a
background process and to not use heartbeats
+ HBASE-2695 [MasterStartupCleanup-v4] Cleans up the master startup, adds
+ new ZK tool ActiveMasterManager for master-side (part of
+ master cleanup and refactor)
NEW FEATURES
Added: hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt?rev=963935&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt (added)
+++ hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt Wed Jul 14 03:41:11 2010
@@ -0,0 +1,38 @@
+List of things todo for branch, including comments from reviews not yet
+implemented.
+
+
+Now:
+
+* synchronize all access to the boolean in ActiveMasterManager
+
+
+Think about:
+
+* renaming master file manager? MasterFS/MasterFileSystem
+
+
+Later:
+
+* ServerStatus/MasterStatus
+
+ - These need new names to be more descriptive (ServerControl?)
+ - They should have a very clear purpose that adds value beyond passing
+ HMaster directly
+ - Current idea is these things would just have accessors/setters to
+ the server status booleans and abort() methods (like closed, closing,
+ abortRequested)
+
+* HBaseEventHandler/HBaseEventType/HBaseExecutorService
+
+ - After ZK changes, renamed to EventHandler/EventType
+ - Currently multiple types map to a single handler, we may want 1-to-1
+ - Need to do a full review of the semantics of these once bulk of
+ master rewrite is done
+
+* LoadBalancer
+
+ - Need to finish or back out code related to block locations
+ (if finish, need to use files not directory, and use right location)
+ - Put notes from reviewboard/jira into LB javadoc or hbase "book"
+
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java?rev=963935&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java Wed Jul 14 03:41:11 2010
@@ -0,0 +1,154 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Handles everything on master-side related to master election.
+ *
+ * Listens and responds to ZooKeeper notifications on the master znode,
+ * both nodeCreated and nodeDeleted.
+ *
+ * Contains blocking methods which will hold up backup masters, waiting
+ * for the active master to fail.
+ *
+ * This class is instantiated in the HMaster constructor and the method
+ * {@link #blockUntilBecomingActiveMaster()} is called to wait until becoming
+ * the active master of the cluster.
+ */
+public class ActiveMasterManager extends ZooKeeperListener {
+ private static final Log LOG = LogFactory.getLog(ActiveMasterManager.class);
+
+ final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false);
+
+ private final HServerAddress address;
+ private final MasterStatus status;
+
+ ActiveMasterManager(ZooKeeperWatcher watcher, HServerAddress address,
+ MasterStatus status) {
+ super(watcher);
+ this.address = address;
+ this.status = status;
+ }
+
+ @Override
+ public void nodeCreated(String path) {
+ if(path.equals(watcher.masterAddressZNode) && !status.isClosed()) {
+ handleMasterNodeChange();
+ }
+ }
+
+ @Override
+ public void nodeDeleted(String path) {
+ if(path.equals(watcher.masterAddressZNode) && !status.isClosed()) {
+ handleMasterNodeChange();
+ }
+ }
+
+ /**
+ * Handle a change in the master node. Doesn't matter whether this was called
+ * from a nodeCreated or nodeDeleted event because there are no guarantees
+ * that the current state of the master node matches the event at the time of
+ * our next ZK request.
+ *
+ * Uses the watchAndCheckExists method which watches the master address node
+ * regardless of whether it exists or not. If it does exist (there is an
+ * active master), it returns true. Otherwise it returns false.
+ *
+ * A watcher is set which guarantees that this method will get called again if
+ * there is another change in the master node.
+ */
+ private void handleMasterNodeChange() {
+ // Watch the node and check if it exists.
+ try {
+ synchronized(clusterHasActiveMaster) {
+ if(ZKUtil.watchAndCheckExists(watcher, watcher.masterAddressZNode)) {
+ // A master node exists, there is an active master
+ LOG.debug("A master is now available");
+ clusterHasActiveMaster.set(true);
+ } else {
+ // Node is no longer there, cluster does not have an active master
+ LOG.debug("No master available. notifying waiting threads");
+ clusterHasActiveMaster.set(false);
+ // Notify any thread waiting to become the active master
+ clusterHasActiveMaster.notifyAll();
+ }
+ }
+ } catch (KeeperException ke) {
+ LOG.fatal("Received an unexpected KeeperException, aborting", ke);
+ status.abortServer();
+ }
+ }
+
+ /**
+ * Block until becoming the active master.
+ *
+ * Method blocks until there is not another active master and our attempt
+ * to become the new active master is successful.
+ *
+ * This also makes sure that we are watching the master znode so will be
+ * notified if another master dies.
+ */
+ void blockUntilBecomingActiveMaster() {
+ // Try to become the active master, watch if there is another master
+ try {
+ if(ZKUtil.setAddressAndWatch(watcher, watcher.masterAddressZNode,
+ address)) {
+ // We are the master, return
+ clusterHasActiveMaster.set(true);
+ return;
+ }
+ } catch (KeeperException ke) {
+ LOG.fatal("Received an unexpected KeeperException, aborting", ke);
+ status.abortServer();
+ return;
+ }
+ // There is another active master, this is not a cluster startup
+ // and we must wait until the active master dies
+ LOG.info("Another master is already the active master, waiting to become " +
+ "the next active master");
+ clusterHasActiveMaster.set(true);
+ status.setClusterStartup(false);
+ synchronized(clusterHasActiveMaster) {
+ while(clusterHasActiveMaster.get() && !status.isClosed()) {
+ try {
+ clusterHasActiveMaster.wait();
+ } catch (InterruptedException e) {
+ // We expect to be interrupted when a master dies, will fall out if so
+ LOG.debug("Interrupted waiting for master to die", e);
+ }
+ }
+ if(status.isClosed()) {
+ return;
+ }
+ // Try to become active master again now that there is no active master
+ blockUntilBecomingActiveMaster();
+ }
+ }
+}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java?rev=963935&r1=963934&r2=963935&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java Wed Jul 14 03:41:11 2010
@@ -72,12 +72,23 @@ public class FileSystemManager {
// We're supposed to run on 0.20 and 0.21 anyways.
conf.set("fs.default.name", this.rootdir.toString());
conf.set("fs.defaultFS", this.rootdir.toString());
+ // setup the filesystem variable
this.fs = FileSystem.get(conf);
-
+ // set up the archived logs path
+ this.oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+ }
+
+ /**
+ * <ol>
+ * <li>Check if the root region exists and is readable, if not create it</li>
+ * <li>Create a log archive directory for RS to put archived logs</li>
+ * </ol>
+ */
+ public void initialize() throws IOException {
+ // check if the root directory exists
checkRootDir(this.rootdir, conf, this.fs);
// Make sure the region servers can archive their old logs
- this.oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
if(!this.fs.exists(this.oldLogDir)) {
this.fs.mkdirs(this.oldLogDir);
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=963935&r1=963934&r2=963935&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Wed Jul 14 03:41:11 2010
@@ -29,19 +29,13 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.NavigableMap;
import java.util.Set;
-import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -52,7 +46,6 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LocalHBaseCluster;
@@ -77,10 +70,10 @@ import org.apache.hadoop.hbase.ipc.HBase
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.master.RegionServerOperationQueue.ProcessingResultCode;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.InfoServer;
@@ -127,122 +120,231 @@ public class HMaster extends Thread impl
// TODO: Is this separate flag necessary?
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+ // The configuration for the Master
private final Configuration conf;
+ // server for the web ui
private InfoServer infoServer;
- private final int numRetries;
-
- // Metrics is set when we call run.
+ // Reporting to track master metrics.
private final MasterMetrics metrics;
// Our zk client. TODO: rename variable once we settle on naming
private ZooKeeperWatcher zooKeeperWrapper;
- // Watcher for master address and for cluster shutdown.
- private final ZKMasterAddressWatcher zkMasterAddressWatcher;
+ // Manager and zk listener for master election
+ private ActiveMasterManager activeMasterManager;
// A Sleeper that sleeps for threadWakeFrequency; sleep if nothing todo.
private final Sleeper sleeper;
-
+ // RPC server for the HMaster
private final HBaseServer rpcServer;
+ // Address of the HMaster
private final HServerAddress address;
-
// file system manager for the master FS operations
private final FileSystemManager fileSystemManager;
private final ServerConnection connection;
+ // server manager to deal with region server info
private final ServerManager serverManager;
+ // region manager to deal with region specific stuff
private final RegionManager regionManager;
- private long lastFragmentationQuery = -1L;
- private Map<String, Integer> fragmentation = null;
- private final RegionServerOperationQueue regionServerOperationQueue;
-
// True if this is the master that started the cluster.
boolean isClusterStartup;
+ // TODO: the following should eventually be removed from here
+ private final RegionServerOperationQueue regionServerOperationQueue;
+ private long lastFragmentationQuery = -1L;
+ private Map<String, Integer> fragmentation = null;
+
/**
- * Constructor
- * @param conf configuration
- * @throws IOException
+ * Initializes the HMaster. The steps are as follows:
+ *
+ * <ol>
+ * <li>Initialize HMaster RPC and address
+ * <li>Connect to ZooKeeper and figure out if this is a fresh cluster start or
+ * a failed over master
+ * <li>Initialize master components - server manager, region manager, metrics,
+ * region server queue, file system manager, etc
+ * <li>Block until becoming active master
+ * </ol>
*/
public HMaster(Configuration conf) throws IOException {
+ // initialize some variables
this.conf = conf;
-
- // Get my address and create an rpc server instance. The rpc-server port
- // can be ephemeral...ensure we have the correct info
- HServerAddress a = new HServerAddress(getMyAddress(this.conf));
- this.rpcServer = HBaseRPC.getServer(this, a.getBindAddress(),
- a.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
- false, conf);
- this.address = new HServerAddress(this.rpcServer.getListenerAddress());
+ // set the thread name
+ setName(MASTER + "-" + this.address);
- this.numRetries = conf.getInt("hbase.client.retries.number", 2);
- int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
- this.sleeper = new Sleeper(threadWakeFrequency, this.closed);
- this.connection = ServerConnectionManager.getConnection(conf);
+ /*
+ * 1. Determine address and initialize RPC server (but do not start)
+ *
+ * Get the master address and create an RPC server instance. The RPC
+ * server ports can be ephemeral.
+ */
+ HServerAddress a = new HServerAddress(getMyAddress(this.conf));
+ int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10);
+ this.rpcServer = HBaseRPC.getServer(this, a.getBindAddress(), a.getPort(),
+ numHandlers, false, conf);
+ this.address = new HServerAddress(rpcServer.getListenerAddress());
- // Figure out if this is a fresh cluster start. This is done by checking the
- // number of RS ephemeral nodes. RS ephemeral nodes are created only after
- // the primary master has written the address to ZK. So this has to be done
- // before we race to write our address to zookeeper.
- zooKeeperWrapper =
+ /*
+ * 2. Determine if this is a fresh cluster startup or failed over master
+ *
+ * This is done by checking for the existence of any ephemeral
+ * RegionServer nodes in ZooKeeper. These nodes are created by RSs on
+ * their initialization but only after they find the primary master. As
+ * long as this check is done before we write our address into ZK, this
+ * will work. Note that multiple masters could find this to be true on
+ * startup (none have become active master yet), which is why there is
+ * an additional check if this master does not become primary on its
+ * first attempt.
+ */
+ zooKeeperWrapper =
new ZooKeeperWatcher(conf, getHServerAddress().toString(), this);
isClusterStartup = (zooKeeperWrapper.scanRSDirectory().size() == 0);
-
- // Create the filesystem manager, which in turn does the following:
- // - Creates the root hbase directory in the FS
- // - Checks the FS to make sure the root directory is readable
- // - Creates the archive directory for logs
+
+ /*
+ * 3. Initialize master components.
+ *
+ * This includes the filesystem manager, server manager, region manager,
+ * metrics, queues, sleeper, etc...
+ */
+ this.connection = ServerConnectionManager.getConnection(conf);
+ this.regionServerOperationQueue = new RegionServerOperationQueue(conf, closed);
+ this.metrics = new MasterMetrics(this.getName());
fileSystemManager = new FileSystemManager(conf, this);
-
- // Get our zookeeper wrapper and then try to write our address to zookeeper.
- // We'll succeed if we are only master or if we win the race when many
- // masters. Otherwise we park here inside in writeAddressToZooKeeper.
- // TODO: Bring up the UI to redirect to active Master.
- zooKeeperWrapper.registerListener(this);
- this.zkMasterAddressWatcher =
- new ZKMasterAddressWatcher(this.zooKeeperWrapper, this.shutdownRequested);
- zooKeeperWrapper.registerListener(zkMasterAddressWatcher);
- this.zkMasterAddressWatcher.writeAddressToZooKeeper(this.address, true);
- this.regionServerOperationQueue =
- new RegionServerOperationQueue(this.conf, this.closed);
+ serverManager = new ServerManager(this, metrics, regionServerOperationQueue);
+ regionManager = new RegionManager(this);
+ // create a sleeper to sleep for a configured wait frequency
+ int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
+ this.sleeper = new Sleeper(threadWakeFrequency, this.closed);
- // set the thread name
- setName(MASTER);
- // create the master metrics object
- this.metrics = new MasterMetrics(MASTER);
+ /*
+ * 4. Block on becoming the active master.
+ *
+ * We race with other masters to write our address into ZooKeeper. If we
+ * succeed, we are the primary/active master and finish initialization.
+ *
+ * If we do not succeed, there is another active master and we should
+ * now wait until it dies to try and become the next active master. If
+ * we do not succeed on our first attempt, this is no longer a cluster
+ * startup.
+ */
+ activeMasterManager = new ActiveMasterManager(zooKeeperWrapper, address,
+ this);
+ zooKeeperWrapper.registerListener(activeMasterManager);
+ // Wait here until we are the active master
+ activeMasterManager.blockUntilBecomingActiveMaster();
- serverManager = new ServerManager(this, metrics, regionServerOperationQueue);
+ // We are the active master now.
-
- // Start the unassigned watcher - which will create the unassigned region
- // in ZK. This is needed before RegionManager() constructor tries to assign
- // the root region.
- ZKUnassignedWatcher.start(this.conf, this);
- // start the "close region" executor service
- HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(address.toString());
- // start the "open region" executor service
- HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(address.toString());
+ LOG.info("Server has become the active/primary master. Address is " +
+ this.address.toString());
-
- // start the region manager
- regionManager = new RegionManager(this);
+ // run() is executed next
+ }
- // We're almost open for business
- this.closed.set(false);
- LOG.info("HMaster initialized on " + this.address.toString());
+ /**
+ * Main processing loop for the HMaster.
+ * 1. Handle both fresh cluster start as well as failed over initialization of
+ * the HMaster.
+ * 2. Start the necessary services
+ * 3. Reassign the root region
+ * 4. The master is no longer closed - set "closed" to false
+ */
+ @Override
+ public void run() {
+ try {
+ // If this is a fresh cluster start, make sure the root region exists.
+ if(isClusterStartup()) {
+ // Initialize the filesystem, which does the following:
+ // - Creates the root hbase directory in the FS
+ // - Checks the FS to make sure the root directory is readable
+ // - Creates the archive directory for logs
+ fileSystemManager.initialize();
+ // Do any log splitting necessary
+ // TODO: Should do this in background rather than block master startup
+ fileSystemManager.splitLogAfterStartup();
+ }
+ // TODO: fix the logic and naming for joinCluster()
+ joinCluster();
+ // start up all service threads.
+ startServiceThreads();
+ // assign the root region
+ regionManager.reassignRootRegion();
+ // set the master as opened
+ this.closed.set(false);
+ LOG.info("HMaster started on " + this.address.toString());
+
+ while (!this.closed.get()) {
+ // check if we should be shutting down
+ if (this.shutdownRequested.get()) {
+ // The region servers won't all exit until we stop scanning the
+ // meta regions
+ this.regionManager.stopScanners();
+ if (this.serverManager.numServers() == 0) {
+ startShutdown();
+ break;
+ }
+ else {
+ LOG.debug("Waiting on " +
+ this.serverManager.getServersToServerInfo().keySet().toString());
+ }
+ }
+
+ // process the operation, handle the result
+ ProcessingResultCode resultCode = regionServerOperationQueue.process();
+ // If FAILED op processing, bad. Will exit.
+ if(resultCode == ProcessingResultCode.FAILED) {
+ break;
+ }
+ // If bad filesystem, exit
+ else if(resultCode == ProcessingResultCode.REQUEUED_BUT_PROBLEM) {
+ if (!fileSystemManager.checkFileSystem()) {
+ break;
+ }
+ }
+ // Continue run loop if conditions are PROCESSED, NOOP, REQUEUED
+ }
+ } catch (Throwable t) {
+ LOG.fatal("Unhandled exception. Starting shutdown.", t);
+ setClosed();
+ }
+
+ // Wait for all the remaining region servers to report in.
+ this.serverManager.letRegionServersShutdown();
+
+ // Clean up and close up shop
+ if (this.infoServer != null) {
+ LOG.info("Stopping infoServer");
+ try {
+ this.infoServer.stop();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ this.rpcServer.stop();
+ this.regionManager.stop();
+ this.zooKeeperWrapper.close();
+ HBaseExecutorService.shutdown();
+ LOG.info("HMaster main thread exiting");
}
-
+
/**
- * Returns true if this master process was responsible for starting the
- * cluster.
+ * Returns true if this master process was responsible for starting the
+ * cluster, false if not.
*/
public boolean isClusterStartup() {
return isClusterStartup;
}
-
- public void resetClusterStartup() {
- isClusterStartup = false;
+
+ /**
+ * Sets whether this is a cluster startup or not. Used by the
+ * {@link ActiveMasterManager} to set to false if we determine another master
+ * has become the primary.
+ * @param isClusterStartup false if another master became active before us
+ */
+ public void setClusterStartup(boolean isClusterStartup) {
+ this.isClusterStartup = isClusterStartup;
}
-
+
public HServerAddress getHServerAddress() {
return address;
}
@@ -274,7 +376,7 @@ public class HMaster extends Thread impl
public InfoServer getInfoServer() {
return this.infoServer;
}
-
+
/**
* Return the file systen manager instance
*/
@@ -304,7 +406,7 @@ public class HMaster extends Thread impl
public void setClosed() {
this.closed.set(true);
}
-
+
public AtomicBoolean getClosed() {
return this.closed;
}
@@ -330,71 +432,11 @@ public class HMaster extends Thread impl
public Path getRootDir() {
return fileSystemManager.getRootDir();
}
-
+
public RegionServerOperationQueue getRegionServerOperationQueue() {
return this.regionServerOperationQueue;
}
- /** Main processing loop */
- @Override
- public void run() {
- joinCluster();
- startServiceThreads();
- /* Main processing loop */
- try {
- FINISHED: while (!this.closed.get()) {
- // check if we should be shutting down
- if (this.shutdownRequested.get()) {
- // The region servers won't all exit until we stop scanning the
- // meta regions
- this.regionManager.stopScanners();
- if (this.serverManager.numServers() == 0) {
- startShutdown();
- break;
- } else {
- LOG.debug("Waiting on " +
- this.serverManager.getServersToServerInfo().keySet().toString());
- }
- }
- switch (this.regionServerOperationQueue.process()) {
- case FAILED:
- // If FAILED op processing, bad. Exit.
- break FINISHED;
- case REQUEUED_BUT_PROBLEM:
- if (!fileSystemManager.checkFileSystem())
- // If bad filesystem, exit.
- break FINISHED;
- default:
- // Continue run loop if conditions are PROCESSED, NOOP, REQUEUED
- break;
- }
- }
- } catch (Throwable t) {
- LOG.fatal("Unhandled exception. Starting shutdown.", t);
- setClosed();
- }
-
- // Wait for all the remaining region servers to report in.
- this.serverManager.letRegionServersShutdown();
-
- /*
- * Clean up and close up shop
- */
- if (this.infoServer != null) {
- LOG.info("Stopping infoServer");
- try {
- this.infoServer.stop();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- this.rpcServer.stop();
- this.regionManager.stop();
- this.zooKeeperWrapper.close();
- HBaseExecutorService.shutdown();
- LOG.info("HMaster main thread exiting");
- }
-
/*
* Joins cluster. Checks to see if this instance of HBase is fresh or the
* master was started following a failover. In the second case, it inspects
@@ -461,6 +503,15 @@ public class HMaster extends Thread impl
*/
private void startServiceThreads() {
try {
+ // Start the unassigned watcher - which will create the unassigned region
+ // in ZK. This is needed before RegionManager() constructor tries to assign
+ // the root region.
+ ZKUnassignedWatcher.start(this.conf, this);
+ // start the "close region" executor service
+ HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(address.toString());
+ // start the "open region" executor service
+ HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(address.toString());
+ // start the region manager
this.regionManager.start();
// Put up info server.
int port = this.conf.getInt("hbase.master.info.port", 60010);
@@ -578,7 +629,8 @@ public class HMaster extends Thread impl
startKey = endKey;
}
}
- for (int tries = 0; tries < this.numRetries; tries++) {
+ int numRetries = conf.getInt("hbase.client.retries.number", 2);
+ for (int tries = 0; tries < numRetries; tries++) {
try {
// We can not create a table unless meta regions have already been
// assigned and scanned.
@@ -594,7 +646,7 @@ public class HMaster extends Thread impl
} catch (TableExistsException e) {
throw e;
} catch (IOException e) {
- if (tries == this.numRetries - 1) {
+ if (tries == numRetries - 1) {
throw RemoteExceptionHandler.checkIOException(e);
}
this.sleeper.sleep();
@@ -688,11 +740,14 @@ public class HMaster extends Thread impl
new MetaScannerVisitor() {
@Override
public boolean processRow(Result data) throws IOException {
- if (data == null || data.size() <= 0)
+ if (data == null || data.size() <= 0) {
return true;
+ }
Pair<HRegionInfo, HServerAddress> pair =
metaRowToRegionPair(data);
- if (pair == null) return false;
+ if (pair == null) {
+ return false;
+ }
if (!Bytes.equals(pair.getFirst().getTableDesc().getName(),
tableName)) {
return false;
@@ -702,10 +757,10 @@ public class HMaster extends Thread impl
}
};
- MetaScanner.metaScan(conf, visitor, tableName);
+ MetaScanner.metaScan(conf, visitor, tableName);
return result;
}
-
+
private Pair<HRegionInfo, HServerAddress> metaRowToRegionPair(
Result data) throws IOException {
HRegionInfo info = Writables.getHRegionInfo(
@@ -719,7 +774,7 @@ public class HMaster extends Thread impl
} else {
//undeployed
return new Pair<HRegionInfo, HServerAddress>(info, null);
- }
+ }
}
/**
@@ -733,16 +788,19 @@ public class HMaster extends Thread impl
throws IOException {
final AtomicReference<Pair<HRegionInfo, HServerAddress>> result =
new AtomicReference<Pair<HRegionInfo, HServerAddress>>(null);
-
+
MetaScannerVisitor visitor =
new MetaScannerVisitor() {
@Override
public boolean processRow(Result data) throws IOException {
- if (data == null || data.size() <= 0)
+ if (data == null || data.size() <= 0) {
return true;
+ }
Pair<HRegionInfo, HServerAddress> pair =
metaRowToRegionPair(data);
- if (pair == null) return false;
+ if (pair == null) {
+ return false;
+ }
if (!Bytes.equals(pair.getFirst().getTableDesc().getName(),
tableName)) {
return false;
@@ -755,12 +813,12 @@ public class HMaster extends Thread impl
MetaScanner.metaScan(conf, visitor, tableName, rowKey, 1);
return result.get();
}
-
+
Pair<HRegionInfo,HServerAddress> getTableRegionFromName(
final byte [] regionName)
throws IOException {
byte [] tableName = HRegionInfo.parseRegionName(regionName)[0];
-
+
Set<MetaRegion> regions = regionManager.getMetaRegionsForTable(tableName);
for (MetaRegion m: regions) {
byte [] metaRegionName = m.getRegionName();
@@ -770,7 +828,9 @@ public class HMaster extends Thread impl
HConstants.REGIONINFO_QUALIFIER);
get.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
Result data = srvr.get(metaRegionName, get);
- if(data == null || data.size() <= 0) continue;
+ if(data == null || data.size() <= 0) {
+ continue;
+ }
return metaRowToRegionPair(data);
}
return null;
@@ -808,8 +868,9 @@ public class HMaster extends Thread impl
switch (op) {
case TABLE_SET_HTD:
if (args == null || args.length < 1 ||
- !(args[0] instanceof HTableDescriptor))
+ !(args[0] instanceof HTableDescriptor)) {
throw new IOException("SET_HTD request requires an HTableDescriptor");
+ }
HTableDescriptor htd = (HTableDescriptor) args[0];
LOG.info("modifyTable(SET_HTD): " + htd);
new ModifyTableMeta(this, tableName, htd).process();
@@ -820,9 +881,10 @@ public class HMaster extends Thread impl
case TABLE_MAJOR_COMPACT:
case TABLE_FLUSH:
if (args != null && args.length > 0) {
- if (!(args[0] instanceof ImmutableBytesWritable))
+ if (!(args[0] instanceof ImmutableBytesWritable)) {
throw new IOException(
"request argument must be ImmutableBytesWritable");
+ }
Pair<HRegionInfo,HServerAddress> pair = null;
if(tableName == null) {
byte [] regionName = ((ImmutableBytesWritable)args[0]).get();
@@ -837,7 +899,9 @@ public class HMaster extends Thread impl
}
} else {
for (Pair<HRegionInfo,HServerAddress> pair: getTableRegions(tableName)) {
- if (pair.getSecond() == null) continue; // undeployed
+ if (pair.getSecond() == null) {
+ continue; // undeployed
+ }
this.regionManager.startAction(pair.getFirst().getRegionName(),
pair.getFirst(), pair.getSecond(), op);
}
@@ -869,7 +933,9 @@ public class HMaster extends Thread impl
// an open or whatever.
this.regionManager.clearFromInTransition(regionname);
// If hostnameAndPort is still null, then none, exit.
- if (hostnameAndPort == null) break;
+ if (hostnameAndPort == null) {
+ break;
+ }
long startCode =
Bytes.toLong(rr.getValue(HConstants.CATALOG_FAMILY,
HConstants.STARTCODE_QUALIFIER));
@@ -905,8 +971,8 @@ public class HMaster extends Thread impl
*/
@Override
public void process(WatchedEvent event) {
- LOG.debug("Event " + event.getType() +
- " with state " + event.getState() +
+ LOG.debug("Event " + event.getType() +
+ " with state " + event.getState() +
" with path " + event.getPath());
// Master should kill itself if its session expired or if its
// znode was deleted manually (usually for testing purposes)
@@ -925,15 +991,13 @@ public class HMaster extends Thread impl
zooKeeperWrapper =
new ZooKeeperWatcher(conf, HMaster.class.getName(), this);
zooKeeperWrapper.registerListener(this);
- this.zkMasterAddressWatcher.setZookeeper(zooKeeperWrapper);
- if(!this.zkMasterAddressWatcher.
- writeAddressToZooKeeper(this.address,false)) {
- throw new Exception("Another Master is currently active");
- }
+ activeMasterManager = new ActiveMasterManager(zooKeeperWrapper,
+ this.address, this);
+ activeMasterManager.blockUntilBecomingActiveMaster();
- // we are a failed over master, reset the fact that we started the
+ // we are a failed over master, reset the fact that we started the
// cluster
- resetClusterStartup();
+ setClusterStartup(false);
// Verify the cluster to see if anything happened while we were away
joinCluster();
} catch (Exception e) {
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/MasterStatus.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/MasterStatus.java?rev=963935&r1=963934&r2=963935&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/MasterStatus.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/MasterStatus.java Wed Jul 14 03:41:11 2010
@@ -25,10 +25,10 @@ import org.apache.hadoop.hbase.ServerSta
import org.apache.hadoop.hbase.client.ServerConnection;
/**
- * These are the set of functions implemented by the HMaster and accessed by
+ * These are the set of functions implemented by the HMaster and accessed by
* the other packages in the master.
- *
- * TODO: this list has to be cleaned up, this is a re-factor only change that
+ *
+ * TODO: this list has to be cleaned up, this is a re-factor only change that
* preserves the functions in the interface.
*/
public interface MasterStatus extends ServerStatus {
@@ -42,26 +42,32 @@ public interface MasterStatus extends Se
* Return the region manager for region related info
*/
public RegionManager getRegionManager();
-
+
/**
* Return the file system manager for dealing with FS related stuff
*/
public FileSystemManager getFileSystemManager();
/**
- * Is this the master that is starting the cluster up? If true, yes.
+ * Is this the master that is starting the cluster up? If true, yes.
* Otherwise this is a failed over master.
*/
public boolean isClusterStartup();
/**
+ * Set whether this is a cluster starting up.
+ * @param isClusterStartup whether this is a cluster startup or failover
+ */
+ public void setClusterStartup(boolean isClusterStartup);
+
+ /**
* Return the server RPC connection
*/
public ServerConnection getServerConnection();
-
- // TODO: the semantics of the following methods should be defined. Once that
+
+ // TODO: the semantics of the following methods should be defined. Once that
// is clear, most of these should move to server status
-
+
// start shutting down the server
public void startShutdown();
// is a shutdown requested
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=963935&r1=963934&r2=963935&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Wed Jul 14 03:41:11 2010
@@ -154,8 +154,6 @@ public class RegionManager {
HConstants.DEFAULT_ZOOKEEPER_RETRIES);
zooKeeperPause = conf.getInt(HConstants.ZOOKEEPER_PAUSE,
HConstants.DEFAULT_ZOOKEEPER_PAUSE);
-
- reassignRootRegion();
}
void start() {
@@ -174,7 +172,7 @@ public class RegionManager {
}
}
- void reassignRootRegion() {
+ public void reassignRootRegion() {
unsetRootRegion();
if (!masterStatus.getShutdownRequested().get()) {
synchronized (regionsInTransition) {
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java?rev=963935&r1=963934&r2=963935&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java Wed Jul 14 03:41:11 2010
@@ -1,129 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-
-/**
- * ZooKeeper watcher for the master address. Also watches the cluster state
- * flag so will shutdown this master if cluster has been shutdown.
- * <p>Used by the Master. Waits on the master address ZNode delete event. When
- * multiple masters are brought up, they race to become master by writing their
- * address to ZooKeeper. Whoever wins becomes the master, and the rest wait for
- * that ephemeral node in ZooKeeper to evaporate (meaning the master went down),
- * at which point they try to write their own address to become the new master.
- */
-class ZKMasterAddressWatcher implements Watcher {
- private static final Log LOG = LogFactory.getLog(ZKMasterAddressWatcher.class);
-
- private ZooKeeperWrapper zookeeper;
- private final AtomicBoolean requestShutdown;
-
- /**
- * Create this watcher using passed ZooKeeperWrapper instance.
- * @param zk ZooKeeper
- * @param flag Flag to set to request shutdown.
- */
- ZKMasterAddressWatcher(final ZooKeeperWrapper zk, final AtomicBoolean flag) {
- this.requestShutdown = flag;
- this.zookeeper = zk;
- }
-
- /**
- * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent)
- */
- @Override
- public synchronized void process (WatchedEvent event) {
- EventType type = event.getType();
- LOG.debug(("Got event " + type + " with path " + event.getPath()));
- if (type.equals(EventType.NodeDeleted)) {
- if (event.getPath().equals(this.zookeeper.clusterStateZNode)) {
- LOG.info("Cluster shutdown while waiting, shutting down" +
- " this master.");
- this.requestShutdown.set(true);
- } else {
- LOG.debug("Master address ZNode deleted, notifying waiting masters");
- notifyAll();
- }
- } else if(type.equals(EventType.NodeCreated) &&
- event.getPath().equals(this.zookeeper.clusterStateZNode)) {
- LOG.debug("Resetting watch on cluster state node.");
- this.zookeeper.setClusterStateWatch();
- }
- }
-
- /**
- * Wait for master address to be available. This sets a watch in ZooKeeper and
- * blocks until the master address ZNode gets deleted.
- */
- public synchronized void waitForMasterAddressAvailability() {
- while (zookeeper.readMasterAddress(this) != null) {
- try {
- LOG.debug("Waiting for master address ZNode to be deleted " +
- "(Also watching cluster state node)");
- this.zookeeper.setClusterStateWatch();
- wait();
- } catch (InterruptedException e) {
- }
- }
- }
-
- /**
- * Write address to zookeeper. Parks here until we successfully write our
- * address (or until cluster shutdown).
- * @param address Address whose format is HServerAddress.toString
- */
- boolean writeAddressToZooKeeper(
- final HServerAddress address, boolean retry) {
- do {
- waitForMasterAddressAvailability();
- // Check if we need to shutdown instead of taking control
- if (this.requestShutdown.get()) {
- LOG.debug("Won't start Master because cluster is shuting down");
- return false;
- }
- if(this.zookeeper.writeMasterAddress(address)) {
- this.zookeeper.setClusterState(true);
- this.zookeeper.setClusterStateWatch();
- // Watch our own node
- this.zookeeper.readMasterAddress(this);
- return true;
- }
- } while(retry);
- return false;
- }
-
- /**
- * Reset the ZK in case a new connection is required
- * @param zookeeper new instance
- */
- public void setZookeeper(ZooKeeperWrapper zookeeper) {
- this.zookeeper = zookeeper;
- }
-}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=963935&r1=963934&r2=963935&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Jul 14 03:41:11 2010
@@ -327,7 +327,7 @@ public class HRegionServer implements HR
zooKeeper = new ZooKeeperWatcher(conf, serverInfo.getServerName(), this);
// create the master address manager, register with zk, and start it
- masterAddressManager = new MasterAddressManager(zooKeeper);
+ masterAddressManager = new MasterAddressManager(zooKeeper, this);
zooKeeper.registerListener(masterAddressManager);
masterAddressManager.monitorMaster();
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java?rev=963935&r1=963934&r2=963935&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java Wed Jul 14 03:41:11 2010
@@ -22,21 +22,23 @@ package org.apache.hadoop.hbase.regionse
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.ServerStatus;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
/**
* Manages the location of the current active Master for this RegionServer.
- *
- * Listens for ZooKeeper events related to the master address. The node /master
- * will contain the address of the current master. This listener is interested
+ *
+ * Listens for ZooKeeper events related to the master address. The node /master
+ * will contain the address of the current master. This listener is interested
* in NodeDeleted and NodeCreated events on /master.
- *
+ *
* This class is thread-safe and takes care of re-setting all watchers to
* ensure it always knows the up-to-date master. To kick it off, instantiate
* the class and run the {@link #monitorMaster()} method.
- *
+ *
* You can get the current master via {@link #getMasterAddress()} or the
* blocking method {@link #waitMasterAddress()}.
*/
@@ -45,25 +47,29 @@ public class MasterAddressManager extend
// Address of the current primary master, null if no primary master
private HServerAddress masterAddress;
-
+
+ // Status and controller for the regionserver
+ private ServerStatus status;
+
/**
* Construct a master address listener with the specified zookeeper reference.
- *
+ *
* This constructor does not trigger any actions, you must call methods
* explicitly. Normally you will just want to execute {@link #monitorMaster()}
- * and you will ensure to
- *
+ * and you will ensure to
+ *
* @param watcher zk reference and watcher
*/
- public MasterAddressManager(ZooKeeperWatcher watcher) {
+ public MasterAddressManager(ZooKeeperWatcher watcher, ServerStatus status) {
super(watcher);
- masterAddress = null;
+ this.status = status;
+ this.masterAddress = null;
}
-
+
/**
* Get the address of the current master if one is available. Returns null
* if no current master.
- *
+ *
* Use {@link #waitMasterAddress} if you want to block until the master is
* available.
* @return server address of current active master, or null if none available
@@ -71,7 +77,7 @@ public class MasterAddressManager extend
public synchronized HServerAddress getMasterAddress() {
return masterAddress;
}
-
+
/**
* Check if there is a master available.
* @return true if there is a master set, false if not.
@@ -79,14 +85,14 @@ public class MasterAddressManager extend
public synchronized boolean hasMaster() {
return masterAddress != null;
}
-
+
/**
* Get the address of the current master. If no master is available, method
* will block until one is available, the thread is interrupted, or timeout
* has passed.
- *
+ *
* TODO: Make this work, currently unused, kept with existing retry semantics.
- *
+ *
* @return server address of current active master, null if timed out
* @throws InterruptedException if the thread is interrupted while waiting
*/
@@ -94,22 +100,28 @@ public class MasterAddressManager extend
throws InterruptedException {
return masterAddress;
}
-
+
/**
* Setup to watch for the primary master of the cluster.
- *
+ *
* If the master is already available in ZooKeeper, this method will ensure
* it gets set and that any further changes are also watched for.
- *
+ *
* If no master is available, this method ensures we become aware of it and
* will take care of setting it.
*/
public void monitorMaster() {
- if(ZKUtil.watchAndCheckExists(watcher, watcher.masterAddressZNode)) {
- handleNewMaster();
+ try {
+ if(ZKUtil.watchAndCheckExists(watcher, watcher.masterAddressZNode)) {
+ handleNewMaster();
+ }
+ } catch(KeeperException ke) {
+ // If we have a ZK exception trying to find the master we must abort
+ LOG.fatal("Unexpected ZooKeeper exception", ke);
+ status.abortServer();
}
}
-
+
@Override
public void nodeCreated(String path) {
LOG.info("nodeCreated(" + path + ")");
@@ -118,7 +130,7 @@ public class MasterAddressManager extend
}
monitorMaster();
}
-
+
@Override
public void nodeDeleted(String path) {
if(path.equals(watcher.masterAddressZNode)) {
@@ -126,7 +138,7 @@ public class MasterAddressManager extend
}
monitorMaster();
}
-
+
/**
* Set the master address to the specified address. This operation is
* idempotent, a master will only be set if there is currently no master set.
@@ -137,7 +149,7 @@ public class MasterAddressManager extend
masterAddress = address;
}
}
-
+
/**
* Unsets the master address. Used when the master goes offline so none is
* available.
@@ -148,34 +160,40 @@ public class MasterAddressManager extend
masterAddress = null;
}
}
-
+
/**
* Handle a new master being set.
- *
+ *
* This method should be called to check if there is a new master. If there
* is already a master set, this method returns immediately. If none is set,
* this will attempt to grab the master location from ZooKeeper and will set
* it.
- *
- * This method uses an atomic operation to ensure a new master is only set
+ *
+ * This method uses an atomic operation to ensure a new master is only set
* once.
*/
private void handleNewMaster() {
if(hasMaster()) {
return;
}
- HServerAddress address =
- ZKUtil.getDataAsAddress(watcher, watcher.masterAddressZNode);
+ HServerAddress address = null;
+ try {
+ address = ZKUtil.getDataAsAddress(watcher, watcher.masterAddressZNode);
+ } catch (KeeperException ke) {
+ // If we have a ZK exception trying to find the master we must abort
+ LOG.fatal("Unexpected ZooKeeper exception", ke);
+ status.abortServer();
+ }
if(address != null) {
setMasterAddress(address);
}
}
-
+
/**
* Handle a master failure.
- *
+ *
* Triggered when a master node is deleted.
- *
+ *
* TODO: Other ways we figure master is "dead"? What do we do if set in ZK
* but we can't communicate with TCP?
*/
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=963935&r1=963934&r2=963935&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Wed Jul 14 03:41:11 2010
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException;
+import java.util.List;
import java.util.Properties;
import org.apache.commons.logging.Log;
@@ -27,28 +28,34 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
* Internal HBase utility class for ZooKeeper.
- *
- * Contains only static methods and constants.
+ *
+ * <p>Contains only static methods and constants.
+ *
+ * <p>Methods all throw {@link KeeperException} if there is an unexpected
+ * zookeeper exception, so callers of these methods must handle appropriately.
+ * If ZK is required for the operation, the server will need to be aborted.
*/
public class ZKUtil {
private static final Log LOG = LogFactory.getLog(ZKUtil.class);
// TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved.
private static final char ZNODE_PATH_SEPARATOR = '/';
-
+
/**
* Creates a new connection to ZooKeeper, pulling settings and quorum config
* from the specified configuration object using methods from {@link ZKConfig}.
- *
+ *
* Sets the connection status monitoring watcher to the specified watcher.
- *
+ *
* @param conf configuration to pull quorum and other settings from
* @param watcher watcher to monitor connection changes
* @return connection to zookeeper
@@ -66,12 +73,16 @@ public class ZKUtil {
return new ZooKeeper(quorum, timeout, watcher);
}
+ //
+ // Helper methods
+ //
+
/**
* Join the prefix znode name with the suffix znode name to generate a proper
* full znode name.
- *
+ *
* Assumes prefix does not end with slash and suffix does not begin with it.
- *
+ *
* @param prefix beginning of znode name
* @param suffix ending of znode name
* @return result of properly joining prefix with suffix
@@ -80,16 +91,22 @@ public class ZKUtil {
return prefix + ZNODE_PATH_SEPARATOR + suffix;
}
+ //
+ // Existence checks and watches
+ //
+
/**
* Watch the specified znode for delete/create/change events. The watcher is
* set whether or not the node exists. If the node already exists, the method
* returns true. If the node does not exist, the method returns false.
- *
+ *
* @param zkw zk reference
* @param znode path of node to watch
* @return true if znode exists, false if does not exist or error
+ * @throws KeeperException if unexpected zookeeper exception
*/
- public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode) {
+ public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode)
+ throws KeeperException {
try {
Stat s = zkw.getZooKeeper().exists(znode, zkw);
zkw.debug("Set watcher on existing znode (" + znode + ")");
@@ -104,21 +121,103 @@ public class ZKUtil {
return false;
}
}
-
+
+ //
+ // Znode listings
+ //
+
+ /**
+ * Lists the children znodes of the specified znode. Also sets a watch on
+ * the specified znode which will capture a NodeDeleted event on the specified
+ * znode as well as NodeChildrenChanged if any children of the specified znode
+ * are created or deleted.
+ *
+ * Returns null if the specified node does not exist. Otherwise returns a
+ * list of children of the specified node. If the node exists but it has no
+ * children, an empty list will be returned.
+ *
+ * @param zkw zk reference
+ * @param znode path of node to list and watch children of
+ * @returns list of children of the specified node, an empty list if the node
+ * exists but has no children, and null if the node does not exist
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static List<String> listChildrenAndWatchForNewChildren(
+ ZooKeeperWatcher zkw, String znode)
+ throws KeeperException {
+ try {
+ List<String> children = zkw.getZooKeeper().getChildren(znode, zkw);
+ return children;
+ } catch(KeeperException.NoNodeException ke) {
+ zkw.debug("Unable to list children of znode (" + znode + ") " +
+ "because node does not exist (not an error)");
+ return null;
+ } catch (KeeperException e) {
+ zkw.warn("Unable to list children of znode (" + znode + ")", e);
+ zkw.keeperException(e);
+ return null;
+ } catch (InterruptedException e) {
+ zkw.warn("Unable to list children of znode (" + znode + ")", e);
+ zkw.interruptedException(e);
+ return null;
+ }
+ }
+
+ /**
+ * Checks if the specified znode has any children. Sets no watches.
+ *
+ * Returns true if the node exists and has children. Returns false if the
+ * node does not exist or if the node does not have any children.
+ *
+ * Used during master initialization to determine if the master is a
+ * failed-over-to master or the first master during initial cluster startup.
+ * If the directory for regionserver ephemeral nodes is empty then this is
+ * a cluster startup, if not then it is not cluster startup.
+ *
+ * @param zkw zk reference
+ * @param znode path of node to check for children of
+ * @return true if node has children, false if not or node does not exist
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static boolean nodeHasChildren(ZooKeeperWatcher zkw, String znode)
+ throws KeeperException {
+ try {
+ return !zkw.getZooKeeper().getChildren(znode, null).isEmpty();
+ } catch(KeeperException.NoNodeException ke) {
+ zkw.debug("Unable to list children of znode (" + znode + ") " +
+ "because node does not exist (not an error)");
+ return false;
+ } catch (KeeperException e) {
+ zkw.warn("Unable to list children of znode (" + znode + ")", e);
+ zkw.keeperException(e);
+ return false;
+ } catch (InterruptedException e) {
+ zkw.warn("Unable to list children of znode (" + znode + ")", e);
+ zkw.interruptedException(e);
+ return false;
+ }
+ }
+
+ //
+ // Data retrieval
+ //
+
/**
* Get the data at the specified znode and set a watch.
- *
+ *
* Returns the data and sets a watch if the node exists. Returns null and no
* watch is set if the node does not exist or there is an exception.
- *
+ *
* @param zkw zk reference
* @param znode path of node
* @return data of the specified znode, or null
+ * @throws KeeperException if unexpected zookeeper exception
*/
- public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode) {
+ public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode)
+ throws KeeperException {
try {
byte [] data = zkw.getZooKeeper().getData(znode, zkw, null);
- zkw.debug("Retrieved " + data.length + " bytes of data from znode (" +
+ zkw.debug("Retrieved " + data.length + " bytes of data from znode (" +
znode + ") and set a watcher");
return data;
} catch (KeeperException.NoNodeException e) {
@@ -135,21 +234,23 @@ public class ZKUtil {
return null;
}
}
-
+
/**
* Get the data at the specified znode, deserialize it as an HServerAddress,
* and set a watch.
- *
+ *
* Returns the data as a server address and sets a watch if the node exists.
* Returns null and no watch is set if the node does not exist or there is an
* exception.
- *
+ *
* @param zkw zk reference
* @param znode path of node
* @return data of the specified node as a server address, or null
+ * @throws KeeperException if unexpected zookeeper exception
*/
- public static HServerAddress getDataAsAddress(ZooKeeperWatcher zkw,
- String znode) {
+ public static HServerAddress getDataAsAddress(ZooKeeperWatcher zkw,
+ String znode)
+ throws KeeperException {
byte [] data = getDataAndWatch(zkw, znode);
if(data == null) {
return null;
@@ -158,4 +259,86 @@ public class ZKUtil {
zkw.debug("Read server address from znode (" + znode + "): " + addrString);
return new HServerAddress(addrString);
}
+
+ /**
+ * Set the specified znode to be an ephemeral node carrying the specified
+ * server address. Used by masters for their ephemeral node and regionservers
+ * for their ephemeral node.
+ *
+ * If the node is created successfully, a watcher is also set on the node.
+ *
+ * If the node is not created successfully because it already exists, this
+ * method will also set a watcher on the node.
+ *
+ * If there is another problem, a KeeperException will be thrown.
+ *
+ * @param zkw zk reference
+ * @param znode path of node
+ * @param address server address
+ * @return true if address set, false if not, watch set in both cases
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static boolean setAddressAndWatch(ZooKeeperWatcher zkw,
+ String znode, HServerAddress address)
+ throws KeeperException {
+ return createEphemeralNodeAndWatch(zkw, znode,
+ Bytes.toBytes(address.toString()));
+ }
+ /**
+ *
+ * Set the specified znode to be an ephemeral node carrying the specified
+ * data.
+ *
+ * If the node is created successfully, a watcher is also set on the node.
+ *
+ * If the node is not created successfully because it already exists, this
+ * method will also set a watcher on the node.
+ *
+ * If there is another problem, a KeeperException will be thrown.
+ *
+ * @param zkw zk reference
+ * @param znode path of node
+ * @param data data of node
+ * @return true if node created, false if not, watch set in both cases
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static boolean createEphemeralNodeAndWatch(ZooKeeperWatcher zkw,
+ String znode, byte [] data)
+ throws KeeperException {
+ try {
+ zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL);
+ } catch (KeeperException.NodeExistsException nee) {
+ if(!watchAndCheckExists(zkw, znode)) {
+ // It did exist but now it doesn't, try again
+ return createEphemeralNodeAndWatch(zkw, znode, data);
+ }
+ return false;
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted", e);
+ }
+ return true;
+ }
+
+ /**
+ * Creates the specified node, if the node does not exist. Does not set a
+ * watch and fails silently if the node already exists.
+ *
+ * The node created is persistent and open access.
+ *
+ * @param zkw zk reference
+ * @param znode path of node
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static void createIfNotExists(ZooKeeperWatcher zkw,
+ String znode)
+ throws KeeperException {
+ try {
+ zkw.getZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch(KeeperException.NodeExistsException nee) {
+ } catch(InterruptedException ie) {
+ zkw.interruptedException(ie);
+ }
+ }
}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=963935&r1=963934&r2=963935&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Wed Jul 14 03:41:11 2010
@@ -88,6 +88,12 @@ public class ZooKeeperWatcher extends Zo
this.server = server;
info("Connected to ZooKeeper");
setNodeNames(conf);
+ try {
+ ZKUtil.createIfNotExists(this, baseZNode);
+ } catch (KeeperException e) {
+ error("Unexpected KeeperException creating base node", e);
+ throw new IOException(e);
+ }
}
/**
@@ -198,9 +204,10 @@ public class ZooKeeperWatcher extends Zo
// Abort the server if Disconnected or Expired
// TODO: Ã
ny reason to handle these two differently?
case Disconnected:
+ info("Received Disconnected from ZooKeeper, ignoring");
+ break;
case Expired:
- error("Received Disconnected/Expired [" + event.getState() + "] " +
- "from ZooKeeper, aborting server");
+ error("Received Expired from ZooKeeper, aborting server");
if(server != null) {
server.abortServer();
}
@@ -213,13 +220,15 @@ public class ZooKeeperWatcher extends Zo
*
* This may be temporary but for now this gives one place to deal with these.
*
- * TODO: Currently this method aborts the server.
+ * TODO: Currently this method rethrows the exception to let the caller handle
*
* @param ke
+ * @throws KeeperException
*/
- public void keeperException(KeeperException ke) {
- error("Received unexpected KeeperException, aborting server", ke);
- server.abortServer();
+ public void keeperException(KeeperException ke)
+ throws KeeperException {
+ error("Received unexpected KeeperException, re-throwing exception", ke);
+ throw ke;
}
/**
@@ -229,6 +238,7 @@ public class ZooKeeperWatcher extends Zo
*
* TODO: Currently, this method does nothing.
* Is this ever expected to happen? Do we abort or can we let it run?
+ * Maybe this should be logged as WARN? It shouldn't happen?
*
* @param ie
*/
Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java?rev=963935&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java Wed Jul 14 03:41:11 2010
@@ -0,0 +1,269 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.client.ServerConnection;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestActiveMasterManager {
+ private static final Log LOG = LogFactory.getLog(TestActiveMasterManager.class);
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniZKCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniZKCluster();
+ }
+ /**
+ * Unit tests that uses ZooKeeper but does not use the master-side methods
+ * but rather acts directly on ZK.
+ * @throws Exception
+ */
+ @Test
+ public void testActiveMasterManagerFromZK() throws Exception {
+
+ ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
+ "testActiveMasterManagerFromZK", null);
+ zk.createZNodeIfNotExists(zk.baseZNode);
+ try {
+ zk.deleteZNode(zk.masterAddressZNode);
+ } catch(KeeperException.NoNodeException nne) {}
+
+ // Create the master node with a dummy address
+ HServerAddress firstMasterAddress = new HServerAddress("firstMaster", 1234);
+ HServerAddress secondMasterAddress = new HServerAddress("secondMaster", 1234);
+
+ // Should not have a master yet
+ DummyMasterStatus ms1 = new DummyMasterStatus();
+ ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk,
+ firstMasterAddress, ms1);
+ zk.registerListener(activeMasterManager);
+ assertFalse(activeMasterManager.clusterHasActiveMaster.get());
+
+ // First test becoming the active master uninterrupted
+ activeMasterManager.blockUntilBecomingActiveMaster();
+ assertTrue(activeMasterManager.clusterHasActiveMaster.get());
+ assertMaster(zk, firstMasterAddress);
+
+ // New manager will now try to become the active master in another thread
+ WaitToBeMasterThread t = new WaitToBeMasterThread(zk, secondMasterAddress);
+ zk.registerListener(t.manager);
+ t.start();
+ // Wait for this guy to figure out there is another active master
+ // Wait for 1 second at most
+ int sleeps = 0;
+ while(!t.manager.clusterHasActiveMaster.get() && sleeps < 100) {
+ Thread.sleep(10);
+ sleeps++;
+ }
+
+ // Both should see that there is an active master
+ assertTrue(activeMasterManager.clusterHasActiveMaster.get());
+ assertTrue(t.manager.clusterHasActiveMaster.get());
+ // But secondary one should not be the active master
+ assertFalse(t.isActiveMaster);
+
+ // Close the first server and delete it's master node
+ ms1.setClosed();
+
+ // Use a listener to capture when the node is actually deleted
+ NodeDeletionListener listener = new NodeDeletionListener(zk, zk.masterAddressZNode);
+ zk.registerListener(listener);
+
+ LOG.info("Deleting master node");
+ zk.deleteZNode(zk.masterAddressZNode);
+
+ // Wait for the node to be deleted
+ LOG.info("Waiting for active master manager to be notified");
+ listener.waitForDeletion();
+ LOG.info("Master node deleted");
+
+ // Now we expect the secondary manager to have and be the active master
+ // Wait for 1 second at most
+ sleeps = 0;
+ while(!t.isActiveMaster && sleeps < 100) {
+ Thread.sleep(10);
+ sleeps++;
+ }
+ LOG.debug("Slept " + sleeps + " times");
+
+ assertTrue(t.manager.clusterHasActiveMaster.get());
+ assertTrue(t.isActiveMaster);
+ }
+
+ /**
+ * Assert there is an active master and that it has the specified address.
+ * @param zk
+ * @param thisMasterAddress
+ * @throws KeeperException
+ */
+ private void assertMaster(ZooKeeperWatcher zk,
+ HServerAddress expectedAddress) throws KeeperException {
+ HServerAddress readAddress = ZKUtil.getDataAsAddress(zk, zk.masterAddressZNode);
+ assertNotNull(readAddress);
+ assertTrue(expectedAddress.equals(readAddress));
+ }
+
+ public static class WaitToBeMasterThread extends Thread {
+
+ ActiveMasterManager manager;
+ boolean isActiveMaster;
+
+ public WaitToBeMasterThread(ZooKeeperWatcher zk,
+ HServerAddress address) {
+ this.manager = new ActiveMasterManager(zk, address,
+ new DummyMasterStatus());
+ isActiveMaster = false;
+ }
+
+ @Override
+ public void run() {
+ manager.blockUntilBecomingActiveMaster();
+ LOG.info("Second master has become the active master!");
+ isActiveMaster = true;
+ }
+ }
+
+ public static class NodeDeletionListener extends ZooKeeperListener {
+ private static final Log LOG = LogFactory.getLog(NodeDeletionListener.class);
+
+ private Semaphore lock;
+ private String node;
+
+ public NodeDeletionListener(ZooKeeperWatcher watcher, String node) {
+ super(watcher);
+ lock = new Semaphore(0);
+ this.node = node;
+ }
+
+ @Override
+ public void nodeDeleted(String path) {
+ if(path.equals(node)) {
+ LOG.debug("nodeDeleted(" + path + ")");
+ lock.release();
+ }
+ }
+
+ public void waitForDeletion() throws InterruptedException {
+ lock.acquire();
+ }
+ }
+
+ public static class DummyMasterStatus implements MasterStatus {
+
+ private AtomicBoolean closed = new AtomicBoolean(false);
+
+ @Override
+ public AtomicBoolean getClosed() {
+ return closed;
+ }
+
+ @Override
+ public FileSystemManager getFileSystemManager() {
+ return null;
+ }
+
+ @Override
+ public RegionManager getRegionManager() {
+ return null;
+ }
+
+ @Override
+ public ServerConnection getServerConnection() {
+ return null;
+ }
+
+ @Override
+ public ServerManager getServerManager() {
+ return null;
+ }
+
+ @Override
+ public AtomicBoolean getShutdownRequested() {
+ return null;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ @Override
+ public boolean isClusterStartup() {
+ return false;
+ }
+
+ @Override
+ public void setClosed() {
+ closed.set(true);
+ }
+
+ @Override
+ public void setClusterStartup(boolean isClusterStartup) {}
+
+ @Override
+ public void shutdown() {}
+
+ @Override
+ public void startShutdown() {}
+
+ @Override
+ public void abortServer() {}
+
+ @Override
+ public Configuration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public HServerAddress getHServerAddress() {
+ return null;
+ }
+
+ @Override
+ public ZooKeeperWatcher getZooKeeper() {
+ return null;
+ }
+
+ }
+}
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java?rev=963935&r1=963934&r2=963935&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java Wed Jul 14 03:41:11 2010
@@ -40,12 +40,12 @@ public class TestMasterAddressManager {
private static final Log LOG = LogFactory.getLog(TestMasterAddressManager.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
}
-
+
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster();
@@ -57,29 +57,29 @@ public class TestMasterAddressManager {
*/
@Test
public void testMasterAddressManagerFromZK() throws Exception {
-
- ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
+
+ ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"testMasterAddressManagerFromZK", null);
zk.createZNodeIfNotExists(zk.baseZNode);
-
+
// Should not have a master yet
- MasterAddressManager addressManager = new MasterAddressManager(zk);
+ MasterAddressManager addressManager = new MasterAddressManager(zk, null);
addressManager.monitorMaster();
assertFalse(addressManager.hasMaster());
zk.registerListener(addressManager);
-
+
// Use a listener to capture when the node is actually created
NodeCreationListener listener = new NodeCreationListener(zk, zk.masterAddressZNode);
zk.registerListener(listener);
-
+
// Create the master node with a dummy address
String host = "hostname";
int port = 1234;
HServerAddress dummyAddress = new HServerAddress(host, port);
LOG.info("Creating master node");
- zk.createZNodeIfNotExists(zk.masterAddressZNode,
+ zk.createZNodeIfNotExists(zk.masterAddressZNode,
Bytes.toBytes(dummyAddress.toString()), CreateMode.EPHEMERAL, false);
-
+
// Wait for the node to be created
LOG.info("Waiting for master address manager to be notified");
listener.waitForCreation();
@@ -87,21 +87,21 @@ public class TestMasterAddressManager {
assertTrue(addressManager.hasMaster());
HServerAddress pulledAddress = addressManager.getMasterAddress();
assertTrue(pulledAddress.equals(dummyAddress));
-
+
}
-
+
public static class NodeCreationListener extends ZooKeeperListener {
private static final Log LOG = LogFactory.getLog(NodeCreationListener.class);
-
+
private Semaphore lock;
private String node;
-
+
public NodeCreationListener(ZooKeeperWatcher watcher, String node) {
super(watcher);
lock = new Semaphore(0);
this.node = node;
}
-
+
@Override
public void nodeCreated(String path) {
if(path.equals(node)) {
@@ -109,7 +109,7 @@ public class TestMasterAddressManager {
lock.release();
}
}
-
+
public void waitForCreation() throws InterruptedException {
lock.acquire();
}