You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/04/30 20:18:25 UTC

svn commit: r1477749 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/master/handler/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/h...

Author: liyin
Date: Tue Apr 30 18:18:25 2013
New Revision: 1477749

URL: http://svn.apache.org/r1477749
Log:
[0.89-fb] [master] Handle missed region open messages from master -> RS

Author: aaiyer

Summary:
We have seen issues on the new hardware where network issues
may cause the MSG_REGION_OPEN from master to the RS to be lost.

This causes the master to keep waiting for the region to open up.
However, since the message is lost, the RS will never open it.

This causes downtime and involves manual intervention to fix, using
hbck.

The diff will ensure that
 (i) the master will retry sending these messages
on future heartbeats, if the RS does not acknowledge the message.
 (ii) The regionserver is able to handle duplicate region open messages
 by maintaining a set of regionOpening regions; and ignoring duplicate
 requests.

Test Plan:
Changed TestRegionRebalancing to cause message loss and
test.

will run the test suite

Reviewers: liyintang, rshroff, adela

Reviewed By: rshroff

CC: hbase-eng@, manukranthk

Differential Revision: https://phabricator.fb.com/D771498

Task ID: 123, 2191086

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HMsg.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ChangeTableState.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HMsg.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HMsg.java?rev=1477749&r1=1477748&r2=1477749&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HMsg.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HMsg.java Tue Apr 30 18:18:25 2013
@@ -81,6 +81,9 @@ public class HMsg implements Writable {
     /** region server is processing open request */
     MSG_REPORT_PROCESS_OPEN,
 
+    /** region server is processing a close request */
+    MSG_REPORT_PROCESS_CLOSE,
+
     /**
      * Region server split the region associated with this message.
      *

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ChangeTableState.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ChangeTableState.java?rev=1477749&r1=1477748&r2=1477749&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ChangeTableState.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ChangeTableState.java Tue Apr 30 18:18:25 2013
@@ -82,7 +82,7 @@ class ChangeTableState extends TableOper
       }
 
       if(!this.online && this.master.getRegionManager().
-          isPendingOpen(i.getRegionNameAsString())) {
+          isPendingOpenAckedOrUnacked(i.getRegionNameAsString())) {
         LOG.debug("Skipping region " + i.toString() +
           " because it is pending open, will tell it to close later");
         continue;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1477749&r1=1477748&r2=1477749&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Apr 30 18:18:25 2013
@@ -512,7 +512,7 @@ public class RegionManager {
       final HServerInfo sinfo, final ArrayList<HMsg> returnMsgs) {
     String regionName = rs.getRegionInfo().getRegionNameAsString();
     LOG.info("Assigning region " + regionName + " to " + sinfo.getServerName());
-    rs.setPendingOpen(sinfo.getServerName());
+    rs.setPendingOpenUnacked(sinfo.getServerName());
     synchronized (this.regionsInTransition) {
       byte[] data = null;
       try {
@@ -887,7 +887,6 @@ public class RegionManager {
         OVERLOADED));
       // mark the region as closing
       setClosing(info.getServerName(), currentRegion, false);
-      setPendingClose(regionName);
       // increment the count of regions we've marked
       regionsClosed++;
     }
@@ -1418,7 +1417,7 @@ public class RegionManager {
       }
 
       t2 = System.currentTimeMillis();
-      if (force || (!s.isPendingOpen() && !s.isOpen())) {
+      if (force || (!s.isPendingOpenAckedOrUnacked() && !s.isOpen())) {
         // Refresh assignment information when a region is marked unassigned so
         // that it opens on the preferred server.
         this.assignmentManager.executeAssignmentPlan(info);
@@ -1460,11 +1459,11 @@ public class RegionManager {
    * @param regionName name of the region
    * @return true if open, false otherwise
    */
-  public boolean isPendingOpen(String regionName) {
+  public boolean isPendingOpenAckedOrUnacked(String regionName) {
     synchronized (regionsInTransition) {
       RegionState s = regionsInTransition.get(regionName);
       if (s != null) {
-        return s.isPendingOpen();
+        return s.isPendingOpenAckedOrUnacked();
       }
     }
     return false;
@@ -1474,6 +1473,22 @@ public class RegionManager {
    * Region has been assigned to a server and the server has told us it is open
    * @param regionName
    */
+  public void setPendingOpenAcked(String regionName) {
+    Preconditions.checkNotNull(regionName);
+    synchronized (regionsInTransition) {
+      RegionState s = regionsInTransition.get(regionName);
+      if (s != null) {
+        s.setPendingOpenAcked();
+      } else {
+        LOG.debug("regionsInTransition does not have an entry for " + regionName);
+      }
+    }
+  }
+
+  /**
+   * Region has been assigned to a server and the server has told us it is open
+   * @param regionName
+   */
   public void setOpen(String regionName) {
     Preconditions.checkNotNull(regionName);
     synchronized (regionsInTransition) {
@@ -1486,7 +1501,6 @@ public class RegionManager {
         }
       }
     }
-
   }
 
   /**
@@ -1530,7 +1544,7 @@ public class RegionManager {
       }
       // If region was asked to open before getting here, we could be taking
       // the wrong server name
-      if(s.isPendingOpen()) {
+      if(s.isPendingOpenAckedOrUnacked()) {
         serverName = s.getServerName();
       }
       s.setClosing(serverName, setOffline);
@@ -1564,6 +1578,25 @@ public class RegionManager {
   }
 
   /**
+   * Returns the set of Regions that have been asked to open on this server
+   * but, it has not acked yet.
+   *
+   * @param serverName
+   * @return set of infos that were requested to open
+   */
+  public Set<HRegionInfo> getRegionsInPendingOpenUnacked(String serverName) {
+    Set<HRegionInfo> result = new HashSet<HRegionInfo>();
+    synchronized (regionsInTransition) {
+      for (RegionState s: regionsInTransition.values()) {
+        if (s.isPendingOpenUnacked() && s.getServerName().compareTo(serverName) == 0) {
+          result.add(s.getRegionInfo());
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
    * Called when we have told a region server to close the region
    *
    * @param regionName
@@ -2341,7 +2374,8 @@ public class RegionManager {
 
     enum State {
       UNASSIGNED, // awaiting a server to be assigned
-      PENDING_OPEN, // told a server to open, hasn't opened yet
+      PENDING_OPEN_UNACKED, // told a server to open, not sure if it got the message.
+      PENDING_OPEN_ACKED, // told a server to open, it got the message, hasn't opened yet
       OPEN, // has been opened on RS, but not yet marked in META/ROOT
       CLOSING, // a msg has been enqueued to close ths region, but not delivered to RS yet
       PENDING_CLOSE, // msg has been delivered to RS to close this region
@@ -2382,7 +2416,8 @@ public class RegionManager {
      */
     synchronized boolean isOpening() {
       return state == State.UNASSIGNED ||
-        state == State.PENDING_OPEN ||
+        state == State.PENDING_OPEN_UNACKED ||
+        state == State.PENDING_OPEN_ACKED ||
         state == State.OPEN;
     }
 
@@ -2403,28 +2438,45 @@ public class RegionManager {
       this.serverName = null;
     }
 
-    synchronized boolean isPendingOpen() {
-      return state == State.PENDING_OPEN;
+    synchronized boolean isPendingOpenAcked() {
+      return state == State.PENDING_OPEN_ACKED;
+    }
+
+    synchronized boolean isPendingOpenUnacked() {
+      return state == State.PENDING_OPEN_UNACKED;
+    }
+
+    synchronized boolean isPendingOpenAckedOrUnacked() {
+      return state == State.PENDING_OPEN_ACKED || state == State.PENDING_OPEN_UNACKED;
     }
 
     /*
      * @param serverName Server region was assigned to.
      */
-    synchronized void setPendingOpen(final String serverName) {
+    synchronized void setPendingOpenUnacked(final String serverName) {
       if (state != State.UNASSIGNED) {
         LOG.warn("Cannot assign a region that is not currently unassigned. " +
           "FIX!! State: " + toString());
       }
-      state = State.PENDING_OPEN;
+      state = State.PENDING_OPEN_UNACKED;
       this.serverName = serverName;
     }
 
+    synchronized void setPendingOpenAcked() {
+      // it is okay to setPendingOpenAcked if it is already acked.
+      if (state != State.PENDING_OPEN_UNACKED && state != State.PENDING_OPEN_ACKED) {
+        LOG.warn("Cannot assign a region that is not currently unacked. " +
+          "FIX!! State: " + toString());
+      }
+      state = State.PENDING_OPEN_ACKED;
+    }
+
     synchronized boolean isOpen() {
       return state == State.OPEN;
     }
 
     synchronized void setOpen() {
-      if (state != State.PENDING_OPEN) {
+      if (state != State.PENDING_OPEN_ACKED && state != State.PENDING_OPEN_UNACKED) {
         LOG.warn("Cannot set a region as open if it has not been pending. " +
           "FIX!! State: " + toString());
       }
@@ -2459,7 +2511,8 @@ public class RegionManager {
 
     synchronized void setClosed() {
       if (state != State.PENDING_CLOSE &&
-          state != State.PENDING_OPEN &&
+          state != State.PENDING_OPEN_UNACKED &&
+          state != State.PENDING_OPEN_ACKED &&
           state != State.CLOSING) {
         throw new IllegalStateException(
             "Cannot set a region to be closed if it was not already marked as" +
@@ -2620,11 +2673,12 @@ public class RegionManager {
       synchronized (regionsInTransition) {
         RegionState s = regionsInTransition.get(regionName);
         if (s == null) {
-          s = new RegionState(regionInfo, RegionState.State.PENDING_OPEN);
+          s = new RegionState(regionInfo, RegionState.State.PENDING_OPEN_ACKED);
           regionsInTransition.put(regionName, s);
         } else {
           s.setUnassigned();
-          s.setPendingOpen(serverName);
+          s.setPendingOpenUnacked(serverName);
+          s.setPendingOpenAcked();
         }
         stateStr = s.toString();
       }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1477749&r1=1477748&r2=1477749&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Tue Apr 30 18:18:25 2013
@@ -457,7 +457,7 @@ public class ServerManager {
       for (int i = 1; i < msgs.length; i++) {
         LOG.info("Processing " + msgs[i] + " from " +
             serverInfo.getServerName());
-        assert msgs[i].getType() == HMsg.Type.MSG_REGION_CLOSE;
+        assert msgs[i].getType() == HMsg.Type.MSG_REPORT_CLOSE;
         HRegionInfo info = msgs[i].getRegionInfo();
         // Meta/root region offlining is handed in removeServerInfo above.
         if (!info.isMetaRegion()) {
@@ -537,6 +537,8 @@ public class ServerManager {
     // Be careful that in message processors we don't throw exceptions that
     // break the switch below because then we might drop messages on the floor.
     int openingCount = 0;
+    HashSet<String> openingRegions = null;
+    HashSet<String> closingRegions = null;
     for (int i = 0; i < incomingMsgs.length; i++) {
       HRegionInfo region = incomingMsgs[i].getRegionInfo();
       LOG.info("Processing " + incomingMsgs[i] + " from " +
@@ -544,17 +546,28 @@ public class ServerManager {
         incomingMsgs.length);
       if (!this.master.getRegionServerOperationQueue().
           process(serverInfo, incomingMsgs[i])) {
+        LOG.debug("Not proceeding further for " + incomingMsgs[i] + " from " + serverInfo);
         continue;
       }
       switch (incomingMsgs[i].getType()) {
         case MSG_REPORT_PROCESS_OPEN:
           openingCount++;
+          if (openingRegions == null)
+            openingRegions = new HashSet<String>();
+          openingRegions.add(incomingMsgs[i].getRegionInfo().getEncodedName());
+          LOG.debug("Added to openingRegions " + incomingMsgs[i] + " from " + serverInfo);
           break;
 
         case MSG_REPORT_OPEN:
           LOG.error("MSG_REPORT_OPEN is not expected to be received from RS.");
           break;
 
+        case MSG_REPORT_PROCESS_CLOSE:
+          if (closingRegions == null)
+            closingRegions = new HashSet<String>();
+          closingRegions.add(incomingMsgs[i].getRegionInfo().getEncodedName());
+          break;
+
         case MSG_REPORT_CLOSE:
           processRegionClose(serverInfo, region);
           break;
@@ -579,13 +592,35 @@ public class ServerManager {
       // Tell the region server to close regions that we have marked for closing.
       for (HRegionInfo i:
         this.master.getRegionManager().getMarkedToClose(serverInfo.getServerName())) {
-        returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE, i));
-        // Transition the region from toClose to closing state
-        this.master.getRegionManager().setPendingClose(i.getRegionNameAsString());
+        if (closingRegions == null || !closingRegions.contains(i.getEncodedName())) {
+          HMsg msg = new HMsg(HMsg.Type.MSG_REGION_CLOSE, i);
+          LOG.info("HMsg " + msg.toString() + " was lost earlier. Resending to " + serverInfo.getServerName());
+          returnMsgs.add(msg);
+        } else {
+          // Transition the region from toClose to closing state
+          this.master.getRegionManager().setPendingClose(i.getRegionNameAsString());
+        }
       }
 
       // Figure out what the RegionServer ought to do, and write back.
 
+      // 1. Remind the server to open the regions that the RS has not acked for
+      // Normally, the master shouldn't need to do this. But, this may be required
+      // if there was a network Incident, in which the master's message to OPEN a
+      // region was lost.
+      for (HRegionInfo i:
+        this.master.getRegionManager().getRegionsInPendingOpenUnacked(serverInfo.getServerName())) {
+        if (openingRegions == null || !openingRegions.contains(i.getEncodedName())) {
+          HMsg msg = new HMsg(HMsg.Type.MSG_REGION_OPEN, i);
+          LOG.info("HMsg " + msg.toString() + " was lost earlier. Resending to " + serverInfo.getServerName());
+          returnMsgs.add(msg);
+          openingCount++;
+        } else {
+          LOG.info("Region " + i.getEncodedName() + " is reported to be opening " + serverInfo.getServerName());
+          this.processRegionOpening(i.getRegionNameAsString());
+        }
+      }
+
       // Should we tell it close regions because its overloaded?  If its
       // currently opening regions, leave it alone till all are open.
       if (openingCount < this.nobalancingCount) {
@@ -715,6 +750,20 @@ public class ServerManager {
     this.master.getRegionManager().setUnassigned(hri, false);
   }
 
+
+  /*
+   * Region server is reporting that a region is now opening
+   * consider this an ack for the open request.
+   *
+   * Master could have received this through the ZK notification.
+   * Or, through the heartbeat.
+   *
+   * @param regionName
+   */
+  public void processRegionOpening(String regionName) {
+    this.master.getRegionManager().setPendingOpenAcked(regionName);
+  }
+
   /*
    * Region server is reporting that a region is now opened
    * @param serverInfo
@@ -727,7 +776,7 @@ public class ServerManager {
     RegionManager regionManager = master.getRegionManager();
     synchronized (regionManager) {
       if (!regionManager.isUnassigned(region) &&
-          !regionManager.isPendingOpen(region.getRegionNameAsString())) {
+          !regionManager.isPendingOpenAckedOrUnacked(region.getRegionNameAsString())) {
         if (region.isRootRegion()) {
           // Root region
           HServerAddress rootServer =
@@ -745,7 +794,7 @@ public class ServerManager {
           // Not root region. If it is not a pending region, then we are
           // going to treat it as a duplicate assignment, although we can't
           // tell for certain that's the case.
-          if (regionManager.isPendingOpen(
+          if (regionManager.isPendingOpenAckedOrUnacked(
               region.getRegionNameAsString())) {
             // A duplicate report from the correct server
             return;
@@ -829,6 +878,8 @@ public class ServerManager {
       //       the messages we've received. In this case, a close could be
       //       processed before an open resulting in the master not agreeing on
       //       the region's state.
+
+      // setClosed works for both CLOSING, and PENDING_CLOSE
       this.master.getRegionManager().setClosed(region.getRegionNameAsString());
       RegionServerOperation op =
         new ProcessRegionClose(master, serverInfo.getServerName(), 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java?rev=1477749&r1=1477748&r2=1477749&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java Tue Apr 30 18:18:25 2013
@@ -75,6 +75,7 @@ public class MasterOpenRegionHandler ext
     // TODO: not implemented. 
     LOG.debug("NO-OP call to handling region opening event");
     // Keep track to see how long the region open takes. If the RS is taking too 
+    serverManager.processRegionOpening(regionName);
     // long, then revert the region back to closed state so that it can be 
     // re-assigned.
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1477749&r1=1477748&r2=1477749&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Apr 30 18:18:25 2013
@@ -132,6 +132,8 @@ import org.apache.hadoop.hbase.util.Envi
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.InfoServer;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ParamFormat;
 import org.apache.hadoop.hbase.util.ParamFormatter;
@@ -222,6 +224,10 @@ public class HRegionServer implements HR
   // below maps.
   protected final Map<Integer, HRegion> onlineRegions =
     new ConcurrentHashMap<Integer, HRegion>();
+  protected final Map<Integer, HRegionInfo> regionsOpening =
+    new ConcurrentHashMap<Integer, HRegionInfo>();
+  protected final Map<Integer, HRegionInfo> regionsClosing =
+    new ConcurrentHashMap<Integer, HRegionInfo>();
 
   // this is a list of region info that we recently closed
   protected final List<ClosedRegionInfo> recentlyClosedRegions =
@@ -706,8 +712,14 @@ public class HRegionServer implements HR
               continue;
             }
 
+            if (InjectionHandler.falseCondition(InjectionEvent.HREGIONSERVER_REPORT_RESPONSE, (Object[])msgs)) {
+              continue;
+            }
+
             // Queue up the HMaster's instruction stream for processing
             boolean restart = false;
+            HRegionInfo regionInfo;
+            Integer mapKey;
             for(int i = 0;
                 !restart && !stopRequestedAtStageOne.get() && i < msgs.length;
                 i++) {
@@ -731,6 +743,52 @@ public class HRegionServer implements HR
                 }
                 break;
 
+              case MSG_REGION_CLOSE:
+                regionInfo = msgs[i].getRegionInfo();
+                mapKey = Bytes.mapKey(regionInfo.getRegionName());
+
+                this.lock.writeLock().lock();
+                try {
+                  if (!this.onlineRegions.containsKey(mapKey) || this.regionsClosing.containsKey(mapKey)) {
+                    LOG.warn("Region " + regionInfo + " already being processed as Closed/closing. Ignoring " + msgs[i]);
+                    break; // already closed the region, or it is closing. Ignore this request.
+                  }
+                  this.regionsClosing.put(mapKey, regionInfo);
+                } finally {
+                  this.lock.writeLock().unlock();
+                }
+                addProcessingCloseMessage(regionInfo);
+                try {
+                  toDo.put(new ToDoEntry(msgs[i]));
+                } catch (InterruptedException e) {
+                  throw new RuntimeException("Putting into msgQueue was " +
+                      "interrupted.", e);
+                }
+                break;
+
+              case MSG_REGION_OPEN:
+                regionInfo = msgs[i].getRegionInfo();
+                mapKey = Bytes.mapKey(regionInfo.getRegionName());
+
+                this.lock.writeLock().lock();
+                try {
+                  if (this.onlineRegions.containsKey(mapKey) || this.regionsOpening.containsKey(mapKey)) {
+                    LOG.warn("Region " + regionInfo + " already being processed as opened/opening. Ignoring " + msgs[i]);
+                    break; // already opened the region, or it is opening. Ignore this request.
+                  }
+                  this.regionsOpening.put(mapKey, regionInfo);
+                } finally {
+                  this.lock.writeLock().unlock();
+                }
+                addProcessingMessage(regionInfo);
+                try {
+                  toDo.put(new ToDoEntry(msgs[i]));
+                } catch (InterruptedException e) {
+                  throw new RuntimeException("Putting into msgQueue was " +
+                      "interrupted.", e);
+                }
+                break;
+
               default:
                 try {
                   toDo.put(new ToDoEntry(msgs[i]));
@@ -2149,6 +2207,7 @@ public class HRegionServer implements HR
         this.onlineRegions.put(mapKey, region);
         region.setRegionServer(this);
         region.setOpenDate(EnvironmentEdgeManager.currentTimeMillis());
+        this.regionsOpening.remove(mapKey);
       } finally {
         this.lock.writeLock().unlock();
       }
@@ -2216,6 +2275,17 @@ public class HRegionServer implements HR
     getOutboundMsgs().add(new HMsg(HMsg.Type.MSG_REPORT_PROCESS_OPEN, hri));
   }
 
+  /**
+   * Add a MSG_REPORT_PROCESS_CLOSE to the outbound queue.
+   * This method is called while region is in the queue of regions to process
+   * and then while the region is being closed, it is called from the Worker
+   * thread that is running the region close.
+   * @param hri Region to add the message for
+   */
+  public void addProcessingCloseMessage(final HRegionInfo hri) {
+    getOutboundMsgs().add(new HMsg(HMsg.Type.MSG_REPORT_PROCESS_CLOSE, hri));
+  }
+
   private void addToRecentlyClosedRegions(ClosedRegionInfo info) {
       recentlyClosedRegions.add(0, info);
       if (recentlyClosedRegions.size() > DEFAULT_NUM_TRACKED_CLOSED_REGION)
@@ -3097,10 +3167,12 @@ public class HRegionServer implements HR
   HRegion removeFromOnlineRegions(HRegionInfo hri) {
     byte[] regionName = hri.getRegionName();
     serverInfo.getFlushedSequenceIdByRegion().remove(regionName);
-    this.lock.writeLock().lock();
+    Integer key = Bytes.mapKey(regionName);
     HRegion toReturn = null;
+    this.lock.writeLock().lock();
     try {
-      toReturn = onlineRegions.remove(Bytes.mapKey(regionName));
+      toReturn = onlineRegions.remove(key);
+      regionsClosing.remove(key);
     } finally {
       this.lock.writeLock().unlock();
     }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java?rev=1477749&r1=1477748&r2=1477749&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java Tue Apr 30 18:18:25 2013
@@ -34,5 +34,6 @@ public enum InjectionEvent {
   HMASTER_DISABLE_TABLE,
   ZKUNASSIGNEDWATCHER_REGION_OPENED,
   SPLITLOGWORKER_SPLIT_LOG_START,
-  HMASTER_START_PROCESS_DEAD_SERVER
+  HMASTER_START_PROCESS_DEAD_SERVER,
+  HREGIONSERVER_REPORT_RESPONSE
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java?rev=1477749&r1=1477748&r2=1477749&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java Tue Apr 30 18:18:25 2013
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase;
 import java.io.IOException;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.util.Dupl
 import org.apache.hadoop.hbase.util.InjectionEvent;
 import org.apache.hadoop.hbase.util.InjectionHandler;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.Test;
 
 /**
  * Test whether region rebalancing works. (HBASE-71)
@@ -53,6 +55,8 @@ public class TestRegionRebalancing exten
 
   final byte [] FAMILY_NAME = Bytes.toBytes("col");
 
+  final Random rand = new Random(57473);
+
   /** constructor */
   public TestRegionRebalancing() {
     super(1);
@@ -101,6 +105,7 @@ public class TestRegionRebalancing exten
    * Set the slot number near 0, so no server's load will large than 4.
    * The load balance algorithm should handle this case properly.
    */
+  @Test(timeout=10000)
   public void testRebalancing() throws IOException {
 
     for (int i = 1; i <= 16; i++){
@@ -109,6 +114,8 @@ public class TestRegionRebalancing exten
       checkingServerStatus();
     }
 
+    setInjectionHandler();
+
     LOG.debug("Restart: killing 1 region server.");
     cluster.stopRegionServer(2, false);
     cluster.waitOnRegionServer(2);
@@ -119,6 +126,30 @@ public class TestRegionRebalancing exten
     assertRegionsAreBalanced();
   }
 
+  private void setInjectionHandler() {
+    InjectionHandler.set(
+        new InjectionHandler() {
+        protected boolean _falseCondition(InjectionEvent event, Object... args) {
+          if (event.equals(InjectionEvent.HREGIONSERVER_REPORT_RESPONSE)) {
+            double num = rand.nextDouble();
+            HMsg msgs[] = (HMsg[]) args;
+            if (msgs.length > 0 && num < 0.45) {
+              StringBuilder sb = new StringBuilder(
+              "Causing a false condition to be true. "
+                  + " rand prob = " + num + " msgs.length is " + msgs.length +
+              ". Messages received from the master that are ignored : \t" );
+              for (int i = 0; i < msgs.length; i++) {
+                sb.append(msgs[i].toString());
+                sb.append("\t");
+              }
+              LOG.debug(sb.toString());
+              return true;
+            }
+          }
+          return false;
+        }});
+  }
+
   private void checkingServerStatus() {
     List<HRegionServer> servers = getOnlineRegionServers();
     double avg = cluster.getMaster().getAverageLoad();