You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2012/07/27 18:34:34 UTC

svn commit: r1366438 [1/3] - in /hbase/trunk/hbase-server/src: main/jamon/org/apache/hadoop/hbase/tmpl/master/ main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/h...

Author: jxiang
Date: Fri Jul 27 16:34:32 2012
New Revision: 1366438

URL: http://svn.apache.org/viewvc?rev=1366438&view=rev
Log:
HBASE-6272 In-memory region state is inconsistent

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionState.java   (with props)
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java   (with props)
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java   (with props)
Removed:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NotifiableConcurrentSkipListMap.java
Modified:
    hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/UnknownRegionException.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MXBeanImpl.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/Mocking.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMXBean.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterStatusServlet.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java

Modified: hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon?rev=1366438&r1=1366437&r2=1366438&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon (original)
+++ hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon Fri Jul 27 16:34:32 2012
@@ -20,7 +20,7 @@ limitations under the License.
 <%import>
 org.apache.hadoop.hbase.HRegionInfo;
 org.apache.hadoop.hbase.master.AssignmentManager;
-org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
+org.apache.hadoop.hbase.master.RegionState;
 org.apache.hadoop.conf.Configuration;
 org.apache.hadoop.hbase.HBaseConfiguration;
 org.apache.hadoop.hbase.HConstants;
@@ -32,7 +32,8 @@ AssignmentManager assignmentManager;
 int limit = 100;
 </%args>
 <%java>
-Map<String, RegionState> rit = assignmentManager.copyRegionsInTransition();
+Map<String, RegionState> rit = assignmentManager
+  .getRegionStates().getRegionsInTransition();
 // process the map to find region in transition details
 Configuration conf = HBaseConfiguration.create();
 int ritThreshold = conf.getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java?rev=1366438&r1=1366437&r2=1366438&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java Fri Jul 27 16:34:32 2012
@@ -20,19 +20,16 @@
 
 package org.apache.hadoop.hbase;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.HashSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.LiveServerInfo;
@@ -41,13 +38,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.RegionLoad;
-import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.VersionMismatchException;
 import org.apache.hadoop.io.VersionedWritable;
 
 import com.google.protobuf.ByteString;
@@ -83,7 +74,6 @@ public class ClusterStatus extends Versi
    *   <dt>3</dt> <dd>Added master and backupMasters</dd>
    * </dl>
    */
-  private static final byte VERSION_MASTER_BACKUPMASTERS = 2;
   private static final byte VERSION = 2;
 
   private String hbaseVersion;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/UnknownRegionException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/UnknownRegionException.java?rev=1366438&r1=1366437&r2=1366438&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/UnknownRegionException.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/UnknownRegionException.java Fri Jul 27 16:34:32 2012
@@ -19,8 +19,6 @@
  */
 package org.apache.hadoop.hbase;
 
-import java.io.IOException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -29,7 +27,7 @@ import org.apache.hadoop.classification.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class UnknownRegionException extends IOException {
+public class UnknownRegionException extends RegionException {
   private static final long serialVersionUID = 1968858760475205392L;
 
   public UnknownRegionException(String regionName) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1366438&r1=1366437&r2=1366438&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Fri Jul 27 16:34:32 2012
@@ -1178,6 +1178,16 @@ public class HBaseAdmin implements Abort
   }
 
   /**
+   * Get all the online regions on a region server.
+   */
+  public List<HRegionInfo> getOnlineRegions(
+      final ServerName sn) throws IOException {
+    AdminProtocol admin =
+      this.connection.getAdmin(sn.getHostname(), sn.getPort());
+    return ProtobufUtil.getOnlineRegions(admin);
+  }
+
+  /**
    * Flush a table or an individual region.
    * Asynchronous operation.
    *

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1366438&r1=1366437&r2=1366438&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Fri Jul 27 16:34:32 2012
@@ -19,14 +19,11 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -34,13 +31,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -65,7 +59,6 @@ import org.apache.hadoop.hbase.executor.
 import org.apache.hadoop.hbase.executor.EventHandler.EventType;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
-import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State;
 import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
@@ -73,11 +66,9 @@ import org.apache.hadoop.hbase.master.ha
 import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
 import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
 import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
-import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
-import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.hadoop.hbase.util.Pair;
@@ -125,6 +116,11 @@ public class AssignmentManager extends Z
   final private KeyLocker<String> locker = new KeyLocker<String>();
 
   /**
+   * Used for assignment only.  TODO: revisit the assign lock scheme
+   */
+  final private KeyLocker<String> assignLocker = new KeyLocker<String>();
+
+  /**
    * Map of regions to reopen after the schema of a table is changed. Key -
    * encoded region name, value - HRegionInfo
    */
@@ -135,13 +131,6 @@ public class AssignmentManager extends Z
    */
   private final int maximumAssignmentAttempts;
 
-  /**
-   * Regions currently in transition.  Map of encoded region names to the master
-   * in-memory state for that region.
-   */
-  final NotifiableConcurrentSkipListMap<String, RegionState> regionsInTransition =
-    new NotifiableConcurrentSkipListMap<String, RegionState>();
-
   /** Plans for region movement. Key is the encoded version of a region name*/
   // TODO: When do plans get cleaned out?  Ever? In server open and in server
   // shutdown processing -- St.Ack
@@ -157,32 +146,12 @@ public class AssignmentManager extends Z
   Set<String> enablingTables = new HashSet<String>();
 
   /**
-   * Server to regions assignment map.
-   * Contains the set of regions currently assigned to a given server.
-   * This Map and {@link #regions} are tied.  Always update this in tandem
-   * with the other under a lock on {@link #regions}.
-   * @see #regions
-   */
-  private final NavigableMap<ServerName, Set<HRegionInfo>> servers =
-    new TreeMap<ServerName, Set<HRegionInfo>>();
-
-  /**
    * Contains the server which need to update timer, these servers will be
    * handled by {@link TimerUpdater}
    */
   private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer = 
     new ConcurrentSkipListSet<ServerName>();
 
-  /**
-   * Region to server assignment map.
-   * Contains the server a given region is currently assigned to.
-   * This Map and {@link #servers} are tied.  Always update this in tandem
-   * with the other under a lock on {@link #regions}.
-   * @see #servers
-   */
-  private final SortedMap<HRegionInfo, ServerName> regions =
-    new TreeMap<HRegionInfo, ServerName>();
-
   private final ExecutorService executorService;
 
   //Thread pool executor service for timeout monitor
@@ -205,6 +174,8 @@ public class AssignmentManager extends Z
    // metrics instance to send metrics for RITs
    MasterMetrics masterMetrics;
 
+   private final RegionStates regionStates;
+
   /**
    * Constructs a new assignment manager.
    *
@@ -217,8 +188,7 @@ public class AssignmentManager extends Z
    */
   public AssignmentManager(Server master, ServerManager serverManager,
       CatalogTracker catalogTracker, final LoadBalancer balancer,
-      final ExecutorService service, MasterMetrics metrics)
-  throws KeeperException, IOException {
+      final ExecutorService service, MasterMetrics metrics) throws KeeperException, IOException {
     super(master.getZooKeeper());
     this.master = master;
     this.serverManager = serverManager;
@@ -241,6 +211,7 @@ public class AssignmentManager extends Z
     this.balancer = balancer;
     this.threadPoolExecutorService = Executors.newCachedThreadPool();
     this.masterMetrics = metrics;// can be null only with tests.
+    this.regionStates = new RegionStates(master, serverManager);
   }
 
   void startTimeOutMonitor() {
@@ -249,26 +220,6 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * Compute the average load across all region servers.
-   * Currently, this uses a very naive computation - just uses the number of
-   * regions being served, ignoring stats about number of requests.
-   * @return the average load
-   */
-  double getAverageLoad() {
-    int totalLoad = 0;
-    int numServers = 0;
-    // Sync on this.regions because access to this.servers always synchronizes
-    // in this order.
-    synchronized (this.regions) {
-      for (Map.Entry<ServerName, Set<HRegionInfo>> e: servers.entrySet()) {
-        numServers++;
-        totalLoad += e.getValue().size();
-      }
-    }
-    return (double)totalLoad / (double)numServers;
-  }
-
-  /**
    * @return Instance of ZKTable.
    */
   public ZKTable getZKTable() {
@@ -276,17 +227,19 @@ public class AssignmentManager extends Z
     // sharing.
     return this.zkTable;
   }
+
   /**
-   * Returns the RegionServer to which hri is assigned.
+   * This SHOULD not be public. It is public now
+   * because of some unit tests.
    *
-   * @param hri
-   *          HRegion for which this function returns the region server
-   * @return HServerInfo The region server to which hri belongs
+   * TODO: make it package private and keep RegionStates in the master package
    */
-  public ServerName getRegionServerOfRegion(HRegionInfo hri) {
-    synchronized (this.regions ) {
-      return regions.get(hri);
-    }
+  public RegionStates getRegionStates() {
+    return regionStates;
+  }
+
+  public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
+    return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
   }
 
   /**
@@ -337,7 +290,8 @@ public class AssignmentManager extends Z
     for (HRegionInfo hri : hris) {
       String name = hri.getEncodedName();
       // no lock concurrent access ok: sequential consistency respected.
-      if (regionsToReopen.containsKey(name) || regionsInTransition.containsKey(name)) {
+      if (regionsToReopen.containsKey(name)
+          || regionStates.isRegionInTransition(name)) {
         pending++;
       }
     }
@@ -389,19 +343,6 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * Process all regions that are in transition up in zookeeper.  Used by
-   * master joining an already running cluster.
-   * @throws KeeperException
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  void processDeadServersAndRegionsInTransition()
-  throws KeeperException, IOException, InterruptedException {
-    // Pass null to signify no dead servers in this context.
-    processDeadServersAndRegionsInTransition(null);
-  }
-
-  /**
    * Process all regions that are in transition in zookeeper and also
    * processes the list of dead servers by scanning the META. 
    * Used by master joining an cluster.  If we figure this is a clean cluster
@@ -417,35 +358,34 @@ public class AssignmentManager extends Z
   throws KeeperException, IOException, InterruptedException {
     List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
       watcher.assignmentZNode);
-    
+
     if (nodes == null) {
       String errorMessage = "Failed to get the children from ZK";
       master.abort(errorMessage, new IOException(errorMessage));
       return;
     }
-    
+
     // Run through all regions.  If they are not assigned and not in RIT, then
     // its a clean cluster startup, else its a failover.
-    synchronized (this.regions) {    
-      for (Map.Entry<HRegionInfo, ServerName> e: this.regions.entrySet()) {
-        if (!e.getKey().isMetaTable() && e.getValue() != null) {
-          LOG.debug("Found " + e + " out on cluster");
-          this.failover = true;
-          break;
-        }
-        if (nodes.contains(e.getKey().getEncodedName())) {
-          LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
-          // Could be a meta region.
-          this.failover = true;
-          break;
-        }
+    Map<HRegionInfo, ServerName> regions = regionStates.getRegionAssignments();
+    for (Map.Entry<HRegionInfo, ServerName> e: regions.entrySet()) {
+      if (!e.getKey().isMetaTable() && e.getValue() != null) {
+        LOG.debug("Found " + e + " out on cluster");
+        this.failover = true;
+        break;
+      }
+      if (nodes.contains(e.getKey().getEncodedName())) {
+        LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
+        // Could be a meta region.
+        this.failover = true;
+        break;
       }
     }
 
     // Remove regions in RIT, they are possibly being processed by
     // ServerShutdownHandler.
     // no lock concurrent access ok: some threads may be adding/removing items but its java-valid
-    nodes.removeAll(regionsInTransition.keySet());
+    nodes.removeAll(regionStates.getRegionsInTransition().keySet());
 
     // If some dead servers are processed by ServerShutdownHandler, we shouldn't
     // assign all user regions( some would be assigned by
@@ -488,11 +428,10 @@ public class AssignmentManager extends Z
     if (!intransistion) return intransistion;
     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
     while (!this.master.isStopped() &&
-      // no lock concurrent access ok: sequentially consistent
-      this.regionsInTransition.containsKey(hri.getEncodedName())) {
+      this.regionStates.isRegionInTransition(hri.getEncodedName())) {
       // We put a timeout because we may have the region getting in just between the test
       //  and the waitForUpdate
-      this.regionsInTransition.waitForUpdate(100);
+      this.regionStates.waitForUpdate(100);
     }
     return intransistion;
   }
@@ -543,8 +482,8 @@ public class AssignmentManager extends Z
     // is that we don't have two threads working on the same region.
     Lock lock = locker.acquireLock(encodedRegionName);
     try {
-      RegionState regionState = regionsInTransition.get(encodedRegionName);
-      if (regionState != null || failoverProcessedRegions.containsKey(encodedRegionName)) {
+      if (regionStates.isRegionInTransition(encodedRegionName)
+          || failoverProcessedRegions.containsKey(encodedRegionName)) {
         // Just return
         return;
       }
@@ -552,15 +491,14 @@ public class AssignmentManager extends Z
       case M_ZK_REGION_CLOSING:
         // If zk node of the region was updated by a live server skip this
         // region and just add it into RIT.
-        if (isOnDeadServer(regionInfo, deadServers) && (sn == null || !isServerOnline(sn))) {
+        if (isOnDeadServer(regionInfo, deadServers) && !isServerOnline(sn)) {
           // If was on dead server, its closed now. Force to OFFLINE and this
           // will get it reassigned if appropriate
           forceOffline(regionInfo, rt);
         } else {
           // Just insert region into RIT.
           // If this never updates the timeout will trigger new assignment
-          regionsInTransition.put(encodedRegionName,
-            getRegionState(regionInfo, RegionState.State.CLOSING, rt));
+          regionStates.updateRegionState(rt, RegionState.State.CLOSING);
         }
         failoverProcessedRegions.put(encodedRegionName, regionInfo);
         break;
@@ -583,8 +521,7 @@ public class AssignmentManager extends Z
           // RPC is not yet sent
           addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
         } else {
-          regionsInTransition.put(encodedRegionName,
-            getRegionState(regionInfo, RegionState.State.PENDING_OPEN, rt));
+          regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
         }
         failoverProcessedRegions.put(encodedRegionName, regionInfo);
         break;
@@ -595,8 +532,7 @@ public class AssignmentManager extends Z
         // Just insert region into RIT
         // If this never updates the timeout will trigger new assignment
         if (regionInfo.isMetaTable()) {
-          regionsInTransition.put(encodedRegionName,
-            getRegionState(regionInfo, RegionState.State.OPENING, rt));
+          regionStates.updateRegionState(rt, RegionState.State.OPENING);
           // If ROOT or .META. table is waiting for timeout monitor to assign
           // it may take lot of time when the assignment.timeout.period is
           // the default value which may be very long.  We will not be able
@@ -609,16 +545,14 @@ public class AssignmentManager extends Z
           // it to a new RS. (HBASE-5882)
           processOpeningState(regionInfo);
           break;
-        } 
-        regionsInTransition.put(encodedRegionName,
-          getRegionState(regionInfo, RegionState.State.OPENING, rt));
+        }
+        regionStates.updateRegionState(rt, RegionState.State.OPENING);
         failoverProcessedRegions.put(encodedRegionName, regionInfo);
         break;
 
       case RS_ZK_REGION_OPENED:
         // Region is opened, insert into RIT and handle it
-        regionsInTransition.put(encodedRegionName,
-          getRegionState(regionInfo, RegionState.State.OPEN, rt));
+        regionStates.updateRegionState(rt, RegionState.State.OPEN);
         // sn could be null if this server is no longer online.  If
         // that is the case, just let this RIT timeout; it'll be assigned
         // to new server then.
@@ -679,25 +613,11 @@ public class AssignmentManager extends Z
    */
   private void addToRITandCallClose(final HRegionInfo hri,
                                     final RegionState.State state, final RegionTransition oldData) {
-    // No lock concurrency: adding a synchronized here would not prevent to have two
-    //  entries as we don't check if the region is already there. This must be ensured by the
-    //  method callers.
-    this.regionsInTransition.put(hri.getEncodedName(), getRegionState(hri, state, oldData));
+    regionStates.updateRegionState(oldData, state);
     new ClosedRegionHandler(this.master, this, hri).process();
   }
 
   /**
-   * @param hri
-   * @param state
-   * @param rt
-   * @return A new {@link RegionState} instance made of the passed arguments
-   */
-  RegionState getRegionState(final HRegionInfo hri, final RegionState.State state,
-      final RegionTransition rt) {
-    return new RegionState(hri, state, rt.getCreateTime(), rt.getServerName());
-  }
-
-  /**
    * When a region is closed, it should be removed from the regionsToReopen
    * @param hri HRegionInfo of the region which was closed
    */
@@ -769,12 +689,12 @@ public class AssignmentManager extends Z
     try {
       // Printing if the event was created a long time ago helps debugging
       boolean lateEvent = createTime < (System.currentTimeMillis() - 15000);
-      RegionState regionState = regionsInTransition.get(encodedName);
+      RegionState regionState = regionStates.getRegionTransitionState(encodedName);
       LOG.debug("Handling transition=" + rt.getEventType() +
         ", server=" + sn + ", region=" +
         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
         (lateEvent ? ", which is more than 15 seconds late" : "") +
-        ", current state from RIT=" + regionState);
+        ", current state from region state map =" + regionState);
       switch (rt.getEventType()) {
         case M_ZK_REGION_OFFLINE:
           // Nothing to do.
@@ -782,7 +702,7 @@ public class AssignmentManager extends Z
 
         case RS_ZK_REGION_SPLITTING:
           if (!isInStateForSplitting(regionState)) break;
-          addSplittingToRIT(sn, encodedName);
+          regionStates.updateRegionState(rt, RegionState.State.SPLITTING);
           break;
 
         case RS_ZK_REGION_SPLIT:
@@ -790,7 +710,9 @@ public class AssignmentManager extends Z
           if (!isInStateForSplitting(regionState)) break;
           // If null, add SPLITTING state before going to SPLIT
           if (regionState == null) {
-            regionState = addSplittingToRIT(sn, encodedName);
+            regionState = regionStates.updateRegionState(rt,
+              RegionState.State.SPLITTING);
+
             String message = "Received SPLIT for region " + prettyPrintedRegionName +
               " from server " + sn;
             // If still null, it means we cannot find it and it was already processed
@@ -826,8 +748,8 @@ public class AssignmentManager extends Z
         case M_ZK_REGION_CLOSING:
           hri = checkIfInFailover(regionState, encodedName, regionName);
           if (hri != null) {
-            regionState = new RegionState(hri, RegionState.State.CLOSING, createTime, sn);
-            regionsInTransition.put(encodedName, regionState);
+            regionState = regionStates.updateRegionState(
+              hri, RegionState.State.CLOSING, createTime, sn);
             failoverProcessedRegions.put(encodedName, hri);
             break;
           }
@@ -842,14 +764,14 @@ public class AssignmentManager extends Z
             return;
           }
           // Transition to CLOSING (or update stamp if already CLOSING)
-          regionState.update(RegionState.State.CLOSING, createTime, sn);
+          regionStates.updateRegionState(rt, RegionState.State.CLOSING);
           break;
 
         case RS_ZK_REGION_CLOSED:
           hri = checkIfInFailover(regionState, encodedName, regionName);
           if (hri != null) {
-            regionState = new RegionState(hri, RegionState.State.CLOSED, createTime, sn);
-            regionsInTransition.put(encodedName, regionState);
+            regionState = regionStates.updateRegionState(
+              hri, RegionState.State.CLOSED, createTime, sn);
             removeClosedRegion(regionState.getRegion());
             new ClosedRegionHandler(master, this, regionState.getRegion())
               .process();
@@ -868,7 +790,7 @@ public class AssignmentManager extends Z
           // Handle CLOSED by assigning elsewhere or stopping if a disable
           // If we got here all is good.  Need to update RegionState -- else
           // what follows will fail because not in expected state.
-          regionState.update(RegionState.State.CLOSED, createTime, sn);
+          regionStates.updateRegionState(rt, RegionState.State.CLOSED);
           removeClosedRegion(regionState.getRegion());
           this.executorService.submit(new ClosedRegionHandler(master,
             this, regionState.getRegion()));
@@ -877,8 +799,8 @@ public class AssignmentManager extends Z
         case RS_ZK_REGION_FAILED_OPEN:
           hri = checkIfInFailover(regionState, encodedName, regionName);
           if (hri != null) {
-            regionState = new RegionState(hri, RegionState.State.CLOSED, createTime, sn);
-            regionsInTransition.put(encodedName, regionState);
+            regionState = regionStates.updateRegionState(
+              hri, RegionState.State.CLOSED, createTime, sn);
             new ClosedRegionHandler(master, this, regionState.getRegion())
               .process();
             failoverProcessedRegions.put(encodedName, hri);
@@ -892,8 +814,8 @@ public class AssignmentManager extends Z
             return;
           }
           // Handle this the same as if it were opened and then closed.
-          regionState.update(RegionState.State.CLOSED, createTime, sn);
-          // When there are more than one region server a new RS is selected as the 
+          regionStates.updateRegionState(rt, RegionState.State.CLOSED);
+          // When there are more than one region server a new RS is selected as the
           // destination and the same is updated in the regionplan. (HBASE-5546)
           getRegionPlan(regionState, sn, true);
           this.executorService.submit(new ClosedRegionHandler(master,
@@ -903,8 +825,8 @@ public class AssignmentManager extends Z
         case RS_ZK_REGION_OPENING:
           hri = checkIfInFailover(regionState, encodedName, regionName);
           if (hri != null) {
-            regionState = new RegionState(hri, RegionState.State.OPENING, createTime, sn);
-            regionsInTransition.put(encodedName, regionState);
+            regionState = regionStates.updateRegionState(
+              hri, RegionState.State.OPENING, createTime, sn);
             failoverProcessedRegions.put(encodedName, hri);
             break;
           }
@@ -920,14 +842,14 @@ public class AssignmentManager extends Z
             return;
           }
           // Transition to OPENING (or update stamp if already OPENING)
-          regionState.update(RegionState.State.OPENING, createTime, sn);
+          regionStates.updateRegionState(rt, RegionState.State.OPENING);
           break;
 
         case RS_ZK_REGION_OPENED:
           hri = checkIfInFailover(regionState, encodedName, regionName);
           if (hri != null) {
-            regionState = new RegionState(hri, RegionState.State.OPEN, createTime, sn);
-            regionsInTransition.put(encodedName, regionState);
+            regionState = regionStates.updateRegionState(
+              hri, RegionState.State.OPEN, createTime, sn);
             new OpenedRegionHandler(master, this, regionState.getRegion(), sn, expectedVersion).process();
             failoverProcessedRegions.put(encodedName, hri);
             break;
@@ -943,7 +865,7 @@ public class AssignmentManager extends Z
             return;
           }
           // Handle OPENED by removing from transition and deleted zk node
-          regionState.update(RegionState.State.OPEN, createTime, sn);
+          regionStates.updateRegionState(rt, RegionState.State.OPEN);
           this.executorService.submit(
             new OpenedRegionHandler(master, this, regionState.getRegion(), sn, expectedVersion));
           break;
@@ -975,7 +897,7 @@ public class AssignmentManager extends Z
     }
     return null;
   }
-  
+
   /**
    * Gets the HRegionInfo from the META table
    * @param  regionName
@@ -1019,7 +941,8 @@ public class AssignmentManager extends Z
   private boolean convertPendingCloseToSplitting(final RegionState rs) {
     if (!rs.isPendingClose()) return false;
     LOG.debug("Converting PENDING_CLOSE to SPLITING; rs=" + rs);
-    rs.update(RegionState.State.SPLITTING);
+    regionStates.updateRegionState(
+      rs.getRegion(), RegionState.State.SPLITTING);
     // Clean up existing state.  Clear from region plans seems all we
     // have to do here by way of clean up of PENDING_CLOSE.
     clearRegionPlan(rs.getRegion());
@@ -1027,63 +950,6 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * @param serverName
-   * @param encodedName
-   * @return The SPLITTING RegionState we added to RIT for the passed region
-   * <code>encodedName</code>
-   */
-  private RegionState addSplittingToRIT(final ServerName serverName,
-      final String encodedName) {
-    RegionState regionState = null;
-    synchronized (this.regions) {
-      regionState = findHRegionInfoThenAddToRIT(serverName, encodedName);
-      if (regionState != null) {
-        regionState.update(RegionState.State.SPLITTING,
-          System.currentTimeMillis(), serverName);
-      }
-    }
-    return regionState;
-  }
-
-  /**
-   * Caller must hold lock on <code>this.regions</code>.
-   * @param serverName
-   * @param encodedName
-   * @return The instance of RegionState that was added to RIT or null if error.
-   */
-  private RegionState findHRegionInfoThenAddToRIT(final ServerName serverName,
-      final String encodedName) {
-    HRegionInfo hri = findHRegionInfo(serverName, encodedName);
-    if (hri == null) {
-      LOG.warn("Region " + encodedName + " not found on server " + serverName +
-        "; failed processing");
-      return null;
-    }
-    // Add to regions in transition, then update state to SPLITTING.
-    return addToRegionsInTransition(hri);
-  }
-
-  /**
-   * Caller must hold lock on <code>this.regions</code>.
-   * @param serverName
-   * @param encodedName
-   * @return Found HRegionInfo or null.
-   */
-  private HRegionInfo findHRegionInfo(final ServerName sn,
-      final String encodedName) {
-    if (!this.serverManager.isServerOnline(sn)) return null;
-    Set<HRegionInfo> hris = this.servers.get(sn);
-    HRegionInfo foundHri = null;
-    for (HRegionInfo hri: hris) {
-      if (hri.getEncodedName().equals(encodedName)) {
-        foundHri = hri;
-        break;
-      }
-    }
-    return foundHri;
-  }
-
-  /**
    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
    * <p>
    * This is handled in a separate code path because it breaks the normal rules.
@@ -1094,7 +960,7 @@ public class AssignmentManager extends Z
     LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
       ", server=" + rt.getServerName() + ", region=" +
       HRegionInfo.prettyPrint(encodedName));
-    RegionState regionState = regionsInTransition.get(encodedName);
+    RegionState regionState = regionStates.getRegionTransitionState(encodedName);
     switch (rt.getEventType()) {
       case M_ZK_REGION_OFFLINE:
         HRegionInfo regionInfo = null;
@@ -1178,9 +1044,7 @@ public class AssignmentManager extends Z
   public void nodeDeleted(final String path) {
     if (path.startsWith(this.watcher.assignmentZNode)) {
       String regionName = ZKAssign.getRegionName(this.master.getZooKeeper(), path);
-      // no lock concurrency ok: sequentially consistent if someone adds/removes the region in
-      //  the same time
-        RegionState rs = this.regionsInTransition.get(regionName);
+      RegionState rs = regionStates.getRegionTransitionState(regionName);
       if (rs != null) {
         HRegionInfo regionInfo = rs.getRegion();
         if (rs.isSplit()) {
@@ -1191,26 +1055,22 @@ public class AssignmentManager extends Z
           LOG.debug("The znode of region " + regionInfo.getRegionNameAsString()
               + " has been deleted.");
           if (rs.isOpened()) {
-            makeRegionOnline(rs, regionInfo);
+            ServerName serverName = rs.getServerName();
+            regionOnline(regionInfo, serverName);
+            LOG.info("The master has opened the region "
+              + regionInfo.getRegionNameAsString() + " that was online on "
+              + serverName);
+            if (this.getZKTable().isDisablingOrDisabledTable(
+                regionInfo.getTableNameAsString())) {
+              LOG.debug("Opened region "
+                  + regionInfo.getRegionNameAsString() + " but "
+                  + "this table is disabled, triggering close of region");
+              unassign(regionInfo);
+            }
           }
         }
       }
     }
-
-  }
-
-  private void makeRegionOnline(RegionState rs, HRegionInfo regionInfo) {
-    regionOnline(regionInfo, rs.serverName);
-    LOG.info("The master has opened the region "
-        + regionInfo.getRegionNameAsString() + " that was online on "
-        + rs.serverName);
-    if (this.getZKTable().isDisablingOrDisabledTable(
-        regionInfo.getTableNameAsString())) {
-      LOG.debug("Opened region "
-          + regionInfo.getRegionNameAsString() + " but "
-          + "this table is disabled, triggering close of region");
-      unassign(regionInfo);
-    }
   }
 
   /**
@@ -1247,24 +1107,13 @@ public class AssignmentManager extends Z
    * @param sn
    */
   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
-    // no lock concurrency ok.
-    this.regionsInTransition.remove(regionInfo.getEncodedName());
-
-    synchronized (this.regions) {
-      // Add check
-      ServerName oldSn = this.regions.get(regionInfo);
-      if (oldSn != null) LOG.warn("Overwriting " + regionInfo.getEncodedName() +
-        " on " + oldSn + " with " + sn);
-      
-      if (isServerOnline(sn)) {
-        this.regions.put(regionInfo, sn);
-        addToServers(sn, regionInfo);
-        this.regions.notifyAll();
-      } else {
-        LOG.info("The server is not in online servers, ServerName=" + 
-          sn.getServerName() + ", region=" + regionInfo.getEncodedName());
-      }
+    if (!isServerOnline(sn)) {
+      LOG.warn("A region was opened on a dead server, ServerName=" +
+        sn.getServerName() + ", region=" + regionInfo.getEncodedName());
     }
+
+    regionStates.regionOnline(regionInfo, sn);
+
     // Remove plan if one.
     clearRegionPlan(regionInfo);
     // Add the server to serversInUpdatingTimer
@@ -1307,11 +1156,9 @@ public class AssignmentManager extends Z
 
     for (Map.Entry<String, RegionPlan> e : rps) {
       if (e.getValue() != null && e.getKey() != null && sn.equals(e.getValue().getDestination())) {
-        RegionState rs = this.regionsInTransition.get(e.getKey());
-        if (rs != null) {
-          // no lock concurrency ok: there is a write when we update the timestamp but it's ok
-          //  as it's an AtomicLong
-          rs.updateTimestampToNow();
+        RegionState regionState = regionStates.getRegionTransitionState(e.getKey());
+        if (regionState != null) {
+          regionState.updateTimestampToNow();
         }
       }
     }
@@ -1325,30 +1172,10 @@ public class AssignmentManager extends Z
    * @param regionInfo
    */
   public void regionOffline(final HRegionInfo regionInfo) {
-    // no lock concurrency ok
-    this.regionsInTransition.remove(regionInfo.getEncodedName());
+    regionStates.regionOffline(regionInfo);
 
     // remove the region plan as well just in case.
     clearRegionPlan(regionInfo);
-    setOffline(regionInfo);
-  }
-
-  /**
-   * Sets the region as offline by removing in-memory assignment information but
-   * retaining transition information.
-   * <p>
-   * Used when a region has been closed but should be reassigned.
-   * @param regionInfo
-   */
-  public void setOffline(HRegionInfo regionInfo) {
-    synchronized (this.regions) {
-      ServerName sn = this.regions.remove(regionInfo);
-      if (sn == null) return;
-      Set<HRegionInfo> serverRegions = this.servers.get(sn);
-      if (!serverRegions.remove(regionInfo)) {
-        LOG.warn("No " + regionInfo + " on " + sn);
-      }
-    }
   }
 
   public void offlineDisabledRegion(HRegionInfo regionInfo) {
@@ -1417,10 +1244,18 @@ public class AssignmentManager extends Z
         region.getRegionNameAsString());
       return;
     }
-    RegionState state = addToRegionsInTransition(region,
+    RegionState state = forceRegionStateToOffline(region,
         hijack);
-    synchronized (state) {
+    // TODO: we can't synchronized on state any more since it could
+    // be an new instance.  We need to reconsider how to avoid
+    // double/multiple assignments.
+    // This is to prevent double assignments? Does it work?
+    String encodedName = region.getEncodedName();
+    Lock lock = assignLocker.acquireLock(encodedName);
+    try {
       assign(region, state, setOfflineInZK, forceNewPlan, hijack);
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -1455,7 +1290,7 @@ public class AssignmentManager extends Z
     // time; i.e. handlers on backend won't be trying to set it to OPEN, etc.
     AtomicInteger counter = new AtomicInteger(0);
     CreateUnassignedAsyncCallback cb =
-      new CreateUnassignedAsyncCallback(this.watcher, destination, counter);
+      new CreateUnassignedAsyncCallback(regionStates, this.watcher, destination, counter);
     for (RegionState state: states) {
       if (!asyncSetOfflineInZooKeeper(state, cb, state)) {
         return false;
@@ -1579,12 +1414,15 @@ public class AssignmentManager extends Z
    */
   static class CreateUnassignedAsyncCallback implements AsyncCallback.StringCallback {
     private final Log LOG = LogFactory.getLog(CreateUnassignedAsyncCallback.class);
+    private final RegionStates regionStates;
     private final ZooKeeperWatcher zkw;
     private final ServerName destination;
     private final AtomicInteger counter;
 
-    CreateUnassignedAsyncCallback(final ZooKeeperWatcher zkw,
-        final ServerName destination, final AtomicInteger counter) {
+    CreateUnassignedAsyncCallback(final RegionStates regionStates,
+        final ZooKeeperWatcher zkw, final ServerName destination,
+        final AtomicInteger counter) {
+      this.regionStates = regionStates;
       this.zkw = zkw;
       this.destination = destination;
       this.counter = counter;
@@ -1604,7 +1442,7 @@ public class AssignmentManager extends Z
       // Async exists to set a watcher so we'll get triggered when
       // unassigned node changes.
       this.zkw.getRecoverableZooKeeper().getZooKeeper().exists(path, this.zkw,
-        new ExistsUnassignedAsyncCallback(this.counter, destination), ctx);
+        new ExistsUnassignedAsyncCallback(regionStates, counter, destination), ctx);
     }
   }
 
@@ -1614,10 +1452,13 @@ public class AssignmentManager extends Z
    */
   static class ExistsUnassignedAsyncCallback implements AsyncCallback.StatCallback {
     private final Log LOG = LogFactory.getLog(ExistsUnassignedAsyncCallback.class);
+    private final RegionStates regionStates;
     private final AtomicInteger counter;
     private ServerName destination;
 
-    ExistsUnassignedAsyncCallback(final AtomicInteger counter, ServerName destination) {
+    ExistsUnassignedAsyncCallback(final RegionStates regionStates,
+        final AtomicInteger counter, ServerName destination) {
+      this.regionStates = regionStates;
       this.counter = counter;
       this.destination = destination;
     }
@@ -1625,7 +1466,7 @@ public class AssignmentManager extends Z
     @Override
     public void processResult(int rc, String path, Object ctx, Stat stat) {
       if (rc != 0) {
-        // Thisis resultcode.  If non-zero, need to resubmit.
+        // This is result code.  If non-zero, need to resubmit.
         LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
           "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
         return;
@@ -1637,28 +1478,14 @@ public class AssignmentManager extends Z
       // yet sent out the actual open but putting this state change after the
       // call to open risks our writing PENDING_OPEN after state has been moved
       // to OPENING by the regionserver.
-      state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(), destination);
+      regionStates.updateRegionState(state.getRegion(),
+        RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
+        destination);
       this.counter.addAndGet(1);
     }
   }
 
   /**
-   * @param region
-   * @return The current RegionState
-   */
-  private RegionState addToRegionsInTransition(final HRegionInfo region) {
-    return addToRegionsInTransition(region, false);
-  }
-  /**
-   * @param region
-   * @param hijack
-   * @return The current RegionState
-   */
-  private RegionState addToRegionsInTransition(final HRegionInfo region,
-      boolean hijack) {
-    return forceRegionStateToOffline(region, hijack);
-  }
-  /**
    * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
    * @param region
    * @return Amended RegionState.
@@ -1679,10 +1506,10 @@ public class AssignmentManager extends Z
 
     Lock lock = locker.acquireLock(encodedName);
     try {
-      RegionState state = this.regionsInTransition.get(encodedName);
+      RegionState state = regionStates.getRegionTransitionState(encodedName);
       if (state == null) {
-        state = new RegionState(region, RegionState.State.OFFLINE);
-        this.regionsInTransition.put(encodedName, state);
+        state = regionStates.updateRegionState(
+          region, RegionState.State.OFFLINE);
       } else {
         // If we are reassigning the node do not force in-memory state to OFFLINE.
         // Based on the znode state we will decide if to change in-memory state to
@@ -1692,7 +1519,8 @@ public class AssignmentManager extends Z
         // assign on its tail as part of the handling of a region close.
         if (!hijack) {
           LOG.debug("Forcing OFFLINE; was=" + state);
-          state.update(RegionState.State.OFFLINE);
+          state = regionStates.updateRegionState(
+            region, RegionState.State.OFFLINE);
         }
       }
       return state;
@@ -1751,8 +1579,9 @@ public class AssignmentManager extends Z
         LOG.info("Assigning region " + state.getRegion().getRegionNameAsString() +
           " to " + plan.getDestination().toString());
         // Transition RegionState to PENDING_OPEN
-        state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
-            plan.getDestination());
+        regionStates.updateRegionState(state.getRegion(),
+          RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
+          plan.getDestination());
         // Send OPEN RPC. This can fail if the server on other end is is not up.
         // Pass the version that was obtained while setting the node to OFFLINE.
         RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan
@@ -1781,7 +1610,8 @@ public class AssignmentManager extends Z
         // Clean out plan we failed execute and one that doesn't look like it'll
         // succeed anyways; we need a new plan!
         // Transition back to OFFLINE
-        state.update(RegionState.State.OFFLINE);
+        regionStates.updateRegionState(
+          state.getRegion(), RegionState.State.OFFLINE);
         // Force a new plan and reassign.  Will return null if no servers.
         if (getRegionPlan(state, plan.getDestination(), true) == null) {
           this.timeoutMonitor.setAllRegionServersOffline(true);
@@ -1794,7 +1624,6 @@ public class AssignmentManager extends Z
   }
 
   private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
-
     // Remove region from in-memory transition and unassigned node from ZK
     // While trying to enable the table the regions of the table were
     // already enabled.
@@ -1813,14 +1642,8 @@ public class AssignmentManager extends Z
           "Error deleting OFFLINED node in ZK for transition ZK node ("
               + encodedRegionName + ")", e);
     }
-    // no lock concurrent ok -> sequentially consistent
-    this.regionsInTransition.remove(region.getEncodedName());
-
-    synchronized (this.regions) {
-      this.regions.put(region, sn);
-      addToServers(sn, region);
-    }
 
+    regionStates.regionOnline(region, sn);
   }
 
   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
@@ -1864,10 +1687,12 @@ public class AssignmentManager extends Z
     if (hijack &&
         (state.getState().equals(RegionState.State.PENDING_OPEN) || 
             state.getState().equals(RegionState.State.OPENING))) {
-      state.update(RegionState.State.PENDING_OPEN);
+      regionStates.updateRegionState(state.getRegion(),
+        RegionState.State.PENDING_OPEN);
       allowZNodeCreation = false;
     } else {
-      state.update(RegionState.State.OFFLINE);
+      regionStates.updateRegionState(state.getRegion(),
+        RegionState.State.OFFLINE);
       allowZNodeCreation = true;
     }
     int versionOfOfflineNode = -1;
@@ -1901,7 +1726,8 @@ public class AssignmentManager extends Z
         new IllegalStateException());
       return false;
     }
-    state.update(RegionState.State.OFFLINE);
+    regionStates.updateRegionState(
+      state.getRegion(), RegionState.State.OFFLINE);
     try {
       ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
         this.master.getServerName(), cb, ctx);
@@ -2009,10 +1835,10 @@ public class AssignmentManager extends Z
     int waitTime = this.master.getConfiguration().getInt(
         "hbase.bulk.waitbetween.reopen", 0);
     for (HRegionInfo region : regions) {
-      if (isRegionInTransition(region) != null)
+      if (regionStates.isRegionInTransition(region))
         continue;
       unassign(region, false);
-      while (isRegionInTransition(region) != null) {
+      while (regionStates.isRegionInTransition(region)) {
         try {
           Thread.sleep(10);
         } catch (InterruptedException e) {
@@ -2065,24 +1891,21 @@ public class AssignmentManager extends Z
     LOG.debug("Starting unassignment of region " +
       region.getRegionNameAsString() + " (offlining)");
 
-    synchronized (this.regions) {
-      // Check if this region is currently assigned
-      if (!regions.containsKey(region)) {
-        LOG.debug("Attempted to unassign region " +
-          region.getRegionNameAsString() + " but it is not " +
-          "currently assigned anywhere");
-        return;
-      }
+    // Check if this region is currently assigned
+    if (!regionStates.isRegionAssigned(region)) {
+      LOG.debug("Attempted to unassign region " +
+        region.getRegionNameAsString() + " but it is not " +
+        "currently assigned anywhere");
+      return;
     }
     String encodedName = region.getEncodedName();
     // Grab the state of this region and synchronize on it
-    RegionState state;
     int versionOfClosingNode = -1;
     // We need a lock here as we're going to do a put later and we don't want multiple states
     //  creation
     ReentrantLock lock = locker.acquireLock(encodedName);
+    RegionState state = regionStates.getRegionTransitionState(encodedName);
     try {
-      state = regionsInTransition.get(encodedName);
       if (state == null) {
         // Create the znode in CLOSING state
         try {
@@ -2124,13 +1947,12 @@ public class AssignmentManager extends Z
           master.abort("Unexpected ZK exception creating node CLOSING", e);
           return;
         }
-        state = new RegionState(region, RegionState.State.PENDING_CLOSE);
-        regionsInTransition.put(encodedName, state);
+        state = regionStates.updateRegionState(region, RegionState.State.PENDING_CLOSE);
       } else if (force && (state.isPendingClose() || state.isClosing())) {
         LOG.debug("Attempting to unassign region " + region.getRegionNameAsString() + 
           " which is already " + state.getState()  + 
           " but forcing to send a CLOSE RPC again ");
-        state.update(state.getState());
+        state.updateTimestampToNow();
       } else {
         LOG.debug("Attempting to unassign region " +
           region.getRegionNameAsString() + " but it is " +
@@ -2142,27 +1964,9 @@ public class AssignmentManager extends Z
     }
 
     // Send CLOSE RPC
-    ServerName server = null;
-    synchronized (this.regions) {
-      server = regions.get(region);
-    }
+    ServerName server = state.getServerName();
     // ClosedRegionhandler can remove the server from this.regions
     if (server == null) {
-      // Possibility of disable flow removing from RIT.
-      lock = locker.acquireLock(encodedName);
-      try {
-        state = regionsInTransition.get(encodedName);
-        if (state != null) {
-          // remove only if the state is PENDING_CLOSE or CLOSING
-          RegionState.State presentState = state.getState();
-          if (presentState == RegionState.State.PENDING_CLOSE
-              || presentState == RegionState.State.CLOSING) {
-            this.regionsInTransition.remove(encodedName);
-          }
-        }
-      } finally {
-        lock.unlock();
-      }
       // delete the node. if no node exists need not bother.
       deleteClosingOrClosedNode(region);
       return;
@@ -2194,22 +1998,13 @@ public class AssignmentManager extends Z
               + region.getTableNameAsString()
               + " to DISABLED state the region " + region
               + " was offlined but the table was in DISABLING state");
-            this.regionsInTransition.remove(region.getEncodedName());
-
-          // Remove from the regionsMap
-          synchronized (this.regions) {
-            this.regions.remove(region);
-            Set<HRegionInfo> serverRegions = this.servers.get(server);
-            if (!serverRegions.remove(region)) {
-              LOG.warn("No " + region + " on " + server);
-            }
-          }
+          regionStates.regionOffline(region);
           deleteClosingOrClosedNode(region);
         }
       } else if (t instanceof RegionAlreadyInTransitionException) {
         // RS is already processing this region, only need to update the timestamp
         LOG.debug("update " + state + " the timestamp.");
-        state.update(state.getState());
+        state.updateTimestampToNow();
       }
       LOG.info("Server " + server + " returned " + t + " for " +
         region.getRegionNameAsString(), t);
@@ -2284,13 +2079,12 @@ public class AssignmentManager extends Z
    */
   public void waitForAssignment(HRegionInfo regionInfo)
   throws InterruptedException {
-    synchronized(regions) {
-      while(!this.master.isStopped() && !regions.containsKey(regionInfo)) {
-        // We should receive a notification, but it's
-        //  better to have a timeout to recheck the condition here:
-        //  it lowers the impact of a race condition if any
-        regions.wait(100);
-      }
+    while(!this.master.isStopped() &&
+        !regionStates.isRegionAssigned(regionInfo)) {
+      // We should receive a notification, but it's
+      //  better to have a timeout to recheck the condition here:
+      //  it lowers the impact of a race condition if any
+      regionStates.waitForUpdate(100);
     }
   }
 
@@ -2556,12 +2350,12 @@ public class AssignmentManager extends Z
     // state of the Master.
     final long endTime = System.currentTimeMillis() + timeout;
 
-    while (!this.master.isStopped() && !regionsInTransition.isEmpty() &&
-      endTime > System.currentTimeMillis()) {
-      regionsInTransition.waitForUpdate(100);
+    while (!this.master.isStopped() && regionStates.isRegionsInTransition()
+        && endTime > System.currentTimeMillis()) {
+      regionStates.waitForUpdate(100);
     }
 
-    return regionsInTransition.isEmpty();
+    return !regionStates.isRegionsInTransition();
   }
 
   /**
@@ -2580,13 +2374,13 @@ public class AssignmentManager extends Z
       Iterator<HRegionInfo> regionInfoIterator = regions.iterator();
       while (regionInfoIterator.hasNext()) {
         HRegionInfo hri = regionInfoIterator.next();
-        if (!regionsInTransition.containsKey(hri.getEncodedName())) {
+        if (!regionStates.isRegionInTransition(hri)) {
           regionInfoIterator.remove();
         }
       }
 
       if (!regions.isEmpty()) {
-        regionsInTransition.waitForUpdate(100);
+        regionStates.waitForUpdate(100);
       }
     }
 
@@ -2620,6 +2414,7 @@ public class AssignmentManager extends Z
       HRegionInfo regionInfo = region.getFirst();
       ServerName regionLocation = region.getSecond();
       if (regionInfo == null) continue;
+      regionStates.createRegionState(regionInfo);
       String tableName = regionInfo.getTableNameAsString();
       if (regionLocation == null) {
         // regionLocation could be null if createTable didn't finish properly.
@@ -2664,19 +2459,16 @@ public class AssignmentManager extends Z
           byte[] data = ZKUtil.getDataNoWatch(this.watcher, node, stat);
           // If znode does not exist dont consider this region
           if (data == null) {
-			LOG.debug("Region "	+  regionInfo.getRegionNameAsString()
-					  + " split is completed. Hence need not add to regions list");  
+            LOG.debug("Region "	+  regionInfo.getRegionNameAsString()
+               + " split is completed. Hence need not add to regions list");
             continue;
           }
         }
         // Region is being served and on an active server
         // add only if region not in disabled and enabling table
-        if (false == checkIfRegionBelongsToDisabled(regionInfo)
-            && false == checkIfRegionsBelongsToEnabling(regionInfo)) {
-          synchronized (this.regions) {            
-            regions.put(regionInfo, regionLocation);
-            addToServers(regionLocation, regionInfo);
-          }
+        if (!checkIfRegionBelongsToDisabled(regionInfo)
+            && !checkIfRegionsBelongsToEnabling(regionInfo)) {
+          regionStates.regionOnline(regionInfo, regionLocation);
         }
         disablingOrEnabling = addTheTablesInPartialState(this.disablingTables,
             this.enablingTables, regionInfo, tableName);
@@ -2876,32 +2668,6 @@ public class AssignmentManager extends Z
     }
   }
 
-  /*
-   * Presumes caller has taken care of necessary locking modifying servers Map.
-   * @param hsi
-   * @param hri
-   */
-  private void addToServers(final ServerName sn, final HRegionInfo hri) {
-    Set<HRegionInfo> hris = servers.get(sn);
-    if (hris == null) {
-      hris = new ConcurrentSkipListSet<HRegionInfo>();
-      servers.put(sn, hris);
-    }
-    if (!hris.contains(hri)) hris.add(hri);
-  }
-
-  /**
-   * @return A copy of the Map of regions currently in transition.
-   */
-  public NavigableMap<String, RegionState> copyRegionsInTransition() {
-    // no lock concurrent access ok
-    return regionsInTransition.copyMap();
-  }
-
-  NotifiableConcurrentSkipListMap<String, RegionState> getRegionsInTransition() {
-    return regionsInTransition;
-  }
-
   /**
    * Set Regions in transitions metrics.
    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
@@ -2916,9 +2682,9 @@ public class AssignmentManager extends Z
     long oldestRITTime = 0;
     int ritThreshold = this.master.getConfiguration().
       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
-    for (Map.Entry<String, RegionState> e : this.regionsInTransition.copyEntrySet()) {
+    for (RegionState state: regionStates.getRegionsInTransition().values()) {
       totalRITs++;
-      long ritTime = currentTime - e.getValue().getStamp();
+      long ritTime = currentTime - state.getStamp();
       if (ritTime > ritThreshold) { // more than the threshold
         totalRITsOverThreshold++;
       }
@@ -2934,26 +2700,6 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * @return True if regions in transition.
-   */
-  public boolean isRegionsInTransition() {
-    // no lock concurrent access ok: we could imagine that someone is currently going to remove
-    //  it or add a region, but it's sequentially consistent.
-    return !(this.regionsInTransition.isEmpty());
-  }
-
-  /**
-   * @param hri Region to check.
-   * @return Returns null if passed region is not in transition else the current
-   * RegionState
-   */
-  public RegionState isRegionInTransition(final HRegionInfo hri) {
-    // no lock concurrent access ok: we could imagine that someone is currently going to remove
-    //  it or add it, but it's sequentially consistent.
-    return this.regionsInTransition.get(hri.getEncodedName());
-  }
-
-  /**
    * @param region Region whose plan we are to clear.
    */
   void clearRegionPlan(final HRegionInfo region) {
@@ -2968,14 +2714,14 @@ public class AssignmentManager extends Z
    * @throws IOException
    */
   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
-  throws IOException {
-    if (isRegionInTransition(hri) == null) return;
+      throws IOException, InterruptedException {
+    if (!regionStates.isRegionInTransition(hri)) return;
     RegionState rs = null;
     // There is already a timeout monitor on regions in transition so I
     // should not have to have one here too?
-    while(!this.master.isStopped() && (rs = isRegionInTransition(hri)) != null) {
-      Threads.sleep(1000);
+    while(!this.master.isStopped() && regionStates.isRegionInTransition(hri)) {
       LOG.info("Waiting on " + rs + " to clear regions-in-transition");
+      regionStates.waitForUpdate(1000);
     }
     if (this.master.isStopped()) {
       LOG.info("Giving up wait on regions in " +
@@ -2983,35 +2729,6 @@ public class AssignmentManager extends Z
     }
   }
 
-
-  /**
-   * Gets the online regions of the specified table.
-   * This method looks at the in-memory state.  It does not go to <code>.META.</code>.
-   * Only returns <em>online</em> regions.  If a region on this table has been
-   * closed during a disable, etc., it will be included in the returned list.
-   * So, the returned list may not necessarily be ALL regions in this table, its
-   * all the ONLINE regions in the table.
-   * @param tableName
-   * @return Online regions from <code>tableName</code>
-   */
-  public List<HRegionInfo> getRegionsOfTable(byte[] tableName) {
-    List<HRegionInfo> tableRegions = new ArrayList<HRegionInfo>();
-    // boundary needs to have table's name but regionID 0 so that it is sorted 
-    // before all table's regions.
-    HRegionInfo boundary =
-      new HRegionInfo(tableName, null, null, false, 0L);
-    synchronized (this.regions) {
-      for (HRegionInfo regionInfo: this.regions.tailMap(boundary).keySet()) {
-        if(Bytes.equals(regionInfo.getTableName(), tableName)) {
-          tableRegions.add(regionInfo);
-        } else {
-          break;
-        }
-      }
-    }
-    return tableRegions;
-  }
-
   /**
    * Update timers for all regions in transition going against the server in the
    * serversInUpdatingTimer.
@@ -3093,7 +2810,7 @@ public class AssignmentManager extends Z
       long now = System.currentTimeMillis();
       // no lock concurrent access ok: we will be working on a copy, and it's java-valid to do
       //  a copy while another thread is adding/removing items
-      for (RegionState regionState : regionsInTransition.copyValues()) {
+      for (RegionState regionState : regionStates.getRegionsInTransition().values()) {
         if (regionState.getStamp() + timeout <= now ||
           (this.allRegionServersOffline && !noRSAvailable)) {
           //decide on action upon timeout or, if some RSs just came back online, we can start the
@@ -3132,8 +2849,15 @@ public class AssignmentManager extends Z
       case OPEN:
         LOG.error("Region has been OPEN for too long, " +
             "we don't know where region was opened so can't do anything");
-        synchronized (regionState) {
+        // TODO: do we need synchronization here?
+        // could not synchronized on regionState since it can be
+        // an new instance
+        String encodedName = regionState.getRegion().getEncodedName();
+        Lock lock = assignLocker.acquireLock(encodedName);
+        try {
           regionState.updateTimestampToNow();
+        } finally {
+          lock.unlock();
         }
         break;
 
@@ -3242,7 +2966,7 @@ public class AssignmentManager extends Z
       return matchZK;
     }
 
-    ServerName addressFromAM = getRegionServerOfRegion(hri);
+    ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
     boolean matchAM = (addressFromAM != null &&
       addressFromAM.equals(serverName));
     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
@@ -3271,33 +2995,7 @@ public class AssignmentManager extends Z
         }
       }
     }
-    // TODO: Do we want to sync on RIT here?
-    // Remove this server from map of servers to regions, and remove all regions
-    // of this server from online map of regions.
-    Set<HRegionInfo> deadRegions = null;
-    List<RegionState> rits = new ArrayList<RegionState>();
-    synchronized (this.regions) {
-      Set<HRegionInfo> assignedRegions = this.servers.remove(sn);
-      if (assignedRegions == null || assignedRegions.isEmpty()) {
-        // No regions on this server, we are done, return empty list of RITs
-        return rits;
-      }
-      deadRegions = new TreeSet<HRegionInfo>(assignedRegions);
-      for (HRegionInfo region : deadRegions) {
-        this.regions.remove(region);
-      }
-    }
-    // See if any of the regions that were online on this server were in RIT
-    // If they are, normal timeouts will deal with them appropriately so
-    // let's skip a manual re-assignment.
-    // no lock concurrent access ok: we will be working on a copy, and it's java-valid to do
-    //  a copy while another thread is adding/removing items
-    for (RegionState region : this.regionsInTransition.copyValues()) {
-      if (deadRegions.remove(region.getRegion())) {
-        rits.add(region);
-      }
-    }
-    return rits;
+    return regionStates.serverOffline(sn);
   }
 
   /**
@@ -3325,92 +3023,6 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * This is an EXPENSIVE clone.  Cloning though is the safest thing to do.
-   * Can't let out original since it can change and at least the loadbalancer
-   * wants to iterate this exported list.  We need to synchronize on regions
-   * since all access to this.servers is under a lock on this.regions.
-   * 
-   * @return A clone of current assignments by table.
-   */
-  Map<String, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable() {
-    Map<String, Map<ServerName, List<HRegionInfo>>> result = null;
-    synchronized (this.regions) {
-      result = new HashMap<String, Map<ServerName,List<HRegionInfo>>>();
-      if (!this.master.getConfiguration().
-          getBoolean("hbase.master.loadbalance.bytable", true)) {
-        result.put("ensemble", getAssignments());
-      } else {
-        for (Map.Entry<ServerName, Set<HRegionInfo>> e: this.servers.entrySet()) {
-          for (HRegionInfo hri : e.getValue()) {
-            if (hri.isMetaRegion() || hri.isRootRegion()) continue;
-            String tablename = hri.getTableNameAsString();
-            Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
-            if (svrToRegions == null) {
-              svrToRegions = new HashMap<ServerName, List<HRegionInfo>>(this.servers.size());
-              result.put(tablename, svrToRegions);
-            }
-            List<HRegionInfo> regions = null;
-            if (!svrToRegions.containsKey(e.getKey())) {
-              regions = new ArrayList<HRegionInfo>();
-              svrToRegions.put(e.getKey(), regions);
-            } else {
-              regions = svrToRegions.get(e.getKey());
-            }
-            regions.add(hri);
-          }
-        }
-      }
-    }
-    Map<ServerName, ServerLoad> onlineSvrs = this.serverManager.getOnlineServers();
-    // Take care of servers w/o assignments.
-    for (Map<ServerName,List<HRegionInfo>> map : result.values()) {
-      for (Map.Entry<ServerName, ServerLoad> svrEntry: onlineSvrs.entrySet()) {
-        if (!map.containsKey(svrEntry.getKey())) {
-          map.put(svrEntry.getKey(), new ArrayList<HRegionInfo>());
-        }
-      }
-    }
-    return result;
-  }
-  
-  /**
-   * @return A clone of current assignments. Note, this is assignments only.
-   * If a new server has come in and it has no regions, it will not be included
-   * in the returned Map.
-   */
-  Map<ServerName, List<HRegionInfo>> getAssignments() {
-    // This is an EXPENSIVE clone.  Cloning though is the safest thing to do.
-    // Can't let out original since it can change and at least the loadbalancer
-    // wants to iterate this exported list.  We need to synchronize on regions
-    // since all access to this.servers is under a lock on this.regions.
-    Map<ServerName, List<HRegionInfo>> result = null;
-    synchronized (this.regions) {
-      result = new HashMap<ServerName, List<HRegionInfo>>(this.servers.size());
-      for (Map.Entry<ServerName, Set<HRegionInfo>> e: this.servers.entrySet()) {
-        result.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
-      }
-    }
-    return result;
-  }
-
-  /**
-   * @param encodedRegionName Region encoded name.
-   * @return Null or a {@link Pair} instance that holds the full {@link HRegionInfo}
-   * and the hosting servers {@link ServerName}.
-   */
-  Pair<HRegionInfo, ServerName> getAssignment(final byte [] encodedRegionName) {
-    String name = Bytes.toString(encodedRegionName);
-    synchronized(this.regions) {
-      for (Map.Entry<HRegionInfo, ServerName> e: this.regions.entrySet()) {
-        if (e.getKey().getEncodedName().equals(name)) {
-          return new Pair<HRegionInfo, ServerName>(e.getKey(), e.getValue());
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
    * @param plan Plan to execute.
    */
   void balance(final RegionPlan plan) {
@@ -3420,265 +3032,6 @@ public class AssignmentManager extends Z
     unassign(plan.getRegionInfo(), false, plan.getDestination());
   }
 
-  /**
-   * Run through remaining regionservers and unassign all catalog regions.
-   */
-  void unassignCatalogRegions() {
-    synchronized (this.regions) {
-      for (Map.Entry<ServerName, Set<HRegionInfo>> e: this.servers.entrySet()) {
-        Set<HRegionInfo> regions = e.getValue();
-        if (regions == null || regions.isEmpty()) continue;
-        for (HRegionInfo hri: regions) {
-          if (hri.isMetaRegion()) {
-            unassign(hri);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * State of a Region while undergoing transitions.
-   */
-  public static class RegionState implements org.apache.hadoop.io.Writable {
-    private HRegionInfo region;
-
-    public enum State {
-      OFFLINE,        // region is in an offline state
-      PENDING_OPEN,   // sent rpc to server to open but has not begun
-      OPENING,        // server has begun to open but not yet done
-      OPEN,           // server opened region and updated meta
-      PENDING_CLOSE,  // sent rpc to server to close but has not begun
-      CLOSING,        // server has begun to close but not yet done
-      CLOSED,         // server closed region and updated meta
-      SPLITTING,      // server started split of a region
-      SPLIT           // server completed split of a region
-    }
-
-    private State state;
-    // Many threads can update the state at the stamp at the same time
-    private final AtomicLong stamp;
-    private ServerName serverName;
-
-    public RegionState() {
-      this.stamp = new AtomicLong(System.currentTimeMillis());
-    }
-
-    RegionState(HRegionInfo region, State state) {
-      this(region, state, System.currentTimeMillis(), null);
-    }
-
-    RegionState(HRegionInfo region, State state, long stamp, ServerName serverName) {
-      this.region = region;
-      this.state = state;
-      this.stamp = new AtomicLong(stamp);
-      this.serverName = serverName;
-    }
-
-    public void update(State state, long stamp, ServerName serverName) {
-      this.state = state;
-      updateTimestamp(stamp);
-      this.serverName = serverName;
-    }
-
-    public void update(State state) {
-      this.state = state;
-      updateTimestampToNow();
-      this.serverName = null;
-    }
-
-    public void updateTimestamp(long stamp) {
-      this.stamp.set(stamp);
-    }
-
-    public void updateTimestampToNow() {
-      this.stamp.set(System.currentTimeMillis());
-    }
-
-    public State getState() {
-      return state;
-    }
-
-    public long getStamp() {
-      return stamp.get();
-    }
-
-    public HRegionInfo getRegion() {
-      return region;
-    }
-
-    public ServerName getServerName() {
-      return serverName;
-    }
-
-    public boolean isClosing() {
-      return state == State.CLOSING;
-    }
-
-    public boolean isClosed() {
-      return state == State.CLOSED;
-    }
-
-    public boolean isPendingClose() {
-      return state == State.PENDING_CLOSE;
-    }
-
-    public boolean isOpening() {
-      return state == State.OPENING;
-    }
-
-    public boolean isOpened() {
-      return state == State.OPEN;
-    }
-
-    public boolean isPendingOpen() {
-      return state == State.PENDING_OPEN;
-    }
-
-    public boolean isOffline() {
-      return state == State.OFFLINE;
-    }
-
-    public boolean isSplitting() {
-      return state == State.SPLITTING;
-    }
- 
-    public boolean isSplit() {
-      return state == State.SPLIT;
-    }
-
-    @Override
-    public String toString() {
-      return region.getRegionNameAsString()
-        + " state=" + state
-        + ", ts=" + stamp
-        + ", server=" + serverName;
-    }
-
-    /**
-     * A slower (but more easy-to-read) stringification 
-     */
-    public String toDescriptiveString() {
-      long lstamp = stamp.get();
-      long relTime = System.currentTimeMillis() - lstamp;
-      
-      return region.getRegionNameAsString()
-        + " state=" + state
-        + ", ts=" + new Date(lstamp) + " (" + (relTime/1000) + "s ago)"
-        + ", server=" + serverName;
-    }
-
-    /**
-     * Convert a RegionState to an HBaseProtos.RegionState
-     *
-     * @return the converted HBaseProtos.RegionState
-     */
-    public ClusterStatusProtos.RegionState convert() {
-      ClusterStatusProtos.RegionState.Builder regionState = ClusterStatusProtos.RegionState.newBuilder();
-      ClusterStatusProtos.RegionState.State rs;
-      switch (regionState.getState()) {
-      case OFFLINE:
-        rs = ClusterStatusProtos.RegionState.State.OFFLINE;
-        break;
-      case PENDING_OPEN:
-        rs = ClusterStatusProtos.RegionState.State.PENDING_OPEN;
-        break;
-      case OPENING:
-        rs = ClusterStatusProtos.RegionState.State.OPENING;
-        break;
-      case OPEN:
-        rs = ClusterStatusProtos.RegionState.State.OPEN;
-        break;
-      case PENDING_CLOSE:
-        rs = ClusterStatusProtos.RegionState.State.PENDING_CLOSE;
-    	break;
-      case CLOSING:
-        rs = ClusterStatusProtos.RegionState.State.CLOSING;
-        break;
-      case CLOSED:
-        rs = ClusterStatusProtos.RegionState.State.CLOSED;
-        break;
-      case SPLITTING:
-        rs = ClusterStatusProtos.RegionState.State.SPLITTING;
-        break;
-      case SPLIT:
-        rs = ClusterStatusProtos.RegionState.State.SPLIT;
-        break;
-      default:
-        throw new IllegalStateException("");
-      }
-      regionState.setRegionInfo(HRegionInfo.convert(region));
-      regionState.setState(rs);
-      regionState.setStamp(getStamp());
-      return regionState.build();
-    }
-
-    /**
-     * Convert a protobuf HBaseProtos.RegionState to a RegionState
-     *
-     * @return the RegionState
-     */
-    public static RegionState convert(ClusterStatusProtos.RegionState proto) {
-      RegionState.State state;
-      switch (proto.getState()) {
-      case OFFLINE:
-        state = State.OFFLINE;
-        break;
-      case PENDING_OPEN:
-        state = State.PENDING_OPEN;
-        break;
-      case OPENING:
-        state = State.OPENING;
-        break;
-      case OPEN:
-        state = State.OPEN;
-        break;
-      case PENDING_CLOSE:
-        state = State.PENDING_CLOSE;
-    	break;
-      case CLOSING:
-        state = State.CLOSING;
-        break;
-      case CLOSED:
-        state = State.CLOSED;
-        break;
-      case SPLITTING:
-        state = State.SPLITTING;
-        break;
-      case SPLIT:
-        state = State.SPLIT;
-        break;
-      default:
-        throw new IllegalStateException("");
-      }
-
-      return new RegionState(HRegionInfo.convert(proto.getRegionInfo()),state,proto.getStamp(),null);
-    }
-
-    /**
-     * @deprecated Writables are going away
-     */
-    @Deprecated
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      region = new HRegionInfo();
-      region.readFields(in);
-      state = State.valueOf(in.readUTF());
-      stamp.set(in.readLong());
-    }
-
-    /**
-     * @deprecated Writables are going away
-     */
-    @Deprecated
-    @Override
-    public void write(DataOutput out) throws IOException {
-      region.write(out);
-      out.writeUTF(state.name());
-      out.writeLong(stamp.get());
-    }
-  }
-
   public void stop() {
     this.timeoutMonitor.interrupt();
     this.timerUpdater.interrupt();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java?rev=1366438&r1=1366437&r2=1366438&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java Fri Jul 27 16:34:32 2012
@@ -62,8 +62,7 @@ public class BulkReOpen extends BulkAssi
       // add plans for the regions that need to be reopened
       Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>();
       for (HRegionInfo hri : hris) {
-        RegionPlan reOpenPlan = new RegionPlan(hri, null,
-            assignmentManager.getRegionServerOfRegion(hri));
+        RegionPlan reOpenPlan = assignmentManager.getRegionReopenPlan(hri);
         plans.put(hri.getEncodedName(), reOpenPlan);
       }
       assignmentManager.addPlans(plans);