You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/12/07 22:23:56 UTC

svn commit: r602226 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/

Author: jimk
Date: Fri Dec  7 13:23:54 2007
New Revision: 602226

URL: http://svn.apache.org/viewvc?rev=602226&view=rev
Log:
HADOOP-2338 Fix NullPointerException in master server.

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=602226&r1=602225&r2=602226&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Fri Dec  7 13:23:54 2007
@@ -60,6 +60,7 @@
                (Bryan Duxbury via Stack)
    HADOOP-2365 Result of HashFunction.hash() contains all identical values
    HADOOP-2362 Leaking hdfs file handle on region split
+   HADOOP-2338 Fix NullPointerException in master server.
 
   IMPROVEMENTS
    HADOOP-2401 Add convenience put method that takes writable

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=602226&r1=602225&r2=602226&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Fri Dec  7 13:23:54 2007
@@ -31,8 +31,6 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -74,6 +72,7 @@
   HMasterRegionInterface {
   
   static final Log LOG = LogFactory.getLog(HMaster.class.getName());
+  static final Long ZERO_L = Long.valueOf(0L);
 
   /** {@inheritDoc} */
   public long getProtocolVersion(String protocol,
@@ -93,6 +92,8 @@
   // started here in HMaster rather than have them have to know about the
   // hosting class
   volatile AtomicBoolean closed = new AtomicBoolean(true);
+  volatile AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+  volatile AtomicInteger quiescedMetaServers = new AtomicInteger(0);
   volatile boolean fsOk;
   Path dir;
   HBaseConfiguration conf;
@@ -102,9 +103,9 @@
   int numRetries;
   long maxRegionOpenTime;
 
-  DelayQueue<ProcessServerShutdown> shutdownQueue =
-    new DelayQueue<ProcessServerShutdown>();
-  BlockingQueue<RegionServerOperation> msgQueue =
+  DelayQueue<RegionServerOperation> delayedToDoQueue =
+    new DelayQueue<RegionServerOperation>();
+  BlockingQueue<RegionServerOperation> toDoQueue =
     new LinkedBlockingQueue<RegionServerOperation>();
 
   int leaseTimeout;
@@ -424,8 +425,7 @@
           || killedRegions.contains(info.getRegionName()) // queued for offline
           || regionsToDelete.contains(info.getRegionName())) { // queued for delete
 
-        unassignedRegions.remove(info.getRegionName());
-        assignAttempts.remove(info.getRegionName());
+        unassignedRegions.remove(info);
         return;
       }
       HServerInfo storedInfo = null;
@@ -458,7 +458,7 @@
       if (!deadServer &&
           ((storedInfo != null && storedInfo.getStartCode() != startCode) ||
               (storedInfo == null &&
-                  !unassignedRegions.containsKey(info.getRegionName()) &&
+                  !unassignedRegions.containsKey(info) &&
                   !pendingRegions.contains(info.getRegionName())
               )
           )
@@ -495,8 +495,7 @@
           }
         }
         // Now get the region assigned
-        unassignedRegions.put(info.getRegionName(), info);
-        assignAttempts.put(info.getRegionName(), Long.valueOf(0L));
+        unassignedRegions.put(info, ZERO_L);
       }
     }
   }
@@ -818,8 +817,9 @@
     new ConcurrentHashMap<String, HServerLoad>();
 
   /**
-   * The 'unassignedRegions' table maps from a region name to a HRegionInfo 
-   * record, which includes the region's table, its id, and its start/end keys.
+   * The 'unassignedRegions' table maps from a HRegionInfo to a timestamp that
+   * indicates the last time we *tried* to assign the region to a RegionServer.
+   * If the timestamp is out of date, then we can try to reassign it. 
    * 
    * We fill 'unassignedRecords' by scanning ROOT and META tables, learning the
    * set of all known valid regions.
@@ -827,15 +827,8 @@
    * <p>Items are removed from this list when a region server reports in that
    * the region has been deployed.
    */
-  final SortedMap<Text, HRegionInfo> unassignedRegions =
-    Collections.synchronizedSortedMap(new TreeMap<Text, HRegionInfo>());
-
-  /**
-   * The 'assignAttempts' table maps from regions to a timestamp that indicates
-   * the last time we *tried* to assign the region to a RegionServer. If the 
-   * timestamp is out of date, then we can try to reassign it.
-   */
-  final Map<Text, Long> assignAttempts = new ConcurrentHashMap<Text, Long>();
+  final SortedMap<HRegionInfo, Long> unassignedRegions =
+    Collections.synchronizedSortedMap(new TreeMap<HRegionInfo, Long>());
 
   /**
    * Regions that have been assigned, and the server has reported that it has
@@ -978,10 +971,7 @@
    */
   void unassignRootRegion() {
     this.rootRegionLocation.set(null);
-    this.unassignedRegions.put(HRegionInfo.rootRegionInfo.getRegionName(),
-        HRegionInfo.rootRegionInfo);
-    this.assignAttempts.put(HRegionInfo.rootRegionInfo.getRegionName(),
-        Long.valueOf(0L));
+    this.unassignedRegions.put(HRegionInfo.rootRegionInfo, ZERO_L);
   }
 
   /**
@@ -1030,7 +1020,11 @@
    * @return Location of the <code>-ROOT-</code> region.
    */
   public HServerAddress getRootRegionLocation() {
-    return this.rootRegionLocation.get();
+    HServerAddress rootServer = null;
+    if (!shutdownRequested.get() && !closed.get()) {
+      rootServer = this.rootRegionLocation.get();
+    }
+    return rootServer;
   }
   
   /**
@@ -1054,11 +1048,11 @@
         if (rootRegionLocation.get() != null) {
           // We can't process server shutdowns unless the root region is online 
 
-          op = this.shutdownQueue.poll();
+          op = this.delayedToDoQueue.poll();
         }
         if (op == null ) {
           try {
-            op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+            op = toDoQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
           } catch (InterruptedException e) {
             // continue
           }
@@ -1077,7 +1071,7 @@
             // for the missing meta region(s) to come back online, but since it
             // is waiting, it cannot process the meta region online operation it
             // is waiting for. So put this operation back on the queue for now.
-            if (msgQueue.size() == 0) {
+            if (toDoQueue.size() == 0) {
               // The queue is currently empty so wait for a while to see if what
               // we need comes in first
               sleeper.sleep();
@@ -1086,9 +1080,10 @@
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Put " + op.toString() + " back on queue");
               }
-              msgQueue.put(op);
+              toDoQueue.put(op);
             } catch (InterruptedException e) {
-              throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+              throw new RuntimeException(
+                  "Putting into toDoQueue was interrupted.", e);
             }
           }
         } catch (Exception ex) {
@@ -1106,9 +1101,10 @@
           }
           LOG.warn("Processing pending operations: " + op.toString(), ex);
           try {
-            msgQueue.put(op);
+            toDoQueue.put(op);
           } catch (InterruptedException e) {
-            throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+            throw new RuntimeException(
+                "Putting into toDoQueue was interrupted.", e);
           }
         }
       }
@@ -1255,7 +1251,7 @@
       if (root != null && root.equals(storedInfo.getServerAddress())) {
         unassignRootRegion();
       }
-      shutdownQueue.put(new ProcessServerShutdown(storedInfo));
+      delayedToDoQueue.put(new ProcessServerShutdown(storedInfo));
     }
 
     // record new server
@@ -1302,48 +1298,70 @@
   throws IOException {
     String serverName = serverInfo.getServerAddress().toString().trim();
     long serverLabel = getServerLabel(serverName);
-    if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
-      synchronized (serversToServerInfo) {
-        try {
-          // HRegionServer is shutting down. Cancel the server's lease.
-          // Note that canceling the server's lease takes care of updating
-          // serversToServerInfo, etc.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Region server " + serverName +
-            ": MSG_REPORT_EXITING -- cancelling lease");
-          }
+//    if (LOG.isDebugEnabled()) {
+//      LOG.debug("received heartbeat from " + serverName);
+//    }
+    if (msgs.length > 0) {
+      if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
+        synchronized (serversToServerInfo) {
+          try {
+            // HRegionServer is shutting down. Cancel the server's lease.
+            // Note that canceling the server's lease takes care of updating
+            // serversToServerInfo, etc.
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Region server " + serverName +
+              ": MSG_REPORT_EXITING -- cancelling lease");
+            }
 
-          if (cancelLease(serverName, serverLabel)) {
-            // Only process the exit message if the server still has a lease.
-            // Otherwise we could end up processing the server exit twice.
-            LOG.info("Region server " + serverName +
-            ": MSG_REPORT_EXITING -- lease cancelled");
-            // Get all the regions the server was serving reassigned
-            // (if we are not shutting down).
-            if (!closed.get()) {
-              for (int i = 1; i < msgs.length; i++) {
-                HRegionInfo info = msgs[i].getRegionInfo();
-                if (info.getTableDesc().getName().equals(ROOT_TABLE_NAME)) {
-                  rootRegionLocation.set(null);
-                } else if (info.getTableDesc().getName().equals(META_TABLE_NAME)) {
-                  onlineMetaRegions.remove(info.getStartKey());
-                }
+            if (cancelLease(serverName, serverLabel)) {
+              // Only process the exit message if the server still has a lease.
+              // Otherwise we could end up processing the server exit twice.
+              LOG.info("Region server " + serverName +
+              ": MSG_REPORT_EXITING -- lease cancelled");
+              // Get all the regions the server was serving reassigned
+              // (if we are not shutting down).
+              if (!closed.get()) {
+                for (int i = 1; i < msgs.length; i++) {
+                  HRegionInfo info = msgs[i].getRegionInfo();
+                  if (info.isRootRegion()) {
+                    rootRegionLocation.set(null);
+                  } else if (info.isMetaTable()) {
+                    onlineMetaRegions.remove(info.getStartKey());
+                  }
 
-                this.unassignedRegions.put(info.getRegionName(), info);
-                this.assignAttempts.put(info.getRegionName(), Long.valueOf(0L));
+                  this.unassignedRegions.put(info, ZERO_L);
+                }
               }
             }
-          }
 
-          // We don't need to return anything to the server because it isn't
-          // going to do any more work.
-          return new HMsg[0];
-        } finally {
-          serversToServerInfo.notifyAll();
+            // We don't need to return anything to the server because it isn't
+            // going to do any more work.
+            return new HMsg[0];
+          } finally {
+            serversToServerInfo.notifyAll();
+          }
+        }
+      } else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) {
+        LOG.info("Region server " + serverName + " quiesced");
+        if(quiescedMetaServers.incrementAndGet() == serversToServerInfo.size()) {
+          // If the only servers we know about are meta servers, then we can
+          // proceed with shutdown
+          LOG.info("All user tables quiesced. Proceeding with shutdown");
+          closed.set(true);
+          synchronized(toDoQueue) {
+            toDoQueue.clear();                         // Empty the queue
+            delayedToDoQueue.clear();                  // Empty shut down queue
+            toDoQueue.notifyAll();                     // Wake main thread
+          }
         }
       }
     }
 
+    if (shutdownRequested.get() && !closed.get()) {
+      // Tell the server to stop serving any user regions
+      return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE)};
+    }
+
     if (closed.get()) {
       // Tell server to shut down if we are shutting down.  This should
       // happen after check of MSG_REPORT_EXITING above, since region server
@@ -1476,62 +1494,86 @@
       switch (incomingMsgs[i].getMsg()) {
 
       case HMsg.MSG_REPORT_PROCESS_OPEN:
-        synchronized (this.assignAttempts) {
+        synchronized (unassignedRegions) {
           // Region server has acknowledged request to open region.
-          // Extend region open time by 1/2 max region open time.
-          assignAttempts.put(region.getRegionName(), 
-              Long.valueOf(assignAttempts.get(
-                  region.getRegionName()).longValue() +
-                  (this.maxRegionOpenTime / 2)));
+          // Extend region open time by max region open time.
+          unassignedRegions.put(region,
+              System.currentTimeMillis() + this.maxRegionOpenTime);
         }
         break;
         
       case HMsg.MSG_REPORT_OPEN:
-        HRegionInfo regionInfo = unassignedRegions.get(region.getRegionName());
-
-        if (regionInfo == null) {
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("region server " + info.getServerAddress().toString()
-                + " should not have opened region " + region.getRegionName());
+        boolean duplicateAssignment = false;
+        synchronized (unassignedRegions) {
+          if (unassignedRegions.remove(region) == null) {
+            if (region.getRegionName().compareTo(
+                HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
+              // Root region
+              HServerAddress rootServer = rootRegionLocation.get();
+              if (rootServer != null) {
+                if (rootServer.toString().compareTo(serverName) == 0) {
+                  // A duplicate open report from the correct server
+                  break;
+                }
+                // We received an open report on the root region, but it is
+                // assigned to a different server
+                duplicateAssignment = true;
+              }
+            } else {
+              // Not root region. If it is not a pending region, then we are
+              // going to treat it as a duplicate assignment
+              if (pendingRegions.contains(region.getRegionName())) {
+                // A duplicate report from the correct server
+                break;
+              }
+              // Although we can't tell for certain if this is a duplicate
+              // report from the correct server, we are going to treat it
+              // as such
+              duplicateAssignment = true;
+            }
           }
+          if (duplicateAssignment) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("region server " + info.getServerAddress().toString()
+                  + " should not have opened region " + region.getRegionName());
+            }
 
-          // This Region should not have been opened.
-          // Ask the server to shut it down, but don't report it as closed.  
-          // Otherwise the HMaster will think the Region was closed on purpose, 
-          // and then try to reopen it elsewhere; that's not what we want.
-
-          returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); 
+            // This Region should not have been opened.
+            // Ask the server to shut it down, but don't report it as closed.  
+            // Otherwise the HMaster will think the Region was closed on purpose, 
+            // and then try to reopen it elsewhere; that's not what we want.
 
-        } else {
-          LOG.info(info.getServerAddress().toString() + " serving " +
-              region.getRegionName());
+            returnMsgs.add(
+                new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); 
 
-          if (region.getRegionName().compareTo(
-              HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
-            // Store the Root Region location (in memory)
-            synchronized (rootRegionLocation) {
-              this.rootRegionLocation.set(
-                  new HServerAddress(info.getServerAddress()));
-              this.rootRegionLocation.notifyAll();
-            }
           } else {
-            // Note that the table has been assigned and is waiting for the meta
-            // table to be updated.
+            LOG.info(info.getServerAddress().toString() + " serving " +
+                region.getRegionName());
 
-            pendingRegions.add(region.getRegionName());
+            if (region.getRegionName().compareTo(
+                HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
+              // Store the Root Region location (in memory)
+              synchronized (rootRegionLocation) {
+                this.rootRegionLocation.set(
+                    new HServerAddress(info.getServerAddress()));
+                this.rootRegionLocation.notifyAll();
+              }
+            } else {
+              // Note that the table has been assigned and is waiting for the
+              // meta table to be updated.
 
-            // Queue up an update to note the region location.
+              pendingRegions.add(region.getRegionName());
 
-            try {
-              msgQueue.put(new ProcessRegionOpen(info, region));
-            } catch (InterruptedException e) {
-              throw new RuntimeException("Putting into msgQueue was interrupted.", e);
-            }
-          } 
-          // Remove from unassigned list so we don't assign it to someone else
-          this.unassignedRegions.remove(region.getRegionName());
-          this.assignAttempts.remove(region.getRegionName());
+              // Queue up an update to note the region location.
+
+              try {
+                toDoQueue.put(new ProcessRegionOpen(info, region));
+              } catch (InterruptedException e) {
+                throw new RuntimeException(
+                    "Putting into toDoQueue was interrupted.", e);
+              }
+            } 
+          }
         }
         break;
 
@@ -1559,19 +1601,24 @@
             deleteRegion = true;
           }
 
+          if (region.isMetaTable()) {
+            // Region is part of the meta table. Remove it from onlineMetaRegions
+            onlineMetaRegions.remove(region.getStartKey());
+          }
+
           // NOTE: we cannot put the region into unassignedRegions as that
           //       could create a race with the pending close if it gets 
           //       reassigned before the close is processed.
 
-          unassignedRegions.remove(region.getRegionName());
-          assignAttempts.remove(region.getRegionName());
+          unassignedRegions.remove(region);
 
           try {
-            msgQueue.put(new ProcessRegionClose(region, reassignRegion,
+            toDoQueue.put(new ProcessRegionClose(region, reassignRegion,
                 deleteRegion));
 
           } catch (InterruptedException e) {
-            throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+            throw new RuntimeException(
+                "Putting into toDoQueue was interrupted.", e);
           }
         }
         break;
@@ -1580,12 +1627,10 @@
         // A region has split.
 
         HRegionInfo newRegionA = incomingMsgs[++i].getRegionInfo();
-        unassignedRegions.put(newRegionA.getRegionName(), newRegionA);
-        assignAttempts.put(newRegionA.getRegionName(), Long.valueOf(0L));
+        unassignedRegions.put(newRegionA, ZERO_L);
 
         HRegionInfo newRegionB = incomingMsgs[++i].getRegionInfo();
-        unassignedRegions.put(newRegionB.getRegionName(), newRegionB);
-        assignAttempts.put(newRegionB.getRegionName(), Long.valueOf(0L));
+        unassignedRegions.put(newRegionB, ZERO_L);
 
         LOG.info("region " + region.getRegionName() +
             " split. New regions are: " + newRegionA.getRegionName() + ", " +
@@ -1631,15 +1676,22 @@
   private void assignRegions(HServerInfo info, String serverName,
       ArrayList<HMsg> returnMsgs) {
     
-    synchronized (this.assignAttempts) {
+    synchronized (this.unassignedRegions) {
       
       // We need to hold a lock on assign attempts while we figure out what to
       // do so that multiple threads do not execute this method in parallel
       // resulting in assigning the same region to multiple servers.
       
       long now = System.currentTimeMillis();
-      Set<Text> regionsToAssign = new HashSet<Text>();
-      for (Map.Entry<Text, Long> e: this.assignAttempts.entrySet()) {
+      Set<HRegionInfo> regionsToAssign = new HashSet<HRegionInfo>();
+      for (Map.Entry<HRegionInfo, Long> e: this.unassignedRegions.entrySet()) {
+        HRegionInfo i = e.getKey();
+        if (numberOfMetaRegions.get() != onlineMetaRegions.size() &&
+            !i.isMetaRegion()) {
+          // Can't assign user regions until all meta regions have been assigned
+          // and are on-line
+          continue;
+        }
         long diff = now - e.getValue().longValue();
         if (diff > this.maxRegionOpenTime) {
           regionsToAssign.add(e.getKey());
@@ -1720,11 +1772,10 @@
         }
 
         now = System.currentTimeMillis();
-        for (Text regionName: regionsToAssign) {
-          HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
-          LOG.info("assigning region " + regionName + " to server " +
-              serverName);
-          this.assignAttempts.put(regionName, Long.valueOf(now));
+        for (HRegionInfo regionInfo: regionsToAssign) {
+          LOG.info("assigning region " + regionInfo.getRegionName() +
+              " to server " + serverName);
+          this.unassignedRegions.put(regionInfo, Long.valueOf(now));
           returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
           if (--nregions <= 0) {
             break;
@@ -1773,14 +1824,13 @@
    * @param serverName
    * @param returnMsgs
    */
-  private void assignRegionsToOneServer(final Set<Text> regionsToAssign,
+  private void assignRegionsToOneServer(final Set<HRegionInfo> regionsToAssign,
       final String serverName, final ArrayList<HMsg> returnMsgs) {
     long now = System.currentTimeMillis();
-    for (Text regionName: regionsToAssign) {
-      HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
-      LOG.info("assigning region " + regionName + " to the only server " +
-          serverName);
-      this.assignAttempts.put(regionName, Long.valueOf(now));
+    for (HRegionInfo regionInfo: regionsToAssign) {
+      LOG.info("assigning region " + regionInfo.getRegionName() +
+          " to the only server " + serverName);
+      this.unassignedRegions.put(regionInfo, Long.valueOf(now));
       returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
     }
   }
@@ -1789,10 +1839,63 @@
    * Some internal classes to manage msg-passing and region server operations
    */
 
-  private abstract class RegionServerOperation {
-    RegionServerOperation() {}
+  private abstract class RegionServerOperation implements Delayed {
+    private long expire;
+
+    protected RegionServerOperation() {
+      // Set the future time at which we expect to be released from the
+      // DelayQueue we're inserted in on lease expiration.
+      this.expire = System.currentTimeMillis() + leaseTimeout / 2;
+    }
+    
+    /** {@inheritDoc} */
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(this.expire - System.currentTimeMillis(),
+        TimeUnit.MILLISECONDS);
+    }
+    
+    /** {@inheritDoc} */
+    public int compareTo(Delayed o) {
+      return Long.valueOf(getDelay(TimeUnit.MILLISECONDS)
+          - o.getDelay(TimeUnit.MILLISECONDS)).intValue();
+    }
+    
+    protected void requeue() {
+      this.expire = System.currentTimeMillis() + leaseTimeout / 2;
+      delayedToDoQueue.put(this);
+    }
+    
+    protected boolean rootAvailable() {
+      boolean available = true;
+      if (rootRegionLocation.get() == null) {
+        available = false;
+        requeue();
+      }
+      return available;
+    }
 
-    abstract boolean process() throws IOException;
+    protected boolean metaTableAvailable() {
+      boolean available = true;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("numberOfMetaRegions: " + numberOfMetaRegions.get() +
+            ", onlineMetaRegions.size(): " + onlineMetaRegions.size());
+      }
+      if (numberOfMetaRegions.get() != onlineMetaRegions.size()) {
+        // We can't proceed because not all of the meta regions are online.
+        // We can't block either because that would prevent the meta region
+        // online message from being processed. In order to prevent spinning
+        // in the run queue, put this request on the delay queue to give
+        // other threads the opportunity to get the meta regions on-line.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Requeuing because not all meta regions are online");
+        }
+        available = false;
+        requeue();
+      }
+      return available;
+    }
+    
+    protected abstract boolean process() throws IOException;
   }
 
   /** 
@@ -1800,15 +1903,13 @@
    * The region server's log file needs to be split up for each region it was
    * serving, and the regions need to get reassigned.
    */
-  private class ProcessServerShutdown extends RegionServerOperation
-  implements Delayed {
-    private long expire;
+  private class ProcessServerShutdown extends RegionServerOperation {
     private HServerAddress deadServer;
     private String deadServerName;
     private Path oldLogDir;
-    private transient boolean logSplit;
-    private transient boolean rootChecked;
-    private transient boolean rootRescanned;
+    private boolean logSplit;
+    private boolean rootChecked;
+    private boolean rootRescanned;
 
     private class ToDoEntry {
       boolean deleteRegion;
@@ -1824,7 +1925,10 @@
       }
     }
 
-    ProcessServerShutdown(HServerInfo serverInfo) {
+    /**
+     * @param serverInfo
+     */
+    public ProcessServerShutdown(HServerInfo serverInfo) {
       super();
       this.deadServer = serverInfo.getServerAddress();
       this.deadServerName = this.deadServer.toString();
@@ -1838,24 +1942,9 @@
       dirName.append("_");
       dirName.append(deadServer.getPort());
       this.oldLogDir = new Path(dir, dirName.toString());
-      // Set the future time at which we expect to be released from the
-      // DelayQueue we're inserted in on lease expiration.
-      this.expire = System.currentTimeMillis() + leaseTimeout / 2;
     }
 
     /** {@inheritDoc} */
-    public long getDelay(TimeUnit unit) {
-      return unit.convert(this.expire - System.currentTimeMillis(),
-        TimeUnit.MILLISECONDS);
-    }
-    
-    /** {@inheritDoc} */
-    public int compareTo(Delayed o) {
-      return Long.valueOf(getDelay(TimeUnit.MILLISECONDS)
-          - o.getDelay(TimeUnit.MILLISECONDS)).intValue();
-    }
-    
-    /** {@inheritDoc} */
     @Override
     public String toString() {
       return "ProcessServerShutdown of " + this.deadServer.toString();
@@ -1866,7 +1955,7 @@
         Text regionName) throws IOException {
 
       ArrayList<ToDoEntry> toDoList = new ArrayList<ToDoEntry>();
-      TreeMap<Text, HRegionInfo> regions = new TreeMap<Text, HRegionInfo>();
+      HashSet<HRegionInfo> regions = new HashSet<HRegionInfo>();
 
       try {
         while (true) {
@@ -1958,8 +2047,7 @@
             if (regionsToKill.containsKey(info.getRegionName())) {
               regionsToKill.remove(info.getRegionName());
               killList.put(deadServerName, regionsToKill);
-              unassignedRegions.remove(info.getRegionName());
-              assignAttempts.remove(info.getRegionName());
+              unassignedRegions.remove(info);
               synchronized (regionsToDelete) {
                 if (regionsToDelete.contains(info.getRegionName())) {
                   // Delete this region
@@ -1974,7 +2062,7 @@
 
           } else {
             // Get region reassigned
-            regions.put(info.getRegionName(), info);
+            regions.add(info);
 
             // If it was pending, remove.
             // Otherwise will obstruct its getting reassigned.
@@ -2008,16 +2096,13 @@
       }
 
       // Get regions reassigned
-      for (Map.Entry<Text, HRegionInfo> e: regions.entrySet()) {
-        Text region = e.getKey();
-        HRegionInfo regionInfo = e.getValue();
-        unassignedRegions.put(region, regionInfo);
-        assignAttempts.put(region, Long.valueOf(0L));
+      for (HRegionInfo info: regions) {
+        unassignedRegions.put(info, ZERO_L);
       }
     }
 
     @Override
-    boolean process() throws IOException {
+    protected boolean process() throws IOException {
       LOG.info("process shutdown of server " + deadServer + ": logSplit: " +
           this.logSplit + ", rootChecked: " + this.rootChecked +
           ", rootRescanned: " + this.rootRescanned + ", numberOfMetaRegions: " +
@@ -2040,30 +2125,12 @@
       }
 
       if (!rootChecked) {
-        boolean rootRegionUnavailable = false;
-        if (rootRegionLocation.get() == null) {
-          rootRegionUnavailable = true;
-
-        } else if (deadServer.equals(rootRegionLocation.get())) {
-          // We should never get here because whenever an object of this type
-          // is created, a check is made to see if it is the root server.
-          // and unassignRootRegion() is called then. However, in the
-          // unlikely event that we do end up here, let's do the right thing.
-          unassignRootRegion();
-          rootRegionUnavailable = true;
-        }
-        if (rootRegionUnavailable) {
-          // We can't do anything until the root region is on-line, put
-          // us back on the delay queue. Reset the future time at which
-          // we expect to be released from the DelayQueue we're inserted
-          // in on lease expiration.
-          this.expire = System.currentTimeMillis() + leaseTimeout / 2;
-          shutdownQueue.put(this);
-          
-          // Return true so run() does not put us back on the msgQueue
+        if (!rootAvailable()) {
+          // Return true so that worker does not put this request back on the
+          // toDoQueue.
+          // rootAvailable() has already put it on the delayedToDoQueue
           return true;
         }
-        rootChecked = true;
       }
 
       if (!rootRescanned) {
@@ -2114,27 +2181,14 @@
         }
         rootRescanned = true;
       }
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("numberOfMetaRegions: " + numberOfMetaRegions.get() +
-            ", onlineMetaRegions.size(): " + onlineMetaRegions.size());
-      }
-      if (numberOfMetaRegions.get() != onlineMetaRegions.size()) {
-        // We can't proceed because not all of the meta regions are online.
-        // We can't block either because that would prevent the meta region
-        // online message from being processed. In order to prevent spinning
-        // in the run queue, put this request on the delay queue to give
-        // other threads the opportunity to get the meta regions on-line.
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(
-              "Requeuing shutdown because not all meta regions are online");
-        }
-        this.expire = System.currentTimeMillis() + leaseTimeout / 2;
-        shutdownQueue.put(this);
-        
-        // Return true so run() does not put us back on the msgQueue
+      
+      if (!metaTableAvailable()) {
+        // We can't proceed because not all meta regions are online.
+        // metaAvailable() has put this request on the delayedToDoQueue
+        // Return true so that worker does not put this on the toDoQueue
         return true;
       }
+
       for (int tries = 0; tries < numRetries; tries++) {
         try {
           if (closed.get()) {
@@ -2181,33 +2235,95 @@
   }
 
   /**
+   * Abstract class that performs common operations for 
+   * @see #ProcessRegionClose and @see #ProcessRegionOpen
+   */
+  private abstract class ProcessRegionStatusChange
+    extends RegionServerOperation {
+
+    protected final boolean isMetaTable;
+    protected final HRegionInfo regionInfo;
+    private MetaRegion metaRegion;
+    protected Text metaRegionName;
+    
+    /**
+     * @param regionInfo
+     */
+    public ProcessRegionStatusChange(HRegionInfo regionInfo) {
+      super();
+      this.regionInfo = regionInfo;
+      this.isMetaTable = regionInfo.isMetaTable();
+      this.metaRegion = null;
+      this.metaRegionName = null;
+    }
+    
+    protected boolean metaRegionAvailable() {
+      boolean available = true;
+      if (isMetaTable) {
+        // This operation is for the meta table
+        if (!rootAvailable()) {
+          // But we can't proceed unless the root region is available
+          available = false;
+        }
+      } else {
+        if (!rootScanned || !metaTableAvailable()) {
+          // The root region has not been scanned or the meta table is not
+          // available so we can't proceed.
+          // Put the operation on the delayedToDoQueue
+          requeue();
+          available = false;
+        }
+      }
+      return available;
+    }
+    
+    protected HRegionInterface getMetaServer() throws IOException {
+      if (this.isMetaTable) {
+        this.metaRegionName = HRegionInfo.rootRegionInfo.getRegionName();
+      } else {
+        if (this.metaRegion == null) {
+          synchronized (onlineMetaRegions) {
+            metaRegion = onlineMetaRegions.size() == 1 ? 
+                onlineMetaRegions.get(onlineMetaRegions.firstKey()) :
+                  onlineMetaRegions.containsKey(regionInfo.getRegionName()) ?
+                      onlineMetaRegions.get(regionInfo.getRegionName()) :
+                        onlineMetaRegions.get(onlineMetaRegions.headMap(
+                            regionInfo.getRegionName()).lastKey());
+          }
+          this.metaRegionName = metaRegion.getRegionName();
+        }
+      }
+
+      HServerAddress server = null;
+      if (isMetaTable) {
+        server = rootRegionLocation.get();
+        
+      } else {
+        server = metaRegion.getServer();
+      }
+      return connection.getHRegionConnection(server);
+    }
+    
+  }
+  /**
    * ProcessRegionClose is instantiated when a region server reports that it
    * has closed a region.
    */
-  private class ProcessRegionClose extends RegionServerOperation {
-    private HRegionInfo regionInfo;
+  private class ProcessRegionClose extends ProcessRegionStatusChange {
     private boolean reassignRegion;
     private boolean deleteRegion;
-    private boolean rootRegion;
 
-    ProcessRegionClose(HRegionInfo regionInfo, boolean reassignRegion,
+    /**
+     * @param regionInfo
+     * @param reassignRegion
+     * @param deleteRegion
+     */
+    public ProcessRegionClose(HRegionInfo regionInfo, boolean reassignRegion,
         boolean deleteRegion) {
 
-      super();
-
-      this.regionInfo = regionInfo;
+      super(regionInfo);
       this.reassignRegion = reassignRegion;
       this.deleteRegion = deleteRegion;
-
-      // If the region closing down is a meta region then we need to update
-      // the ROOT table
-
-      if (this.regionInfo.getTableDesc().getName().equals(META_TABLE_NAME)) {
-        this.rootRegion = true;
-
-      } else {
-        this.rootRegion = false;
-      }
     }
 
     /** {@inheritDoc} */
@@ -2217,7 +2333,7 @@
     }
 
     @Override
-    boolean process() throws IOException {
+    protected boolean process() throws IOException {
       for (int tries = 0; tries < numRetries; tries++) {
         if (closed.get()) {
           return true;
@@ -2226,50 +2342,15 @@
 
         // Mark the Region as unavailable in the appropriate meta table
 
-        Text metaRegionName;
-        HRegionInterface server;
-        if (rootRegion) {
-          if (rootRegionLocation.get() == null || !rootScanned) {
-            // We can't proceed until the root region is online and has been
-            // scanned
-            return false;
-          }
-          metaRegionName = HRegionInfo.rootRegionInfo.getRegionName();
-          server = connection.getHRegionConnection(rootRegionLocation.get());
-          onlineMetaRegions.remove(regionInfo.getStartKey());
-
-        } else {
-          if (!rootScanned ||
-              numberOfMetaRegions.get() != onlineMetaRegions.size()) {
-            
-            // We can't proceed because not all of the meta regions are online.
-            // We can't block either because that would prevent the meta region
-            // online message from being processed. So return false to have this
-            // operation requeued.
-            
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Requeuing close because rootScanned=" +
-                  rootScanned + ", numberOfMetaRegions=" +
-                  numberOfMetaRegions.get() + ", onlineMetaRegions.size()=" +
-                  onlineMetaRegions.size());
-            }
-            return false;
-          }
-
-          MetaRegion r = null;
-          synchronized (onlineMetaRegions) {
-            if (onlineMetaRegions.containsKey(regionInfo.getRegionName())) {
-              r = onlineMetaRegions.get(regionInfo.getRegionName());
-
-            } else {
-              r = onlineMetaRegions.get(onlineMetaRegions.headMap(
-                  regionInfo.getRegionName()).lastKey());
-            }
-          }
-          metaRegionName = r.getRegionName();
-          server = connection.getHRegionConnection(r.getServer());
+        if (!metaRegionAvailable()) {
+          // We can't proceed unless the meta region we are going to update
+          // is online. metaRegionAvailable() has put this operation on the
+          // delayedToDoQueue, so return true so the operation is not put 
+          // back on the toDoQueue
+          return true;
         }
 
+        HRegionInterface server = getMetaServer();
         try {
           BatchUpdate b = new BatchUpdate(rand.nextLong());
           long lockid = b.startUpdate(regionInfo.getRegionName());
@@ -2298,8 +2379,7 @@
       if (reassignRegion) {
         LOG.info("reassign region: " + regionInfo.getRegionName());
 
-        unassignedRegions.put(regionInfo.getRegionName(), regionInfo);
-        assignAttempts.put(regionInfo.getRegionName(), Long.valueOf(0L));
+        unassignedRegions.put(regionInfo, ZERO_L);
 
       } else if (deleteRegion) {
         try {
@@ -2320,19 +2400,18 @@
    * serving a region. This applies to all meta and user regions except the 
    * root region which is handled specially.
    */
-  private class ProcessRegionOpen extends RegionServerOperation {
-    private final boolean rootRegion;
-    private final HRegionInfo region;
+  private class ProcessRegionOpen extends ProcessRegionStatusChange {
     private final HServerAddress serverAddress;
     private final byte [] startCode;
 
-    ProcessRegionOpen(HServerInfo info, HRegionInfo region)
+    /**
+     * @param info
+     * @param regionInfo
+     * @throws IOException
+     */
+    public ProcessRegionOpen(HServerInfo info, HRegionInfo regionInfo)
     throws IOException {
-      // If true, the region which just came on-line is a META region.
-      // We need to look in the ROOT region for its information.  Otherwise,
-      // its just an ordinary region. Look for it in the META table.
-      this.rootRegion = region.getTableDesc().getName().equals(META_TABLE_NAME);
-      this.region = region;
+      super(regionInfo);
       this.serverAddress = info.getServerAddress();
       this.startCode = Writables.longToBytes(info.getStartCode());
     }
@@ -2344,72 +2423,40 @@
     }
 
     @Override
-    boolean process() throws IOException {
+    protected boolean process() throws IOException {
       for (int tries = 0; tries < numRetries; tries++) {
         if (closed.get()) {
           return true;
         }
-        LOG.info(region.toString() + " open on " + 
+        LOG.info(regionInfo.toString() + " open on " + 
             this.serverAddress.toString());
 
-        // Register the newly-available Region's location.
-        Text metaRegionName;
-        HRegionInterface server;
-        if (this.rootRegion) {
-          if (rootRegionLocation.get() == null || !rootScanned) {
-            // We can't proceed until root region is online and scanned
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("root region: " + 
-                ((rootRegionLocation.get() != null)?
-                  rootRegionLocation.get().toString(): "null") +
-                ", rootScanned: " + rootScanned);
-            }
-            return false;
-          }
-          metaRegionName = HRegionInfo.rootRegionInfo.getRegionName();
-          server = connection.getHRegionConnection(rootRegionLocation.get());
-        } else {
-          if (!rootScanned ||
-              numberOfMetaRegions.get() != onlineMetaRegions.size()) {
-            // We can't proceed because not all of the meta regions are online.
-            // We can't block either because that would prevent the meta region
-            // online message from being processed. So return false to have this
-            // operation requeued.
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Requeuing open because rootScanned: " +
-                  rootScanned + ", numberOfMetaRegions: " +
-                  numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " +
-                  onlineMetaRegions.size());
-            }
-            return false;
-          }
-
-          MetaRegion r = null;
-          synchronized (onlineMetaRegions) {
-            r = onlineMetaRegions.containsKey(region.getRegionName()) ?
-                onlineMetaRegions.get(region.getRegionName()) :
-                  onlineMetaRegions.get(onlineMetaRegions.headMap(
-                      region.getRegionName()).lastKey());
-          }
-          metaRegionName = r.getRegionName();
-          server = connection.getHRegionConnection(r.getServer());
+        if (!metaRegionAvailable()) {
+          // We can't proceed unless the meta region we are going to update
+          // is online. metaRegionAvailable() has put this operation on the
+          // delayedToDoQueue, so return true so the operation is not put 
+          // back on the toDoQueue
+          return true;
         }
+
+        // Register the newly-available Region's location.
         
-        LOG.info("updating row " + region.getRegionName() + " in table " +
+        HRegionInterface server = getMetaServer();
+        LOG.info("updating row " + regionInfo.getRegionName() + " in table " +
           metaRegionName + " with startcode " +
           Writables.bytesToLong(this.startCode) + " and server "+
           serverAddress.toString());
         try {
           BatchUpdate b = new BatchUpdate(rand.nextLong());
-          long lockid = b.startUpdate(region.getRegionName());
+          long lockid = b.startUpdate(regionInfo.getRegionName());
           b.put(lockid, COL_SERVER,
             Writables.stringToBytes(serverAddress.toString()));
           b.put(lockid, COL_STARTCODE, startCode);
           server.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
-          if (region.getTableDesc().getName().equals(META_TABLE_NAME)) {
+          if (isMetaTable) {
             // It's a meta region.
             MetaRegion m = new MetaRegion(this.serverAddress,
-              this.region.getRegionName(), this.region.getStartKey());
+              this.regionInfo.getRegionName(), this.regionInfo.getStartKey());
             if (!initialMetaScanComplete) {
               // Put it on the queue to be scanned for the first time.
               try {
@@ -2422,11 +2469,11 @@
             } else {
               // Add it to the online meta regions
               LOG.debug("Adding to onlineMetaRegions: " + m.toString());
-              onlineMetaRegions.put(this.region.getStartKey(), m);
+              onlineMetaRegions.put(this.regionInfo.getStartKey(), m);
             }
           }
           // If updated successfully, remove from pending list.
-          pendingRegions.remove(region.getRegionName());
+          pendingRegions.remove(regionInfo.getRegionName());
           break;
         } catch (IOException e) {
           if (tries == numRetries - 1) {
@@ -2449,19 +2496,8 @@
 
   /** {@inheritDoc} */
   public void shutdown() {
-    TimerTask tt = new TimerTask() {
-      @Override
-      public void run() {
-        closed.set(true);
-        synchronized(msgQueue) {
-          msgQueue.clear();                         // Empty the queue
-          shutdownQueue.clear();                    // Empty shut down queue
-          msgQueue.notifyAll();                     // Wake main thread
-        }
-      }
-    };
-    Timer t = new Timer(getName() + "-Shutdown");
-    t.schedule(tt, 10);
+    LOG.info("Cluster shutdown requested. Starting to quiesce servers");
+    this.shutdownRequested.set(true);
   }
 
   /** {@inheritDoc} */
@@ -2563,8 +2599,7 @@
 
       // 5. Get it assigned to a server
 
-      this.unassignedRegions.put(regionName, info);
-      this.assignAttempts.put(regionName, Long.valueOf(0L));
+      this.unassignedRegions.put(info, ZERO_L);
 
     } finally {
       tableInCreation.remove(newRegion.getTableDesc().getName());
@@ -2838,14 +2873,12 @@
         }
 
         if (online) {                         // Bring offline regions on-line
-          if (!unassignedRegions.containsKey(i.getRegionName())) {
-            unassignedRegions.put(i.getRegionName(), i);
-            assignAttempts.put(i.getRegionName(), Long.valueOf(0L));
+          if (!unassignedRegions.containsKey(i)) {
+            unassignedRegions.put(i, ZERO_L);
           }
 
         } else {                              // Prevent region from getting assigned.
-          unassignedRegions.remove(i.getRegionName());
-          assignAttempts.remove(i.getRegionName());
+          unassignedRegions.remove(i);
         }
       }
 
@@ -3069,7 +3102,7 @@
       // here because the new server will start serving the root region before
       // the ProcessServerShutdown operation has a chance to split the log file.
       if (info != null) {
-        shutdownQueue.put(new ProcessServerShutdown(info));
+        delayedToDoQueue.put(new ProcessServerShutdown(info));
       }
     }
   }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java?rev=602226&r1=602225&r2=602226&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java Fri Dec  7 13:23:54 2007
@@ -48,6 +48,9 @@
 
   /** Stop serving the specified region and don't report back that it's closed */
   public static final byte MSG_REGION_CLOSE_WITHOUT_REPORT = 6;
+  
+  /** Stop serving user regions */
+  public static final byte MSG_REGIONSERVER_QUIESCE = 7;
 
   // Messages sent from the region server to the master
   
@@ -72,9 +75,12 @@
    * region server is shutting down
    * 
    * note that this message is followed by MSG_REPORT_CLOSE messages for each
-   * region the region server was serving.
+   * region the region server was serving, unless it was told to quiesce.
    */
   public static final byte MSG_REPORT_EXITING = 104;
+  
+  /** region server has closed all user regions but is still serving meta regions */
+  public static final byte MSG_REPORT_QUIESCED = 105;
 
   byte msg;
   HRegionInfo info;
@@ -148,6 +154,10 @@
       message.append("MSG_REGION_CLOSE_WITHOUT_REPORT : ");
       break;
       
+    case MSG_REGIONSERVER_QUIESCE:
+      message.append("MSG_REGIONSERVER_QUIESCE : ");
+      break;
+      
     case MSG_REPORT_PROCESS_OPEN:
       message.append("MSG_REPORT_PROCESS_OPEN : ");
       break;
@@ -166,6 +176,10 @@
       
     case MSG_REPORT_EXITING:
       message.append("MSG_REPORT_EXITING : ");
+      break;
+      
+    case MSG_REPORT_QUIESCED:
+      message.append("MSG_REPORT_QUIESCED : ");
       break;
       
     default:

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java?rev=602226&r1=602225&r2=602226&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java Fri Dec  7 13:23:54 2007
@@ -193,6 +193,21 @@
     return tableDesc;
   }
   
+  /** @return true if this is the root region */
+  public boolean isRootRegion() {
+    return this.tableDesc.isRootRegion();
+  }
+  
+  /** @return true if this is the meta table */
+  public boolean isMetaTable() {
+    return this.tableDesc.isMetaTable();
+  }
+
+  /** @return true if this region is a meta region */
+  public boolean isMetaRegion() {
+    return this.tableDesc.isMetaRegion();
+  }
+  
   /**
    * @return True if has been split and has daughters.
    */

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=602226&r1=602225&r2=602226&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Fri Dec  7 13:23:54 2007
@@ -81,6 +81,8 @@
   // Chore threads need to know about the hosting class.
   protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
   
+  protected final AtomicBoolean quiesced = new AtomicBoolean(false);
+  
   // Go down hard.  Used if file system becomes unavailable and also in
   // debugging and unit tests.
   protected volatile boolean abortRequested;
@@ -652,6 +654,7 @@
    * load/unload instructions.
    */
   public void run() {
+    boolean quiesceRequested = false;
     try {
       init(reportForDuty());
       long lastMsg = 0;
@@ -682,6 +685,16 @@
               HMsg msgs[] =
                 this.hbaseMaster.regionServerReport(serverInfo, outboundArray);
               lastMsg = System.currentTimeMillis();
+              
+              if (this.quiesced.get() && onlineRegions.size() == 0) {
+                // We've just told the master we're exiting because we aren't
+                // serving any regions. So set the stop bit and exit.
+                LOG.info("Server quiesced and not serving any regions. " +
+                    "Starting shutdown");
+                stopRequested.set(true);
+                continue;
+              }
+              
               // Queue up the HMaster's instruction stream for processing
               boolean restart = false;
               for(int i = 0; i < msgs.length && !stopRequested.get() &&
@@ -689,9 +702,7 @@
                 switch(msgs[i].getMsg()) {
                 
                 case HMsg.MSG_CALL_SERVER_STARTUP:
-                  if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got call server startup message");
-                  }
+                  LOG.info("Got call server startup message");
                   // We the MSG_CALL_SERVER_STARTUP on startup but we can also
                   // get it when the master is panicing because for instance
                   // the HDFS has been yanked out from under it.  Be wary of
@@ -725,11 +736,22 @@
                   break;
 
                 case HMsg.MSG_REGIONSERVER_STOP:
-                  if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got regionserver stop message");
-                  }
+                  LOG.info("Got regionserver stop message");
                   stopRequested.set(true);
                   break;
+                  
+                case HMsg.MSG_REGIONSERVER_QUIESCE:
+                  if (!quiesceRequested) {
+                    LOG.info("Got quiesce server message");
+                    try {
+                      toDo.put(new ToDoEntry(msgs[i]));
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException("Putting into msgQueue was " +
+                        "interrupted.", e);
+                    }
+                    quiesceRequested = true;
+                  }
+                  break;
 
                 default:
                   if (fsOk) {
@@ -1101,6 +1123,10 @@
           try {
             LOG.info(e.msg.toString());
             switch(e.msg.getMsg()) {
+            
+            case HMsg.MSG_REGIONSERVER_QUIESCE:
+              closeUserRegions();
+              break;
 
             case HMsg.MSG_REGION_OPEN:
               // Open a region
@@ -1149,12 +1175,19 @@
     }
   }
   
-  void openRegion(final HRegionInfo regionInfo) throws IOException {
+  void openRegion(final HRegionInfo regionInfo) {
     HRegion region = onlineRegions.get(regionInfo.getRegionName());
     if(region == null) {
-      region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)),
-        this.log, FileSystem.get(conf), conf, regionInfo, null,
-        this.cacheFlusher);
+      try {
+        region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)),
+            this.log, FileSystem.get(conf), conf, regionInfo, null,
+            this.cacheFlusher);
+        
+      } catch (IOException e) {
+        LOG.error("error opening region " + regionInfo.getRegionName(), e);
+        reportClose(region);
+        return;
+      }
       this.lock.writeLock().lock();
       try {
         this.log.setSequenceNumber(region.getMinSequenceId());
@@ -1206,6 +1239,45 @@
       }
     }
     return regionsToClose;
+  }
+
+  /** Called as the first stage of cluster shutdown. */
+  void closeUserRegions() {
+    ArrayList<HRegion> regionsToClose = new ArrayList<HRegion>();
+    this.lock.writeLock().lock();
+    try {
+      synchronized (onlineRegions) {
+        for (Iterator<Map.Entry<Text, HRegion>> i =
+          onlineRegions.entrySet().iterator();
+        i.hasNext();) {
+          Map.Entry<Text, HRegion> e = i.next();
+          HRegion r = e.getValue();
+          if (!r.getRegionInfo().isMetaRegion()) {
+            regionsToClose.add(r);
+            i.remove();
+          }
+        }
+      }
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+    for(HRegion region: regionsToClose) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("closing region " + region.getRegionName());
+      }
+      try {
+        region.close(false);
+      } catch (IOException e) {
+        LOG.error("error closing region " + region.getRegionName(),
+          RemoteExceptionHandler.checkIOException(e));
+      }
+    }
+    this.quiesced.set(true);
+    if (onlineRegions.size() == 0) {
+      outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_EXITING));
+    } else {
+      outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_QUIESCED));
+    }
   }
 
   //

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java?rev=602226&r1=602225&r2=602226&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java Fri Dec  7 13:23:54 2007
@@ -52,7 +52,8 @@
             HColumnDescriptor.CompressionType.NONE, false, Integer.MAX_VALUE,
             null));
   
-
+  private boolean rootregion;
+  private boolean metaregion;
   private Text name;
   // TODO: Does this need to be a treemap?  Can it be a HashMap?
   private final TreeMap<Text, HColumnDescriptor> families;
@@ -69,6 +70,8 @@
 
   /** Used to construct the table descriptors for root and meta tables */
   private HTableDescriptor(Text name, HColumnDescriptor family) {
+    rootregion = name.equals(HConstants.ROOT_TABLE_NAME);
+    this.metaregion = true;
     this.name = new Text(name);
     this.families = new TreeMap<Text, HColumnDescriptor>();
     families.put(family.getName(), family);
@@ -92,13 +95,30 @@
    * <code>[a-zA-Z_0-9]
    */
   public HTableDescriptor(String name) {
+    this();
     Matcher m = LEGAL_TABLE_NAME.matcher(name);
     if (m == null || !m.matches()) {
       throw new IllegalArgumentException(
           "Table names can only contain 'word characters': i.e. [a-zA-Z_0-9");
     }
-    this.name = new Text(name);
-    this.families = new TreeMap<Text, HColumnDescriptor>();
+    this.name.set(name);
+    this.rootregion = false;
+    this.metaregion = false;
+  }
+  
+  /** @return true if this is the root region */
+  public boolean isRootRegion() {
+    return rootregion;
+  }
+  
+  /** @return true if table is the meta table */
+  public boolean isMetaTable() {
+    return metaregion && !rootregion;
+  }
+  
+  /** @return true if this is a meta region (part of the root or meta tables) */
+  public boolean isMetaRegion() {
+    return metaregion;
   }
 
   /** @return name of table */
@@ -165,6 +185,8 @@
 
   /** {@inheritDoc} */
   public void write(DataOutput out) throws IOException {
+    out.writeBoolean(rootregion);
+    out.writeBoolean(metaregion);
     name.write(out);
     out.writeInt(families.size());
     for(Iterator<HColumnDescriptor> it = families.values().iterator();
@@ -175,6 +197,8 @@
 
   /** {@inheritDoc} */
   public void readFields(DataInput in) throws IOException {
+    this.rootregion = in.readBoolean();
+    this.metaregion = in.readBoolean();
     this.name.readFields(in);
     int numCols = in.readInt();
     families.clear();