You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/08/05 21:16:13 UTC

svn commit: r1369645 [2/3] - in /hbase/branches/0.89-fb: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/executor/ src/main/java/org/apache/hadoop/hbase/master/ src/main/java...

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Sun Aug  5 19:16:11 2012
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.util.Envi
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.LegacyRootZNodeUpdater;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
 import org.apache.hadoop.io.Text;
 
@@ -68,8 +69,8 @@ import org.apache.hadoop.io.Text;
 public class RegionManager {
   protected static final Log LOG = LogFactory.getLog(RegionManager.class);
 
-  private AtomicReference<HServerAddress> rootRegionLocation =
-    new AtomicReference<HServerAddress>(null);
+  private AtomicReference<HServerInfo> rootRegionLocation =
+    new AtomicReference<HServerInfo>(null);
 
   private final RootScanner rootScannerThread;
   final MetaScanner metaScannerThread;
@@ -81,6 +82,9 @@ public class RegionManager {
   private final NavigableMap<byte [], MetaRegion> onlineMetaRegions =
     new ConcurrentSkipListMap<byte [], MetaRegion>(Bytes.BYTES_COMPARATOR);
 
+  private final NavigableMap<byte [], MetaRegion> metaRegionLocationsBeforeScan =
+      new TreeMap<byte [], MetaRegion>(Bytes.BYTES_COMPARATOR);
+
   private static final byte[] OVERLOADED = Bytes.toBytes("Overloaded");
 
   private static final byte [] META_REGION_PREFIX = Bytes.toBytes(".META.,");
@@ -90,9 +94,9 @@ public class RegionManager {
   /**
    * Map key -> tableName, value -> ThrottledRegionReopener
    * An entry is created in the map before an alter operation is performed on the
-   * table. It is cleared when all the regions have reopened.
+   * table. It is cleared when all the regions have reopened.   
    */
-  private final Map<String, ThrottledRegionReopener> tablesReopeningRegions =
+  private final Map<String, ThrottledRegionReopener> tablesReopeningRegions = 
       new ConcurrentHashMap<String, ThrottledRegionReopener>();
   /**
    * Map of region name to RegionState for regions that are in transition such as
@@ -109,7 +113,7 @@ public class RegionManager {
    */
    final SortedMap<String, RegionState> regionsInTransition =
     Collections.synchronizedSortedMap(new TreeMap<String, RegionState>());
-
+   
    // regions in transition are also recorded in ZK using the zk wrapper
    final ZooKeeperWrapper zkWrapper;
 
@@ -162,13 +166,17 @@ public class RegionManager {
   private final int zooKeeperNumRetries;
   private final int zooKeeperPause;
 
-  /**
-   * Set of region servers which send heart beat in the first period of time
+  /** 
+   * Set of region servers which send heart beat in the first period of time 
    * during the master boots. Hold the best locality regions for these
    * region servers.
    */
   private Set<String> quickStartRegionServerSet = new HashSet<String>();
 
+  private boolean stoppedScanners = false;
+
+  private LegacyRootZNodeUpdater legacyRootZNodeUpdater;
+  
   RegionManager(HMaster master) throws IOException {
     Configuration conf = master.getConfiguration();
 
@@ -195,7 +203,8 @@ public class RegionManager {
     zooKeeperPause = conf.getInt(HConstants.ZOOKEEPER_PAUSE,
         HConstants.DEFAULT_ZOOKEEPER_PAUSE);
 
-    reassignRootRegion();
+    legacyRootZNodeUpdater = new LegacyRootZNodeUpdater(zkWrapper, master,
+        rootRegionLocation);
   }
 
   public LoadBalancer getLoadBalancer() {
@@ -208,6 +217,7 @@ public class RegionManager {
       "RegionManager.rootScanner");
     Threads.setDaemonThreadRunning(metaScannerThread,
       "RegionManager.metaScanner");
+    Threads.setDaemonThreadRunning(legacyRootZNodeUpdater, null);
   }
 
   public AssignmentManager getAssignmentManager() {
@@ -244,11 +254,11 @@ public class RegionManager {
     }
   }
 
-  /*
+  /**
    * Assigns regions to region servers attempting to balance the load across all
    * region servers. Note that no synchronization is necessary as the caller
    * (ServerManager.processMsgs) already owns the monitor for the RegionManager.
-   *
+   * 
    * @param info
    * @param mostLoadedRegions
    * @param returnMsgs
@@ -260,7 +270,7 @@ public class RegionManager {
       // be assigned when the region server reports next
       return;
     }
-
+    
     if (this.master.shouldAssignRegionsWithFavoredNodes()) {
       // assign regions with favored nodes
       assignRegionsWithFavoredNodes(info, mostLoadedRegions, returnMsgs);
@@ -269,7 +279,7 @@ public class RegionManager {
       assignRegionsWithoutFavoredNodes(info, mostLoadedRegions, returnMsgs);
     }
   }
-
+  
   private void assignRegionsWithFavoredNodes(HServerInfo regionServer,
       HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) {
     // get the regions that are waiting for assignment for that region server
@@ -285,26 +295,38 @@ public class RegionManager {
     }
   }
 
+  /**
+   * @return true if there is a single regionserver online, or if there is any other reason to
+   *         remove restrictions on assigning .META./-ROOT- to the same regionserver (e.g. if there
+   *         are blacklisted regionservers during testing).
+   */
+  private boolean isSingleRegionServer() {
+    // If there are blacklisted servers (unit tests only), treat the situation as if there is
+    // just a single host, otherwise we might keep trying to assign regions to blacklisted
+    // regionservers.
+    return master.numServers() == 1 || master.getServerManager().hasBlacklistedServersInTest();
+  }
+
   private void assignRegionsWithoutFavoredNodes(HServerInfo info,
       HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) {
     // the region may assigned to this region server
     Set<RegionState> regionsToAssign = null;
 
     HServerLoad thisServersLoad = info.getLoad();
-    boolean isSingleServer = this.master.numServers() == 1;
+    boolean isSingleServer = isSingleRegionServer();
     boolean holdRegionForBestRegionServer = false;
     boolean assignmentByLocality = false;
-
-    // only check assignmentByLocality when the
+    
+    // only check assignmentByLocality when the 
     // PreferredRegionToRegionServerMapping is not null;
     if (this.master.getPreferredRegionToRegionServerMapping() != null) {
       long masterRunningTime = System.currentTimeMillis()
-              - this.master.getMasterStartupTime();
-      holdRegionForBestRegionServer =
+              - this.master.getMasterStartupTime();      
+      holdRegionForBestRegionServer = 
         masterRunningTime < this.master.getHoldRegionForBestLocalityPeriod();
-      assignmentByLocality =
+      assignmentByLocality = 
         masterRunningTime < this.master.getApplyPreferredAssignmentPeriod();
-
+      
       // once it has passed the ApplyPreferredAssignmentPeriod, clear up
       // the quickStartRegionServerSet and PreferredRegionToRegionServerMapping
       // and it won't check the assignmentByLocality anymore.
@@ -313,7 +335,7 @@ public class RegionManager {
         this.master.clearPreferredRegionToRegionServerMapping();
       }
     }
-
+    
     if (assignmentByLocality) {
       // have to add . at the end of host name
       String hostName = info.getHostname();
@@ -331,7 +353,7 @@ public class RegionManager {
         isSingleServer, preferredAssignment, assignmentByLocality,
         holdRegionForBestRegionServer,
         quickStartRegionServerSet);
-
+    
     if (regionsToAssign.isEmpty()) {
       // There are no regions waiting to be assigned.
       if (!assignmentByLocality
@@ -340,7 +362,7 @@ public class RegionManager {
         this.loadBalancer.loadBalancing(info, mostLoadedRegions, returnMsgs);
       }
     } else {
-      // if there's only one server or assign the region by locality,
+      // if there's only one server or assign the region by locality, 
       // just give the regions to this server
       if (isSingleServer || assignmentByLocality
           || preferredAssignment.booleanValue()) {
@@ -356,11 +378,11 @@ public class RegionManager {
 
   /*
    * Make region assignments taking into account multiple servers' loads.
-   *
+   * 
    * Note that no synchronization is needed while we iterate over
    * regionsInTransition because this method is only called by assignRegions
    * whose caller owns the monitor for RegionManager
-   *
+   * 
    * TODO: This code is unintelligible. REWRITE. Add TESTS! St.Ack 09/30/2009
    * @param thisServersLoad
    * @param regionsToAssign
@@ -376,7 +398,7 @@ public class RegionManager {
         isMetaAssign = true;
     }
     int nRegionsToAssign = regionsToAssign.size();
-    int otherServersRegionsCount =
+    int otherServersRegionsCount = 
       regionsToGiveOtherServers(nRegionsToAssign, thisServersLoad);
     nRegionsToAssign -= otherServersRegionsCount;
     if (nRegionsToAssign > 0 || isMetaAssign) {
@@ -390,8 +412,8 @@ public class RegionManager {
       int nservers = computeNextHeaviestLoad(thisServersLoad, heavierLoad);
       int nregions = 0;
       // Advance past any less-loaded servers
-      for (HServerLoad load = new HServerLoad(thisServersLoad);
-      load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
+      for (HServerLoad load = new HServerLoad(thisServersLoad); 
+      load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign; 
       load.setNumberOfRegions(load.getNumberOfRegions() + 1), nregions++) {
         // continue;
       }
@@ -444,7 +466,7 @@ public class RegionManager {
    * Note that no synchronization is needed on regionsInTransition while
    * iterating on it because the only caller is assignRegions whose caller owns
    * the monitor for RegionManager
-   *
+   * 
    * @param regionsToAssign
    * @param serverName
    * @param returnMsgs
@@ -501,6 +523,11 @@ public class RegionManager {
    */
   private int regionsToGiveOtherServers(final int numUnassignedRegions,
       final HServerLoad thisServersLoad) {
+    if (master.getServerManager().hasBlacklistedServersInTest()) {
+      // For unit testing. Otherwise, we will always think we should give regions to blacklisted
+      // servers, but will not actually assign any.
+      return 0;
+    }
     SortedMap<HServerLoad, Collection<String>> lightServers = 
         master.getServerManager().getServersToLoad().getLightServers(thisServersLoad);
     // Examine the list of servers that are more lightly loaded than this one.
@@ -533,13 +560,13 @@ public class RegionManager {
   private Set<RegionState> regionsAwaitingAssignment(HServerInfo server) {
     // set of regions we want to assign to this server
     Set<RegionState> regionsToAssign = new HashSet<RegionState>();
-    boolean isSingleServer = this.master.numServers() == 1;
+    boolean isSingleServer = isSingleRegionServer();
     HServerAddress addr = server.getServerAddress();
     boolean isMetaServer = isMetaServer(addr);
     RegionState rootState = null;
     boolean isPreferredAssignment = false;
     boolean reassigningMetas =
-      (numberOfMetaRegions.get() != onlineMetaRegions.size());
+      (numberOfMetaRegions.get() > onlineMetaRegions.size());
     boolean isMetaOrRoot = isMetaServer || isRootServer(addr);
 
     // Assign ROOT region if ROOT region is offline.
@@ -567,7 +594,7 @@ public class RegionManager {
     // for the current region server
     Set<HRegionInfo> preservedRegionsForCurrentRS =
       assignmentManager.getTransientAssignments(addr);
-
+    
     synchronized (this.regionsInTransition) {
       int nonPreferredAssignment = 0;
       for (RegionState regionState : regionsInTransition.values()) {
@@ -598,8 +625,7 @@ public class RegionManager {
         // Can't assign user regions until all meta regions have been assigned,
         // the initial meta scan is done and there are enough online
         // region servers
-        if (reassigningMetas || !this.isInitialMetaScanComplete() ||
-            !master.getServerManager().hasEnoughRegionServers()) {
+        if (reassigningMetas || !master.getServerManager().hasEnoughRegionServers()) {
           LOG.debug("Cannot assign region " + regionInfo.getRegionNameAsString()
               + " because not all the META are online, "
               + "or the initial META scan is not completed, or there are no "
@@ -611,12 +637,12 @@ public class RegionManager {
         if (!regionState.isUnassigned()) {
           continue;
         }
-
-        if (preservedRegionsForCurrentRS == null ||
+        
+        if (preservedRegionsForCurrentRS == null || 
             !preservedRegionsForCurrentRS.contains(regionInfo)) {
-          if (assignmentManager.hasTransientAssignment(regionInfo) ||
+          if (assignmentManager.hasTransientAssignment(regionInfo) || 
               nonPreferredAssignment > this.maxAssignInOneGo) {
-            // Hold the region for its favored nodes and limit the number of
+            // Hold the region for its favored nodes and limit the number of 
             // non preferred assignments for each region server.
             continue;
           }
@@ -626,22 +652,22 @@ public class RegionManager {
         } else {
           isPreferredAssignment = true;
         }
-
+        
         // Assign the current region to the region server.
         regionsToAssign.add(regionState);
         LOG.debug("Going to assign user region " +
             regionInfo.getRegionNameAsString() +
             " to server " + server.getHostnamePort() + " in a " +
             (isPreferredAssignment ? "": "non-") + "preferred way");
-
+      
       }
     }
     return regionsToAssign;
   }
-
+  
   /**
    * Get the set of regions that should be assignable in this pass.
-   *
+   * 
    * Note that no synchronization on regionsInTransition is needed because the
    * only caller (assignRegions, whose caller is ServerManager.processMsgs) owns
    * the monitor for RegionManager
@@ -650,7 +676,6 @@ public class RegionManager {
       boolean isSingleServer, MutableBoolean isPreferredAssignment,
       boolean assignmentByLocality, boolean holdRegionForBestRegionserver,
       Set<String> quickStartRegionServerSet) {
-
     // set of regions we want to assign to this server
     Set<RegionState> regionsToAssign = new HashSet<RegionState>();
 
@@ -735,13 +760,13 @@ public class RegionManager {
               + " is in transition but not enough servers yet");
           continue;
         }
-
+        
         // if we are holding it, don't give it away to any other server
         if (assignmentManager.hasTransientAssignment(s.getRegionInfo())) {
           continue;
         }
         if (assignmentByLocality && !i.isRootRegion() && !i.isMetaRegion()) {
-          Text preferredHostNameTxt =
+          Text preferredHostNameTxt = 
             (Text)this.master.getPreferredRegionToRegionServerMapping().get(new Text(name));
 
           if (hostName == null) {
@@ -750,18 +775,18 @@ public class RegionManager {
           if (preferredHostNameTxt != null) {
             String preferredHost = preferredHostNameTxt.toString();
             if (hostName.startsWith(preferredHost)) {
-              LOG.debug("Doing Preferred Region Assignment for : " + name +
+              LOG.debug("Doing Preferred Region Assignment for : " + name + 
                   " to the " + hostName);
               // add the region to its preferred region server.
               regionsToAssign.add(s);
               continue;
-            } else if (holdRegionForBestRegionserver ||
+            } else if (holdRegionForBestRegionserver || 
                 quickStartRegionServerSet.contains(preferredHost)) {
               continue;
             }
           }
         }
-        // Only assign a configured number unassigned region at one time in the
+        // Only assign a configured number unassigned region at one time in the 
         // non preferred assignment case.
         if ((nonPreferredAssignmentCount++) < this.maxAssignInOneGo) {
           regionsToAssign.add(s);
@@ -909,12 +934,15 @@ public class RegionManager {
   }
 
   /**
-   * Stop the root and meta scanners so that the region servers serving meta
-   * regions can shut down.
+   * Stop the root and meta scanners so that the region servers serving meta regions can shut down.
+   * Not thread-safe, but if called twice from the same thread, scanners will only be stopped once.
    */
   public void stopScanners() {
-    this.rootScannerThread.interruptAndStop();
-    this.metaScannerThread.interruptAndStop();
+    if (!stoppedScanners) {
+      this.rootScannerThread.interruptAndStop();
+      this.metaScannerThread.interruptAndStop();
+      stoppedScanners = true;
+    }
   }
 
   /**
@@ -944,7 +972,7 @@ public class RegionManager {
   public boolean areAllMetaRegionsOnline() {
     synchronized (onlineMetaRegions) {
       return (rootRegionLocation.get() != null &&
-          numberOfMetaRegions.get() == onlineMetaRegions.size());
+          numberOfMetaRegions.get() <= onlineMetaRegions.size());
     }
   }
 
@@ -956,17 +984,21 @@ public class RegionManager {
    */
   public MetaRegion getFirstMetaRegionForRegion(HRegionInfo newRegion) {
     synchronized (onlineMetaRegions) {
-      if (onlineMetaRegions.size() == 0) {
-        return null;
-      } else if (onlineMetaRegions.size() == 1) {
-        return onlineMetaRegions.get(onlineMetaRegions.firstKey());
-      } else {
-        if (onlineMetaRegions.containsKey(newRegion.getRegionName())) {
-          return onlineMetaRegions.get(newRegion.getRegionName());
-        }
-        return onlineMetaRegions.get(onlineMetaRegions.headMap(
-            newRegion.getRegionName()).lastKey());
+      return getMetaRegionPointingTo(onlineMetaRegions, newRegion);
+    }
+  }
+
+  static MetaRegion getMetaRegionPointingTo(NavigableMap<byte[], MetaRegion> metaRegions,
+      HRegionInfo newRegion) {
+    if (metaRegions.isEmpty()) {
+      return null;
+    } else if (metaRegions.size() == 1) {
+      return metaRegions.get(metaRegions.firstKey());
+    } else {
+      if (metaRegions.containsKey(newRegion.getRegionName())) {
+        return metaRegions.get(newRegion.getRegionName());
       }
+      return metaRegions.get(metaRegions.headMap(newRegion.getRegionName()).lastKey());
     }
   }
 
@@ -985,7 +1017,7 @@ public class RegionManager {
         throw new NotAllMetaRegionsOnlineException(
             Bytes.toString(HConstants.ROOT_TABLE_NAME));
       }
-      metaRegions.add(new MetaRegion(rootRegionLocation.get(),
+      metaRegions.add(new MetaRegion(rootRegionLocation.get().getServerAddress(),
           HRegionInfo.ROOT_REGIONINFO));
     } else {
       if (!areAllMetaRegionsOnline()) {
@@ -1041,7 +1073,7 @@ public class RegionManager {
   throws IOException {
     createRegion(newRegion, server, metaRegionName, null);
   }
-
+ 
   /**
    * Create a new HRegion, put a row for it into META (or ROOT), and mark the
    * new region unassigned so that it will get assigned to a region server.
@@ -1068,7 +1100,7 @@ public class RegionManager {
     // 3.1 Put the region info into meta table.
     put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
         Writables.getBytes(info));
-
+    
     // 3.2 Put the favorite nodes into meta.
     if (favoriteNodeList != null) {
       String favoredNodes = RegionPlacement.getFavoredNodes(favoriteNodeList);
@@ -1083,10 +1115,10 @@ public class RegionManager {
     // 4. Close the new region to flush it to disk.  Close its log file too.
     region.close();
     region.getLog().closeAndDelete();
-
+    
     // After all regions are created, the caller will schedule
     // the meta scanner to run immediately and assign out the
-    // regions.
+    // regions.    
   }
 
   /**
@@ -1309,11 +1341,15 @@ public class RegionManager {
   }
 
   /**
-   * Set a region to unassigned
+   * Set a region to unassigned. Always writes the region's unassigned znode.
    * @param info Region to set unassigned
    * @param force if true mark region unassigned whatever its current state
    */
-  public void setUnassigned(HRegionInfo info, boolean force) {
+  void setUnassigned(HRegionInfo info, boolean force) {
+    setUnassignedGeneral(true, info, force);
+  }
+
+  void setUnassignedGeneral(boolean writeToZK, HRegionInfo info, boolean force) {
     RegionState s = null;
     long t0, t1, t2, t3;
     t0 = System.currentTimeMillis();
@@ -1329,9 +1365,11 @@ public class RegionManager {
           //       should never happen
           LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e);
         }
-        zkWrapper.createOrUpdateUnassignedRegion(info.getEncodedName(), data);
-        LOG.debug("Created/updated UNASSIGNED zNode " + info.getRegionNameAsString() +
-                  " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
+        if (writeToZK) {
+          zkWrapper.createOrUpdateUnassignedRegion(info.getEncodedName(), data);
+          LOG.debug("Created/updated UNASSIGNED zNode " + info.getRegionNameAsString() + 
+                    " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
+        }
         s = new RegionState(info, RegionState.State.UNASSIGNED);
         regionsInTransition.put(info.getRegionNameAsString(), s);
       }
@@ -1444,7 +1482,7 @@ public class RegionManager {
         // marked offline so that it opens on the preferred server.
         this.assignmentManager.executeAssignmentPlan(regionInfo);
       }
-    }
+    }    
   }
 
   /**
@@ -1492,35 +1530,17 @@ public class RegionManager {
       }
     }
   }
-  /**
-   * Add a meta region to the scan queue
-   * @param m MetaRegion that needs to get scanned
-   */
-  public void addMetaRegionToScan(MetaRegion m) {
-    metaScannerThread.addMetaRegionToScan(m);
-  }
-
-  /**
-   * Check if the initial root scan has been completed.
-   * @return true if scan completed, false otherwise
-   */
-  public boolean isInitialRootScanComplete() {
-    return rootScannerThread.isInitialScanComplete();
-  }
-
-  /**
-   * Check if the initial meta scan has been completed.
-   * @return true if meta completed, false otherwise
-   */
-  public boolean isInitialMetaScanComplete() {
-    return metaScannerThread.isInitialScanComplete();
-  }
 
   /**
    * Get the root region location.
    * @return HServerAddress describing root region server.
    */
   public HServerAddress getRootRegionLocation() {
+    return HServerInfo.getAddress(rootRegionLocation.get());
+  }
+
+  /** Returns root region location as a server info object (with a start code) */
+  public HServerInfo getRootServerInfo() {
     return rootRegionLocation.get();
   }
 
@@ -1538,7 +1558,8 @@ public class RegionManager {
           // Cycle rather than hold here in case master is closed meantime.
           rootRegionLocation.wait(this.master.getThreadWakeFrequency());
         } catch (InterruptedException e) {
-          // continue
+          LOG.error("Interrupted when waiting for root region location");
+          continue;
         }
       }
     }
@@ -1575,9 +1596,9 @@ public class RegionManager {
     }
   }
 
-  private void writeRootRegionLocationToZooKeeper(HServerAddress address) {
+  private void writeRootRegionLocationToZooKeeper(HServerInfo hsi) {
     for (int attempt = 0; attempt < zooKeeperNumRetries; ++attempt) {
-      if (master.getZooKeeperWrapper().writeRootRegionLocation(address)) {
+      if (master.getZooKeeperWrapper().writeRootRegionLocation(hsi)) {
         return;
       }
 
@@ -1585,21 +1606,21 @@ public class RegionManager {
     }
 
     LOG.error("Failed to write root region location to ZooKeeper after " +
-              zooKeeperNumRetries + " retries, shutting down");
+              zooKeeperNumRetries + " retries, shutting down the cluster");
 
-    this.master.shutdown();
+    this.master.requestClusterShutdown();
   }
 
   /**
    * Set the root region location.
    * @param address Address of the region server where the root lives
    */
-  public void setRootRegionLocation(HServerAddress address) {
-    writeRootRegionLocationToZooKeeper(address);
+  public void setRootRegionLocation(HServerInfo hsi) {
+    writeRootRegionLocationToZooKeeper(hsi);
     synchronized (rootRegionLocation) {
       // the root region has been assigned, remove it from transition in ZK
       zkWrapper.deleteUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName());
-      rootRegionLocation.set(new HServerAddress(address));
+      rootRegionLocation.set(new HServerInfo(hsi));
       rootRegionLocation.notifyAll();
     }
   }
@@ -1996,7 +2017,7 @@ public class RegionManager {
 
           // Only move the region if the other server is under-loaded and the
           // current server is overloaded.
-          if (serverLoad - regionsUnassigned > avgLoadPlusSlop &&
+          if (serverLoad - regionsUnassigned > avgLoadPlusSlop && 
               otherLoad.getNumberOfRegions() < avgLoadMinusSlop) {
             if (unassignRegion(info, region, returnMsgs)) {
               // Need to override transient assignment that may have been added
@@ -2032,7 +2053,7 @@ public class RegionManager {
       HServerInfo other =
           master.getServerManager().getHServerInfo(server);
       if (other == null ||
-          master.getServerManager().isDead(other.getServerName())) {
+          master.getServerManager().isDeadProcessingPending(other.getServerName())) {
         return null;
       }
       return master.getServerManager().getServersToLoad()
@@ -2085,7 +2106,7 @@ public class RegionManager {
     DefaultLoadBalancer() {
       super();
     }
-
+    
     /**
      * Balance server load by unassigning some regions.
      *
@@ -2167,10 +2188,10 @@ public class RegionManager {
       int lowSrvCount = serverLoadMap.numServersByLoad(lowestServerLoad);
       int numSrvRegs = srvLoad.getNumberOfRegions();
       int numMoveToLowLoaded = (avgLoadMinusSlop - lowestLoad) * lowSrvCount;
-
-      int numRegionsToClose = numSrvRegs - (int)Math.floor(avgLoad);
+      
+      int numRegionsToClose = numSrvRegs - (int)Math.floor(avgLoad);      
       numRegionsToClose = Math.min(numRegionsToClose, numMoveToLowLoaded);
-
+         
       if (LOG.isDebugEnabled()) {
         LOG.debug("Server(s) are carrying only " + lowestLoad + " regions. " +
           "Server " + srvName + " is most loaded (" + numSrvRegs +
@@ -2393,7 +2414,7 @@ public class RegionManager {
   /**
    * Method used to do housekeeping for holding regions for a RegionServer going
    * down for a restart
-   *
+   * 
    * @param regionServer
    *          the RegionServer going down for a restart
    * @param regions
@@ -2401,7 +2422,7 @@ public class RegionManager {
    */
   public void addRegionServerForRestart(final HServerInfo regionServer,
       Set<HRegionInfo> regions) {
-    LOG.debug("Holding regions of restartng server: " +
+    LOG.debug("Holding regions of restartng server: " +  
         regionServer.getServerName());
     HServerAddress addr = regionServer.getServerAddress();
     for (HRegionInfo region : regions) {
@@ -2410,7 +2431,7 @@ public class RegionManager {
   }
 
   /**
-   * Create a reopener for this table, if one exists, return the existing throttler.
+   * Create a reopener for this table, if one exists, return the existing throttler. 
    * @param tableName
    * @return
    */
@@ -2418,10 +2439,10 @@ public class RegionManager {
     if (!tablesReopeningRegions.containsKey(tableName)) {
       ThrottledRegionReopener throttledReopener = new ThrottledRegionReopener(tableName, this.master, this);
       tablesReopeningRegions.put(tableName, throttledReopener);
-    }
+    }  
     return tablesReopeningRegions.get(tableName);
   }
-
+  
   /**
    * Return the throttler for this table
    * @param tableName
@@ -2430,7 +2451,7 @@ public class RegionManager {
   public ThrottledRegionReopener getThrottledReopener(String tableName) {
     return tablesReopeningRegions.get(tableName);
   }
-
+  
   /**
    * Delete the throttler when the operation is complete
    * @param tableName
@@ -2444,10 +2465,10 @@ public class RegionManager {
       LOG.debug("Tried to delete a throttled reopener, but it does not exist.");
     }
   }
-
+  
   /**
-   * When the region is opened, check if it is reopening and notify the throttler
-   * for further processing.
+   * When the region is opened, check if it is reopening and notify the throttler 
+   * for further processing.  
    * @param region
    */
   public void notifyRegionReopened(HRegionInfo region) {
@@ -2456,4 +2477,78 @@ public class RegionManager {
       tablesReopeningRegions.get(tableName).notifyRegionOpened(region);
     }
   }
+
+  MetaScanner getMetaScanner() {
+    return metaScannerThread;
+  }
+
+  /**
+   * Composes a map of .META. region locations for both online .META. regions and regions that
+   * we know are assigned to regionservers, but have not been scanned yet. This is used on master
+   * startup to write pending region location changes from the ZK unassigned directory to .META.
+   */
+  NavigableMap<byte[], MetaRegion> getAllMetaRegionLocations() {
+    NavigableMap<byte[], MetaRegion> m =
+        new TreeMap<byte[], MetaRegion>(Bytes.BYTES_COMPARATOR);
+    m.putAll(metaRegionLocationsBeforeScan);
+    m.putAll(onlineMetaRegions);
+    return m;
+  }
+
+  /** 
+   * Modifies region state in regionsInTransition based on the initial scan of the ZK unassigned
+   * directory.
+   * @param event event type written by the regionserver to the znode
+   * @param regionInfo region info
+   * @param serverName regionserver name
+   */
+  void setRegionStateOnRecovery(HBaseEventType event, HRegionInfo regionInfo, String serverName) {
+    String regionName = regionInfo.getRegionNameAsString();
+    String stateStr = null;
+    if (event == HBaseEventType.RS2ZK_REGION_CLOSING ||
+        event == HBaseEventType.RS2ZK_REGION_CLOSED) {
+      synchronized (regionsInTransition) {
+        RegionState s = regionsInTransition.get(regionName);
+        if (s == null) {
+          s = new RegionState(regionInfo, RegionState.State.PENDING_CLOSE);
+          regionsInTransition.put(regionName, s);
+        } else {
+          s.setClosing(serverName, s.isOfflined());
+          s.setPendingClose();
+        }
+        stateStr = s.toString();
+      }
+    }
+
+    if (event == HBaseEventType.RS2ZK_REGION_OPENED ||
+        event == HBaseEventType.RS2ZK_REGION_OPENING) {
+      synchronized (regionsInTransition) {
+        RegionState s = regionsInTransition.get(regionName);
+        if (s == null) {
+          s = new RegionState(regionInfo, RegionState.State.PENDING_OPEN);
+          regionsInTransition.put(regionName, s);
+        } else {
+          s.setUnassigned();
+          s.setPendingOpen(serverName);
+        }
+        stateStr = s.toString();
+      }
+    }
+
+    if (stateStr != null) {
+      LOG.info("Set state in regionsInTransition: " + stateStr);
+    }
+  }
+
+  /** Recovers root region location from ZK. Should only be called on master startup. */
+  void recoverRootRegionLocationFromZK() {
+    HServerInfo rootLocationInZK = zkWrapper.readRootRegionServerInfo();
+    if (rootLocationInZK != null) {
+      synchronized (rootRegionLocation) {
+        rootRegionLocation.set(rootLocationInZK);
+        rootRegionLocation.notifyAll();
+      }
+    }
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperation.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperation.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperation.java Sun Aug  5 19:16:11 2012
@@ -81,23 +81,16 @@ abstract class RegionServerOperation imp
         - o.getDelay(TimeUnit.MILLISECONDS)).intValue();
   }
 
-  private long whenToExpire() {
-    return System.currentTimeMillis() + this.delay;
-  }
-
   protected boolean rootAvailable() {
-    boolean available = true;
-    if (this.master.getRegionManager().getRootRegionLocation() == null) {
-      available = false;
-    }
-    return available;
+    return master.getRegionManager().getRootRegionLocation() != null;
   }
 
   protected boolean metaTableAvailable() {
     boolean available = true;
-    if ((master.getRegionManager().numMetaRegions() !=
-      master.getRegionManager().numOnlineMetaRegions()) ||
-      master.getRegionManager().metaRegionsInTransition()) {
+    int numMetaRegions = master.getRegionManager().numMetaRegions();
+    if (numMetaRegions == 0 ||
+        numMetaRegions > master.getRegionManager().numOnlineMetaRegions() ||
+        master.getRegionManager().metaRegionsInTransition()) {
       // We can't proceed because not all of the meta regions are online.
       // We can't block either because that would prevent the meta region
       // online message from being processed. In order to prevent spinning

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RootScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RootScanner.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RootScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RootScanner.java Sun Aug  5 19:16:11 2012
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.master;
 
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 
 import java.io.IOException;
@@ -36,7 +37,6 @@ class RootScanner extends BaseScanner {
 
   /**
    * Don't retry if we get an error while scanning. Errors are most often
-   *
    * caused by the server going away. Wait until next rescan interval when
    * things should be back to normal.
    * @return True if successfully scanned.
@@ -50,9 +50,9 @@ class RootScanner extends BaseScanner {
     try {
       // Don't interrupt us while we're working
       synchronized(scannerLock) {
-        if (master.getRegionManager().getRootRegionLocation() != null) {
-          scanRegion(new MetaRegion(master.getRegionManager().getRootRegionLocation(),
-            HRegionInfo.ROOT_REGIONINFO));
+        HServerAddress rootRegionLocation = master.getRegionManager().getRootRegionLocation();
+        if (rootRegionLocation != null) {
+          scanRegion(new MetaRegion(rootRegionLocation, HRegionInfo.ROOT_REGIONINFO));
         }
       }
     } catch (IOException e) {
@@ -76,12 +76,6 @@ class RootScanner extends BaseScanner {
   }
 
   @Override
-  protected boolean initialScan() {
-    this.initialScanComplete = scanRoot();
-    return initialScanComplete;
-  }
-
-  @Override
   protected void maintenanceScan() {
     scanRoot();
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Sun Aug  5 19:16:11 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -115,6 +114,13 @@ public class ServerManager {
   private final OldLogsCleaner oldLogCleaner;
 
   /**
+   * A lock that controls simultaneous changes and lookup to the dead server set and the server to
+   * server info map. Required so that we don't reassign the same region both in expireServer
+   * and in the base scanner.
+   */
+  final Object deadServerStatusLock = new Object();
+
+  /**
    * A set of host:port pairs representing regionservers that are blacklisted
    * from region assignment. Used for unit tests only. Please do not use this
    * for production, because in a real situation a blacklisted server might
@@ -247,7 +253,7 @@ public class ServerManager {
    */
   private void checkIsDead(final String serverName, final String what)
   throws YouAreDeadException {
-    if (!isDead(serverName)) return;
+    if (!isDeadProcessingPending(serverName)) return;
     String message = "Server " + what + " rejected; currently processing " +
       serverName + " as dead server";
     LOG.debug(message);
@@ -300,6 +306,7 @@ public class ServerManager {
       // Could not set a watch, undo the above changes and re-throw.
       serversToLoad.updateServerLoad(serverName, oldServerLoad);
       undoMapUpdate(serversToServerInfo, serverName, oldServerInfo);
+      LOG.error("Could not set watch on regionserver znode for " + serverName); 
       throw ex;
     }
   }
@@ -577,20 +584,18 @@ public class ServerManager {
       // currently opening regions, leave it alone till all are open.
       if (openingCount < this.nobalancingCount) {
         if (!blacklistedRSHostPortSetForTest.contains(
-            serverInfo.getHostnamePort())) {
+            serverInfo.getHostnamePort()) || serversToServerInfo.size() <= 1) {
           // Production code path.
           master.getRegionManager().assignRegions(serverInfo,
               mostLoadedRegions, returnMsgs);
-        } else if (mostLoadedRegions.length > 0) {
+        } else {
           // UNIT TESTS ONLY.
           // We just don't assign anything to "blacklisted" regionservers as
           // required by a unit test (for determinism). This is OK because
           // another regionserver will get these regions in response to a
           // heartbeat.
-          LOG.debug("[UNIT TEST ONLY] Not assigning regions "
-              + Arrays.toString(mostLoadedRegions) + " to regionserver "
-              + serverInfo.getHostnamePort()
-              + " because it is blacklisted.");
+          LOG.debug("[UNIT TEST ONLY] Not assigning regions to blacklisted regionserver "
+              + serverInfo.getHostnamePort());
         }
       }
 
@@ -621,8 +626,13 @@ public class ServerManager {
     ArrayList<HMsg> msgsForServer = pendingMsgsToSvrsMap.get(serverInfo);
     
     if (msgsForServer == null) {
-      msgsForServer = pendingMsgsToSvrsMap.putIfAbsent(serverInfo,
-        new ArrayList<HMsg>());
+      msgsForServer = new ArrayList<HMsg>();
+      ArrayList<HMsg> newMsgsForServer =
+          pendingMsgsToSvrsMap.putIfAbsent(serverInfo, msgsForServer);
+      if (newMsgsForServer != null) {
+        // There is already a list of messages for this server, use it.
+        msgsForServer = newMsgsForServer;
+      }
     }
     
     synchronized(msgsForServer) {
@@ -708,13 +718,14 @@ public class ServerManager {
   public void processRegionOpen(HServerInfo serverInfo,
       HRegionInfo region, ArrayList<HMsg> returnMsgs) {
     boolean duplicateAssignment = false;
-    synchronized (master.getRegionManager()) {
-      if (!this.master.getRegionManager().isUnassigned(region) &&
-          !this.master.getRegionManager().isPendingOpen(region.getRegionNameAsString())) {
+    RegionManager regionManager = master.getRegionManager();
+    synchronized (regionManager) {
+      if (!regionManager.isUnassigned(region) &&
+          !regionManager.isPendingOpen(region.getRegionNameAsString())) {
         if (region.isRootRegion()) {
           // Root region
           HServerAddress rootServer =
-            this.master.getRegionManager().getRootRegionLocation();
+            regionManager.getRootRegionLocation();
           if (rootServer != null) {
             if (rootServer.compareTo(serverInfo.getServerAddress()) == 0) {
               // A duplicate open report from the correct server
@@ -728,7 +739,7 @@ public class ServerManager {
           // Not root region. If it is not a pending region, then we are
           // going to treat it as a duplicate assignment, although we can't
           // tell for certain that's the case.
-          if (this.master.getRegionManager().isPendingOpen(
+          if (regionManager.isPendingOpen(
               region.getRegionNameAsString())) {
             // A duplicate report from the correct server
             return;
@@ -751,25 +762,25 @@ public class ServerManager {
         if (region.isRootRegion()) {
           // it was assigned, and it's not a duplicate assignment, so take it out
           // of the unassigned list.
-          this.master.getRegionManager().removeRegion(region);
+          regionManager.removeRegion(region);
 
           // Store the Root Region location (in memory)
           HServerAddress rootServer = serverInfo.getServerAddress();
           this.master.getServerConnection().setRootRegionLocation(
             new HRegionLocation(region, rootServer));
-          this.master.getRegionManager().setRootRegionLocation(rootServer);
+          regionManager.setRootRegionLocation(serverInfo);
           // Increase the region opened counter
           this.master.getMetrics().incRegionsOpened();
         } else {
           // Note that the table has been assigned and is waiting for the
           // meta table to be updated.
-          this.master.getRegionManager().setOpen(region.getRegionNameAsString());
+          regionManager.setOpen(region.getRegionNameAsString());
           RegionServerOperation op =
-            new ProcessRegionOpen(master, serverInfo, region);
-          this.master.getRegionServerOperationQueue().put(op);
+              new ProcessRegionOpen(master, serverInfo, region);
+          master.getRegionServerOperationQueue().put(op);
         }
       }
-      this.master.getRegionManager().notifyRegionReopened(region);
+      regionManager.notifyRegionReopened(region);
     }
   }
 
@@ -980,9 +991,11 @@ public class ServerManager {
       LOG.warn("Already processing shutdown of " + serverName);
       return;
     }
-    // Remove the server from the known servers lists and update load info
-    this.serversToServerInfo.remove(serverName);
-    serversToLoad.removeServerLoad(serverName);
+    synchronized (deadServerStatusLock) {
+      // Remove the server from the known servers lists and update load info
+      this.serversToServerInfo.remove(serverName);
+      serversToLoad.removeServerLoad(serverName);
+    }
     // Add to dead servers and queue a shutdown processing.
     LOG.debug("Added=" + serverName +
       " to dead servers, added shutdown processing operation");
@@ -1006,7 +1019,7 @@ public class ServerManager {
    * @param serverName
    * @return true if server is dead
    */
-  public boolean isDead(final String serverName) {
+  public boolean isDeadProcessingPending(final String serverName) {
     return isDead(serverName, false);
   }
 
@@ -1099,7 +1112,7 @@ public class ServerManager {
     public void run() {
       try {
         while (true) {
-          boolean waitingForMoreServersInRackToTimeOut =
+          boolean waitingForMoreServersInRackToTimeOut = 
               expireTimedOutServers(timeout, maxServersToExpirePerRack);
           if (waitingForMoreServersInRackToTimeOut) {
             sleep(shortTimeout/2);
@@ -1292,7 +1305,7 @@ public class ServerManager {
           continue; //server vanished
         }
         // re-check - just in case the server reported
-        if (curTime > load.expireAfter) {
+        if (curTime > load.expireAfter) {  // debug
           LOG.info("Expiring server " + si.getServerName() +
               " no report for last " + (curTime - load.lastLoadRefreshTime));
           this.expireServer(si);
@@ -1302,4 +1315,11 @@ public class ServerManager {
     return waitingForMoreServersInRackToTimeOut;
   }
 
+  boolean hasBlacklistedServersInTest() {
+    return !blacklistedRSHostPortSetForTest.isEmpty();
+  }
+
+  public static void clearRSBlacklistInTest() {
+    blacklistedRSHostPortSetForTest.clear();
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKClusterStateRecovery.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKClusterStateRecovery.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKClusterStateRecovery.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKClusterStateRecovery.java Sun Aug  5 19:16:11 2012
@@ -18,16 +18,23 @@ package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.client.ServerConnection;
+import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
+import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 
 /**
@@ -49,11 +56,20 @@ public class ZKClusterStateRecovery {
   private Set<String> liveRSNamesAtStartupUnmodifiable;
 
   private final HMaster master;
+  private final RegionManager regionManager;
+  private final ServerManager serverManager;
   private final ZooKeeperWrapper zkw;
+  private final String unassignedZNode;
+
+  /** A snapshot of the list of unassigned znodes */
+  private List<String> unassignedNodes;
 
   public ZKClusterStateRecovery(HMaster master, ServerConnection connection) {
     this.master = master;
     zkw = master.getZooKeeperWrapper();
+    regionManager = master.getRegionManager();
+    serverManager = master.getServerManager();
+    this.unassignedZNode = zkw.getRegionInTransitionZNode();
   }
 
   /**
@@ -94,7 +110,7 @@ public class ZKClusterStateRecovery {
         }
 
         try {
-          master.getServerManager().recordNewServer(serverInfo);
+          serverManager.recordNewServer(serverInfo);
         } catch (IOException ex) {
           if (ex.getCause() instanceof NoNodeException) {
             // This regionserver has disappeared, don't try to register it. This will also ensure
@@ -118,4 +134,200 @@ public class ZKClusterStateRecovery {
     return liveRSNamesAtStartupUnmodifiable;
   }
 
+  private HRegionInfo parseUnassignedZNode(String regionName, byte[] nodeData,
+      RegionTransitionEventData hbEventData) throws IOException {
+    String znodePath = zkw.getZNode(unassignedZNode, regionName);
+    if (nodeData == null) {
+      // This znode does not seem to exist anymore.
+      LOG.error("znode for region " + regionName + " disappeared while scanning unassigned " +
+          "directory, skipping");
+      return null;
+    }
+
+    Writables.getWritable(nodeData, hbEventData);
+
+    HMsg msg = hbEventData.getHmsg();
+    if (msg == null) {
+      LOG.warn("HMsg is not present in unassigned znode data, skipping: " + znodePath);
+      return null;
+    }
+
+    HRegionInfo hri = msg.getRegionInfo();
+    if (hri == null) {
+      LOG.warn("Region info read from znode is null, skipping: " + znodePath);
+      return null;
+    }
+
+    if (!hri.getEncodedName().equals(regionName)) {
+      LOG.warn("Region name read from znode data (" + hri.getEncodedName() + ") " +
+          "must be the same as znode name: " + regionName + ". Skipping.");
+      return null;
+    }
+    return hri;
+  }
+
+  /**
+   * Read znode path as part of scanning the unassigned directory.
+   * @param regionName the region name to read the unassigned znode for
+   * @return znode data or null if the znode no longer exists
+   * @throws IOException in case of a ZK error
+   */
+  private byte[] getUnassignedZNodeAndSetWatch(String regionName)
+      throws IOException {
+    final String znodePath = zkw.getZNode(unassignedZNode, regionName);
+    try {
+      return zkw.readUnassignedZNodeAndSetWatch(znodePath);
+    } catch (IOException ex) {
+      if (ex.getCause() instanceof KeeperException) {
+        KeeperException ke = (KeeperException) ex.getCause();
+        if (ke.code() == KeeperException.Code.NONODE) {
+          LOG.warn("Unassigned node is missing: " + znodePath + ", ignoring");
+          return null;
+        }
+      }
+      throw ex;
+    }
+  }
+
+  /**
+   * Goes through the unassigned node directory in ZK.
+   */
+  private void processUnassignedNodes() throws IOException {
+    LOG.info("Processing unassigned znode directory on master startup");
+    for (String unassignedRegion : unassignedNodes) {
+      if (master.isStopped()) {
+        break;
+      }
+
+      final String znodePath = zkw.getZNode(unassignedZNode, unassignedRegion);
+      // Get znode and set watch
+      byte[] nodeData = getUnassignedZNodeAndSetWatch(znodePath);
+      if (nodeData == null) {
+        // The node disappeared.
+        continue;
+      }
+
+      HBaseEventType rsState = HBaseEventType.fromByte(nodeData[0]);
+      RegionTransitionEventData hbEventData = new RegionTransitionEventData();
+      HRegionInfo hri = parseUnassignedZNode(unassignedRegion, nodeData, hbEventData);
+      if (hri == null) {
+        // Could not parse the znode. Error message already logged.
+        continue;
+      }
+
+      LOG.info("Found unassigned znode: state=" + rsState + ", region=" +
+            hri.getRegionNameAsString() + ", rs=" + hbEventData.getRsName());
+      boolean openedOrClosed = rsState == HBaseEventType.RS2ZK_REGION_OPENED ||
+          rsState == HBaseEventType.RS2ZK_REGION_CLOSED;
+
+      if (rsState == HBaseEventType.RS2ZK_REGION_CLOSING ||
+          rsState == HBaseEventType.RS2ZK_REGION_OPENING ||
+          openedOrClosed) {
+        regionManager.setRegionStateOnRecovery(rsState, hri, hbEventData.getRsName());
+        if (openedOrClosed) {
+          master.getUnassignedWatcher().handleRegionStateInZK(znodePath, nodeData, false);
+        }
+      } else if (rsState == HBaseEventType.M2ZK_REGION_OFFLINE) {
+        // Write to ZK = false; override previous state ("force") = true. 
+        regionManager.setUnassignedGeneral(false, hri, true);
+      } else {
+        LOG.warn("Invalid unassigned znode state: " + rsState + " for region " + unassignedRegion);
+      }
+    }
+  }
+
+  /**
+   * Ensures that -ROOT- and .META. are assigned and persists region locations from OPENED and
+   * CLOSED nodes in the ZK unassigned directory in respectively -ROOT- (for .META. regions) and
+   * .META. (for user regions).
+   */
+  private void recoverRegionStateFromZK() throws IOException {
+    if (!isStopped()) {
+      regionManager.recoverRootRegionLocationFromZK();
+    }
+
+    if (!isStopped()) {
+      unassignedNodes = master.getUnassignedWatcher().getUnassignedDirSnapshot();
+    }
+
+    if (!isStopped()) {
+      processUnassignedNodes();
+    }
+
+    if (!isStopped()) {
+      master.getUnassignedWatcher().drainZKEventQueue();
+    }
+
+    if (!isStopped()) {
+      ensureRootAssigned();
+    }
+  }
+
+  private void ensureRootAssigned() {
+    HServerInfo rootServerInfo = regionManager.getRootServerInfo();
+    boolean reassignRoot = true;
+    if (rootServerInfo != null) {
+      // Root appears assigned. Check if it is assigned to an unknown server that we are not
+      // processing as dead. In that case we do need to reassign. This logic is similar to
+      // what is done in BaseScanner.checkAssigned.
+      String serverName = rootServerInfo.getServerName();
+      if (regionManager.regionIsInTransition(
+          HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString())) {
+        // Already in transition, we will wait until it is assigned.
+        reassignRoot = false;
+        LOG.info("Not assigning root because it is already in transition");
+      } else {
+        boolean processingRootServerAsDead;
+        HServerInfo rootRSInfo;
+        synchronized (serverManager.deadServerStatusLock) {
+          // Synchronizing to avoid a race condition with ServerManager.expireServer.
+          processingRootServerAsDead =
+              serverManager.isDeadProcessingPending(serverName);
+          rootRSInfo = serverManager.getServerInfo(serverName);
+        }
+        reassignRoot = !processingRootServerAsDead && rootRSInfo == null;
+        LOG.info("reassignRoot=" + reassignRoot +
+            ", processingRootServerAsDead=" + processingRootServerAsDead +
+            ", rootRSInfo=" + rootRSInfo);
+      }
+    }
+
+    if (reassignRoot) {
+      regionManager.reassignRootRegion();
+    }
+  }
+
+  public void backgroundRecoverRegionStateFromZK() {
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          recoverRegionStateFromZK();
+        } catch (Throwable ex) {
+          LOG.error(ex);
+          master.stop("Failed to recover region assignment from ZK");
+        }
+      }
+    }, "backgroundRecoverRegionStateFromZK");
+    t.setDaemon(true);
+    t.start();
+  }
+
+  /**
+   * Return true if there are no live regionservers. Assumes that
+   * {@link #registerLiveRegionServers} has been called. Only used for testing. No decisions are
+   * made based on the boolean "is cluster startup" flag.
+   */
+  boolean isClusterStartup() throws IOException {
+    return liveRSNamesAtStartup.isEmpty();
+  }
+
+  private boolean isStopped() {
+    return master.isStopped();
+  }
+
+  public boolean wasLiveRegionServerAtStartup(String serverName) {
+    return liveRSNamesAtStartup.contains(serverName);
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java Sun Aug  5 19:16:11 2012
@@ -20,16 +20,20 @@
 package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
+import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
 import org.apache.hadoop.hbase.master.handler.MasterCloseRegionHandler;
 import org.apache.hadoop.hbase.master.handler.MasterOpenRegionHandler;
+import org.apache.hadoop.hbase.util.DrainableQueue;
+import org.apache.hadoop.hbase.util.ParamCallable;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZNodePathAndData;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper.ZNodePathAndData;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -42,51 +46,47 @@ import org.apache.zookeeper.Watcher.Even
 public class ZKUnassignedWatcher implements Watcher {
   private static final Log LOG = LogFactory.getLog(ZKUnassignedWatcher.class);
 
-  private ZooKeeperWrapper zkWrapper;
-  String serverName;
-  ServerManager serverManager;
-
-  public static void start(Configuration conf, HMaster master)
-  throws IOException {
-    new ZKUnassignedWatcher(conf, master);
+  private final ZooKeeperWrapper zkWrapper;
+  private final String serverName;
+  private final ServerManager serverManager;
+  private final String unassignedZNode;
+
+  private DrainableQueue<ZNodePathAndData> delayedZKEvents =
+      new DrainableQueue<ZNodePathAndData>("delayedZKEvents");
+
+  private List<String> unassignedDirSnapshot = new ArrayList<String>();
+
+  private ParamCallable<ZNodePathAndData> processEvent = new ParamCallable<ZNodePathAndData>() {
+    @Override
+    public void call(ZNodePathAndData pathAndData) {
+      try {
+        handleRegionStateInZK(pathAndData.getzNodePath(), pathAndData.getData(), false);
+      } catch (IOException e) {
+        LOG.error("Could not process event from ZooKeeper", e);
+      }
+    }
+  };
+  
+  ZKUnassignedWatcher(HMaster master) throws IOException {
     LOG.debug("Started ZKUnassigned watcher");
-  }
-
-  public ZKUnassignedWatcher(Configuration conf, HMaster master)
-  throws IOException {
     this.serverName = master.getHServerAddress().toString();
     this.serverManager = master.getServerManager();
-    zkWrapper = ZooKeeperWrapper.getInstance(conf, master.getZKWrapperName());
-    String unassignedZNode = zkWrapper.getRegionInTransitionZNode();
+    zkWrapper = ZooKeeperWrapper.getInstance(master.getConfiguration(), master.getZKWrapperName());
+    unassignedZNode = zkWrapper.getRegionInTransitionZNode();
 
-    // If the UNASSIGNED ZNode exists and this is a fresh cluster start, then
-    // delete it.
-    final boolean unassignedNodeExists =
-        zkWrapper.exists(unassignedZNode, false);
-    LOG.debug(getClass().getSimpleName() + " constructor: " +
-        "unassignedNodeExists=" + unassignedNodeExists + ", " +
-        "isClusterStartup=" + master.isClusterStartup());
+    // Set a watch on Zookeeper's UNASSIGNED node if it exists.
+    zkWrapper.registerListener(this);
 
-    if (master.isClusterStartup() && unassignedNodeExists) {
-      LOG.info("Cluster start, but found " + unassignedZNode + ", deleting it.");
+    if (zkWrapper.exists(unassignedZNode, false)) {
+      // The unassigned directory already exists in ZK. Take a snapshot of unassigned regions.
       try {
-        zkWrapper.deleteZNode(unassignedZNode, true);
-      } catch (KeeperException e) {
-        LOG.error("Could not delete znode " + unassignedZNode, e);
-        throw new IOException(e);
-      } catch (InterruptedException e) {
-        LOG.error("Could not delete znode " + unassignedZNode, e);
-        throw new IOException(e);
+        unassignedDirSnapshot = zkWrapper.listChildrenAndWatchForNewChildren(unassignedZNode);
+      } catch (KeeperException ke) {
+        throw new IOException(ke);
       }
+    } else {
+      zkWrapper.createZNodeIfNotExists(unassignedZNode);  // create and watch
     }
-
-    // If the UNASSIGNED ZNode does not exist, create it.
-    zkWrapper.createZNodeIfNotExists(unassignedZNode);
-
-    // TODO: get the outstanding changes in UNASSIGNED
-
-    // Set a watch on Zookeeper's UNASSIGNED node if it exists.
-    zkWrapper.registerListener(this);
   }
 
   /**
@@ -105,9 +105,7 @@ public class ZKUnassignedWatcher impleme
     }
 
     // check if the path is for the UNASSIGNED directory we care about
-    if(event.getPath() == null ||
-       !event.getPath().startsWith(zkWrapper.getZNodePathForHBase(
-           zkWrapper.getRegionInTransitionZNode()))) {
+    if (event.getPath() == null || !event.getPath().startsWith(unassignedZNode)) {
       return;
     }
 
@@ -142,7 +140,7 @@ public class ZKUnassignedWatcher impleme
         for(ZNodePathAndData zNodePathAndData : newZNodes) {
           LOG.debug("Handling updates for znode: " + zNodePathAndData.getzNodePath());
           handleRegionStateInZK(zNodePathAndData.getzNodePath(),
-              zNodePathAndData.getData());
+              zNodePathAndData.getData(), true);
         }
       }
     }
@@ -162,20 +160,34 @@ public class ZKUnassignedWatcher impleme
    */
   private void handleRegionStateInZK(String zNodePath) throws IOException {
     byte[] data = zkWrapper.readZNode(zNodePath, null);
-    handleRegionStateInZK(zNodePath, data);
+    handleRegionStateInZK(zNodePath, data, true);
   }
 
-  private void handleRegionStateInZK(String zNodePath, byte[] data) {
+  void handleRegionStateInZK(String zNodePath, byte[] data, boolean canDefer) throws IOException {
     // a null value is set when a node is created, we don't need to handle this
     if(data == null) {
       return;
     }
-    String rgnInTransitNode = zkWrapper.getRegionInTransitionZNode();
+
     String region = zNodePath.substring(
-        zNodePath.indexOf(rgnInTransitNode) + rgnInTransitNode.length() + 1);
+        zNodePath.indexOf(unassignedZNode) + unassignedZNode.length() + 1);
+
     HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]);
     LOG.debug("Got event type [ " + rsEvent + " ] for region " + region);
 
+    RegionTransitionEventData rt = new RegionTransitionEventData();
+    Writables.getWritable(data, rt);
+
+    if (canDefer) {
+      ZNodePathAndData pathAndData = new ZNodePathAndData(zNodePath, data);
+      if (delayedZKEvents.enqueue(pathAndData)) {
+        // We will process this event after the initial scan of the unassigned directory is done.
+        LOG.debug("ZK-EVENT-PROCESS: deferring processing of event " + rsEvent + ", path "
+            + zNodePath + " until master startup is complete");
+        return;
+      }
+    }
+
     // if the node was CLOSED then handle it
     if(rsEvent == HBaseEventType.RS2ZK_REGION_CLOSED) {
       new MasterCloseRegionHandler(rsEvent, serverManager, serverName, region, data).submit();
@@ -187,4 +199,16 @@ public class ZKUnassignedWatcher impleme
           data).submit();
     }
   }
+
+  void drainZKEventQueue() {
+    LOG.info("Draining ZK unassigned event queue");
+    delayedZKEvents.drain(processEvent);
+    LOG.info("Finished draining ZK unassigned event queue");
+  }
+
+  /** @return a snapshot of the ZK unassigned directory taken when we set the watch */
+  List<String> getUnassignedDirSnapshot() {
+    return unassignedDirSnapshot;
+  }
+  
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java Sun Aug  5 19:16:11 2012
@@ -19,18 +19,11 @@
  */
 package org.apache.hadoop.hbase.master.handler;
 
-import java.io.IOException;
-import java.util.ArrayList;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HMsg;
 import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
 import org.apache.hadoop.hbase.executor.HBaseEventHandler;
-import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.ServerManager;
-import org.apache.hadoop.hbase.util.Writables;
 
 /**
  * This is the event handler for all events relating to closing regions on the
@@ -41,31 +34,23 @@ import org.apache.hadoop.hbase.util.Writ
 public class MasterCloseRegionHandler extends HBaseEventHandler
 {
   private static final Log LOG = LogFactory.getLog(MasterCloseRegionHandler.class);
-
-  private String regionName;
-  protected byte[] serializedData;
-  RegionTransitionEventData hbEventData;
-  ServerManager serverManager;
-
-  public MasterCloseRegionHandler(HBaseEventType eventType,
-                                  ServerManager serverManager,
-                                  String serverName,
-                                  String regionName,
+  
+  public MasterCloseRegionHandler(HBaseEventType eventType, 
+                                  ServerManager serverManager, 
+                                  String serverName, 
+                                  String regionName, 
                                   byte[] serializedData) {
-    super(false, serverName, eventType);
-    this.regionName = regionName;
-    this.serializedData = serializedData;
-    this.serverManager = serverManager;
+    super(false, serverName, eventType, regionName, serializedData, serverManager);
   }
 
   /**
-   * Handle the various events relating to closing regions. We can get the
+   * Handle the various events relating to closing regions. We can get the 
    * following events here:
    *   - RS_REGION_CLOSING : No-op
-   *   - RS_REGION_CLOSED  : The region is closed. If we are not in a shutdown
-   *                         state, find the RS to open this region. This could
-   *                         be a part of a region move, or just that the RS has
-   *                         died. Should result in a M_REQUEST_OPENREGION event
+   *   - RS_REGION_CLOSED  : The region is closed. If we are not in a shutdown 
+   *                         state, find the RS to open this region. This could 
+   *                         be a part of a region move, or just that the RS has 
+   *                         died. Should result in a M_REQUEST_OPENREGION event 
    *                         getting created.
    */
   @Override
@@ -75,31 +60,19 @@ public class MasterCloseRegionHandler ex
     // handle RS_REGION_CLOSED events
     handleRegionClosedEvent();
   }
-
+  
   private void handleRegionClosedEvent() {
-    try {
-      if(hbEventData == null) {
-        hbEventData = new RegionTransitionEventData();
-        Writables.getWritable(serializedData, hbEventData);
-      }
-    } catch (IOException e) {
-      LOG.error("Could not deserialize additional args for Close region", e);
-    }
-
+    ensureEventDataAvailable();
     String serverName = hbEventData.getRsName();
     HServerInfo serverInfo = serverManager.getServerInfo(serverName);
-
-    // process the region close - this will cause the reopening of the
+    
+    // process the region close - this will cause the reopening of the 
     // region as a part of the heartbeat of some RS
-    serverManager.processRegionClose(serverInfo,
+    serverManager.processRegionClose(serverInfo, 
         hbEventData.getHmsg().getRegionInfo());
-
-    LOG.info("Processed close of region " +
+    
+    LOG.info("Processed close of region " + 
         hbEventData.getHmsg().getRegionInfo().getRegionNameAsString() +
         " by region server: " + serverName);
   }
-
-  public String getRegionName() {
-    return regionName;
-  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java Sun Aug  5 19:16:11 2012
@@ -19,18 +19,14 @@
  */
 package org.apache.hadoop.hbase.master.handler;
 
-import java.io.IOException;
 import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HMsg;
 import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
 import org.apache.hadoop.hbase.executor.HBaseEventHandler;
-import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.ServerManager;
-import org.apache.hadoop.hbase.util.Writables;
 
 /**
  * This is the event handler for all events relating to opening regions on the
@@ -43,33 +39,24 @@ import org.apache.hadoop.hbase.util.Writ
  */
 public class MasterOpenRegionHandler extends HBaseEventHandler {
   private static final Log LOG = LogFactory.getLog(MasterOpenRegionHandler.class);
-  // other args passed in a byte array form
-  protected byte[] serializedData;
-  private String regionName;
-  private RegionTransitionEventData hbEventData;
-  ServerManager serverManager;
-
-  public MasterOpenRegionHandler(HBaseEventType eventType,
-                                 ServerManager serverManager,
-                                 String serverName,
-                                 String regionName,
-                                 byte[] serData) {
-    super(false, serverName, eventType);
-    this.regionName = regionName;
 
-    this.serializedData = serData;
-    this.serverManager = serverManager;
+  public MasterOpenRegionHandler(HBaseEventType eventType, 
+                                 ServerManager serverManager, 
+                                 String serverName, 
+                                 String regionName, 
+                                 byte[] serData) {
+    super(false, serverName, eventType, regionName, serData, serverManager);
   }
 
   /**
-   * Handle the various events relating to opening regions. We can get the
+   * Handle the various events relating to opening regions. We can get the 
    * following events here:
-   *   - RS_REGION_OPENING : Keep track to see how long the region open takes.
-   *                         If the RS is taking too long, then revert the
-   *                         region back to closed state so that it can be
+   *   - RS_REGION_OPENING : Keep track to see how long the region open takes. 
+   *                         If the RS is taking too long, then revert the 
+   *                         region back to closed state so that it can be 
    *                         re-assigned.
-   *   - RS_REGION_OPENED  : The region is opened. Add an entry into META for
-   *                         the RS having opened this region. Then delete this
+   *   - RS_REGION_OPENED  : The region is opened. Add an entry into META for  
+   *                         the RS having opened this region. Then delete this 
    *                         entry in ZK.
    */
   @Override
@@ -83,12 +70,12 @@ public class MasterOpenRegionHandler ext
       handleRegionOpenedEvent();
     }
   }
-
+  
   private void handleRegionOpeningEvent() {
-    // TODO: not implemented.
+    // TODO: not implemented. 
     LOG.debug("NO-OP call to handling region opening event");
-    // Keep track to see how long the region open takes. If the RS is taking too
-    // long, then revert the region back to closed state so that it can be
+    // Keep track to see how long the region open takes. If the RS is taking too 
+    // long, then revert the region back to closed state so that it can be 
     // re-assigned.
   }
 
@@ -105,27 +92,4 @@ public class MasterOpenRegionHandler ext
     }
   }
 
-  private void ensureEventDataAvailable() {
-    if (hbEventData != null) {
-      return;
-    }
-
-    try {
-      hbEventData = new RegionTransitionEventData();
-      Writables.getWritable(serializedData, hbEventData);
-    } catch (IOException e) {
-      LOG.error("Could not deserialize additional args for Open region", e);
-      throw new RuntimeException(e);
-    }
-  }
-
-  public String getRegionName() {
-    return regionName;
-  }
-
-  public String getRegionServerName() {
-    ensureEventDataAvailable();
-    return hbEventData.getRsName();
-  }
-
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Aug  5 19:16:11 2012
@@ -1617,7 +1617,9 @@ public class HRegionServer implements HR
     Threads.shutdown(this.cacheFlusher);
     Threads.shutdown(this.hlogRoller);
     this.compactSplitThread.join();
-    this.replicationHandler.join();
+    if (replicationHandler != null) {
+      this.replicationHandler.join();
+    }
   }
 
   private boolean getMaster() {
@@ -1655,8 +1657,8 @@ public class HRegionServer implements HR
   private HServerAddress readMasterAddressFromZK() {
     HServerAddress masterAddress = null;
     try {
-      masterAddress = zooKeeperWrapper.readAddressOrThrow(
-          zooKeeperWrapper.masterElectionZNode, zooKeeperWrapper);
+      masterAddress = HServerInfo.getAddress(zooKeeperWrapper.readAddressOrThrow(
+          zooKeeperWrapper.masterElectionZNode, zooKeeperWrapper));
     } catch (KeeperException e) {
       LOG.fatal(UNABLE_TO_READ_MASTER_ADDRESS_ERR_MSG, e);
       forceAbort();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java Sun Aug  5 19:16:11 2012
@@ -56,11 +56,11 @@ public class SequenceFileLogWriter imple
   }
 
   @Override
-  public void init(FileSystem fs, Path path, Configuration conf)
+  public void init(FileSystem fs, Path path, Configuration conf) 
   throws IOException {
     // Create a SF.Writer instance.
     try {
-	this.generateWriter(fs,path,conf);
+    	this.generateWriter(fs,path,conf);
     } catch (InvocationTargetException ite) {
       // function was properly called, but threw it's own exception
       throw new IOException(ite.getCause());
@@ -139,9 +139,13 @@ public class SequenceFileLogWriter imple
     this.writer.sync();
     if (this.syncFs != null) {
       try {
-       this.syncFs.invoke(this.writer, HLog.NO_ARGS);
+        this.syncFs.invoke(this.writer, HLog.NO_ARGS);
       } catch (Exception e) {
-        throw new IOException("Reflection", e);
+        if (e.getCause() instanceof IOException) {
+          throw (IOException) e.getCause();
+        }
+        throw new IOException("Reflection: could not call method " + syncFs.getName()
+            + " with no arguments on " + writer.getClass().getName(), e);
       }
     }
   }
@@ -158,20 +162,20 @@ public class SequenceFileLogWriter imple
   public OutputStream getDFSCOutputStream() {
     return this.dfsClient_out;
   }
-
-  // To be backward compatible; we still need to call the old sequence file
-  // interface.
-  private void generateWriter(FileSystem fs, Path path, Configuration conf)
+  
+  // To be backward compatible; we still need to call the old sequence file 
+  // interface. 
+  private void generateWriter(FileSystem fs, Path path, Configuration conf) 
   throws InvocationTargetException, Exception {
-	boolean forceSync =
-		conf.getBoolean("hbase.regionserver.hlog.writer.forceSync", false);
-	if (forceSync) {
+  	boolean forceSync = 
+  		conf.getBoolean("hbase.regionserver.hlog.writer.forceSync", false);
+  	if (forceSync) {
       // call the new create api with force sync flag
       this.writer = (SequenceFile.Writer) SequenceFile.class
         .getMethod("createWriter", new Class[] {FileSystem.class,
             Configuration.class, Path.class, Class.class, Class.class,
             Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
-            CompressionType.class, CompressionCodec.class, Metadata.class,
+            CompressionType.class, CompressionCodec.class, Metadata.class, 
             Boolean.TYPE})
         .invoke(null, new Object[] {fs, conf, path, HLog.getKeyClass(conf),
             WALEdit.class,
@@ -187,8 +191,8 @@ public class SequenceFileLogWriter imple
             forceSync
             });
 
-	} else {
-		// still need to keep old interface to be backward compatible
+  	} else {
+  		// still need to keep old interface to be backward compatible
       // reflection for a version of SequenceFile.createWriter that doesn't
       // automatically create the parent directory (see HBASE-2312)
       this.writer = (SequenceFile.Writer) SequenceFile.class
@@ -208,6 +212,6 @@ public class SequenceFileLogWriter imple
             SequenceFile.CompressionType.NONE, new DefaultCodec(),
             new Metadata()
             });
-	}
+  	}
   }
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/DrainableQueue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/DrainableQueue.java?rev=1369645&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/DrainableQueue.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/DrainableQueue.java Sun Aug  5 19:16:11 2012
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.InterruptedIOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.StopStatus;
+
+/**
+ * Suppose we are responding events and have the option to process them in real time or to defer
+ * (buffer) them. Also suppose that we have an initial period when we have to buffer all the
+ * events, after which we have to process all the buffered events and switch to real-time
+ * processing, while events are still arriving. This class provides support for that use case. We
+ * are assuming that any number of threads can add elements to the queue, but only one thread can
+ * drain the queue and make the switch to real-time processing, and there are no other ways to take
+ * elements out of the queue.
+ *
+ * @param <T> queue element type
+ */
+public class DrainableQueue<T> {
+  private static final Log LOG = LogFactory.getLog(DrainableQueue.class);
+
+  private final BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
+  private final String name;
+
+  /** True if the queue has been completely drained. */
+  private boolean drained = false;
+
+  /** Making sure that only one thread can drain the queue at a time */
+  private Object drainLock = new Object();
+
+  /** We will stop draining the queue if this stop status is set to true */
+  private StopStatus drainStop;
+  
+  public DrainableQueue(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Enqueue an event if we are still in the "deferred processing" mode for this queue. Even if
+   * we have already started draining the queue, we still enqueue events if the draining process
+   * has not completed.
+   */
+  public synchronized boolean enqueue(T event) throws InterruptedIOException {
+    if (!drained) {
+      try {
+        queue.put(event);
+      } catch (InterruptedException ex) {
+        String msg = "Could not add event to queue " + name;
+        LOG.error(msg, ex);
+        throw new InterruptedIOException(msg + ": " + ex.getMessage());
+      }
+      // Enqueued the event.
+      return true;
+    }
+    // Event not accepted, tell the caller to process it in real time.
+    return false;
+  }
+
+  /**
+   * Find the top element in the queue if it is present. Used while draining the queue. If there
+   * are no elements left in the queue, the "drained" status is set. Does not remove the element
+   * from the queue.
+   *
+   * @return the head of the queue or null if the queue has been drained.
+   */
+  private T peek() throws InterruptedException {
+    synchronized (this) {
+      if (queue.isEmpty()) {
+        drained = true;
+        return null;
+      }
+    }
+    // We are assuming that no elements could be taken out of the queue between the isEmpty check
+    // above and here, because the only thread that can do that this thread, which is draining
+    // the queue.
+    return queue.peek();
+  }
+
+  public void drain(ParamCallable<T> processor) {
+    synchronized (drainLock) {
+      if (drainStop != null && drainStop.isStopped()) {
+        LOG.error("Stopping draining event queue " + name + " because we are shutting down");
+        return;
+      }
+      T event;
+      try {
+        while ((event = peek()) != null) {
+          processor.call(event);
+          // Assuming that this will always succeed as we are the only thread allowed to take
+          // elements out of the queue, and we verified above that queue was not empty.
+          // We cannot switch to real-time event processing before we finish processing the event
+          // that came out of the queue because new events may arrive while the event is still
+          // being processed, resulting in event re-ordering.
+          queue.remove();
+        }
+      } catch (InterruptedException ex) {
+        LOG.error("Interrupted while draining " + name);
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  public void stopDrainIfStopped(StopStatus drainStop) {
+    this.drainStop = drainStop;
+  }
+
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java Sun Aug  5 19:16:11 2012
@@ -98,7 +98,7 @@ public class JVMClusterUtil {
   /**
    * Creates a {@link HMaster}. Call 'start' on the returned thread to make it
    * run. Modifies the passed configuration -- the caller is responsible for
-   * defensive copying.
+   * defensive copying. 
    * @param masterConf configuration to use
    * @param hmc class to create an instance of
    * @param masterId a unique identifier of a master within a mini-cluster
@@ -171,8 +171,7 @@ public class JVMClusterUtil {
     if (masters != null) {
       for (HMaster t : masters) {
         if (t.isActiveMaster()) {
-          // This will trigger cluster shutdown.
-          t.shutdown();
+          t.requestClusterShutdown();
         } else {
           // This will only stop this particular master.
           t.stop("normal shutdown");
@@ -193,7 +192,7 @@ public class JVMClusterUtil {
           }
         }
       }
-
+  
       if (masters != null) {
         for (HMaster t : masters) {
           while (t.isAlive()) {

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ParamCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ParamCallable.java?rev=1369645&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ParamCallable.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ParamCallable.java Sun Aug  5 19:16:11 2012
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+/** A callable object that takes an argument */
+public interface ParamCallable<T> {
+
+  void call(T arg);
+
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java Sun Aug  5 19:16:11 2012
@@ -64,7 +64,9 @@ public class Threads {
    */
   public static Thread setDaemonThreadRunning(final Thread t,
     final String name, final UncaughtExceptionHandler handler) {
-    t.setName(name);
+    if (name != null) {
+      t.setName(name);
+    }
     if (handler != null) {
       t.setUncaughtExceptionHandler(handler);
     }
@@ -181,15 +183,15 @@ public class Threads {
   }
 
   /**
-   * Create a new CachedThreadPool with a bounded number as the maximum
+   * Create a new CachedThreadPool with a bounded number as the maximum 
    * thread size in the pool.
-   *
+   * 
    * @param maxCachedThread the maximum thread could be created in the pool
    * @param timeout the maximum time to wait
    * @param unit the time unit of the timeout argument
    * @param threadFactory the factory to use when creating new threads
-   * @return threadPoolExecutor the cachedThreadPool with a bounded number
-   * as the maximum thread size in the pool.
+   * @return threadPoolExecutor the cachedThreadPool with a bounded number 
+   * as the maximum thread size in the pool. 
    */
   public static ThreadPoolExecutor getBoundedCachedThreadPool(
       int maxCachedThread, long timeout, TimeUnit unit,