You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2013/10/02 00:02:02 UTC

svn commit: r1528227 [1/2] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/ hbase-it/src/test/java/org/apache/hadoop/hbase/ hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/ hbase-it/src/test/java/org/apache/hadoop/hba...

Author: jxiang
Date: Tue Oct  1 22:02:01 2013
New Revision: 1528227

URL: http://svn.apache.org/r1528227
Log:
HBASE-9514 Prevent region from assigning before log splitting is done

Added:
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/DumpClusterStatusAction.java   (with props)
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/AddColumnAction.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/CompactRandomRegionOfTableAction.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/FlushRandomRegionOfTableAction.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MergeRandomAdjacentRegionsOfTableAction.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRandomRegionOfTableAction.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRegionsOfTableAction.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/SlowDeterministicMonkeyFactory.java
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java Tue Oct  1 22:02:01 2013
@@ -272,11 +272,39 @@ public class ClusterStatus extends Versi
     return balancerOn != null && balancerOn;
   }
 
-  public Boolean getBalancerOn(){
+  public Boolean getBalancerOn() {
     return balancerOn;
   }
 
-   /**
+  public String toString() {
+    StringBuilder sb = new StringBuilder(1024);
+    sb.append("Master: " + master);
+    sb.append("\nNumber of backup masters: " + backupMasters.size());
+    for (ServerName serverName: backupMasters) {
+      sb.append("\n  " + serverName);
+    }
+
+    sb.append("\nNumber of live region servers: " + liveServers.size());
+    for (ServerName serverName: liveServers.keySet()) {
+      sb.append("\n  " + serverName.getServerName());
+    }
+
+    sb.append("\nNumber of dead region servers: " + deadServers.size());
+    for (ServerName serverName: deadServers) {
+      sb.append("\n  " + serverName);
+    }
+
+    sb.append("\nAverage load: " + getAverageLoad());
+    sb.append("\nNumber of requests: " + getRequestsCount());
+    sb.append("\nNumber of regions: " + getRegionsCount());
+    sb.append("\nNumber of regions in transition: " + intransition.size());
+    for (RegionState state: intransition.values()) {
+      sb.append("\n  " + state.toDescriptiveString());
+    }
+    return sb.toString();
+  }
+
+  /**
     * Convert a ClusterStatus to a protobuf ClusterStatus
     *
     * @return the protobuf ClusterStatus

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java Tue Oct  1 22:02:01 2013
@@ -95,8 +95,14 @@ public abstract class IntegrationTestBas
 
   @After
   public void cleanUpMonkey() throws Exception {
-    monkey.stop("Ending test");
-    monkey.waitForStop();
+    cleanUpMonkey("Ending test");
+  }
+
+  protected void cleanUpMonkey(String why) throws Exception {
+    if (monkey != null && !monkey.isStopped()) {
+      monkey.stop(why);
+      monkey.waitForStop();
+    }
   }
 
   protected IntegrationTestingUtility getTestingUtil(Configuration conf) {

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/AddColumnAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/AddColumnAction.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/AddColumnAction.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/AddColumnAction.java Tue Oct  1 22:02:01 2013
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hbase.chaos.actions;
 
 import java.io.IOException;
-import java.util.Random;
 
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.hbase.HColumnDescriptor;

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/CompactRandomRegionOfTableAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/CompactRandomRegionOfTableAction.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/CompactRandomRegionOfTableAction.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/CompactRandomRegionOfTableAction.java Tue Oct  1 22:02:01 2013
@@ -53,13 +53,18 @@ public class CompactRandomRegionOfTableA
   public void perform() throws Exception {
     HBaseTestingUtility util = context.getHaseIntegrationTestingUtility();
     HBaseAdmin admin = util.getHBaseAdmin();
-    List<HRegionInfo> regions = admin.getTableRegions(tableNameBytes);
     boolean major = RandomUtils.nextInt(100) < majorRatio;
 
     LOG.info("Performing action: Compact random region of table "
       + tableName + ", major=" + major);
+    List<HRegionInfo> regions = admin.getTableRegions(tableNameBytes);
+    if (regions == null || regions.isEmpty()) {
+      LOG.info("Table " + tableName + " doesn't have regions to compact");
+      return;
+    }
+
     HRegionInfo region = PolicyBasedChaosMonkey.selectRandomItem(
-        regions.toArray(new HRegionInfo[regions.size()]));
+      regions.toArray(new HRegionInfo[regions.size()]));
 
     if (major) {
       LOG.debug("Major compacting region " + region.getRegionNameAsString());

Added: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/DumpClusterStatusAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/DumpClusterStatusAction.java?rev=1528227&view=auto
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/DumpClusterStatusAction.java (added)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/DumpClusterStatusAction.java Tue Oct  1 22:02:01 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos.actions;
+
+import java.io.IOException;
+
+/**
+ * Action to dump the cluster status.
+ */
+public class DumpClusterStatusAction extends Action {
+
+  @Override
+  public void init(ActionContext context) throws IOException {
+    super.init(context);
+  }
+
+  @Override
+  public void perform() throws Exception {
+    LOG.debug("Performing action: Dump cluster status");
+    LOG.info("Cluster status\n" + cluster.getClusterStatus());
+  }
+}

Propchange: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/DumpClusterStatusAction.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/FlushRandomRegionOfTableAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/FlushRandomRegionOfTableAction.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/FlushRandomRegionOfTableAction.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/FlushRandomRegionOfTableAction.java Tue Oct  1 22:02:01 2013
@@ -51,8 +51,13 @@ public class FlushRandomRegionOfTableAct
 
     LOG.info("Performing action: Flush random region of table " + tableName);
     List<HRegionInfo> regions = admin.getTableRegions(tableNameBytes);
+    if (regions == null || regions.isEmpty()) {
+      LOG.info("Table " + tableName + " doesn't have regions to flush");
+      return;
+    }
+
     HRegionInfo region = PolicyBasedChaosMonkey.selectRandomItem(
-        regions.toArray(new HRegionInfo[regions.size()]));
+      regions.toArray(new HRegionInfo[regions.size()]));
     LOG.debug("Flushing region " + region.getRegionNameAsString());
     admin.flush(region.getRegionName());
     if (sleepTime > 0) {

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MergeRandomAdjacentRegionsOfTableAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MergeRandomAdjacentRegionsOfTableAction.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MergeRandomAdjacentRegionsOfTableAction.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MergeRandomAdjacentRegionsOfTableAction.java Tue Oct  1 22:02:01 2013
@@ -51,8 +51,8 @@ public class MergeRandomAdjacentRegionsO
 
     LOG.info("Performing action: Merge random adjacent regions of table " + tableName);
     List<HRegionInfo> regions = admin.getTableRegions(tableNameBytes);
-    if (regions.size() < 2) {
-      LOG.info("Table " + tableName + " doesn't have enough region to merge");
+    if (regions == null || regions.size() < 2) {
+      LOG.info("Table " + tableName + " doesn't have enough regions to merge");
       return;
     }
 

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRandomRegionOfTableAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRandomRegionOfTableAction.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRandomRegionOfTableAction.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRandomRegionOfTableAction.java Tue Oct  1 22:02:01 2013
@@ -55,8 +55,13 @@ public class MoveRandomRegionOfTableActi
 
     LOG.info("Performing action: Move random region of table " + tableName);
     List<HRegionInfo> regions = admin.getTableRegions(tableNameBytes);
+    if (regions == null || regions.isEmpty()) {
+      LOG.info("Table " + tableName + " doesn't have regions to move");
+      return;
+    }
+
     HRegionInfo region = PolicyBasedChaosMonkey.selectRandomItem(
-        regions.toArray(new HRegionInfo[regions.size()]));
+      regions.toArray(new HRegionInfo[regions.size()]));
     LOG.debug("Unassigning region " + region.getRegionNameAsString());
     admin.unassign(region.getRegionName(), false);
     if (sleepTime > 0) {

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRegionsOfTableAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRegionsOfTableAction.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRegionsOfTableAction.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MoveRegionsOfTableAction.java Tue Oct  1 22:02:01 2013
@@ -52,12 +52,16 @@ public class MoveRegionsOfTableAction ex
     }
 
     HBaseAdmin admin = this.context.getHaseIntegrationTestingUtility().getHBaseAdmin();
-
-    List<HRegionInfo> regions = admin.getTableRegions(tableNameBytes);
     Collection<ServerName> serversList = admin.getClusterStatus().getServers();
     ServerName[] servers = serversList.toArray(new ServerName[serversList.size()]);
 
     LOG.info("Performing action: Move regions of table " + tableName);
+    List<HRegionInfo> regions = admin.getTableRegions(tableNameBytes);
+    if (regions == null || regions.isEmpty()) {
+      LOG.info("Table " + tableName + " doesn't have regions to move");
+      return;
+    }
+
     for (HRegionInfo regionInfo:regions) {
       try {
         String destServerName =

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/SlowDeterministicMonkeyFactory.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/SlowDeterministicMonkeyFactory.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/SlowDeterministicMonkeyFactory.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/SlowDeterministicMonkeyFactory.java Tue Oct  1 22:02:01 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.chaos.act
 import org.apache.hadoop.hbase.chaos.actions.ChangeVersionsAction;
 import org.apache.hadoop.hbase.chaos.actions.CompactRandomRegionOfTableAction;
 import org.apache.hadoop.hbase.chaos.actions.CompactTableAction;
+import org.apache.hadoop.hbase.chaos.actions.DumpClusterStatusAction;
 import org.apache.hadoop.hbase.chaos.actions.FlushRandomRegionOfTableAction;
 import org.apache.hadoop.hbase.chaos.actions.FlushTableAction;
 import org.apache.hadoop.hbase.chaos.actions.MergeRandomAdjacentRegionsOfTableAction;
@@ -81,11 +82,17 @@ public class SlowDeterministicMonkeyFact
         new RestartRsHoldingMetaAction(35000),
     };
 
+    // Action to log more info for debugging
+    Action[] actions4 = new Action[] {
+        new DumpClusterStatusAction()
+    };
+
     return new PolicyBasedChaosMonkey(util,
         new PeriodicRandomActionPolicy(60 * 1000, actions1),
         new PeriodicRandomActionPolicy(90 * 1000, actions2),
         new CompositeSequentialPolicy(
             new DoActionsOncePolicy(150 * 1000, actions3),
-            new PeriodicRandomActionPolicy(150 * 1000, actions3)));
+            new PeriodicRandomActionPolicy(150 * 1000, actions3)),
+        new PeriodicRandomActionPolicy(90 * 1000, actions4));
   }
 }

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java Tue Oct  1 22:02:01 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.Integrati
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
 import org.apache.hadoop.hbase.IntegrationTests;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.chaos.monkies.CalmChaosMonkey;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
@@ -59,6 +60,7 @@ import org.apache.hadoop.hbase.mapreduce
 import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HBaseFsck;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -700,6 +702,8 @@ public class IntegrationTestBigLinkedLis
 
     private static final Log LOG = LogFactory.getLog(Loop.class);
 
+    IntegrationTestBigLinkedList it;
+
     protected void runGenerator(int numMappers, long numNodes,
         String outputDir, Integer width, Integer wrapMuplitplier) throws Exception {
       Path outputPath = new Path(outputDir);
@@ -714,7 +718,8 @@ public class IntegrationTestBigLinkedLis
       }
     }
 
-    protected void runVerify(String outputDir, int numReducers, long expectedNumNodes) throws Exception {
+    protected boolean runVerify(String outputDir,
+        int numReducers, long expectedNumNodes) throws Exception {
       Path outputPath = new Path(outputDir);
       UUID uuid = UUID.randomUUID(); //create a random UUID.
       Path iterationOutput = new Path(outputPath, uuid.toString());
@@ -726,12 +731,20 @@ public class IntegrationTestBigLinkedLis
         throw new RuntimeException("Verify.run failed with return code: " + retCode);
       }
 
-      boolean verifySuccess = verify.verify(expectedNumNodes);
-      if (!verifySuccess) {
-        throw new RuntimeException("Verify.verify failed");
+      if (!verify.verify(expectedNumNodes)) {
+        try {
+          HBaseFsck fsck = new HBaseFsck(getConf());
+          HBaseFsck.setDisplayFullReport();
+          fsck.connect();
+          fsck.onlineHbck();
+        } catch (Throwable t) {
+          LOG.error("Failed to run hbck", t);
+        }
+        return false;
       }
 
       LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
+      return true;
     }
 
     @Override
@@ -761,7 +774,17 @@ public class IntegrationTestBigLinkedLis
         runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
         expectedNumNodes += numMappers * numNodes;
 
-        runVerify(outputDir, numReducers, expectedNumNodes);
+        if (!runVerify(outputDir, numReducers, expectedNumNodes)) {
+          if (it.monkey != null && !(it.monkey instanceof CalmChaosMonkey)) {
+            LOG.info("Verify.verify failed, let's stop CM and verify again");
+            it.cleanUpMonkey("Stop monkey before verify again after verify failed");
+            if (!runVerify(outputDir, numReducers, expectedNumNodes)) {
+              LOG.info("Verify.verify failed even without CM, verify one more");
+              runVerify(outputDir, numReducers, expectedNumNodes);
+            }
+          }
+          throw new RuntimeException("Verify.verify failed");
+        }
       }
 
       return 0;
@@ -1051,7 +1074,9 @@ public class IntegrationTestBigLinkedLis
     } else if (toRun.equals("Verify")) {
       tool = new Verify();
     } else if (toRun.equals("Loop")) {
-      tool = new Loop();
+      Loop loop = new Loop();
+      loop.it = this;
+      tool = loop;
     } else if (toRun.equals("Walker")) {
       tool = new Walker();
     } else if (toRun.equals("Print")) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Tue Oct  1 22:02:01 2013
@@ -269,8 +269,9 @@ public class AssignmentManager extends Z
       this.timerUpdater = null;
     }
     this.zkTable = new ZKTable(this.watcher);
-    this.maximumAttempts =
-      this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
+    // This is the max attempts, not retries, so it should be at least 1.
+    this.maximumAttempts = Math.max(1,
+      this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
         "hbase.meta.assignment.retry.sleeptime", 1000l);
     this.balancer = balancer;
@@ -389,6 +390,14 @@ public class AssignmentManager extends Z
   }
 
   /**
+   * To avoid racing with AM, external entities may need to lock a region,
+   * for example, when SSH checks what regions to skip re-assigning.
+   */
+  public Lock acquireRegionLock(final String encodedName) {
+    return locker.acquireLock(encodedName);
+  }
+
+  /**
    * Now, failover cleanup is completed. Notify server manager to
    * process queued up dead servers processing, if any.
    */
@@ -586,10 +595,7 @@ public class AssignmentManager extends Z
         } else {
           // Insert into RIT & resend the query to the region server: may be the previous master
           // died before sending the query the first time.
-          regionStates.updateRegionState(rt, RegionState.State.CLOSING);
-          final RegionState rs = regionStates.getRegionState(regionInfo);
-          final ClosedRegionHandler closedRegionHandler =
-            new ClosedRegionHandler(server, this, regionInfo);
+          final RegionState rs = regionStates.updateRegionState(rt, State.CLOSING);
           this.executorService.submit(
               new EventHandler(server, EventType.M_MASTER_RECOVERY) {
                 @Override
@@ -598,7 +604,7 @@ public class AssignmentManager extends Z
                   try {
                     unassign(regionInfo, rs, expectedVersion, null, true, null);
                     if (regionStates.isRegionOffline(regionInfo)) {
-                      closedRegionHandler.process();
+                      assign(regionInfo, true);
                     }
                   } finally {
                     lock.unlock();
@@ -611,18 +617,18 @@ public class AssignmentManager extends Z
       case RS_ZK_REGION_CLOSED:
       case RS_ZK_REGION_FAILED_OPEN:
         // Region is closed, insert into RIT and handle it
-        addToRITandCallClose(regionInfo, RegionState.State.CLOSED, rt);
+        addToRITandInvokeAssign(regionInfo, State.CLOSED, rt);
         break;
 
       case M_ZK_REGION_OFFLINE:
         // If zk node of the region was updated by a live server skip this
         // region and just add it into RIT.
         if (!serverManager.isServerOnline(sn)) {
-          // Region is offline, insert into RIT and handle it like a closed
-          addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
+          // Region is offline, insert into RIT and invoke assign
+          addToRITandInvokeAssign(regionInfo, State.OFFLINE, rt);
         } else {
           // Insert in RIT and resend to the regionserver
-          regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
+          regionStates.updateRegionState(rt, State.PENDING_OPEN);
           final RegionState rs = regionStates.getRegionState(regionInfo);
           this.executorService.submit(
               new EventHandler(server, EventType.M_MASTER_RECOVERY) {
@@ -645,7 +651,7 @@ public class AssignmentManager extends Z
         if (!serverManager.isServerOnline(sn)) {
           forceOffline(regionInfo, rt);
         } else {
-          regionStates.updateRegionState(rt, RegionState.State.OPENING);
+          regionStates.updateRegionState(rt, State.OPENING);
         }
         break;
 
@@ -656,7 +662,7 @@ public class AssignmentManager extends Z
           // Region is opened, insert into RIT and handle it
           // This could be done asynchronously, we would need then to acquire the lock in the
           //  handler.
-          regionStates.updateRegionState(rt, RegionState.State.OPEN);
+          regionStates.updateRegionState(rt, State.OPEN);
           new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
         }
         break;
@@ -672,7 +678,7 @@ public class AssignmentManager extends Z
           // user region rebuilding since we may consider the split is completed.
           // Put it in SPLITTING state to avoid complications.
           regionStates.regionOnline(regionInfo, sn);
-          regionStates.updateRegionState(rt, RegionState.State.SPLITTING);
+          regionStates.updateRegionState(rt, State.SPLITTING);
           LOG.info("Processed " + prettyPrintedRegionName + " in state : " + et);
         }
         break;
@@ -687,7 +693,7 @@ public class AssignmentManager extends Z
           // user region rebuilding since we may consider the split is completed.
           // Put it in SPLITTING state to avoid complications.
           regionStates.regionOnline(regionInfo, sn);
-          regionStates.updateRegionState(rt, RegionState.State.SPLITTING);
+          regionStates.updateRegionState(rt, State.SPLITTING);
           LOG.info("Processed " + prettyPrintedRegionName + " in state : " + et);
           // Move the region to splitting state. The regionserver is supposed to update the znode
           // multiple times so if it's still up we will receive an update soon.
@@ -741,24 +747,24 @@ public class AssignmentManager extends Z
   private void forceOffline(final HRegionInfo hri, final RegionTransition oldRt)
       throws KeeperException {
     // If was on dead server, its closed now.  Force to OFFLINE and then
-    // handle it like a close; this will get it reassigned if appropriate
+    // invoke assign; this will get it reassigned if appropriate
     LOG.debug("RIT " + hri.getEncodedName() + " in state=" + oldRt.getEventType() +
       " was on deadserver; forcing offline");
     ZKAssign.createOrForceNodeOffline(this.watcher, hri, oldRt.getServerName());
-    addToRITandCallClose(hri, RegionState.State.OFFLINE, oldRt);
+    addToRITandInvokeAssign(hri, State.OFFLINE, oldRt);
   }
 
   /**
-   * Add to the in-memory copy of regions in transition and then call close
-   * handler on passed region <code>hri</code>
+   * Add to the in-memory copy of regions in transition and then invoke
+   * assign on passed region <code>hri</code>
    * @param hri
    * @param state
    * @param oldData
    */
-  private void addToRITandCallClose(final HRegionInfo hri,
-      final RegionState.State state, final RegionTransition oldData) {
+  private void addToRITandInvokeAssign(final HRegionInfo hri,
+      final State state, final RegionTransition oldData) {
     regionStates.updateRegionState(oldData, state);
-    new ClosedRegionHandler(this.server, this, hri).process();
+    invokeAssign(hri);
   }
 
   /**
@@ -800,7 +806,7 @@ public class AssignmentManager extends Z
     if (!serverManager.isServerOnline(sn)
       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
       LOG.warn("Attempted to handle region transition for server but " +
-        "server is not online: " + prettyPrintedRegionName);
+        "it is not online: " + prettyPrintedRegionName + ", " + rt);
       return;
     }
 
@@ -839,119 +845,114 @@ public class AssignmentManager extends Z
       }
       regionState = latestState;
       switch (rt.getEventType()) {
-        case RS_ZK_REGION_SPLITTING:
-          if (!isInStateForSplitting(regionState)) break;
-          regionStates.updateRegionState(rt, RegionState.State.SPLITTING);
-          break;
+      case RS_ZK_REGION_SPLITTING:
+        if (!isInStateForSplitting(regionState)) break;
+        regionStates.updateRegionState(rt, State.SPLITTING);
+        break;
 
-        case RS_ZK_REGION_SPLIT:
-          // RegionState must be null, or SPLITTING or PENDING_CLOSE.
-          if (!isInStateForSplitting(regionState)) break;
-          // If null, add SPLITTING state before going to SPLIT
+      case RS_ZK_REGION_SPLIT:
+        // RegionState must be null, or SPLITTING or PENDING_CLOSE.
+        if (!isInStateForSplitting(regionState)) break;
+        // If null, add SPLITTING state before going to SPLIT
+        if (regionState == null) {
+          regionState = regionStates.updateRegionState(rt, State.SPLITTING);
+
+          String message = "Received SPLIT for region " + prettyPrintedRegionName +
+            " from server " + sn;
+          // If still null, it means we cannot find it and it was already processed
           if (regionState == null) {
-            regionState = regionStates.updateRegionState(rt,
-              RegionState.State.SPLITTING);
-
-            String message = "Received SPLIT for region " + prettyPrintedRegionName +
-              " from server " + sn;
-            // If still null, it means we cannot find it and it was already processed
-            if (regionState == null) {
-              LOG.warn(message + " but it doesn't exist anymore," +
-                  " probably already processed its split");
-              break;
-            }
-            LOG.info(message +
-                " but region was not first in SPLITTING state; continuing");
-          }
-          // Check it has daughters.
-          byte [] payload = rt.getPayload();
-          List<HRegionInfo> daughters;
-          try {
-            daughters = HRegionInfo.parseDelimitedFrom(payload, 0, payload.length);
-          } catch (IOException e) {
-            LOG.error("Dropped split! Failed reading split payload for " +
-              prettyPrintedRegionName);
+            LOG.warn(message + " but it doesn't exist anymore," +
+                " probably already processed its split");
             break;
           }
-          assert daughters.size() == 2;
-          // Assert that we can get a serverinfo for this server.
-          if (!this.serverManager.isServerOnline(sn)) {
-            LOG.error("Dropped split! ServerName=" + sn + " unknown.");
-            break;
-          }
-          // Run handler to do the rest of the SPLIT handling.
-          new SplitRegionHandler(server, this, regionState.getRegion(), sn, daughters).process();
-          updateSplitHandlerTracker();
+          LOG.info(message +
+              " but region was not first in SPLITTING state; continuing");
+        }
+        // Check it has daughters.
+        byte [] payload = rt.getPayload();
+        List<HRegionInfo> daughters;
+        try {
+          daughters = HRegionInfo.parseDelimitedFrom(payload, 0, payload.length);
+        } catch (IOException e) {
+          LOG.error("Dropped split! Failed reading split payload for " +
+            prettyPrintedRegionName);
           break;
-
-        case RS_ZK_REGION_MERGING:
-          // Merged region is a new region, we can't find it in the region states now.
-          // However, the two merging regions are not new. They should be in state for merging.
-          handleRegionMerging(rt, prettyPrintedRegionName, sn);
+        }
+        assert daughters.size() == 2;
+        // Assert that we can get a serverinfo for this server.
+        if (!this.serverManager.isServerOnline(sn)) {
+          LOG.error("Dropped split! ServerName=" + sn + " unknown.");
           break;
+        }
+        // Run handler to do the rest of the SPLIT handling.
+        new SplitRegionHandler(server, this, regionState.getRegion(), sn, daughters).process();
+        updateSplitHandlerTracker();
+        break;
 
-        case RS_ZK_REGION_MERGED:
-          // Assert that we can get a serverinfo for this server.
-          if (!this.serverManager.isServerOnline(sn)) {
-            LOG.error("Dropped merge! ServerName=" + sn + " unknown.");
-            break;
-          }
-          // Get merged and merging regions.
-          byte[] payloadOfMerge = rt.getPayload();
-          List<HRegionInfo> mergeRegions;
-          try {
-            mergeRegions = HRegionInfo.parseDelimitedFrom(payloadOfMerge, 0,
-                payloadOfMerge.length);
-          } catch (IOException e) {
-            LOG.error("Dropped merge! Failed reading merge payload for " +
-              prettyPrintedRegionName);
-            break;
-          }
-          assert mergeRegions.size() == 3;
-          HRegionInfo merge_a = mergeRegions.get(1);
-          HRegionInfo merge_b = mergeRegions.get(2);
-          if (!isInStateForMerging(sn, merge_a, merge_b)) {
-            // Move on. Merge already happened (passed PONR), no point to stop now
-            LOG.warn("Got merge event, but not in state good for MERGED; rs_a="
-              + merge_a + ", rs_b=" + merge_b);
-          }
-          // Run handler to do the rest of the MERGED handling.
-          new MergedRegionHandler(server, this, sn, mergeRegions).process();
-          break;
+      case RS_ZK_REGION_MERGING:
+        // Merged region is a new region, we can't find it in the region states now.
+        // However, the two merging regions are not new. They should be in state for merging.
+        handleRegionMerging(rt, prettyPrintedRegionName, sn);
+        break;
 
-        case M_ZK_REGION_CLOSING:
-          // Should see CLOSING after we have asked it to CLOSE or additional
-          // times after already being in state of CLOSING
-          if (regionState == null
-              || !regionState.isPendingCloseOrClosingOnServer(sn)) {
-            LOG.warn("Received CLOSING for " + prettyPrintedRegionName
-              + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
-              + regionStates.getRegionState(encodedName));
-            return;
-          }
-          // Transition to CLOSING (or update stamp if already CLOSING)
-          regionStates.updateRegionState(rt, RegionState.State.CLOSING);
+      case RS_ZK_REGION_MERGED:
+        // Assert that we can get a serverinfo for this server.
+        if (!this.serverManager.isServerOnline(sn)) {
+          LOG.error("Dropped merge! ServerName=" + sn + " unknown.");
           break;
-
-        case RS_ZK_REGION_CLOSED:
-          // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
-          if (regionState == null
-              || !regionState.isPendingCloseOrClosingOnServer(sn)) {
-            LOG.warn("Received CLOSED for " + prettyPrintedRegionName
-              + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
-              + regionStates.getRegionState(encodedName));
-            return;
-          }
-          // Handle CLOSED by assigning elsewhere or stopping if a disable
-          // If we got here all is good.  Need to update RegionState -- else
-          // what follows will fail because not in expected state.
-          regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED);
-          if (regionState != null) {
-            removeClosedRegion(regionState.getRegion());
-            new ClosedRegionHandler(server, this, regionState.getRegion()).process();
-            updateClosedRegionHandlerTracker(regionState.getRegion());
-          }
+        }
+        // Get merged and merging regions.
+        byte[] payloadOfMerge = rt.getPayload();
+        List<HRegionInfo> mergeRegions;
+        try {
+          mergeRegions = HRegionInfo.parseDelimitedFrom(payloadOfMerge, 0,
+              payloadOfMerge.length);
+        } catch (IOException e) {
+          LOG.error("Dropped merge! Failed reading merge payload for " +
+            prettyPrintedRegionName);
           break;
+        }
+        assert mergeRegions.size() == 3;
+        HRegionInfo merge_a = mergeRegions.get(1);
+        HRegionInfo merge_b = mergeRegions.get(2);
+        if (!isInStateForMerging(sn, merge_a, merge_b)) {
+          // Move on. Merge already happened (passed PONR), no point to stop now
+          LOG.warn("Got merge event, but not in state good for MERGED; rs_a="
+            + merge_a + ", rs_b=" + merge_b);
+        }
+        // Run handler to do the rest of the MERGED handling.
+        new MergedRegionHandler(server, this, sn, mergeRegions).process();
+        break;
+
+      case M_ZK_REGION_CLOSING:
+        // Should see CLOSING after we have asked it to CLOSE or additional
+        // times after already being in state of CLOSING
+        if (regionState == null
+            || !regionState.isPendingCloseOrClosingOnServer(sn)) {
+          LOG.warn("Received CLOSING for " + prettyPrintedRegionName
+            + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
+            + regionStates.getRegionState(encodedName));
+          return;
+        }
+        // Transition to CLOSING (or update stamp if already CLOSING)
+        regionStates.updateRegionState(rt, State.CLOSING);
+        break;
+
+      case RS_ZK_REGION_CLOSED:
+        // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
+        if (regionState == null
+            || !regionState.isPendingCloseOrClosingOnServer(sn)) {
+          LOG.warn("Received CLOSED for " + prettyPrintedRegionName
+            + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
+            + regionStates.getRegionState(encodedName));
+          return;
+        }
+        // Handle CLOSED by assigning elsewhere or stopping if a disable
+        // If we got here all is good.  Need to update RegionState -- else
+        // what follows will fail because not in expected state.
+        new ClosedRegionHandler(server, this, regionState.getRegion()).process();
+        updateClosedRegionHandlerTracker(regionState.getRegion());
+        break;
 
         case RS_ZK_REGION_FAILED_OPEN:
           if (regionState == null
@@ -970,13 +971,13 @@ public class AssignmentManager extends Z
             failedOpenTracker.put(encodedName, failedOpenCount);
           }
           if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
-            regionStates.updateRegionState(rt, RegionState.State.FAILED_OPEN);
+            regionStates.updateRegionState(rt, State.FAILED_OPEN);
             // remove the tracking info to save memory, also reset
             // the count for next open initiative
             failedOpenTracker.remove(encodedName);
           } else {
             // Handle this the same as if it were opened and then closed.
-            regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED);
+            regionState = regionStates.updateRegionState(rt, State.CLOSED);
             if (regionState != null) {
               // When there are more than one region server a new RS is selected as the
               // destination and the same is updated in the regionplan. (HBASE-5546)
@@ -1001,7 +1002,7 @@ public class AssignmentManager extends Z
             return;
           }
           // Transition to OPENING (or update stamp if already OPENING)
-          regionStates.updateRegionState(rt, RegionState.State.OPENING);
+          regionStates.updateRegionState(rt, State.OPENING);
           break;
 
         case RS_ZK_REGION_OPENED:
@@ -1019,7 +1020,7 @@ public class AssignmentManager extends Z
             return;
           }
           // Handle OPENED by removing from transition and deleted zk node
-          regionState = regionStates.updateRegionState(rt, RegionState.State.OPEN);
+          regionState = regionStates.updateRegionState(rt, State.OPEN);
           if (regionState != null) {
             failedOpenTracker.remove(encodedName); // reset the count, if any
             new OpenedRegionHandler(
@@ -1141,8 +1142,7 @@ public class AssignmentManager extends Z
   private boolean convertPendingCloseToSplitting(final RegionState rs) {
     if (!rs.isPendingClose()) return false;
     LOG.debug("Converting PENDING_CLOSE to SPLITTING; rs=" + rs);
-    regionStates.updateRegionState(
-      rs.getRegion(), RegionState.State.SPLITTING);
+    regionStates.updateRegionState(rs.getRegion(), State.SPLITTING);
     // Clean up existing state.  Clear from region plans seems all we
     // have to do here by way of clean up of PENDING_CLOSE.
     clearRegionPlan(rs.getRegion());
@@ -1400,10 +1400,6 @@ public class AssignmentManager extends Z
    * @param sn
    */
   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
-    if (!serverManager.isServerOnline(sn)) {
-      LOG.warn("A region was opened on a dead server, ServerName=" +
-        sn + ", region=" + regionInfo.getEncodedName());
-    }
     numRegionsOpened.incrementAndGet();
     regionStates.regionOnline(regionInfo, sn);
 
@@ -1556,7 +1552,7 @@ public class AssignmentManager extends Z
    */
   public void assign(HRegionInfo region,
       boolean setOfflineInZK, boolean forceNewPlan) {
-    if (!setOfflineInZK && isDisabledorDisablingRegionInRIT(region)) {
+    if (isDisabledorDisablingRegionInRIT(region)) {
       return;
     }
     if (this.serverManager.isClusterShutdown()) {
@@ -1569,6 +1565,12 @@ public class AssignmentManager extends Z
     try {
       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
       if (state != null) {
+        if (regionStates.wasRegionOnDeadServer(encodedName)) {
+          LOG.info("Skip assigning " + region.getRegionNameAsString()
+            + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
+            + " is dead but not processed yet");
+          return;
+        }
         assign(state, setOfflineInZK, forceNewPlan);
       }
     } finally {
@@ -1605,19 +1607,34 @@ public class AssignmentManager extends Z
         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
         List<RegionState> states = new ArrayList<RegionState>(regions.size());
         for (HRegionInfo region : regions) {
-          String encodedRegionName = region.getEncodedName();
-          RegionState state = forceRegionStateToOffline(region, true);
-          if (state != null && asyncSetOfflineInZooKeeper(state, cb, destination)) {
-            RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
-            plans.put(encodedRegionName, plan);
-            states.add(state);
-          } else {
-            LOG.warn("failed to force region state to offline or "
-              + "failed to set it offline in ZK, will reassign later: " + region);
-            failedToOpenRegions.add(region); // assign individually later
-            Lock lock = locks.remove(encodedRegionName);
-            lock.unlock();
+          String encodedName = region.getEncodedName();
+          if (!isDisabledorDisablingRegionInRIT(region)) {
+            RegionState state = forceRegionStateToOffline(region, false);
+            boolean onDeadServer = false;
+            if (state != null) {
+              if (regionStates.wasRegionOnDeadServer(encodedName)) {
+                LOG.info("Skip assigning " + region.getRegionNameAsString()
+                  + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
+                  + " is dead but not processed yet");
+                onDeadServer = true;
+              } else if (asyncSetOfflineInZooKeeper(state, cb, destination)) {
+                RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
+                plans.put(encodedName, plan);
+                states.add(state);
+                continue;
+              }
+            }
+            // Reassign if the region wasn't on a dead server
+            if (!onDeadServer) {
+              LOG.info("failed to force region state to offline or "
+                + "failed to set it offline in ZK, will reassign later: " + region);
+              failedToOpenRegions.add(region); // assign individually later
+            }
           }
+          // Release the lock, this region is excluded from bulk assign because
+          // we can't update its state, or set its znode to offline.
+          Lock lock = locks.remove(encodedName);
+          lock.unlock();
         }
 
         // Wait until all unassigned nodes have been put up and watchers set.
@@ -1653,8 +1670,8 @@ public class AssignmentManager extends Z
             Lock lock = locks.remove(encodedRegionName);
             lock.unlock();
           } else {
-            regionStates.updateRegionState(region,
-              RegionState.State.PENDING_OPEN, destination);
+            regionStates.updateRegionState(
+              region, State.PENDING_OPEN, destination);
             List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
             if (this.shouldAssignRegionsWithFavoredNodes) {
               favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
@@ -1666,8 +1683,8 @@ public class AssignmentManager extends Z
 
         // Move on to open regions.
         try {
-          // Send OPEN RPC. If it fails on a IOE or RemoteException, the
-          // TimeoutMonitor will pick up the pieces.
+          // Send OPEN RPC. If it fails on a IOE or RemoteException,
+          // regions will be assigned individually.
           long maxWaitTime = System.currentTimeMillis() +
             this.server.getConfiguration().
               getLong("hbase.regionserver.rpc.startup.waittime", 60000);
@@ -1730,8 +1747,8 @@ public class AssignmentManager extends Z
           }
         } catch (IOException e) {
           // Can be a socket timeout, EOF, NoRouteToHost, etc
-          LOG.info("Unable to communicate with the region server in order" +
-            " to assign regions", e);
+          LOG.info("Unable to communicate with " + destination
+            + " in order to assign regions, ", e);
           return false;
         } catch (InterruptedException e) {
           throw new RuntimeException(e);
@@ -1744,10 +1761,12 @@ public class AssignmentManager extends Z
 
       if (!failedToOpenRegions.isEmpty()) {
         for (HRegionInfo region : failedToOpenRegions) {
-          invokeAssign(region);
+          if (!regionStates.isRegionOnline(region)) {
+            invokeAssign(region);
+          }
         }
       }
-      LOG.debug("Bulk assigning done for " + destination.toString());
+      LOG.debug("Bulk assigning done for " + destination);
       return true;
     } finally {
       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
@@ -1823,7 +1842,7 @@ public class AssignmentManager extends Z
     }
     // Run out of attempts
     if (!tomActivated && state != null) {
-      regionStates.updateRegionState(region, RegionState.State.FAILED_CLOSE);
+      regionStates.updateRegionState(region, State.FAILED_CLOSE);
     }
   }
 
@@ -1838,36 +1857,77 @@ public class AssignmentManager extends Z
       state = regionStates.createRegionState(region);
     }
 
+    if (forceNewPlan && LOG.isDebugEnabled()) {
+      LOG.debug("Force region state offline " + state);
+    }
+
     switch (state.getState()) {
     case OPEN:
     case OPENING:
     case PENDING_OPEN:
+    case CLOSING:
+    case PENDING_CLOSE:
       if (!forceNewPlan) {
-        LOG.debug("Attempting to assign region " +
-          region + " but it is already in transition: " + state);
+        LOG.debug("Skip assigning " +
+          region + ", it is already " + state);
         return null;
       }
-    case CLOSING:
-    case PENDING_CLOSE:
     case FAILED_CLOSE:
     case FAILED_OPEN:
       unassign(region, state, -1, null, false, null);
+      RegionState oldState = state;
       state = regionStates.getRegionState(region);
-      if (state.isOffline()) break;
+      if (state.isFailedClose()) {
+        LOG.info("Skip assigning " +
+          region + ", we couldn't close it: " + state);
+        return null;
+      }
+      // In these cases, we need to confirm with meta
+      // the region was not on a dead server if it's open/pending.
+      if ((oldState.isOpened() || oldState.isPendingOpenOrOpening())
+          && wasRegionOnDeadServerByMeta(region, oldState.getServerName())) {
+        LOG.info("Skip assigning " + region.getRegionNameAsString()
+          + ", it is on a dead but not processed yet server");
+        return null;
+      }
     case CLOSED:
-      LOG.debug("Forcing OFFLINE; was=" + state);
-      state = regionStates.updateRegionState(
-        region, RegionState.State.OFFLINE);
     case OFFLINE:
       break;
     default:
       LOG.error("Trying to assign region " + region
-        + ", which is in state " + state);
+        + ", which is " + state);
       return null;
     }
     return state;
   }
 
+  private boolean wasRegionOnDeadServerByMeta(
+      final HRegionInfo region, final ServerName sn) {
+    try {
+      if (region.isMetaRegion()) {
+        ServerName server = catalogTracker.getMetaLocation();
+        return regionStates.isServerDeadAndNotProcessed(server);
+      }
+      while (!server.isStopped()) {
+        try {
+          catalogTracker.waitForMeta();
+          Pair<HRegionInfo, ServerName> r =
+            MetaReader.getRegion(catalogTracker, region.getRegionName());
+          ServerName server = r == null ? null : r.getSecond();
+          return regionStates.isServerDeadAndNotProcessed(server);
+        } catch (IOException ioe) {
+          LOG.info("Received exception accessing hbase:meta during force assign "
+            + region.getRegionNameAsString() + ", retrying", ioe);
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.info("Interrupted accessing hbase:meta", e);
+    }
+    // Call is interrupted or server is stopped.
+    return regionStates.isServerDeadAndNotProcessed(sn);
+  }
+
   /**
    * Caller must hold lock on the passed <code>state</code> object.
    * @param state
@@ -1878,214 +1938,214 @@ public class AssignmentManager extends Z
       final boolean setOfflineInZK, final boolean forceNewPlan) {
     long startTime = EnvironmentEdgeManager.currentTimeMillis();
     try {
-    RegionState currentState = state;
-    int versionOfOfflineNode = -1;
-    RegionPlan plan = null;
-    long maxRegionServerStartupWaitTime = -1;
-    HRegionInfo region = state.getRegion();
-    RegionOpeningState regionOpenState;
-    for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
-      if (plan == null) { // Get a server for the region at first
-        try {
-          plan = getRegionPlan(region, forceNewPlan);
-        } catch (HBaseIOException e) {
-          LOG.warn("Failed to get region plan", e);
+      RegionState currentState = state;
+      int versionOfOfflineNode = -1;
+      RegionPlan plan = null;
+      long maxRegionServerStartupWaitTime = -1;
+      HRegionInfo region = state.getRegion();
+      RegionOpeningState regionOpenState;
+      for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
+        if (plan == null) { // Get a server for the region at first
+          try {
+            plan = getRegionPlan(region, forceNewPlan);
+          } catch (HBaseIOException e) {
+            LOG.warn("Failed to get region plan", e);
+          }
         }
-      }
-      if (plan == null) {
-        LOG.warn("Unable to determine a plan to assign " + region);
-        if (tomActivated){
-          this.timeoutMonitor.setAllRegionServersOffline(true);
-        } else {
-          if (region.isMetaRegion()) {
-            try {
-              if (i != maximumAttempts) {
-                Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
-                continue;
+        if (plan == null) {
+          LOG.warn("Unable to determine a plan to assign " + region);
+          if (tomActivated){
+            this.timeoutMonitor.setAllRegionServersOffline(true);
+          } else {
+            if (region.isMetaRegion()) {
+              try {
+                if (i != maximumAttempts) {
+                  Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
+                  continue;
+                }
+                // TODO : Ensure HBCK fixes this
+                LOG.error("Unable to determine a plan to assign hbase:meta even after repeated attempts. Run HBCK to fix this");
+              } catch (InterruptedException e) {
+                LOG.error("Got exception while waiting for hbase:meta assignment");
+                Thread.currentThread().interrupt();
               }
-              // TODO : Ensure HBCK fixes this
-              LOG.error("Unable to determine a plan to assign hbase:meta even after repeated attempts. Run HBCK to fix this");
-            } catch (InterruptedException e) {
-              LOG.error("Got exception while waiting for hbase:meta assignment");
-              Thread.currentThread().interrupt();
             }
+            regionStates.updateRegionState(region, State.FAILED_OPEN);
           }
-          regionStates.updateRegionState(region, RegionState.State.FAILED_OPEN);
+          return;
         }
-        return;
-      }
-      if (setOfflineInZK && versionOfOfflineNode == -1) {
-        // get the version of the znode after setting it to OFFLINE.
-        // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
-        versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
-        if (versionOfOfflineNode != -1) {
-          if (isDisabledorDisablingRegionInRIT(region)) {
-            return;
+        if (setOfflineInZK && versionOfOfflineNode == -1) {
+          // get the version of the znode after setting it to OFFLINE.
+          // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
+          versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
+          if (versionOfOfflineNode != -1) {
+            if (isDisabledorDisablingRegionInRIT(region)) {
+              return;
+            }
+            // In case of assignment from EnableTableHandler table state is ENABLING. Any how
+            // EnableTableHandler will set ENABLED after assigning all the table regions. If we
+            // try to set to ENABLED directly then client API may think table is enabled.
+            // When we have a case such as all the regions are added directly into hbase:meta and we call
+            // assignRegion then we need to make the table ENABLED. Hence in such case the table
+            // will not be in ENABLING or ENABLED state.
+            TableName tableName = region.getTable();
+            if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) {
+              LOG.debug("Setting table " + tableName + " to ENABLED state.");
+              setEnabledTable(tableName);
+            }
           }
-          // In case of assignment from EnableTableHandler table state is ENABLING. Any how
-          // EnableTableHandler will set ENABLED after assigning all the table regions. If we
-          // try to set to ENABLED directly then client API may think table is enabled.
-          // When we have a case such as all the regions are added directly into hbase:meta and we call
-          // assignRegion then we need to make the table ENABLED. Hence in such case the table
-          // will not be in ENABLING or ENABLED state.
-          TableName tableName = region.getTable();
-          if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) {
-            LOG.debug("Setting table " + tableName + " to ENABLED state.");
-            setEnabledTable(tableName);
+        }
+        if (setOfflineInZK && versionOfOfflineNode == -1) {
+          LOG.info("Unable to set offline in ZooKeeper to assign " + region);
+          // Setting offline in ZK must have been failed due to ZK racing or some
+          // exception which may make the server to abort. If it is ZK racing,
+          // we should retry since we already reset the region state,
+          // existing (re)assignment will fail anyway.
+          if (!server.isAborted()) {
+            continue;
           }
         }
-      }
-      if (setOfflineInZK && versionOfOfflineNode == -1) {
-        LOG.info("Unable to set offline in ZooKeeper to assign " + region);
-        // Setting offline in ZK must have been failed due to ZK racing or some
-        // exception which may make the server to abort. If it is ZK racing,
-        // we should retry since we already reset the region state,
-        // existing (re)assignment will fail anyway.
-        if (!server.isAborted()) {
-          continue;
+        if (this.server.isStopped() || this.server.isAborted()) {
+          LOG.debug("Server stopped/aborted; skipping assign of " + region);
+          return;
         }
-      }
-      if (this.server.isStopped() || this.server.isAborted()) {
-        LOG.debug("Server stopped/aborted; skipping assign of " + region);
-        return;
-      }
-      LOG.info("Assigning " + region.getRegionNameAsString() +
-          " to " + plan.getDestination().toString());
-      // Transition RegionState to PENDING_OPEN
-      currentState = regionStates.updateRegionState(region,
-          RegionState.State.PENDING_OPEN, plan.getDestination());
-
-      boolean needNewPlan;
-      final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
-          " to " + plan.getDestination();
-      try {
-        List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
-        if (this.shouldAssignRegionsWithFavoredNodes) {
-          favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
-        }
-        regionOpenState = serverManager.sendRegionOpen(
-            plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
-
-        if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
-          // Failed opening this region, looping again on a new server.
-          needNewPlan = true;
-          LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
-              " trying to assign elsewhere instead; " +
-              "try=" + i + " of " + this.maximumAttempts);
-        } else {
-          // we're done
-          if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
-            processAlreadyOpenedRegion(region, plan.getDestination());
+        LOG.info("Assigning " + region.getRegionNameAsString() +
+            " to " + plan.getDestination().toString());
+        // Transition RegionState to PENDING_OPEN
+        currentState = regionStates.updateRegionState(region,
+          State.PENDING_OPEN, plan.getDestination());
+
+        boolean needNewPlan;
+        final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
+            " to " + plan.getDestination();
+        try {
+          List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
+          if (this.shouldAssignRegionsWithFavoredNodes) {
+            favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
+          }
+          regionOpenState = serverManager.sendRegionOpen(
+              plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
+
+          if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
+            // Failed opening this region, looping again on a new server.
+            needNewPlan = true;
+            LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
+                " trying to assign elsewhere instead; " +
+                "try=" + i + " of " + this.maximumAttempts);
+          } else {
+            // we're done
+            if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
+              processAlreadyOpenedRegion(region, plan.getDestination());
+            }
+            return;
+          }
+
+        } catch (Throwable t) {
+          if (t instanceof RemoteException) {
+            t = ((RemoteException) t).unwrapRemoteException();
+          }
+
+          // Should we wait a little before retrying? If the server is starting it's yes.
+          // If the region is already in transition, it's yes as well: we want to be sure that
+          //  the region will get opened but we don't want a double assignment.
+          boolean hold = (t instanceof RegionAlreadyInTransitionException ||
+              t instanceof ServerNotRunningYetException);
+
+          // In case socket is timed out and the region server is still online,
+          // the openRegion RPC could have been accepted by the server and
+          // just the response didn't go through.  So we will retry to
+          // open the region on the same server to avoid possible
+          // double assignment.
+          boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
+              && this.serverManager.isServerOnline(plan.getDestination()));
+
+
+          if (hold) {
+            LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
+                "try=" + i + " of " + this.maximumAttempts, t);
+
+            if (maxRegionServerStartupWaitTime < 0) {
+              maxRegionServerStartupWaitTime = EnvironmentEdgeManager.currentTimeMillis() +
+                  this.server.getConfiguration().
+                      getLong("hbase.regionserver.rpc.startup.waittime", 60000);
+            }
+            try {
+              long now = EnvironmentEdgeManager.currentTimeMillis();
+              if (now < maxRegionServerStartupWaitTime) {
+                LOG.debug("Server is not yet up; waiting up to " +
+                    (maxRegionServerStartupWaitTime - now) + "ms", t);
+                Thread.sleep(100);
+                i--; // reset the try count
+                needNewPlan = false;
+              } else {
+                LOG.debug("Server is not up for a while; try a new one", t);
+                needNewPlan = true;
+              }
+            } catch (InterruptedException ie) {
+              LOG.warn("Failed to assign "
+                  + region.getRegionNameAsString() + " since interrupted", ie);
+              Thread.currentThread().interrupt();
+              if (!tomActivated) {
+                regionStates.updateRegionState(region, State.FAILED_OPEN);
+              }
+              return;
+            }
+          } else if (retry) {
+            needNewPlan = false;
+            LOG.warn(assignMsg + ", trying to assign to the same region server " +
+                "try=" + i + " of " + this.maximumAttempts, t);
+          } else {
+            needNewPlan = true;
+            LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
+                " try=" + i + " of " + this.maximumAttempts, t);
           }
-          return;
         }
 
-      } catch (Throwable t) {
-        if (t instanceof RemoteException) {
-          t = ((RemoteException) t).unwrapRemoteException();
+        if (i == this.maximumAttempts) {
+          // Don't reset the region state or get a new plan any more.
+          // This is the last try.
+          continue;
         }
 
-        // Should we wait a little before retrying? If the server is starting it's yes.
-        // If the region is already in transition, it's yes as well: we want to be sure that
-        //  the region will get opened but we don't want a double assignment.
-        boolean hold = (t instanceof RegionAlreadyInTransitionException ||
-            t instanceof ServerNotRunningYetException);
-
-        // In case socket is timed out and the region server is still online,
-        // the openRegion RPC could have been accepted by the server and
-        // just the response didn't go through.  So we will retry to
-        // open the region on the same server to avoid possible
-        // double assignment.
-        boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
-            && this.serverManager.isServerOnline(plan.getDestination()));
-
-
-        if (hold) {
-          LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
-              "try=" + i + " of " + this.maximumAttempts, t);
-
-          if (maxRegionServerStartupWaitTime < 0) {
-            maxRegionServerStartupWaitTime = EnvironmentEdgeManager.currentTimeMillis() +
-                this.server.getConfiguration().
-                    getLong("hbase.regionserver.rpc.startup.waittime", 60000);
-          }
+        // If region opened on destination of present plan, reassigning to new
+        // RS may cause double assignments. In case of RegionAlreadyInTransitionException
+        // reassigning to same RS.
+        if (needNewPlan) {
+          // Force a new plan and reassign. Will return null if no servers.
+          // The new plan could be the same as the existing plan since we don't
+          // exclude the server of the original plan, which should not be
+          // excluded since it could be the only server up now.
+          RegionPlan newPlan = null;
           try {
-            long now = EnvironmentEdgeManager.currentTimeMillis();
-            if (now < maxRegionServerStartupWaitTime) {
-              LOG.debug("Server is not yet up; waiting up to " +
-                  (maxRegionServerStartupWaitTime - now) + "ms", t);
-              Thread.sleep(100);
-              i--; // reset the try count
-              needNewPlan = false;
+            newPlan = getRegionPlan(region, true);
+          } catch (HBaseIOException e) {
+            LOG.warn("Failed to get region plan", e);
+          }
+          if (newPlan == null) {
+            if (tomActivated) {
+              this.timeoutMonitor.setAllRegionServersOffline(true);
             } else {
-              LOG.debug("Server is not up for a while; try a new one", t);
-              needNewPlan = true;
-            }
-          } catch (InterruptedException ie) {
-            LOG.warn("Failed to assign "
-                + region.getRegionNameAsString() + " since interrupted", ie);
-            Thread.currentThread().interrupt();
-            if (!tomActivated) {
-              regionStates.updateRegionState(region, RegionState.State.FAILED_OPEN);
+              regionStates.updateRegionState(region, State.FAILED_OPEN);
             }
+            LOG.warn("Unable to find a viable location to assign region " +
+                region.getRegionNameAsString());
             return;
           }
-        } else if (retry) {
-          needNewPlan = false;
-          LOG.warn(assignMsg + ", trying to assign to the same region server " +
-              "try=" + i + " of " + this.maximumAttempts, t);
-        } else {
-          needNewPlan = true;
-          LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
-              " try=" + i + " of " + this.maximumAttempts, t);
-        }
-      }
 
-      if (i == this.maximumAttempts) {
-        // Don't reset the region state or get a new plan any more.
-        // This is the last try.
-        continue;
-      }
-
-      // If region opened on destination of present plan, reassigning to new
-      // RS may cause double assignments. In case of RegionAlreadyInTransitionException
-      // reassigning to same RS.
-      if (needNewPlan) {
-        // Force a new plan and reassign. Will return null if no servers.
-        // The new plan could be the same as the existing plan since we don't
-        // exclude the server of the original plan, which should not be
-        // excluded since it could be the only server up now.
-        RegionPlan newPlan = null;
-        try {
-          newPlan = getRegionPlan(region, true);
-        } catch (HBaseIOException e) {
-          LOG.warn("Failed to get region plan", e);
-        }
-        if (newPlan == null) {
-          if (tomActivated) {
-            this.timeoutMonitor.setAllRegionServersOffline(true);
-          } else {
-            regionStates.updateRegionState(region, RegionState.State.FAILED_OPEN);
+          if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
+            // Clean out plan we failed execute and one that doesn't look like it'll
+            // succeed anyways; we need a new plan!
+            // Transition back to OFFLINE
+            currentState = regionStates.updateRegionState(region, State.OFFLINE);
+            versionOfOfflineNode = -1;
+            plan = newPlan;
           }
-          LOG.warn("Unable to find a viable location to assign region " +
-              region.getRegionNameAsString());
-          return;
-        }
-
-        if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
-          // Clean out plan we failed execute and one that doesn't look like it'll
-          // succeed anyways; we need a new plan!
-          // Transition back to OFFLINE
-          currentState = regionStates.updateRegionState(region, RegionState.State.OFFLINE);
-          versionOfOfflineNode = -1;
-          plan = newPlan;
         }
       }
-    }
-    // Run out of attempts
-    if (!tomActivated) {
-      regionStates.updateRegionState(region, RegionState.State.FAILED_OPEN);
-    }
+      // Run out of attempts
+      if (!tomActivated) {
+        regionStates.updateRegionState(region, State.FAILED_OPEN);
+      }
     } finally {
       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
     }
@@ -2139,8 +2199,7 @@ public class AssignmentManager extends Z
       this.server.abort(msg, new IllegalStateException(msg));
       return -1;
     }
-    regionStates.updateRegionState(state.getRegion(),
-      RegionState.State.OFFLINE);
+    regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
     int versionOfOfflineNode;
     try {
       // get the version after setting the znode to OFFLINE
@@ -2316,21 +2375,25 @@ public class AssignmentManager extends Z
     //  creation
     ReentrantLock lock = locker.acquireLock(encodedName);
     RegionState state = regionStates.getRegionTransitionState(encodedName);
+    boolean reassign = true;
     try {
       if (state == null) {
         // Region is not in transition.
         // We can unassign it only if it's not SPLIT/MERGED.
         state = regionStates.getRegionState(encodedName);
-        if (state != null && (state.isMerged() || state.isSplit())) {
+        if (state != null && (state.isMerged()
+            || state.isSplit() || state.isOffline())) {
           LOG.info("Attempting to unassign " + state + ", ignored");
+          // Offline region will be reassigned below
           return;
         }
         // Create the znode in CLOSING state
         try {
-          state = regionStates.getRegionState(region);
           if (state == null || state.getServerName() == null) {
             // We don't know where the region is, offline it.
             // No need to send CLOSE RPC
+            LOG.warn("Attempting to unassign a region not in RegionStates"
+              + region.getRegionNameAsString() + ", offlined");
             regionOffline(region);
             return;
           }
@@ -2338,8 +2401,9 @@ public class AssignmentManager extends Z
             watcher, region, state.getServerName());
           if (versionOfClosingNode == -1) {
             LOG.info("Attempting to unassign " +
-                region.getRegionNameAsString() + " but ZK closing node "
-                + "can't be created.");
+              region.getRegionNameAsString() + " but ZK closing node "
+              + "can't be created.");
+            reassign = false; // not unassigned at all
             return;
           }
         } catch (KeeperException e) {
@@ -2354,6 +2418,7 @@ public class AssignmentManager extends Z
               if (isSplitOrSplittingOrMergedOrMerging(path)) {
                 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
                   "skipping unassign because region no longer exists -- its split or merge");
+                reassign = false; // no need to reassign for split/merged region
                 return;
               }
             } catch (KeeperException.NoNodeException ke) {
@@ -2369,9 +2434,10 @@ public class AssignmentManager extends Z
           }
           // If we get here, don't understand whats going on -- abort.
           server.abort("Unexpected ZK exception creating node CLOSING", e);
+          reassign = false; // heading out already
           return;
         }
-        state = regionStates.updateRegionState(region, RegionState.State.PENDING_CLOSE);
+        state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
       } else if (state.isFailedOpen()) {
         // The region is not open yet
         regionOffline(region);
@@ -2381,7 +2447,7 @@ public class AssignmentManager extends Z
           " which is already " + state.getState()  +
           " but forcing to send a CLOSE RPC again ");
         if (state.isFailedClose()) {
-          state = regionStates.updateRegionState(region, RegionState.State.PENDING_CLOSE);
+          state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
         }
         state.updateTimestampToNow();
       } else {
@@ -2392,11 +2458,13 @@ public class AssignmentManager extends Z
       }
 
       unassign(region, state, versionOfClosingNode, dest, true, null);
-      if (regionStates.isRegionOffline(region)) {
-        new ClosedRegionHandler(server, this, region).process();
-      }
     } finally {
       lock.unlock();
+
+      // Region is expected to be reassigned afterwards
+      if (reassign && regionStates.isRegionOffline(region)) {
+        assign(region, true);
+      }
     }
   }
 
@@ -2477,7 +2545,7 @@ public class AssignmentManager extends Z
    */
   public boolean waitForAssignment(HRegionInfo regionInfo)
       throws InterruptedException {
-    while (!regionStates.isRegionAssigned(regionInfo)) {
+    while (!regionStates.isRegionOnline(regionInfo)) {
       if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
           || this.server.isStopped()) {
         return false;
@@ -2575,7 +2643,13 @@ public class AssignmentManager extends Z
           " region(s) to " + servers + " server(s)");
       }
       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
-        assign(plan.getKey(), plan.getValue());
+        if (!assign(plan.getKey(), plan.getValue())) {
+          for (HRegionInfo region: plan.getValue()) {
+            if (!regionStates.isRegionOnline(region)) {
+              invokeAssign(region);
+            }
+          }
+        }
       }
     } else {
       LOG.info("Bulk assigning " + regions + " region(s) across "
@@ -2818,6 +2892,8 @@ public class AssignmentManager extends Z
     if (deadServers != null) {
       for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
         ServerName serverName = server.getKey();
+        // We need to keep such info even if the server is known dead
+        regionStates.setLastRegionServerOfRegions(serverName, server.getValue());
         if (!serverManager.isServerDead(serverName)) {
           serverManager.expireServer(serverName); // Let SSH do region re-assign
         }
@@ -3178,9 +3254,10 @@ public class AssignmentManager extends Z
         RegionState regionState =
           regionStates.getRegionTransitionState(encodedName);
         if (regionState == null
-            || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
-          LOG.info("Skip " + hri
-            + " since it is not opening on the dead server any more: " + sn);
+            || (regionState.getServerName() != null && !regionState.isOnServer(sn))
+            || !(regionState.isFailedClose() || regionState.isPendingOpenOrOpening())) {
+          LOG.info("Skip " + regionState + " since it is not opening/failed_close"
+            + " on the dead server any more: " + sn);
           it.remove();
         } else {
           try {
@@ -3194,8 +3271,8 @@ public class AssignmentManager extends Z
             regionStates.regionOffline(hri);
             continue;
           }
-          // Mark the region closed and assign it again by SSH
-          regionStates.updateRegionState(hri, RegionState.State.CLOSED);
+          // Mark the region offline and assign it again by SSH
+          regionStates.updateRegionState(hri, State.OFFLINE);
         }
       } finally {
         lock.unlock();
@@ -3213,17 +3290,10 @@ public class AssignmentManager extends Z
    */
   public void handleSplitReport(final ServerName sn, final HRegionInfo parent,
       final HRegionInfo a, final HRegionInfo b) {
-    regionOffline(parent, State.SPLIT);
-    regionOnline(a, sn);
-    regionOnline(b, sn);
-
-    // There's a possibility that the region was splitting while a user asked
-    // the master to disable, we need to make sure we close those regions in
-    // that case. This is not racing with the region server itself since RS
-    // report is done after the split transaction completed.
-    if (this.zkTable.isDisablingOrDisabledTable(parent.getTable())) {
-      unassign(a);
-      unassign(b);
+    synchronized (regionStates) {
+      regionOffline(parent, State.SPLIT);
+      onlineNewRegion(a, sn);
+      onlineNewRegion(b, sn);
     }
   }
 
@@ -3236,16 +3306,10 @@ public class AssignmentManager extends Z
    */
   public void handleRegionsMergeReport(final ServerName sn,
       final HRegionInfo merged, final HRegionInfo a, final HRegionInfo b) {
-    regionOffline(a, State.MERGED);
-    regionOffline(b, State.MERGED);
-    regionOnline(merged, sn);
-
-    // There's a possibility that the region was merging while a user asked
-    // the master to disable, we need to make sure we close those regions in
-    // that case. This is not racing with the region server itself since RS
-    // report is done after the regions merge transaction completed.
-    if (this.zkTable.isDisablingOrDisabledTable(merged.getTable())) {
-      unassign(merged);
+    synchronized (regionStates) {
+      regionOffline(a, State.MERGED);
+      regionOffline(b, State.MERGED);
+      onlineNewRegion(merged, sn);
     }
   }
 
@@ -3256,7 +3320,14 @@ public class AssignmentManager extends Z
     synchronized (this.regionPlans) {
       this.regionPlans.put(plan.getRegionName(), plan);
     }
-    unassign(plan.getRegionInfo(), false, plan.getDestination());
+    HRegionInfo hri = plan.getRegionInfo();
+    TableName tableName = hri.getTable();
+    if (zkTable.isDisablingOrDisabledTable(tableName)) {
+      LOG.info("Ignored moving region of disabling/disabled table "
+        + tableName);
+      return;
+    }
+    unassign(hri, false, plan.getDestination());
   }
 
   public void stop() {
@@ -3303,8 +3374,7 @@ public class AssignmentManager extends Z
         new IllegalStateException());
       return false;
     }
-    regionStates.updateRegionState(
-      state.getRegion(), RegionState.State.OFFLINE);
+    regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
     try {
       ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
         destination, cb, state);
@@ -3344,8 +3414,8 @@ public class AssignmentManager extends Z
         + merging_a + ", rs_b=" + merging_b);
       return false;
     }
-    regionStates.updateRegionState(merging_a, RegionState.State.MERGING);
-    regionStates.updateRegionState(merging_b, RegionState.State.MERGING);
+    regionStates.updateRegionState(merging_a, State.MERGING);
+    regionStates.updateRegionState(merging_b, State.MERGING);
     return true;
   }
 
@@ -3360,4 +3430,21 @@ public class AssignmentManager extends Z
     // remove the region plan as well just in case.
     clearRegionPlan(regionInfo);
   }
+
+  /**
+   * Online a newly created region, which is usually from split/merge.
+   */
+  private void onlineNewRegion(final HRegionInfo region, final ServerName sn) {
+    synchronized (regionStates) {
+      // Someone could find the region from meta and reassign it.
+      if (regionStates.getRegionState(region) == null) {
+        regionStates.createRegionState(region);
+        regionOnline(region, sn);
+      }
+    }
+    // User could disable the table before master knows the new region.
+    if (zkTable.isDisablingOrDisabledTable(region.getTable())) {
+      unassign(region);
+    }
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java Tue Oct  1 22:02:01 2013
@@ -328,14 +328,6 @@ public class CatalogJanitor extends Chor
     if (hasNoReferences(a) && hasNoReferences(b)) {
       LOG.debug("Deleting region " + parent.getRegionNameAsString() +
         " because daughter splits no longer hold references");
-
-      // This latter regionOffline should not be necessary but is done for now
-      // until we let go of regionserver to master heartbeats.  See HBASE-3368.
-      if (this.services.getAssignmentManager() != null) {
-        // The mock used in testing catalogjanitor returns null for getAssignmnetManager.
-        // Allow for null result out of getAssignmentManager.
-        this.services.getAssignmentManager().regionOffline(parent);
-      }
       FileSystem fs = this.services.getMasterFileSystem().getFileSystem();
       if (LOG.isTraceEnabled()) LOG.trace("Archiving parent region: " + parent);
       HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, parent);
@@ -380,7 +372,8 @@ public class CatalogJanitor extends Chor
       regionFs = HRegionFileSystem.openRegionFromFileSystem(
           this.services.getConfiguration(), fs, tabledir, daughter, true);
     } catch (IOException e) {
-      LOG.warn("Daughter region does not exist: " + daughter.getEncodedName());
+      LOG.warn("Daughter region does not exist: " + daughter.getEncodedName()
+        + ", parent is: " + parent.getEncodedName());
       return new Pair<Boolean, Boolean>(Boolean.FALSE, Boolean.FALSE);
     }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java Tue Oct  1 22:02:01 2013
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -39,6 +41,8 @@ import java.util.Set;
  */
 @InterfaceAudience.Private
 public class DeadServer {
+  private static final Log LOG = LogFactory.getLog(DeadServer.class);
+
   /**
    * Set of known dead servers.  On znode expiration, servers are added here.
    * This is needed in case of a network partitioning where the server's lease
@@ -110,8 +114,8 @@ public class DeadServer {
     }
   }
 
-  @SuppressWarnings("UnusedParameters")
-  public synchronized void finish(ServerName ignored) {
+  public synchronized void finish(ServerName sn) {
+    LOG.debug("Finished processing " + sn);
     this.numProcessing--;
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java Tue Oct  1 22:02:01 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.RegionState.State;
 
 /**
  * Run bulk assign.  Does one RCP per regionserver passing a
@@ -129,9 +130,8 @@ public class GeneralBulkAssigner extends
       Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
       while (regionInfoIterator.hasNext()) {
         HRegionInfo hri = regionInfoIterator.next();
-        RegionState state = regionStates.getRegionState(hri);
-        if ((!regionStates.isRegionInTransition(hri) && regionStates.isRegionAssigned(hri))
-            || state.isSplitting() || state.isMerging()) {
+        if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
+            State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
           regionInfoIterator.remove();
         }
       }
@@ -195,8 +195,11 @@ public class GeneralBulkAssigner extends
           + " regions to server " + e.getKey() + ", reassigning them");
       reassigningRegions.addAll(failedPlans.remove(e.getKey()));
     }
+    RegionStates regionStates = assignmentManager.getRegionStates();
     for (HRegionInfo region : reassigningRegions) {
-      assignmentManager.invokeAssign(region);
+      if (!regionStates.isRegionOnline(region)) {
+        assignmentManager.invokeAssign(region);
+      }
     }
     return reassigningRegions.size();
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1528227&r1=1528226&r2=1528227&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Oct  1 22:02:01 2013
@@ -2516,8 +2516,7 @@ MasterServices, Server {
       LOG.debug(getClientIdAuditPrefix() + " unassign " + hri.getRegionNameAsString()
           + " in current location if it is online and reassign.force=" + force);
       this.assignmentManager.unassign(hri, force);
-      if (!this.assignmentManager.getRegionStates().isRegionInTransition(hri)
-          && !this.assignmentManager.getRegionStates().isRegionAssigned(hri)) {
+      if (this.assignmentManager.getRegionStates().isRegionOffline(hri)) {
         LOG.debug("Region " + hri.getRegionNameAsString()
             + " is not online on any region server, reassigning it.");
         assignRegion(hri);