You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2012/09/24 22:33:20 UTC

svn commit: r1389561 [2/2] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/master/handler/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/zookeeper/ test...

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java?rev=1389561&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java Mon Sep 24 20:33:19 2012
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Run bulk assign.  Does one RCP per regionserver passing a
+ * batch of regions using {@link SingleServerBulkAssigner}.
+ */
+@InterfaceAudience.Private
+public class GeneralBulkAssigner extends BulkAssigner {
+  private static final Log LOG = LogFactory.getLog(GeneralBulkAssigner.class);
+
+  private Map<ServerName, List<HRegionInfo>> failedPlans
+    = new ConcurrentHashMap<ServerName, List<HRegionInfo>>();
+  private ExecutorService pool;
+
+  final Map<ServerName, List<HRegionInfo>> bulkPlan;
+  final AssignmentManager assignmentManager;
+
+  GeneralBulkAssigner(final Server server,
+      final Map<ServerName, List<HRegionInfo>> bulkPlan,
+      final AssignmentManager am) {
+    super(server);
+    this.bulkPlan = bulkPlan;
+    this.assignmentManager = am;
+  }
+
+  @Override
+  public boolean bulkAssign(boolean sync) throws InterruptedException,
+      IOException {
+    // Disable timing out regions in transition up in zk while bulk assigning.
+    this.assignmentManager.timeoutMonitor.bulkAssign(true);
+    try {
+      return super.bulkAssign(sync);
+    } finally {
+      // Re-enable timing out regions in transition up in zk.
+      this.assignmentManager.timeoutMonitor.bulkAssign(false);
+    }
+ }
+
+  @Override
+  protected String getThreadNamePrefix() {
+    return this.server.getServerName() + "-GeneralBulkAssigner";
+  }
+
+  @Override
+  protected void populatePool(ExecutorService pool) {
+    this.pool = pool; // shut it down later in case some assigner hangs 
+    for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
+      pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
+        this.assignmentManager, this.failedPlans));
+    }
+  }
+
+  /**
+   *
+   * @param timeout How long to wait.
+   * @return true if done.
+   */
+  @Override
+  protected boolean waitUntilDone(final long timeout)
+  throws InterruptedException {
+    Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
+    for (List<HRegionInfo> regionList : bulkPlan.values()) {
+      regionSet.addAll(regionList);
+    }
+
+    pool.shutdown(); // no more task allowed
+    int serverCount = bulkPlan.size();
+    int regionCount = regionSet.size();
+    long startTime = System.currentTimeMillis();
+    long rpcWaitTime = startTime + timeout;
+    while (!server.isStopped() && !pool.isTerminated()
+        && rpcWaitTime > System.currentTimeMillis()) {
+      if (failedPlans.isEmpty()) {
+        pool.awaitTermination(100, TimeUnit.MILLISECONDS);
+      } else {
+        reassignFailedPlans();
+      }
+    }
+    if (!pool.isTerminated()) {
+      LOG.warn("bulk assigner is still running after "
+        + (System.currentTimeMillis() - startTime) + "ms, shut it down now");
+      // some assigner hangs, can't wait any more, shutdown the pool now
+      List<Runnable> notStarted = pool.shutdownNow();
+      if (notStarted != null && !notStarted.isEmpty()) {
+        server.abort("some single server assigner hasn't started yet"
+          + " when the bulk assigner timed out", null);
+        return false;
+      }
+    }
+
+    int reassigningRegions = 0;
+    if (!failedPlans.isEmpty() && !server.isStopped()) {
+      reassigningRegions = reassignFailedPlans();
+    }
+
+    Configuration conf = server.getConfiguration();
+    long perRegionOpenTimeGuesstimate =
+      conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
+    long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime)
+      + perRegionOpenTimeGuesstimate * (reassigningRegions + 1);
+    RegionStates regionStates = assignmentManager.getRegionStates();
+    // We're not synchronizing on regionsInTransition now because we don't use any iterator.
+    while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) {
+      Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
+      while (regionInfoIterator.hasNext()) {
+        HRegionInfo hri = regionInfoIterator.next();
+        RegionState state = regionStates.getRegionState(hri);
+        if ((!regionStates.isRegionInTransition(hri) && regionStates.isRegionAssigned(hri))
+            || state.isSplit() || state.isSplitting()) {
+          regionInfoIterator.remove();
+        }
+      }
+      if (!regionSet.isEmpty()) {
+        regionStates.waitForUpdate(100);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      long elapsedTime = System.currentTimeMillis() - startTime;
+      String status = "successfully";
+      if (!regionSet.isEmpty()) {
+        status = "with " + regionSet.size() + " regions still not assigned yet";
+      }
+      LOG.debug("bulk assigning total " + regionCount + " regions to "
+        + serverCount + " servers, took " + elapsedTime + "ms, " + status);
+    }
+    return regionSet.isEmpty();
+  }
+
+  @Override
+  protected long getTimeoutOnRIT() {
+    // Guess timeout.  Multiply the max number of regions on a server
+    // by how long we think one region takes opening.
+    Configuration conf = server.getConfiguration();
+    long perRegionOpenTimeGuesstimate =
+      conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
+    int maxRegionsPerServer = 1;
+    for (List<HRegionInfo> regionList : bulkPlan.values()) {
+      int size = regionList.size();
+      if (size > maxRegionsPerServer) {
+        maxRegionsPerServer = size;
+      }
+    }
+    long timeout = perRegionOpenTimeGuesstimate * maxRegionsPerServer
+      + conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000)
+      + conf.getLong("hbase.bulk.assignment.perregionserver.rpc.waittime",
+        30000) * bulkPlan.size();
+    LOG.debug("Timeout-on-RIT=" + timeout);
+    return timeout;
+  }
+
+  @Override
+  protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
+    return new UncaughtExceptionHandler() {
+      @Override
+      public void uncaughtException(Thread t, Throwable e) {
+        LOG.warn("Assigning regions in " + t.getName(), e);
+      }
+    };
+  }
+
+  private int reassignFailedPlans() {
+    List<HRegionInfo> reassigningRegions = new ArrayList<HRegionInfo>();
+    for (Map.Entry<ServerName, List<HRegionInfo>> e : failedPlans.entrySet()) {
+      LOG.info("Failed assigning " + e.getValue().size()
+          + " regions to server " + e.getKey() + ", reassigning them");
+      reassigningRegions.addAll(failedPlans.remove(e.getKey()));
+    }
+    for (HRegionInfo region : reassigningRegions) {
+      assignmentManager.assign(region, true, true);
+    }
+    return reassigningRegions.size();
+  }
+
+  /**
+   * Manage bulk assigning to a server.
+   */
+  static class SingleServerBulkAssigner implements Runnable {
+    private final ServerName regionserver;
+    private final List<HRegionInfo> regions;
+    private final AssignmentManager assignmentManager;
+    private final Map<ServerName, List<HRegionInfo>> failedPlans;
+
+    SingleServerBulkAssigner(final ServerName regionserver,
+        final List<HRegionInfo> regions, final AssignmentManager am,
+        final Map<ServerName, List<HRegionInfo>> failedPlans) {
+      this.regionserver = regionserver;
+      this.regions = regions;
+      this.assignmentManager = am;
+      this.failedPlans = failedPlans;
+    }
+
+    @Override
+    public void run() {
+      try {
+       if (!assignmentManager.assign(regionserver, regions)) {
+         failedPlans.put(regionserver, regions);
+       }
+      } catch (Throwable t) {
+        LOG.warn("Failed bulking assigning " + regions.size()
+            + " region(s) to " + regionserver.getServerName()
+            + ", and continue to bulk assign others", t);
+        failedPlans.put(regionserver, regions);
+      }
+    }
+  }
+}

Propchange: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Mon Sep 24 20:33:19 2012
@@ -30,11 +30,8 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -61,7 +58,6 @@ import org.apache.hadoop.hbase.MasterMon
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
 import org.apache.hadoop.hbase.PleaseHoldException;
-import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.RegionServerStatusProtocol;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerLoad;
@@ -661,7 +657,7 @@ Server {
 
     // Wait for region servers to report in.
     this.serverManager.waitForRegionServers(status);
-    // Check zk for regionservers that are up but didn't register
+    // Check zk for region servers that are up but didn't register
     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
       if (!this.serverManager.isServerOnline(sn)) {
         // Not registered; add it.
@@ -690,7 +686,7 @@ Server {
       .updateRootAndMetaIfNecessary(this);
 
     this.balancer.setMasterServices(this);
-    // Fixup assignment manager status
+    // Fix up assignment manager status
     status.setStatus("Starting assignment manager");
     this.assignmentManager.joinCluster();
 
@@ -765,12 +761,11 @@ Server {
   /**
    * If ServerShutdownHandler is disabled, we enable it and expire those dead
    * but not expired servers.
-   * @throws IOException
    */
-  private void enableServerShutdownHandler() throws IOException {
+  private void enableServerShutdownHandler() {
     if (!serverShutdownHandlerEnabled) {
       serverShutdownHandlerEnabled = true;
-      this.serverManager.expireDeadNotExpiredServers();
+      this.serverManager.processQueuedDeadServers();
     }
   }
 
@@ -845,7 +840,7 @@ Server {
       enableSSHandWaitForMeta();
       assigned++;
     } else {
-      // Region already assigned.  We didnt' assign it.  Add to in-memory state.
+      // Region already assigned.  We didn't assign it.  Add to in-memory state.
       this.assignmentManager.regionOnline(HRegionInfo.FIRST_META_REGIONINFO,
         this.catalogTracker.getMetaLocation());
     }
@@ -911,8 +906,11 @@ Server {
     // Now work on our list of found parents. See if any we can clean up.
     int fixups = 0;
     for (Map.Entry<HRegionInfo, Result> e : offlineSplitParents.entrySet()) {
-      fixups += ServerShutdownHandler.fixupDaughters(
+      ServerName sn = HRegionInfo.getServerName(e.getValue());
+      if (!serverManager.isServerDead(sn)) { // Otherwise, let SSH take care of it
+        fixups += ServerShutdownHandler.fixupDaughters(
           e.getValue(), assignmentManager, catalogTracker);
+      }
     }
     if (fixups != 0) {
       LOG.info("Scanned the catalog and fixed up " + fixups +
@@ -1484,7 +1482,7 @@ Server {
     }
 
     this.executorService.submit(new CreateTableHandler(this,
-      this.fileSystemManager, this.serverManager, hTableDescriptor, conf,
+      this.fileSystemManager, hTableDescriptor, conf,
       newRegions, catalogTracker, assignmentManager));
     if (cpHost != null) {
       cpHost.postCreateTable(hTableDescriptor, newRegions);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java Mon Sep 24 20:33:19 2012
@@ -133,6 +133,13 @@ public class RegionStates {
   }
 
   /**
+   * @return the server the specified region assigned to; null if not assigned.
+   */
+  public synchronized ServerName getAssignedServer(final HRegionInfo hri) {
+    return regionAssignments.get(hri);
+  }
+
+  /**
    * Wait for the state map to be updated by assignment manager.
    */
   public synchronized void waitForUpdate(
@@ -519,7 +526,11 @@ public class RegionStates {
     try {
       Pair<HRegionInfo, ServerName> p =
         MetaReader.getRegion(server.getCatalogTracker(), regionName);
-      return p == null ? null : p.getFirst();
+      HRegionInfo hri = p == null ? null : p.getFirst();
+      if (hri != null) {
+        createRegionState(hri);
+      }
+      return hri;
     } catch (IOException e) {
       server.abort("Aborting because error occoured while reading " +
         Bytes.toStringBinary(regionName) + " from .META.", e);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Mon Sep 24 20:33:19 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClockOutOfSyncException;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.PleaseHoldException;
 import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.Server;
@@ -59,11 +58,9 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
 /**
@@ -78,6 +75,15 @@ import com.google.protobuf.ServiceExcept
  * (hostname and port) as well as the startcode (timestamp from when the server
  * was started).  This is used to differentiate a restarted instance of a given
  * server from the original instance.
+ * <p>
+ * If a sever is known not to be running any more, it is called dead. The dead
+ * server needs to be handled by a ServerShutdownHandler.  If the handler is not
+ * enabled yet, the server can't be handled right away so it is queued up.
+ * After the handler is enabled, the server will be submitted to a handler to handle.
+ * However, the handler may be just partially enabled.  If so,
+ * the server cannot be fully processed, and be queued up for further processing.
+ * A server is fully processed only after the handler is fully enabled
+ * and has completed the handling.
  */
 @InterfaceAudience.Private
 public class ServerManager {
@@ -117,12 +123,39 @@ public class ServerManager {
   private final long warningSkew;
 
   /**
-   * Set of region servers which are dead but not expired immediately. If one
+   * Set of region servers which are dead but not processed immediately. If one
    * server died before master enables ServerShutdownHandler, the server will be
-   * added to set and will be expired through calling
-   * {@link ServerManager#expireDeadNotExpiredServers()} by master.
+   * added to this set and will be processed through calling
+   * {@link ServerManager#processQueuedDeadServers()} by master.
+   * <p>
+   * A dead server is a server instance known to be dead, not listed in the /hbase/rs
+   * znode any more. It may have not been submitted to ServerShutdownHandler yet
+   * because the handler is not enabled.
+   * <p>
+   * A dead server, which has been submitted to ServerShutdownHandler while the
+   * handler is not enabled, is queued up.
+   * <p>
+   * So this is a set of region servers known to be dead but not submitted to
+   * ServerShutdownHander for processing yet.
+   */
+  private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
+
+  /**
+   * Set of region servers which are dead and submitted to ServerShutdownHandler to
+   * process but not fully processed immediately.
+   * <p>
+   * If one server died before assignment manager finished the failover cleanup, the server
+   * will be added to this set and will be processed through calling
+   * {@link ServerManager#processQueuedDeadServers()} by assignment manager.
+   * <p>
+   * For all the region servers in this set, HLog split is already completed.
+   * <p>
+   * ServerShutdownHandler processes a dead server submitted to the handler after
+   * the handler is enabled. It may not be able to complete the processing because root/meta
+   * is not yet online or master is currently in startup mode.  In this case, the dead
+   * server will be parked in this set temporarily.
    */
-  private Set<ServerName> deadNotExpiredServers = new HashSet<ServerName>();
+  private Set<ServerName> requeuedDeadServers = new HashSet<ServerName>();
 
   /**
    * Constructor.
@@ -326,18 +359,6 @@ public class ServerManager {
   }
 
   /**
-   * @param address
-   * @return ServerLoad if serverName is known else null
-   * @deprecated Use {@link #getLoad(HServerAddress)}
-   */
-  public ServerLoad getLoad(final HServerAddress address) {
-    ServerName sn = new ServerName(address.toString(), ServerName.NON_STARTCODE);
-    ServerName actual =
-      ServerName.findServerWithSameHostnamePort(this.getOnlineServersList(), sn);
-    return actual == null? null: getLoad(actual);
-  }
-
-  /**
    * Compute the average load across all region servers.
    * Currently, this uses a very naive computation - just uses the number of
    * regions being served, ignoring stats about number of requests.
@@ -410,20 +431,19 @@ public class ServerManager {
   }
 
   /*
-   * Expire the passed server.  Add it to list of deadservers and queue a
+   * Expire the passed server.  Add it to list of dead servers and queue a
    * shutdown processing.
    */
   public synchronized void expireServer(final ServerName serverName) {
     if (!services.isServerShutdownHandlerEnabled()) {
       LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
           + "delay expiring server " + serverName);
-      this.deadNotExpiredServers.add(serverName);
+      this.queuedDeadServers.add(serverName);
       return;
     }
     if (!this.onlineServers.containsKey(serverName)) {
       LOG.warn("Received expiration of " + serverName +
         " but server is not currently online");
-      return;
     }
     if (this.deadservers.contains(serverName)) {
       // TODO: Can this happen?  It shouldn't be online in this case?
@@ -465,20 +485,47 @@ public class ServerManager {
         carryingRoot + ", meta=" + carryingMeta);
   }
 
+  public synchronized void processDeadServer(final ServerName serverName) {
+    // When assignment manager is cleaning up the zookeeper nodes and rebuilding the
+    // in-memory region states, region servers could be down. Root/meta table can and
+    // should be re-assigned, log splitting can be done too. However, it is better to
+    // wait till the cleanup is done before re-assigning user regions.
+    //
+    // We should not wait in the server shutdown handler thread since it can clog
+    // the handler threads and root/meta table could not be re-assigned in case
+    // the corresponding server is down. So we queue them up here instead.
+    if (!services.getAssignmentManager().isFailoverCleanupDone()) {
+      requeuedDeadServers.add(serverName);
+      return;
+    }
+
+    this.deadservers.add(serverName);
+    this.services.getExecutorService().submit(new ServerShutdownHandler(
+      this.master, this.services, this.deadservers, serverName, false));
+  }
+
   /**
-   * Expire the servers which died during master's initialization. It will be
-   * called after HMaster#assignRootAndMeta.
-   * @throws IOException
+   * Process the servers which died during master's initialization. It will be
+   * called after HMaster#assignRootAndMeta and AssignmentManager#joinCluster.
    * */
-  synchronized void expireDeadNotExpiredServers() throws IOException {
+  synchronized void processQueuedDeadServers() {
     if (!services.isServerShutdownHandlerEnabled()) {
-      throw new IOException("Master hasn't enabled ServerShutdownHandler ");
+      LOG.info("Master hasn't enabled ServerShutdownHandler");
     }
-    Iterator<ServerName> serverIterator = deadNotExpiredServers.iterator();
+    Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
     while (serverIterator.hasNext()) {
       expireServer(serverIterator.next());
       serverIterator.remove();
     }
+
+    if (!services.getAssignmentManager().isFailoverCleanupDone()) {
+      LOG.info("AssignmentManager hasn't finished failover cleanup");
+    }
+    serverIterator = requeuedDeadServers.iterator();
+    while (serverIterator.hasNext()) {
+      processDeadServer(serverIterator.next());
+      serverIterator.remove();
+    }
   }
 
   /*
@@ -713,11 +760,23 @@ public class ServerManager {
    * @return A copy of the internal set of deadNotExpired servers.
    */
   Set<ServerName> getDeadNotExpiredServers() {
-    return new HashSet<ServerName>(this.deadNotExpiredServers);
+    return new HashSet<ServerName>(this.queuedDeadServers);
   }
 
   public boolean isServerOnline(ServerName serverName) {
-    return onlineServers.containsKey(serverName);
+    return serverName != null && onlineServers.containsKey(serverName);
+  }
+
+  /**
+   * Check if a server is known to be dead.  A server can be online,
+   * or known to be dead, or unknown to this manager (i.e, not online,
+   * not known to be dead either. it is simply not tracked by the
+   * master any more, for example, a very old previous instance).
+   */
+  public synchronized boolean isServerDead(ServerName serverName) {
+    return serverName == null || deadservers.isDeadServer(serverName)
+      || queuedDeadServers.contains(serverName)
+      || requeuedDeadServers.contains(serverName);
   }
 
   public void shutdownCluster() {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java Mon Sep 24 20:33:19 2012
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaEditor;
@@ -41,9 +40,7 @@ import org.apache.hadoop.hbase.master.As
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
-import org.apache.hadoop.hbase.master.ServerManager;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.zookeeper.KeeperException;
 
@@ -58,19 +55,15 @@ public class CreateTableHandler extends 
   private Configuration conf;
   private final AssignmentManager assignmentManager;
   private final CatalogTracker catalogTracker;
-  private final ServerManager serverManager;
   private final HRegionInfo [] newRegions;
 
   public CreateTableHandler(Server server, MasterFileSystem fileSystemManager,
-    ServerManager serverManager, HTableDescriptor hTableDescriptor,
-    Configuration conf, HRegionInfo [] newRegions,
-    CatalogTracker catalogTracker, AssignmentManager assignmentManager)
-    throws NotAllMetaRegionsOnlineException, TableExistsException,
-    IOException {
+      HTableDescriptor hTableDescriptor, Configuration conf, HRegionInfo [] newRegions,
+      CatalogTracker catalogTracker, AssignmentManager assignmentManager)
+          throws NotAllMetaRegionsOnlineException, TableExistsException, IOException {
     super(server, EventType.C_M_CREATE_TABLE);
 
     this.fileSystemManager = fileSystemManager;
-    this.serverManager = serverManager;
     this.hTableDescriptor = hTableDescriptor;
     this.conf = conf;
     this.newRegions = newRegions;
@@ -173,11 +166,10 @@ public class CreateTableHandler extends 
     }
 
     // 4. Trigger immediate assignment of the regions in round-robin fashion
-    List<ServerName> servers = serverManager.createDestinationServersList();
     try {
       List<HRegionInfo> regions = Arrays.asList(newRegions);
       assignmentManager.getRegionStates().createRegionStates(regions);
-      assignmentManager.assignUserRegions(regions, servers);
+      assignmentManager.assign(regions);
     } catch (InterruptedException ie) {
       LOG.error("Caught " + ie + " during round-robin assignment");
       throw new IOException(ie);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java Mon Sep 24 20:33:19 2012
@@ -208,7 +208,7 @@ public class EnableTableHandler extends 
         }
       } else {
         try {
-          assignmentManager.assignUserRegionsToOnlineServers(regions);
+          assignmentManager.assign(regions);
         } catch (InterruptedException e) {
           LOG.warn("Assignment was interrupted");
           Thread.currentThread().interrupt();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java Mon Sep 24 20:33:19 2012
@@ -222,10 +222,13 @@ public class ServerShutdownHandler exten
       // The solution here is to resubmit a ServerShutdownHandler request to process
       // user regions on that server so that MetaServerShutdownHandler
       // executor pool is always available.
-      if (isCarryingRoot() || isCarryingMeta()) { // -ROOT- or .META.
-        this.services.getExecutorService().submit(new ServerShutdownHandler(
-          this.server, this.services, this.deadServers, serverName, false));
-        this.deadServers.add(serverName);
+      //
+      // If AssignmentManager hasn't finished rebuilding user regions,
+      // we are not ready to assign dead regions either. So we re-queue up
+      // the dead server for further processing too.
+      if (isCarryingRoot() || isCarryingMeta() // -ROOT- or .META.
+          || !services.getAssignmentManager().isFailoverCleanupDone()) {
+        this.services.getServerManager().processDeadServer(serverName);
         return;
       }
 
@@ -267,6 +270,9 @@ public class ServerShutdownHandler exten
               serverName + ", retrying META read", ioe);
         }
       }
+      if (this.server.isStopped()) {
+        throw new IOException("Server is stopped");
+      }
 
       // Skip regions that were in transition unless CLOSING or PENDING_CLOSE
       for (RegionState rit : regionsInTransition) {
@@ -347,11 +353,12 @@ public class ServerShutdownHandler exten
             toAssignRegions.remove(hri);
           }
         }
-        // Get all available servers
-        List<ServerName> availableServers = services.getServerManager()
-            .createDestinationServersList();
-        this.services.getAssignmentManager().assign(toAssignRegions,
-            availableServers);
+        try {
+          this.services.getAssignmentManager().assign(toAssignRegions);
+        } catch (InterruptedException ie) {
+          LOG.error("Caught " + ie + " during round-robin assignment");
+          throw new IOException(ie);
+        }
       }
     } finally {
       this.deadServers.finish(serverName);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Sep 24 20:33:19 2012
@@ -49,7 +49,6 @@ import java.util.concurrent.ConcurrentMa
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.management.ObjectName;
@@ -68,14 +67,15 @@ import org.apache.hadoop.hbase.DoNotRetr
 import org.apache.hadoop.hbase.FailedSanityCheckException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionMovedException;
+import org.apache.hadoop.hbase.RegionServerStatusProtocol;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableDescriptors;
@@ -167,11 +167,20 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -214,23 +223,12 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
 import org.codehaus.jackson.map.ObjectMapper;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
-import org.apache.hadoop.hbase.RegionServerStatusProtocol;
 
 import com.google.common.base.Function;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
 
 import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
 import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java Mon Sep 24 20:33:19 2012
@@ -27,14 +27,11 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.DeserializationException;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.protobuf.InvalidProtocolBufferException;
-
 /**
  * Helper class for table state tracking for use by {@link AssignmentManager}.
  * Reads, caches and sets state up in zookeeper.  If multiple read/write
@@ -305,4 +302,67 @@ public class ZKTable {
     }
     return disabledTables;
   }
+
+  /**
+   * Gets a list of all the tables set as disabled in zookeeper.
+   * @return Set of disabled tables, empty Set if none
+   * @throws KeeperException
+   */
+  public static Set<String> getDisabledTables(ZooKeeperWatcher zkw)
+      throws KeeperException {
+    return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLED);
+  }
+
+  /**
+   * Gets a list of all the tables set as disabling in zookeeper.
+   * @return Set of disabling tables, empty Set if none
+   * @throws KeeperException
+   */
+  public static Set<String> getDisablingTables(ZooKeeperWatcher zkw)
+      throws KeeperException {
+    return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLING);
+  }
+
+  /**
+   * Gets a list of all the tables set as enabling in zookeeper.
+   * @return Set of enabling tables, empty Set if none
+   * @throws KeeperException
+   */
+  public static Set<String> getEnablingTables(ZooKeeperWatcher zkw)
+      throws KeeperException {
+    return getAllTables(zkw, ZooKeeperProtos.Table.State.ENABLING);
+  }
+
+  /**
+   * Gets a list of all the tables set as disabled in zookeeper.
+   * @return Set of disabled tables, empty Set if none
+   * @throws KeeperException
+   */
+  public static Set<String> getDisabledOrDisablingTables(ZooKeeperWatcher zkw)
+      throws KeeperException {
+    return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLED,
+      ZooKeeperProtos.Table.State.DISABLING);
+  }
+
+  /**
+   * Gets a list of all the tables of specified states in zookeeper.
+   * @return Set of tables of specified states, empty Set if none
+   * @throws KeeperException
+   */
+  static Set<String> getAllTables(final ZooKeeperWatcher zkw,
+      final ZooKeeperProtos.Table.State... states) throws KeeperException {
+    Set<String> allTables = new HashSet<String>();
+    List<String> children =
+      ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
+    for (String child: children) {
+      ZooKeeperProtos.Table.State state = ZKTableReadOnly.getTableState(zkw, child);
+      for (ZooKeeperProtos.Table.State expectedState: states) {
+        if (state == expectedState) {
+          allTables.add(child);
+          break;
+        }
+      }
+    }
+    return allTables;
+  }
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java Mon Sep 24 20:33:19 2012
@@ -397,12 +397,13 @@ public class TestZooKeeper {
     zk.close();
     ZKUtil.createAndFailSilent(zk2, aclZnode);
  }
-  
-  @Test
+
   /**
    * Test should not fail with NPE when getChildDataAndWatchForNewChildren
    * invoked with wrongNode
    */
+  @Test
+  @SuppressWarnings("deprecation")
   public void testGetChildDataAndWatchForNewChildrenShouldNotThrowNPE()
       throws Exception {
     ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
@@ -443,7 +444,7 @@ public class TestZooKeeper {
    * Tests whether the logs are split when master recovers from a expired zookeeper session and an
    * RS goes down.
    */
-  @Test(timeout = 60000)
+  @Test(timeout = 180000)
   public void testLogSplittingAfterMasterRecoveryDueToZKExpiry() throws IOException,
       KeeperException, InterruptedException {
     MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Mon Sep 24 20:33:19 2012
@@ -155,7 +155,7 @@ public class TestHCM {
     table.put(put2);
     assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
 
-    // We can wait for all regions to be onlines, that makes log reading easier when debugging
+    // We can wait for all regions to be online, that makes log reading easier when debugging
     while (TEST_UTIL.getMiniHBaseCluster().getMaster().
       getAssignmentManager().getRegionStates().isRegionsInTransition()) {
     }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java Mon Sep 24 20:33:19 2012
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hbase.coprocessor;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 
 import junit.framework.Assert;
 import org.apache.commons.logging.Log;
@@ -32,9 +31,6 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java Mon Sep 24 20:33:19 2012
@@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@@ -337,6 +336,7 @@ public class TestAssignmentManager {
     // Create an AM.
     AssignmentManager am = new AssignmentManager(this.server,
       this.serverManager, ct, balancer, executor, null);
+    am.failoverCleanupDone.set(true);
     try {
       // Make sure our new AM gets callbacks; once registered, can't unregister.
       // Thats ok because we make a new zk watcher for each test.
@@ -451,9 +451,10 @@ public class TestAssignmentManager {
     // Create and startup an executor. This is used by AssignmentManager
     // handling zk callbacks.
     ExecutorService executor = startupMasterExecutor("testSSHWhenSplitRegionInProgress");
-
     // We need a mocked catalog tracker.
     CatalogTracker ct = Mockito.mock(CatalogTracker.class);
+    ZKAssign.deleteAllNodes(this.watcher);
+
     // Create an AM.
     AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
       this.server, this.serverManager);
@@ -501,6 +502,8 @@ public class TestAssignmentManager {
     // We need a mocked catalog tracker.
     CatalogTracker ct = Mockito.mock(CatalogTracker.class);
     LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration());
+    ZKAssign.deleteAllNodes(this.watcher);
+
     // Create an AM.
     AssignmentManager am = new AssignmentManager(this.server,
       this.serverManager, ct, balancer, executor, null);
@@ -521,6 +524,7 @@ public class TestAssignmentManager {
     String node = ZKAssign.getNodeName(this.watcher, REGIONINFO.getEncodedName());
     // create znode in M_ZK_REGION_CLOSING state.
     ZKUtil.createAndWatch(this.watcher, node, data.toByteArray());
+
     try {
       processServerShutdownHandler(ct, am, false);
       // check znode deleted or not.
@@ -541,7 +545,7 @@ public class TestAssignmentManager {
       ZKAssign.deleteAllNodes(this.watcher);
     }
   }
-     
+
   private void processServerShutdownHandler(CatalogTracker ct, AssignmentManager am, boolean splitRegion)
       throws IOException, ServiceException {
     // Make sure our new AM gets callbacks; once registered, can't unregister.
@@ -588,6 +592,7 @@ public class TestAssignmentManager {
     Mockito.when(services.getZooKeeper()).thenReturn(this.watcher);
     ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
       services, deadServers, SERVERNAME_A, false);
+    am.failoverCleanupDone.set(true);
     handler.process();
     // The region in r will have been assigned.  It'll be up in zk as unassigned.
   }
@@ -667,7 +672,7 @@ public class TestAssignmentManager {
     };
     ((ZooKeeperWatcher) zkw).registerListener(am);
     Mockito.doThrow(new InterruptedException()).when(recoverableZk)
-        .getChildren("/hbase/unassigned", zkw);
+        .getChildren("/hbase/unassigned", null);
     am.setWatcher((ZooKeeperWatcher) zkw);
     try {
       am.processDeadServersAndRegionsInTransition(null);
@@ -748,7 +753,7 @@ public class TestAssignmentManager {
       am.shutdown();
     }
   }
-  
+
   /**
    * Mocked load balancer class used in the testcase to make sure that the testcase waits until
    * random assignment is called and the gate variable is set to true.
@@ -774,7 +779,7 @@ public class TestAssignmentManager {
       return super.retainAssignment(regions, servers);
     }
   }
-  
+
   /**
    * Test the scenario when the master is in failover and trying to process a
    * region which is in Opening state on a dead RS. Master should immediately
@@ -791,8 +796,8 @@ public class TestAssignmentManager {
         EventType.RS_ZK_REGION_OPENING, version);
     RegionTransition rt = RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_OPENING,
         REGIONINFO.getRegionName(), SERVERNAME_A, HConstants.EMPTY_BYTE_ARRAY);
-    Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers = 
-      new HashMap<ServerName, List<Pair<HRegionInfo, Result>>>();
+    Map<ServerName, List<HRegionInfo>> deadServers =
+      new HashMap<ServerName, List<HRegionInfo>>();
     deadServers.put(SERVERNAME_A, null);
     version = ZKAssign.getVersion(this.watcher, REGIONINFO);
     am.gate.set(false);
@@ -804,7 +809,7 @@ public class TestAssignmentManager {
     assertTrue("The region should be assigned immediately.", null != am.regionPlans.get(REGIONINFO
         .getEncodedName()));
   }
-  
+
   /**
    * Test verifies whether assignment is skipped for regions of tables in DISABLING state during
    * clean cluster startup. See HBASE-6281.
@@ -969,20 +974,12 @@ public class TestAssignmentManager {
     @Override
     boolean processRegionInTransition(String encodedRegionName,
         HRegionInfo regionInfo,
-        Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers)
+        Map<ServerName, List<HRegionInfo>> deadServers)
         throws KeeperException, IOException {
       this.processRITInvoked = true;
       return super.processRegionInTransition(encodedRegionName, regionInfo,
           deadServers);
     }
-    @Override
-    void processRegionsInTransition(final RegionTransition rt,
-        final HRegionInfo regionInfo,
-        final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers,
-        final int expectedVersion) throws KeeperException {
-      while (this.gate.get()) Threads.sleep(1);
-      super.processRegionsInTransition(rt, regionInfo, deadServers, expectedVersion);
-    }
 
     @Override
     public void assign(HRegionInfo region, boolean setOfflineInZK, boolean forceNewPlan,
@@ -992,16 +989,16 @@ public class TestAssignmentManager {
     }
 
     @Override
-    public void assign(java.util.List<HRegionInfo> regions, java.util.List<ServerName> servers) 
-    {
+    public void assign(List<HRegionInfo> regions)
+        throws IOException, InterruptedException {
       assignInvoked = true;
-    };
-    
+    }
+
     /** reset the watcher */
     void setWatcher(ZooKeeperWatcher watcher) {
       this.watcher = watcher;
     }
-    
+
     /**
      * @return ExecutorService used by this instance.
      */

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java Mon Sep 24 20:33:19 2012
@@ -781,34 +781,14 @@ public class TestMasterFailover {
     region = enabledRegions.remove(0);
     regionsThatShouldBeOnline.add(region);
     master.getAssignmentManager().getRegionStates().updateRegionState(
-      region, RegionState.State.PENDING_OPEN, null);
+      region, RegionState.State.PENDING_OPEN);
     ZKAssign.createNodeOffline(zkw, region, master.getServerName());
     // PENDING_OPEN and disabled
     region = disabledRegions.remove(0);
     regionsThatShouldBeOffline.add(region);
     master.getAssignmentManager().getRegionStates().updateRegionState(
-      region, RegionState.State.PENDING_OPEN, null);
+      region, RegionState.State.PENDING_OPEN);
     ZKAssign.createNodeOffline(zkw, region, master.getServerName());
-    // This test is bad.  It puts up a PENDING_CLOSE but doesn't say what
-    // server we were PENDING_CLOSE against -- i.e. an entry in
-    // AssignmentManager#regions.  W/o a server, we NPE trying to resend close.
-    // In past, there was wonky logic that had us reassign region if no server
-    // at tail of the unassign.  This was removed.  Commenting out for now.
-    // TODO: Remove completely.
-    /*
-    // PENDING_CLOSE and enabled
-    region = enabledRegions.remove(0);
-    LOG.info("Setting PENDING_CLOSE enabled " + region.getEncodedName());
-    regionsThatShouldBeOnline.add(region);
-    master.assignmentManager.regionsInTransition.put(region.getEncodedName(),
-      new RegionState(region, RegionState.State.PENDING_CLOSE, 0));
-    // PENDING_CLOSE and disabled
-    region = disabledRegions.remove(0);
-    LOG.info("Setting PENDING_CLOSE disabled " + region.getEncodedName());
-    regionsThatShouldBeOffline.add(region);
-    master.assignmentManager.regionsInTransition.put(region.getEncodedName(),
-      new RegionState(region, RegionState.State.PENDING_CLOSE, 0));
-      */
 
     // Failover should be completed, now wait for no RIT
     log("Waiting for no more RIT");

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java Mon Sep 24 20:33:19 2012
@@ -131,8 +131,8 @@ public class TestReplication {
 
     LOG.info("Setup second Zk");
     CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
-    utility1.startMiniCluster(2);
-    utility2.startMiniCluster(2);
+    utility1.startMiniCluster(3);
+    utility2.startMiniCluster(3);
 
     HTableDescriptor table = new HTableDescriptor(tableName);
     HColumnDescriptor fam = new HColumnDescriptor(famName);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java Mon Sep 24 20:33:19 2012
@@ -52,7 +52,6 @@ import org.apache.hadoop.hbase.HRegionLo
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.LargeTests;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.RegionTransition;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.catalog.MetaEditor;
 import org.apache.hadoop.hbase.client.AdminProtocol;
@@ -65,7 +64,6 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.executor.EventHandler.EventType;
 import org.apache.hadoop.hbase.io.hfile.TestHFile;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.HMaster;
@@ -78,6 +76,7 @@ import org.apache.hadoop.hbase.util.HBas
 import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
 import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKTable;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
@@ -138,6 +137,7 @@ public class TestHBaseFsck {
     // point to a different region server
     HTable meta = new HTable(conf, HTableDescriptor.META_TABLEDESC.getName());
     ResultScanner scanner = meta.getScanner(new Scan());
+    HRegionInfo hri = null;
 
     resforloop:
     for (Result res : scanner) {
@@ -158,6 +158,7 @@ public class TestHBaseFsck {
           put.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
             Bytes.toBytes(sn.getStartcode()));
           meta.put(put);
+          hri = HRegionInfo.getHRegionInfo(res);
           break resforloop;
         }
       }
@@ -167,10 +168,8 @@ public class TestHBaseFsck {
     assertErrors(doFsck(conf, true), new ERROR_CODE[]{
         ERROR_CODE.SERVER_DOES_NOT_MATCH_META});
 
-    // fixing assignments require opening regions is not synchronous.  To make
-    // the test pass consistently so for now we bake in some sleep to let it
-    // finish.  1s seems sufficient.
-    Thread.sleep(1000);
+    TEST_UTIL.getHBaseCluster().getMaster()
+      .getAssignmentManager().waitForAssignment(hri);
 
     // Should be fixed now
     assertNoErrors(doFsck(conf, false));
@@ -318,18 +317,6 @@ public class TestHBaseFsck {
     }
     tbl.put(puts);
     tbl.flushCommits();
-    long endTime = System.currentTimeMillis() + 60000;
-    while (!TEST_UTIL.getHBaseAdmin().isTableEnabled(tablename)) {
-      try {
-        if (System.currentTimeMillis() > endTime) {
-          fail("Failed to enable table " + tablename + " after waiting for 60 sec");
-        }
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-        fail("Interrupted when waiting table " + tablename + " to be enabled");
-      }
-    }
     return tbl;
   }
 
@@ -1117,15 +1104,19 @@ public class TestHBaseFsck {
 
       // Region of disable table was opened on RS
       TEST_UTIL.getHBaseAdmin().disableTable(table);
+      // Mess up ZKTable state, otherwise, can't open the region
+      ZKTable zkTable = cluster.getMaster().getAssignmentManager().getZKTable();
+      zkTable.setEnabledTable(table);
       HRegionInfo region = disabledRegions.remove(0);
       ZKAssign.createNodeOffline(zkw, region, serverName);
       ProtobufUtil.openRegion(hrs, region);
 
       int iTimes = 0;
+      byte[] regionName = region.getRegionName();
       while (true) {
-        byte[] data = ZKAssign.getData(zkw, region.getEncodedName());
-        RegionTransition rt = data == null ? null : RegionTransition.parseFrom(data);
-        if (rt == null || rt.getEventType() == EventType.RS_ZK_REGION_OPENED) {
+        if (cluster.getServerWith(regionName) != -1) {
+          // Now, region is deployed, reset the table state back
+          zkTable.setDisabledTable(table);
           break;
         }
         Thread.sleep(100);