You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2010/08/05 09:35:02 UTC

svn commit: r982489 [2/7] - in /hbase/branches/0.90_master_rewrite: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/catalog/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/executor/ s...

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java Thu Aug  5 07:35:00 2010
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -43,88 +43,77 @@ import org.apache.commons.logging.LogFac
  * HBEventHandler class and create an event type that submits to this service.
  *
  */
-public class HBaseExecutorService
-{
+public class HBaseExecutorService {
   private static final Log LOG = LogFactory.getLog(HBaseExecutorService.class);
   // default number of threads in the pool
   private int corePoolSize = 1;
-  // max number of threads - maximum concurrency
-  private int maximumPoolSize = 5;
   // how long to retain excess threads
   private long keepAliveTimeInMillis = 1000;
   // the thread pool executor that services the requests
   ThreadPoolExecutor threadPoolExecutor;
   // work queue to use - unbounded queue
-  BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
+  BlockingQueue<Runnable> workQueue = new PriorityBlockingQueue<Runnable>();
   // name for this executor service
   String name;
   // hold the all the executors created in a map addressable by their names
   static Map<String, HBaseExecutorService> executorServicesMap =
     Collections.synchronizedMap(new HashMap<String, HBaseExecutorService>());
 
-  
   /**
-   * The following is a list of names for the various executor services in both 
+   * The following is a list of names for the various executor services in both
    * the master and the region server.
    */
   public enum HBaseExecutorServiceType {
-    NONE                       (-1),
-    MASTER_CLOSEREGION         (1),
-    MASTER_OPENREGION          (2);
-    
-    private final int value;
-    
-    HBaseExecutorServiceType(int intValue) {
-      this.value = intValue;
-    }
-    
-    public void startExecutorService(String serverName) {
-      // if this is NONE then there is no executor to start
-      if(value == NONE.value) {
-        throw new RuntimeException("Cannot start NONE executor type.");
-      }
+
+    // Master executor services
+    MASTER_CLOSE_REGION        (1),
+    MASTER_OPEN_REGION         (2),
+    MASTER_SERVER_OPERATIONS   (3),
+    MASTER_TABLE_OPERATIONS    (4),
+
+    // RegionServer executor services
+    RS_OPEN_REGION             (20),
+    RS_OPEN_ROOT               (21),
+    RS_OPEN_META               (22),
+    RS_CLOSE_REGION            (23),
+    RS_CLOSE_ROOT              (24),
+    RS_CLOSE_META              (25);
+
+    HBaseExecutorServiceType(int value) {}
+
+    public void startExecutorService(String serverName, int maxThreads) {
       String name = getExecutorName(serverName);
       if(HBaseExecutorService.isExecutorServiceRunning(name)) {
         LOG.debug("Executor service " + toString() + " already running on " + serverName);
         return;
       }
       LOG.debug("Starting executor service [" + name + "]");
-      HBaseExecutorService.startExecutorService(name);
+      HBaseExecutorService.startExecutorService(name, maxThreads);
     }
-    
+
     public HBaseExecutorService getExecutor(String serverName) {
-      // if this is NONE then there is no executor
-      if(value == NONE.value) {
-        return null;
-      }
       return HBaseExecutorService.getExecutorService(getExecutorName(serverName));
     }
-    
+
     public String getExecutorName(String serverName) {
-      // if this is NONE then there is no executor
-      if(value == NONE.value) {
-        return null;
-      }
       return (this.toString() + "-" + serverName);
     }
   }
 
-
-
   /**
    * Start an executor service with a given name. If there was a service already
    * started with the same name, this throws a RuntimeException.
    * @param name Name of the service to start.
    */
-  public static void startExecutorService(String name) {
+  public static void startExecutorService(String name, int maxThreads) {
     if(executorServicesMap.get(name) != null) {
       throw new RuntimeException("An executor service with the name " + name + " is already running!");
     }
-    HBaseExecutorService hbes = new HBaseExecutorService(name);
+    HBaseExecutorService hbes = new HBaseExecutorService(name, maxThreads);
     executorServicesMap.put(name, hbes);
     LOG.debug("Starting executor service: " + name);
   }
-  
+
   public static boolean isExecutorServiceRunning(String name) {
     return (executorServicesMap.containsKey(name));
   }
@@ -140,7 +129,7 @@ public class HBaseExecutorService
     }
     return executor;
   }
-  
+
   public static void shutdown() {
     for(Entry<String, HBaseExecutorService> entry : executorServicesMap.entrySet()) {
       entry.getValue().threadPoolExecutor.shutdown();
@@ -148,16 +137,11 @@ public class HBaseExecutorService
     executorServicesMap.clear();
   }
 
-  protected HBaseExecutorService(String name) {
+  protected HBaseExecutorService(String name, int maxThreads) {
     this.name = name;
     // create the thread pool executor
-    threadPoolExecutor = new ThreadPoolExecutor(
-                                corePoolSize,
-                                maximumPoolSize,
-                                keepAliveTimeInMillis,
-                                TimeUnit.MILLISECONDS,
-                                workQueue
-                                );
+    threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxThreads,
+        keepAliveTimeInMillis, TimeUnit.MILLISECONDS, workQueue);
     // name the threads for this threadpool
     threadPoolExecutor.setThreadFactory(new NamedThreadFactory(name));
   }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java Thu Aug  5 07:35:00 2010
@@ -23,8 +23,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.HMsg;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
+import org.apache.hadoop.hbase.executor.EventHandler.EventType;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Writable;
 
@@ -36,19 +36,16 @@ public class RegionTransitionData implem
    * Type of transition event (offline, opening, opened, closing, closed).
    * Required.
    */
-  private HBaseEventType eventType;
+  private EventType eventType;
 
   /** Region being transitioned.  Required. */
-  private String regionName;
+  private byte [] regionName;
 
   /** Server event originated from.  Optional. */
   private String serverName;
 
   /** Time the event was created.  Required but automatically set. */
-  private long timeStamp;
-
-  /** Temporary.  Holds payload used doing transitions via heartbeats. */
-  private HMsg hmsg; // to be removed shortly once we stop using heartbeats
+  private long stamp;
 
   /**
    * Writable constructor.  Do not use directly.
@@ -68,12 +65,12 @@ public class RegionTransitionData implem
    * assignment.
    *
    * <p>Since only the master uses this constructor, the type should always be
-   * {@link HBaseEventType#M2ZK_REGION_OFFLINE}.
+   * {@link EventType#M2ZK_REGION_OFFLINE}.
    *
    * @param eventType type of event
    * @param regionName name of region
    */
-  public RegionTransitionData(HBaseEventType eventType, String regionName) {
+  public RegionTransitionData(EventType eventType, byte [] regionName) {
     this(eventType, regionName, null);
   }
 
@@ -83,37 +80,20 @@ public class RegionTransitionData implem
    *
    * <p>Used when the server name is known (a regionserver is setting it).
    *
-   * <p>Valid types for this constructor are {@link HBaseEventType#RS2ZK_REGION_CLOSING},
-   * {@link HBaseEventType#RS2ZK_REGION_CLOSED}, {@link HBaseEventType#RS2ZK_REGION_OPENING},
-   * and {@link HBaseEventType#RS2ZK_REGION_OPENED}.
+   * <p>Valid types for this constructor are {@link EventType#RS2ZK_REGION_CLOSING},
+   * {@link EventType#RS2ZK_REGION_CLOSED}, {@link EventType#RS2ZK_REGION_OPENING},
+   * and {@link EventType#RS2ZK_REGION_OPENED}.
    *
    * @param eventType type of event
    * @param regionName name of region
    * @param serverName name of server setting data
    */
-  public RegionTransitionData(HBaseEventType eventType, String regionName,
+  public RegionTransitionData(EventType eventType, byte [] regionName,
       String serverName) {
-    this(eventType, regionName, serverName, null);
-  }
-
-  /**
-   * Construct data for a fully-specified, old-format region transition event
-   * which uses HMsg/heartbeats.
-   *
-   * TODO: Remove this constructor once we stop using heartbeats.
-   *
-   * @param eventType
-   * @param regionName
-   * @param serverName
-   * @param hmsg
-   */
-  public RegionTransitionData(HBaseEventType eventType, String regionName,
-      String serverName, HMsg hmsg) {
     this.eventType = eventType;
-    this.timeStamp = System.currentTimeMillis();
+    this.stamp = System.currentTimeMillis();
     this.regionName = regionName;
     this.serverName = serverName;
-    this.hmsg = hmsg;
   }
 
   /**
@@ -121,25 +101,25 @@ public class RegionTransitionData implem
    *
    * <p>One of:
    * <ul>
-   * <li>{@link HBaseEventType#M2ZK_REGION_OFFLINE}
-   * <li>{@link HBaseEventType#RS2ZK_REGION_CLOSING}
-   * <li>{@link HBaseEventType#RS2ZK_REGION_CLOSED}
-   * <li>{@link HBaseEventType#RS2ZK_REGION_OPENING}
-   * <li>{@link HBaseEventType#RS2ZK_REGION_OPENED}
+   * <li>{@link EventType#M2ZK_REGION_OFFLINE}
+   * <li>{@link EventType#RS2ZK_REGION_CLOSING}
+   * <li>{@link EventType#RS2ZK_REGION_CLOSED}
+   * <li>{@link EventType#RS2ZK_REGION_OPENING}
+   * <li>{@link EventType#RS2ZK_REGION_OPENED}
    * </ul>
    * @return type of region transition event
    */
-  public HBaseEventType getEventType() {
+  public EventType getEventType() {
     return eventType;
   }
 
   /**
-   * Gets the encoded name of the region being transitioned.
+   * Gets the name of the region being transitioned.
    *
    * <p>Region name is required so this never returns null.
    * @return region name
    */
-  public String getRegionName() {
+  public byte [] getRegionName() {
     return regionName;
   }
 
@@ -156,54 +136,39 @@ public class RegionTransitionData implem
   /**
    * Gets the timestamp when this event was created.
    *
-   * @return time event was created
+   * @return stamp event was created
    */
-  public long getTimeStamp() {
-    return timeStamp;
-  }
-
-  /**
-   * Gets the {@link HMsg} payload of this region transition event.
-   * @return heartbeat payload
-   */
-  public HMsg getHmsg() {
-    return hmsg;
+  public long getStamp() {
+    return stamp;
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     // the event type byte
-    eventType = HBaseEventType.fromByte(in.readByte());
+    eventType = EventType.values()[in.readShort()];
     // the timestamp
-    timeStamp = in.readLong();
+    stamp = in.readLong();
     // the encoded name of the region being transitioned
-    regionName = in.readUTF();
+    regionName = Bytes.readByteArray(in);
     // remaining fields are optional so prefixed with boolean
     // the name of the regionserver sending the data
     if(in.readBoolean()) {
       serverName = in.readUTF();
-    }
-    // hmsg
-    if(in.readBoolean()) {
-      hmsg = new HMsg();
-      hmsg.readFields(in);
+    } else {
+      serverName = null;
     }
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeByte(eventType.getByteValue());
+    out.writeShort(eventType.ordinal());
     out.writeLong(System.currentTimeMillis());
-    out.writeUTF(regionName);
+    Bytes.writeByteArray(out, regionName);
     // remaining fields are optional so prefixed with boolean
     out.writeBoolean(serverName != null);
     if(serverName != null) {
       out.writeUTF(serverName);
     }
-    out.writeBoolean(hmsg != null);
-    if(hmsg != null) {
-      hmsg.write(out);
-    }
   }
 
   /**

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Thu Aug  5 07:35:00 2010
@@ -19,13 +19,11 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.io.Writable;
-
-import java.io.IOException;
 
 /**
  * Clients interact with the HMasterInterface to gain access to meta-level
@@ -110,11 +108,10 @@ public interface HMasterInterface extend
    * Modify a table's metadata
    *
    * @param tableName table to modify
-   * @param op the operation to do
-   * @param args arguments for operation
+   * @param htd new descriptor for table
    * @throws IOException e
    */
-  public void modifyTable(byte[] tableName, HConstants.Modify op, Writable[] args)
+  public void modifyTable(byte[] tableName, HTableDescriptor htd)
     throws IOException;
 
   /**

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Thu Aug  5 07:35:00 2010
@@ -19,6 +19,9 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.NotServingRegionException;
@@ -31,9 +34,6 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 
-import java.io.IOException;
-import java.util.List;
-
 /**
  * Clients interact with HRegionServers using a handle to the HRegionInterface.
  *
@@ -279,10 +279,64 @@ public interface HRegionInterface extend
    * @throws IOException e
    */
   public MultiPutResponse multiPut(MultiPut puts) throws IOException;
-  
+
   /**
    * Bulk load an HFile into an open region
    */
   public void bulkLoadHFile(String hfilePath,
       byte[] regionName, byte[] familyName) throws IOException;
+
+  // Master methods
+
+  /**
+   * Opens the specified region.
+   * @param region region to open
+   */
+  public void openRegion(final HRegionInfo region);
+
+  /**
+   * Closes the specified region.
+   * @param region region to close
+   * @return true if closing region, false if not
+   */
+  public boolean closeRegion(final HRegionInfo region)
+  throws NotServingRegionException;
+
+  // Region administrative methods
+
+  /**
+   * Flushes the MemStore of the specified region.
+   * <p>
+   * This method is synchronous.
+   * @param regionInfo region to flush
+   * @throws NotServingRegionException
+   * @throws IOException
+   */
+  void flushRegion(HRegionInfo regionInfo)
+  throws NotServingRegionException, IOException;
+
+  /**
+   * Splits the specified region.
+   * <p>
+   * This method currently flushes the region and then forces a compaction which
+   * will then trigger a split.  The flush is done synchronously but the
+   * compaction is asynchronous.
+   * @param regionInfo region to split
+   * @throws NotServingRegionException
+   * @throws IOException
+   */
+  void splitRegion(HRegionInfo regionInfo)
+  throws NotServingRegionException, IOException;
+
+  /**
+   * Compacts the specified region.  Performs a major compaction if specified.
+   * <p>
+   * This method is asynchronous.
+   * @param regionInfo region to compact
+   * @param major true to force major compaction
+   * @throws NotServingRegionException
+   * @throws IOException
+   */
+  void compactRegion(HRegionInfo regionInfo, boolean major)
+  throws NotServingRegionException, IOException;
 }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java Thu Aug  5 07:35:00 2010
@@ -48,10 +48,10 @@ public class ActiveMasterManager extends
   final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false);
 
   private final HServerAddress address;
-  private final MasterStatus status;
+  private final MasterController status;
 
   ActiveMasterManager(ZooKeeperWatcher watcher, HServerAddress address,
-      MasterStatus status) {
+      MasterController status) {
     super(watcher);
     this.address = address;
     this.status = status;

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu Aug  5 07:35:00 2010
@@ -19,41 +19,91 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.MetaScanner;
 import org.apache.hadoop.hbase.executor.RegionTransitionData;
-import org.apache.hadoop.hbase.master.handler.MasterCloseRegionHandler;
-import org.apache.hadoop.hbase.master.handler.MasterOpenRegionHandler;
+import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
+import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
+import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
+import org.apache.hadoop.io.Writable;
 import org.apache.zookeeper.KeeperException;
 
 /**
- * Manages region assignment.
- *
- * <p>Monitors ZooKeeper for events related to regions in transition.
- *
- * <p>Handles existing regions in transition during master failover.
+ * Manages and performs region assignment.
+ * <p>
+ * Monitors ZooKeeper for events related to regions in transition.
+ * <p>
+ * Handles existing regions in transition during master failover.
  */
 public class AssignmentManager extends ZooKeeperListener {
   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
 
-  private MasterStatus status;
+  protected MasterController master;
 
   private ServerManager serverManager;
 
-  private RegionManager regionManager;
+  private CatalogTracker catalogTracker;
 
-  private String serverName;
+  private TimeoutMonitor timeoutMonitor;
 
-//  TODO: Eventually RIT will move here?
-//  private final Map<String,RegionState> regionsInTransition =
-//    new TreeMap<String,RegionState>();
+  /** Regions currently in transition. */
+  private final Map<String,RegionState> regionsInTransition =
+    new TreeMap<String,RegionState>();
+
+  /** Plans for region movement. */
+  private final Map<String,RegionPlan> regionPlans =
+    new TreeMap<String,RegionPlan>();
+
+  /** Set of tables that have been disabled. */
+  private final Set<String> disabledTables =
+    Collections.synchronizedSet(new HashSet<String>());
+
+  /**
+   * Server to regions assignment map.
+   * Contains the set of regions currently assigned to a given server.
+   */
+  private final SortedMap<HServerInfo,Set<HRegionInfo>> servers =
+        new TreeMap<HServerInfo,Set<HRegionInfo>>();
+
+  /**
+   * Region to server assignment map.
+   * Contains the server a given region is currently assigned to.
+   * This object should be used for all synchronization around servers/regions.
+   */
+  private final SortedMap<HRegionInfo,HServerInfo> regions =
+    new TreeMap<HRegionInfo,HServerInfo>();
 
   /**
    * Constructs a new assignment manager.
@@ -63,85 +113,211 @@ public class AssignmentManager extends Z
    * @param watcher zookeeper watcher
    * @param status master status
    */
-  public AssignmentManager(ZooKeeperWatcher watcher, MasterStatus status,
-      ServerManager serverManager, RegionManager regionManager) {
+  public AssignmentManager(ZooKeeperWatcher watcher, MasterController master,
+      ServerManager serverManager, CatalogTracker catalogTracker) {
     super(watcher);
-    this.status = status;
+    this.master = master;
     this.serverManager = serverManager;
-    this.regionManager = regionManager;
-    serverName = status.getHServerAddress().toString();
+    this.catalogTracker = catalogTracker;
+    Configuration conf = master.getConfiguration();
+    this.timeoutMonitor = new TimeoutMonitor(
+        conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000),
+        master.getClosed(),
+        conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 15000));
+    Threads.setDaemonThreadRunning(timeoutMonitor,
+        master.getServerName() + ".timeoutMonitor");
   }
 
   /**
-   * Starts the assignment manager.
-   *
-   * <p>This includes registering itself with ZooKeeper and handling
-   * the initial state of whatever unassigned nodes already exist.
+   * Cluster startup.  Reset all unassigned nodes and assign all user regions.
+   * @throws IOException
    * @throws KeeperException
    */
-  public void start() throws KeeperException {
-    watcher.registerListener(this);
-    if(status.isClusterStartup()) {
-      processStartup();
-    } else {
-      processFailover();
-    }
-  }
-
-  public synchronized void processStartup() throws KeeperException {
+  void processStartup() throws IOException, KeeperException {
+    // Cleanup any existing ZK nodes and start watching
     ZKAssign.deleteAllNodes(watcher);
-    ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode);
+    ZKUtil.listChildrenAndWatchForNewChildren(watcher,
+        watcher.assignmentZNode);
+    // Assign all existing user regions out
+    assignAllUserRegions();
   }
 
   /**
-   * Handle failover.
+   * Handle failover.  Restore state from META and ZK.  Handle any regions in
+   * transition.
    * @throws KeeperException
+   * @throws IOException
    */
-  public synchronized void processFailover() throws KeeperException {
+  void processFailover() throws KeeperException, IOException {
+    // Scan META to build list of existing regions, servers, and assignment
+    rebuildUserRegions();
+    // Pickup any disabled tables
+    rebuildDisabledTables();
+    // Check existing regions in transition
     List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
         watcher.assignmentZNode);
     if(nodes.isEmpty()) {
-      LOG.info("No regions in transition in ZK, nothing to do for failover");
+      LOG.info("No regions in transition in ZK to process on failover");
       return;
     }
     LOG.info("Failed-over master needs to process " + nodes.size() +
         " regions in transition");
     for(String regionName : nodes) {
       RegionTransitionData data = ZKAssign.getData(watcher, regionName);
+      HRegionInfo regionInfo =
+        MetaReader.getRegion(catalogTracker, data.getRegionName()).getFirst();
+      String encodedName = regionInfo.getEncodedName();
       switch(data.getEventType()) {
-        case M2ZK_REGION_OFFLINE:
-          // TODO: Generate new assignment and send OPEN RPC
-          break;
         case RS2ZK_REGION_CLOSING:
-          // TODO: Only need to deal with timeouts.
+          // Just insert region into RIT
+          // If this never updates the timeout will trigger new assignment
+          regionsInTransition.put(encodedName,
+              new RegionState(regionInfo, RegionState.State.CLOSING,
+                  data.getStamp()));
           break;
+
         case RS2ZK_REGION_CLOSED:
-          // TODO: Generate new assignment and send OPEN RPC
+          // Region is closed, insert into RIT and handle it
+          regionsInTransition.put(encodedName,
+              new RegionState(regionInfo, RegionState.State.CLOSED,
+                  data.getStamp()));
+          new ClosedRegionHandler(master, this, data, regionInfo).execute();
           break;
+
         case RS2ZK_REGION_OPENING:
-          // TODO: Only need to deal with timeouts.
+          // Just insert region into RIT
+          // If this never updates the timeout will trigger new assignment
+          regionsInTransition.put(encodedName,
+              new RegionState(regionInfo, RegionState.State.OPENING,
+                  data.getStamp()));
           break;
+
         case RS2ZK_REGION_OPENED:
-          // TODO: Delete the node from ZK.  Region successfully opened but not
-          //       acknowledged.
+          // Region is opened, insert into RIT and handle it
+          regionsInTransition.put(encodedName,
+              new RegionState(regionInfo, RegionState.State.OPENING,
+                  data.getStamp()));
+          new OpenedRegionHandler(master, this, data, regionInfo,
+              serverManager.getServerInfo(data.getServerName())).execute();
           break;
       }
     }
   }
 
-  private synchronized void handleRegion(RegionTransitionData data) {
-    switch(data.getEventType()) {
-      case RS2ZK_REGION_CLOSED:
-        new MasterCloseRegionHandler(data.getEventType(), serverManager,
-            serverName, data.getRegionName(), data.getBytes())
-        .submit();
-        break;
-      case RS2ZK_REGION_OPENED:
-      case RS2ZK_REGION_OPENING:
-        new MasterOpenRegionHandler(data.getEventType(), serverManager,
-            serverName, data.getRegionName(), data.getBytes())
-        .submit();
-        break;
+  /**
+   * Gets the region info for the region with the specified encoded name.
+   * <p>
+   * Currently this does a full scan of the regions map looking for a region
+   * with the specified encoded name.
+   * <p>
+   * Returns null if none found.
+   * @param regionName
+   * @return
+   * @deprecated should be able to remove this now?
+   */
+  @Deprecated
+  private HRegionInfo getRegionInfoFromEncoded(String encodedName) {
+    for(HRegionInfo regionInfo : regions.keySet()) {
+      if(regionInfo.getEncodedName().equals(encodedName)) {
+        return regionInfo;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Handles various states an unassigned node can be in.
+   * <p>
+   * Method is called when a state change is suspected for an unassigned node.
+   * <p>
+   * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
+   * yet).
+   * @param data
+   */
+  private void handleRegion(RegionTransitionData data) {
+    synchronized(regionsInTransition) {
+      // Verify this is a known server
+      if(!serverManager.isServerOnline(data.getServerName())) {
+        LOG.warn("Attempted to handle region transition for server " +
+            data.getServerName() + " but server is not online");
+        return;
+      }
+      String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
+      LOG.debug("Attempting to handle region transition for server " +
+          data.getServerName() + " and region " + encodedName);
+      RegionState regionState = regionsInTransition.get(encodedName);
+      switch(data.getEventType()) {
+
+        case RS2ZK_REGION_CLOSING:
+          // Should see CLOSING after we have asked it to CLOSE or additional
+          // times after already being in state of CLOSING
+          if(regionState == null ||
+              (!regionState.isPendingClose() && !regionState.isClosing())) {
+            LOG.warn("Received CLOSING for region " + encodedName +
+                " from server " + data.getServerName() + " but region was in " +
+                " the state " + regionState + " and not " +
+                "in expected PENDING_CLOSE or CLOSING states");
+            return;
+          }
+          // Transition to CLOSING (or update stamp if already CLOSING)
+          regionState.update(RegionState.State.CLOSING, data.getStamp());
+          break;
+
+        case RS2ZK_REGION_CLOSED:
+          // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
+          if(regionState == null ||
+              (!regionState.isPendingClose() && !regionState.isClosing())) {
+            LOG.warn("Received CLOSED for region " + encodedName +
+                " from server " + data.getServerName() + " but region was in " +
+                " the state " + regionState + " and not " +
+                "in expected PENDING_CLOSE or CLOSING states");
+            return;
+          }
+          // Handle CLOSED by assigning elsewhere or stopping if a disable
+          new ClosedRegionHandler(master, this, data, regionState.getRegion())
+          .submit();
+          break;
+
+        case RS2ZK_REGION_OPENING:
+          // Should see OPENING after we have asked it to OPEN or additional
+          // times after already being in state of OPENING
+          if(regionState == null ||
+              (!regionState.isPendingOpen() && !regionState.isOpening())) {
+            LOG.warn("Received OPENING for region " + encodedName +
+                " from server " + data.getServerName() + " but region was in " +
+                " the state " + regionState + " and not " +
+                "in expected PENDING_OPEN or OPENING states");
+            return;
+          }
+          // Transition to OPENING (or update stamp if already OPENING)
+          regionState.update(RegionState.State.OPENING, data.getStamp());
+          break;
+
+        case RS2ZK_REGION_OPENED:
+          // Should see OPENED after OPENING but possible after PENDING_OPEN
+          if(regionState == null ||
+              (!regionState.isPendingOpen() && !regionState.isOpening())) {
+            LOG.warn("Received OPENED for region " + encodedName +
+                " from server " + data.getServerName() + " but region was in " +
+                " the state " + regionState + " and not " +
+                "in expected PENDING_OPEN or OPENING states");
+            return;
+          }
+          // If this is a catalog table, update catalog manager accordingly
+          // Moving root and meta editing over to RS who does the opening
+          LOG.debug("Processing OPENED for region " + regionState.getRegion() +
+              " which isMeta[" + regionState.getRegion().isMetaRegion() + "] " +
+              " isRoot[" + regionState.getRegion().isRootRegion() + "]");
+
+          // Used to have updating of root/meta locations here but it's
+          // automatic in CatalogTracker now
+
+          // Handle OPENED by removing from transition and deleted zk node
+          new OpenedRegionHandler(master, this, data, regionState.getRegion(),
+              serverManager.getServerInfo(data.getServerName()))
+          .submit();
+          break;
+      }
     }
   }
 
@@ -160,17 +336,19 @@ public class AssignmentManager extends Z
    * </ol>
    */
   @Override
-  public synchronized void nodeCreated(String path) {
+  public void nodeCreated(String path) {
     if(path.startsWith(watcher.assignmentZNode)) {
-      try {
-        RegionTransitionData data = ZKAssign.getData(watcher, path);
-        if(data == null) {
-          return;
+      synchronized(regionsInTransition) {
+        try {
+          RegionTransitionData data = ZKAssign.getData(watcher, path);
+          if(data == null) {
+            return;
+          }
+          handleRegion(data);
+        } catch (KeeperException e) {
+          LOG.error("Unexpected ZK exception reading unassigned node data", e);
+          master.abort();
         }
-        handleRegion(data);
-      } catch (KeeperException e) {
-        LOG.error("Unexpected ZK exception reading unassigned node data", e);
-        status.abort();
       }
     }
   }
@@ -188,17 +366,19 @@ public class AssignmentManager extends Z
    * </ol>
    */
   @Override
-  public synchronized void nodeDataChanged(String path) {
+  public void nodeDataChanged(String path) {
     if(path.startsWith(watcher.assignmentZNode)) {
-      try {
-        RegionTransitionData data = ZKAssign.getData(watcher, path);
-        if(data == null) {
-          return;
+      synchronized(regionsInTransition) {
+        try {
+          RegionTransitionData data = ZKAssign.getData(watcher, path);
+          if(data == null) {
+            return;
+          }
+          handleRegion(data);
+        } catch (KeeperException e) {
+          LOG.error("Unexpected ZK exception reading unassigned node data", e);
+          master.abort();
         }
-        handleRegion(data);
-      } catch (KeeperException e) {
-        LOG.error("Unexpected ZK exception reading unassigned node data", e);
-        status.abort();
       }
     }
   }
@@ -217,19 +397,589 @@ public class AssignmentManager extends Z
    * </ol>
    */
   @Override
-  public synchronized void nodeChildrenChanged(String path) {
+  public void nodeChildrenChanged(String path) {
     if(path.equals(watcher.assignmentZNode)) {
+      synchronized(regionsInTransition) {
+        try {
+          List<NodeAndData> newNodes = ZKUtil.watchAndGetNewChildren(watcher,
+              watcher.assignmentZNode);
+          for(NodeAndData newNode : newNodes) {
+            LOG.debug("Handling new unassigned node: " + newNode);
+            handleRegion(RegionTransitionData.fromBytes(newNode.getData()));
+          }
+        } catch(KeeperException e) {
+          LOG.error("Unexpected ZK exception reading unassigned children", e);
+          master.abort();
+        }
+      }
+    }
+  }
+
+  /**
+   * Marks the region as online.  Removes it from regions in transition and
+   * updates the in-memory assignment information.
+   * <p>
+   * Used when a region has been successfully opened on a region server.
+   * @param regionInfo
+   * @param serverInfo
+   */
+  public void regionOnline(HRegionInfo regionInfo, HServerInfo serverInfo) {
+    synchronized(regionsInTransition) {
+      regionsInTransition.remove(regionInfo.getEncodedName());
+      regionsInTransition.notifyAll();
+    }
+    synchronized(regions) {
+      regions.put(regionInfo, serverInfo);
+      Set<HRegionInfo> regionSet = servers.get(serverInfo);
+      if(regionSet == null) {
+        regionSet = new TreeSet<HRegionInfo>();
+        servers.put(serverInfo, regionSet);
+      }
+      regionSet.add(regionInfo);
+    }
+  }
+
+  /**
+   * Marks the region as offline.  Removes it from regions in transition and
+   * removes in-memory assignment information.
+   * <p>
+   * Used when a region has been closed and should remain closed.
+   * @param regionInfo
+   * @param serverInfo
+   */
+  public void regionOffline(HRegionInfo regionInfo) {
+    synchronized(regionsInTransition) {
+      regionsInTransition.remove(regionInfo.getEncodedName());
+      regionsInTransition.notifyAll();
+    }
+    synchronized(regions) {
+      HServerInfo serverInfo = regions.remove(regionInfo);
+      Set<HRegionInfo> serverRegions = servers.get(serverInfo);
+      serverRegions.remove(regionInfo);
+    }
+  }
+
+  /**
+   * Sets the region as offline by removing in-memory assignment information but
+   * retaining transition information.
+   * <p>
+   * Used when a region has been closed but should be reassigned.
+   * @param regionInfo
+   */
+  public void setOffline(HRegionInfo regionInfo) {
+    synchronized(regions) {
+      HServerInfo serverInfo = regions.remove(regionInfo);
+      Set<HRegionInfo> serverRegions = servers.get(serverInfo);
+      serverRegions.remove(regionInfo);
+    }
+  }
+
+  // Assignment methods
+
+  /**
+   * Assigns the specified region.
+   * <p>
+   * If a RegionPlan is available with a valid destination then it will be used
+   * to determine what server region is assigned to.  If no RegionPlan is
+   * available, region will be assigned to a random available server.
+   * <p>
+   * Updates the RegionState and sends the OPEN RPC.
+   * <p>
+   * This will only succeed if the region is in transition and in a CLOSED or
+   * OFFLINE state or not in transition (in-memory not zk).  If the in-memory
+   * checks pass, the zk node is forced to OFFLINE before assigning.
+   *
+   * @param regionName server to be assigned
+   */
+  public void assign(HRegionInfo region) {
+    LOG.debug("Starting assignment for region " + region);
+    // Grab the state of this region and synchronize on it
+    String regionName = region.getEncodedName();
+    RegionState state;
+    synchronized(regionsInTransition) {
+      state = regionsInTransition.get(regionName);
+      if(state == null) {
+        state = new RegionState(region, RegionState.State.OFFLINE);
+        regionsInTransition.put(regionName, state);
+      }
+    }
+    synchronized(state) {
+      if(!state.isClosed() && !state.isOffline()) {
+        LOG.info("Attempting to assign region but it is in transition and in " +
+            "an unexpected state:" + state);
+        return;
+      } else {
+        state.update(RegionState.State.OFFLINE);
+      }
+      try {
+        if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), region,
+            master.getServerName())) {
+          LOG.warn("Attempted to create/force node into OFFLINE state before " +
+              "completing assignment but failed to do so");
+          return;
+        }
+      } catch (KeeperException e) {
+        LOG.error("Unexpected ZK exception creating/setting node OFFLINE", e);
+        master.abort();
+        return;
+      }
+      // Pickup existing plan or make a new one
+      RegionPlan plan;
+      synchronized(regionPlans) {
+        plan = regionPlans.get(regionName);
+        if(plan == null) {
+          LOG.debug("No previous transition plan for " + regionName +
+              " so generating a random one from " + serverManager.numServers() +
+              " ( " + serverManager.getOnlineServers().size() + ") available servers");
+          plan = new RegionPlan(regionName, null,
+              LoadBalancer.randomAssignment(serverManager.getOnlineServersList()));
+          regionPlans.put(regionName, plan);
+        }
+      }
+      // Transition RegionState to PENDING_OPEN and send OPEN RPC
+      state.update(RegionState.State.PENDING_OPEN);
+      serverManager.sendRegionOpen(plan.getDestination(), state.getRegion());
+    }
+  }
+
+  /**
+   * Unassigns the specified region.
+   * <p>
+   * Updates the RegionState and sends the OPEN RPC.
+   * <p>
+   * If a RegionPlan is already set, it will remain.  If this is being used
+   * to disable a table, be sure to use {@link #disableTable(String)} to ensure
+   * regions are not onlined after being closed.
+   *
+   * @param regionName server to be unassigned
+   */
+  public void unassign(HRegionInfo region) {
+    LOG.debug("Starting unassignment of region " + region + " (offlining)");
+    // Check if this region is currently assigned
+    if (!regions.containsKey(region)) {
+      LOG.debug("Attempted to unassign region " + region + " but it is not " +
+          "currently assigned anywhere");
+      return;
+    }
+    String regionName = region.getEncodedName();
+    // Grab the state of this region and synchronize on it
+    RegionState state;
+    synchronized(regionsInTransition) {
+      state = regionsInTransition.get(regionName);
+      if(state == null) {
+        state = new RegionState(region, RegionState.State.PENDING_CLOSE);
+        regionsInTransition.put(regionName, state);
+      } else {
+        LOG.debug("Attempting to unassign region " + region + " but it is " +
+            "already in transition (" + state.getState() + ")");
+        return;
+      }
+    }
+    // Send OPEN RPC
+    try {
+      serverManager.sendRegionClose(regions.get(region), state.getRegion());
+    } catch (NotServingRegionException e) {
+      LOG.warn("Attempted to close region " + region + " but got an NSRE", e);
+    }
+  }
+
+  /**
+   * Waits until the specified region has completed assignment.
+   * <p>
+   * If the region is already assigned, returns immediately.  Otherwise, method
+   * blocks until the region is assigned.
+   * @param regionInfo region to wait on assignment for
+   * @throws InterruptedException
+   */
+  public void waitForAssignment(HRegionInfo regionInfo)
+  throws InterruptedException {
+    synchronized(regions) {
+      while(!regions.containsKey(regionInfo)) {
+        regions.wait();
+      }
+    }
+  }
+
+  /**
+   * Assigns the ROOT region.
+   * <p>
+   * Assumes that ROOT is currently closed and is not being actively served by
+   * any RegionServer.
+   * <p>
+   * Forcibly unsets the current root region location in ZooKeeper and assigns
+   * ROOT to a random RegionServer.
+   */
+  public void assignRoot() {
+    // Force assignment to a random server
+    assign(HRegionInfo.ROOT_REGIONINFO);
+  }
+
+  /**
+   * Assigns the META region.
+   * <p>
+   * Assumes that META is currently closed and is not being actively served by
+   * any RegionServer.
+   * <p>
+   * Forcibly assigns META to a random RegionServer.
+   */
+  public void assignMeta() {
+    // Force assignment to a random server
+    assign(HRegionInfo.FIRST_META_REGIONINFO);
+  }
+
+  /**
+   * Assigns all user regions, if any exist.  Used during cluster startup.
+   * <p>
+   * This is a synchronous call and will return once every region has been
+   * assigned.  If anything fails, an exception is thrown and the cluster
+   * should be shutdown.
+   */
+  public void assignAllUserRegions() throws IOException {
+    // First experiment at synchronous assignment
+    // Simpler because just wait for no regions in transition
+
+    // Scan META for all user regions
+    List<HRegionInfo> allRegions = MetaScanner.listAllRegions(
+        master.getConfiguration());
+    if(allRegions == null || allRegions.isEmpty()) {
+      return;
+    }
+
+    // Get all available servers
+    List<HServerInfo> servers = serverManager.getOnlineServersList();
+
+    LOG.info("Assigning " + allRegions.size() + " across " + servers.size() +
+        " servers");
+
+    // Generate a cluster startup region placement plan
+    Map<HServerInfo,List<HRegionInfo>> bulkPlan =
+      LoadBalancer.bulkAssignment(allRegions, servers);
+
+    // For each server, create OFFLINE nodes and send OPEN RPCs
+    for(Map.Entry<HServerInfo,List<HRegionInfo>> entry : bulkPlan.entrySet()) {
+      HServerInfo server = entry.getKey();
+      List<HRegionInfo> regions = entry.getValue();
+      LOG.debug("Assigning " + regions.size() + " regions to " + server);
+      for(HRegionInfo region : regions) {
+        LOG.debug("Assigning " + region + " to " + server);
+        String regionName = region.getEncodedName();
+        RegionPlan plan = new RegionPlan(regionName, null,server);
+        regionPlans.put(regionName, plan);
+        assign(region);
+      }
+    }
+
+    // Wait for no regions to be in transition
+    try {
+      waitUntilNoRegionsInTransition();
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted waiting for regions to be assigned", e);
+      throw new IOException(e);
+    }
+
+    LOG.info("\n\nAll user regions have been assigned");
+  }
+
+  private void rebuildUserRegions() throws IOException {
+    Map<HRegionInfo,HServerAddress> allRegions =
+      MetaReader.fullScan(catalogTracker);
+    for(Map.Entry<HRegionInfo,HServerAddress> region : allRegions.entrySet()) {
+      HServerAddress regionLocation = region.getValue();
+      HRegionInfo regionInfo = region.getKey();
+      if(regionLocation == null) {
+        regions.put(regionInfo, null);
+        continue;
+      }
+      HServerInfo serverInfo = serverManager.getHServerInfo(regionLocation);
+      regions.put(regionInfo, serverInfo);
+      Set<HRegionInfo> regionSet = servers.get(serverInfo);
+      if(regionSet == null) {
+        regionSet = new TreeSet<HRegionInfo>();
+        servers.put(serverInfo, regionSet);
+      }
+      regionSet.add(regionInfo);
+    }
+  }
+
+  /**
+   * Blocks until there are no regions in transition.  It is possible that there
+   * are regions in transition immediately after this returns but guarantees
+   * that if it returns without an exception that there was a period of time
+   * with no regions in transition from the point-of-view of the in-memory
+   * state of the Master.
+   * @throws InterruptedException
+   */
+  public void waitUntilNoRegionsInTransition() throws InterruptedException {
+    synchronized(regionsInTransition) {
+      while(regionsInTransition.size() > 0) {
+        regionsInTransition.wait();
+      }
+    }
+  }
+
+  /**
+   * Gets the map of regions currently in transition.
+   * @return
+   */
+  public Map<String, RegionState> getRegionsInTransition() {
+    return regionsInTransition;
+  }
+
+  /**
+   * Checks if the specified table has been disabled by the user.
+   * @param tableName
+   * @return
+   */
+  public boolean isTableDisabled(String tableName) {
+    synchronized(disabledTables) {
+      return disabledTables.contains(tableName);
+    }
+  }
+
+  /**
+   * Checks if the table of the specified region has been disabled by the user.
+   * @param regionName
+   * @return
+   */
+  public boolean isTableOfRegionDisabled(byte [] regionName) {
+    return isTableDisabled(Bytes.toString(
+        HRegionInfo.getTableName(regionName)));
+  }
+
+  /**
+   * Sets the specified table to be disabled.
+   * @param tableName table to be disabled
+   */
+  public void disableTable(String tableName) {
+    synchronized(disabledTables) {
+      if(!isTableDisabled(tableName)) {
+        disabledTables.add(tableName);
+        try {
+          ZKTableDisable.disableTable(master.getZooKeeper(), tableName);
+        } catch (KeeperException e) {
+          LOG.warn("ZK error setting table as disabled", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Unsets the specified table from being disabled.
+   * <p>
+   * This operation only acts on the in-memory
+   * @param tableName table to be undisabled
+   */
+  public void undisableTable(String tableName) {
+    synchronized(disabledTables) {
+      if(isTableDisabled(tableName)) {
+        disabledTables.remove(tableName);
+        try {
+          ZKTableDisable.undisableTable(master.getZooKeeper(), tableName);
+        } catch (KeeperException e) {
+          LOG.warn("ZK error setting table as disabled", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Rebuild the set of disabled tables from zookeeper.  Used during master
+   * failover.
+   */
+  private void rebuildDisabledTables() {
+    synchronized(disabledTables) {
+      List<String> disabledTables;
       try {
-        List<NodeAndData> newNodes = ZKUtil.watchAndGetNewChildren(watcher,
-            watcher.assignmentZNode);
-        for(NodeAndData newNode : newNodes) {
-          LOG.debug("Handling new unassigned node: " + newNode);
-          handleRegion(RegionTransitionData.fromBytes(newNode.getData()));
-        }
-      } catch(KeeperException e) {
-        LOG.error("Unexpected ZK exception reading unassigned children", e);
-        status.abort();
+        disabledTables = ZKTableDisable.getDisabledTables(master.getZooKeeper());
+      } catch (KeeperException e) {
+        LOG.warn("ZK error getting list of disabled tables", e);
+        return;
       }
+      if(!disabledTables.isEmpty()) {
+        LOG.info("Rebuilt list of " + disabledTables.size() + " disabled " +
+            "tables from zookeeper");
+        disabledTables.addAll(disabledTables);
+      }
+    }
+  }
+
+  /**
+   * Gets the online regions of the specified table.
+   * @param tableName
+   * @return
+   */
+  public List<HRegionInfo> getRegionsOfTable(byte[] tableName) {
+    List<HRegionInfo> tableRegions = new ArrayList<HRegionInfo>();
+    for(HRegionInfo regionInfo : regions.tailMap(new HRegionInfo(
+        new HTableDescriptor(tableName), null, null)).keySet()) {
+      if(Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
+        tableRegions.add(regionInfo);
+      } else {
+        break;
+      }
+    }
+    return tableRegions;
+  }
+
+  /**
+   * Unsets the specified table as disabled (enables it).
+   */
+  public class TimeoutMonitor extends Chore {
+
+    private final int timeout;
+
+    /**
+     * Creates a periodic monitor to check for time outs on region transition
+     * operations.  This will deal with retries if for some reason something
+     * doesn't happen within the specified timeout.
+     * @param period
+     * @param stop
+     * @param timeout
+     */
+    public TimeoutMonitor(final int period, final AtomicBoolean stop,
+        final int timeout) {
+      super("AssignmentTimeoutMonitor", period, stop);
+      this.timeout = timeout;
+    }
+
+    @Override
+    protected void chore() {
+      synchronized(regionsInTransition) {
+        // Iterate all regions in transition checking for time outs
+        long now = System.currentTimeMillis();
+        for(RegionState regionState : regionsInTransition.values()) {
+          if(regionState.getStamp() + timeout <= now) {
+            HRegionInfo regionInfo = regionState.getRegion();
+            String regionName = regionInfo.getEncodedName();
+            LOG.info("Region transition timed out for region " + regionName);
+            // Expired!  Do a retry.
+            switch(regionState.getState()) {
+              case OFFLINE:
+              case CLOSED:
+                LOG.info("Region has been OFFLINE or CLOSED for too long, " +
+                    "reassigning " + regionInfo.getRegionNameAsString());
+                assign(regionState.getRegion());
+                break;
+              case PENDING_OPEN:
+              case OPENING:
+                LOG.info("Region has been PENDING_OPEN or OPENING for too " +
+                    "long, reassigning " + regionInfo.getRegionNameAsString());
+                assign(regionState.getRegion());
+                break;
+              case OPEN:
+                LOG.warn("Long-running region in OPEN state?  This should " +
+                    "not happen");
+                break;
+              case PENDING_CLOSE:
+              case CLOSING:
+                LOG.info("Region has been PENDING_CLOSE or CLOSING for too " +
+                    "long, resending close rpc");
+                unassign(regionInfo);
+                break;
+            }
+          }
+        }
+      }
+    }
+  }
+
+  public static class RegionState implements Writable {
+    private HRegionInfo region;
+
+    public enum State {
+      OFFLINE,        // region is in an offline state
+      PENDING_OPEN,   // sent rpc to server to open but has not begun
+      OPENING,        // server has begun to open but not yet done
+      OPEN,           // server opened region and updated meta
+      PENDING_CLOSE,  // sent rpc to server to close but has not begun
+      CLOSING,        // server has begun to close but not yet done
+      CLOSED          // server closed region and updated meta
+    }
+
+    private State state;
+    private long stamp;
+
+    public RegionState() {}
+
+    RegionState(HRegionInfo region, State state) {
+      this(region, state, System.currentTimeMillis());
+    }
+
+    RegionState(HRegionInfo region, State state, long stamp) {
+      this.region = region;
+      this.state = state;
+      this.stamp = stamp;
+    }
+
+    public void update(State state, long stamp) {
+      this.state = state;
+      this.stamp = stamp;
+    }
+
+    public void update(State state) {
+      this.state = state;
+      this.stamp = System.currentTimeMillis();
+    }
+
+    public State getState() {
+      return state;
+    }
+
+    public long getStamp() {
+      return stamp;
+    }
+
+    public HRegionInfo getRegion() {
+      return region;
+    }
+
+    public boolean isClosing() {
+      return state == State.CLOSING;
+    }
+
+    public boolean isClosed() {
+      return state == State.CLOSED;
+    }
+
+    public boolean isPendingClose() {
+      return state == State.PENDING_CLOSE;
+    }
+
+    public boolean isOpening() {
+      return state == State.OPENING;
+    }
+
+    public boolean isOpened() {
+      return state == State.OPEN;
+    }
+
+    public boolean isPendingOpen() {
+      return state == State.PENDING_OPEN;
+    }
+
+    public boolean isOffline() {
+      return state == State.OFFLINE;
+    }
+
+    @Override
+    public String toString() {
+      return "RegionState (" + region.getRegionNameAsString() + ") " + state +
+             " at time " + stamp;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      region = new HRegionInfo();
+      region.readFields(in);
+      state = State.valueOf(in.readUTF());
+      stamp = in.readLong();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      region.write(out);
+      out.writeUTF(state.name());
+      out.writeLong(stamp);
     }
   }
 }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java Thu Aug  5 07:35:00 2010
@@ -34,13 +34,14 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 
 /**
- * This class abstract a bunch of operations the HMaster needs to interact with 
- * the underlying file system, including splitting log files, checking file 
+ * This class abstract a bunch of operations the HMaster needs to interact with
+ * the underlying file system, including splitting log files, checking file
  * system status, etc.
  */
 public class FileSystemManager {
@@ -48,7 +49,7 @@ public class FileSystemManager {
   // HBase configuration
   Configuration conf;
   // master status
-  MasterStatus masterStatus;
+  MasterController masterStatus;
   // Keep around for convenience.
   private final FileSystem fs;
   // Is the fileystem ok?
@@ -59,8 +60,8 @@ public class FileSystemManager {
   private final Path rootdir;
   // create the split log lock
   final Lock splitLogLock = new ReentrantLock();
-  
-  public FileSystemManager(Configuration conf, MasterStatus masterStatus) throws IOException {
+
+  public FileSystemManager(Configuration conf, MasterController masterStatus) throws IOException {
     this.conf = conf;
     this.masterStatus = masterStatus;
     // Set filesystem to be that of this.rootdir else we get complaints about
@@ -105,7 +106,7 @@ public class FileSystemManager {
   public Path getOldLogDir() {
     return this.oldLogDir;
   }
-  
+
   /**
    * Checks to see if the file system is still accessible.
    * If not, sets closed
@@ -123,7 +124,7 @@ public class FileSystemManager {
     }
     return this.fsOk;
   }
-  
+
   /**
    * @return HBase root dir.
    * @throws IOException
@@ -131,20 +132,22 @@ public class FileSystemManager {
   public Path getRootDir() {
     return this.rootdir;
   }
-  
+
   public Lock getSplitLogLock() {
     return splitLogLock;
   }
-  
-  /*
+
+  /**
    * Inspect the log directory to recover any log file without
-   * ad active region server.
+   * an active region server.
    */
   public void splitLogAfterStartup() {
     Path logsDirPath =
       new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
     try {
-      if (!this.fs.exists(logsDirPath)) return;
+      if (!this.fs.exists(logsDirPath)) {
+        return;
+      }
     } catch (IOException e) {
       throw new RuntimeException("Could exists for " + logsDirPath, e);
     }
@@ -179,8 +182,8 @@ public class FileSystemManager {
       }
     }
   }
-  
-  /*
+
+  /**
    * Get the rootdir.  Make sure its wholesome and exists before returning.
    * @param rd
    * @param conf
@@ -238,7 +241,7 @@ public class FileSystemManager {
     }
   }
 
-  /*
+  /**
    * @param hri Set all family block caching to <code>b</code>
    * @param b
    */
@@ -250,4 +253,29 @@ public class FileSystemManager {
       }
     }
   }
+
+  public void deleteRegion(HRegionInfo region) throws IOException {
+    fs.delete(HRegion.getRegionDir(rootdir, region), true);
+  }
+
+  public void deleteTable(byte[] tableName) throws IOException {
+    fs.delete(new Path(rootdir, Bytes.toString(tableName)), true);
+  }
+
+  public void updateRegionInfo(HRegionInfo region) {
+    // TODO implement this.  i think this is currently broken in trunk i don't
+    //      see this getting updated.
+    //      @see HRegion.checkRegioninfoOnFilesystem()
+  }
+
+  public void addFamily(HRegionInfo region, byte[] familyName) {
+    // TODO Looks like the family directory is just created on the first flush?
+  }
+
+  public void deleteFamily(HRegionInfo region, byte[] familyName)
+  throws IOException {
+    fs.delete(Store.getStoreHomedir(
+        new Path(rootdir, region.getTableDesc().getNameAsString()),
+        region.getEncodedName(), familyName), true);
+  }
 }