You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ji...@apache.org on 2008/08/08 21:16:32 UTC

svn commit: r684060 [2/2] - in /hadoop/hbase/branches/0.2: CHANGES.txt src/java/org/apache/hadoop/hbase/client/HTable.java src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Modified: hadoop/hbase/branches/0.2/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.2/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=684060&r1=684059&r2=684060&view=diff
==============================================================================
--- hadoop/hbase/branches/0.2/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/branches/0.2/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Aug  8 12:16:30 2008
@@ -1,1654 +1,1640 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.lang.reflect.Constructor;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HMsg;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.HServerLoad;
-import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.LeaseListener;
-import org.apache.hadoop.hbase.Leases;
-import org.apache.hadoop.hbase.LocalHBaseCluster;
-import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.RegionHistorian;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
-import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.BatchOperation;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.HbaseMapWritable;
-import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.ipc.HbaseRPC;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.InfoServer;
-import org.apache.hadoop.hbase.util.Sleeper;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * HRegionServer makes a set of HRegions available to clients.  It checks in with
- * the HMaster. There are many HRegionServers in a single HBase deployment.
- */
-public class HRegionServer implements HConstants, HRegionInterface, Runnable {
-  static final Log LOG = LogFactory.getLog(HRegionServer.class);
-  
-  // Set when a report to the master comes back with a message asking us to
-  // shutdown.  Also set by call to stop when debugging or running unit tests
-  // of HRegionServer in isolation. We use AtomicBoolean rather than
-  // plain boolean so we can pass a reference to Chore threads.  Otherwise,
-  // Chore threads need to know about the hosting class.
-  protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
-  
-  protected final AtomicBoolean quiesced = new AtomicBoolean(false);
-  
-  // Go down hard.  Used if file system becomes unavailable and also in
-  // debugging and unit tests.
-  protected volatile boolean abortRequested;
-  
-  // If false, the file system has become unavailable
-  protected volatile boolean fsOk;
-  
-  protected final HServerInfo serverInfo;
-  protected final HBaseConfiguration conf;
-  private FileSystem fs;
-  private Path rootDir;
-  private final Random rand = new Random();
-
-  // Key is Bytes.hashCode of region name byte array and the value is HRegion
-  // in both of the maps below.  Use Bytes.mapKey(byte []) generating key for
-  // below maps.
-  protected final Map<Integer, HRegion> onlineRegions =
-    new ConcurrentHashMap<Integer, HRegion>();
-
-  protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final List<HMsg> outboundMsgs =
-    Collections.synchronizedList(new ArrayList<HMsg>());
-
-  final int numRetries;
-  protected final int threadWakeFrequency;
-  private final int msgInterval;
-  private final int serverLeaseTimeout;
-
-  protected final int numRegionsToReport;
-  
-  // Remote HMaster
-  private HMasterRegionInterface hbaseMaster;
-
-  // Server to handle client requests.  Default access so can be accessed by
-  // unit tests.
-  final Server server;
-  
-  // Leases
-  private final Leases leases;
-  
-  // Request counter
-  private volatile AtomicInteger requestCount = new AtomicInteger();
-
-  // Info server.  Default access so can be used by unit tests.  REGIONSERVER
-  // is name of the webapp and the attribute name used stuffing this instance
-  // into web context.
-  InfoServer infoServer;
-  
-  /** region server process name */
-  public static final String REGIONSERVER = "regionserver";
-  
-  /**
-   * Space is reserved in HRS constructor and then released when aborting
-   * to recover from an OOME. See HBASE-706.
-   */
-  private final LinkedList<byte[]> reservedSpace = new LinkedList<byte []>();
-  
-  /**
-   * Thread to shutdown the region server in an orderly manner.  This thread
-   * is registered as a shutdown hook in the HRegionServer constructor and is
-   * only called when the HRegionServer receives a kill signal.
-   */
-  class ShutdownThread extends Thread {
-    private final HRegionServer instance;
-    
-    /**
-     * @param instance
-     */
-    public ShutdownThread(HRegionServer instance) {
-      this.instance = instance;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void run() {
-      LOG.info("Starting shutdown thread.");
-      
-      // tell the region server to stop and wait for it to complete
-      instance.stop();
-      instance.join();
-      LOG.info("Shutdown thread complete");
-    }    
-  }
-
-  // Compactions
-  final CompactSplitThread compactSplitThread;
-
-  // Cache flushing  
-  final Flusher cacheFlusher;
-  
-  // HLog and HLog roller.  log is protected rather than private to avoid
-  // eclipse warning when accessed by inner classes
-  protected HLog log;
-  final LogRoller logRoller;
-  
-  // flag set after we're done setting up server threads (used for testing)
-  protected volatile boolean isOnline;
-    
-  /**
-   * Starts a HRegionServer at the default location
-   * @param conf
-   * @throws IOException
-   */
-  public HRegionServer(HBaseConfiguration conf) throws IOException {
-    this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
-        DEFAULT_REGIONSERVER_ADDRESS)), conf);
-  }
-  
-  /**
-   * Starts a HRegionServer at the specified location
-   * @param address
-   * @param conf
-   * @throws IOException
-   */
-  public HRegionServer(HServerAddress address, HBaseConfiguration conf)
-  throws IOException {  
-    this.abortRequested = false;
-    this.fsOk = true;
-    this.conf = conf;
-
-    this.isOnline = false;
-
-    // Config'ed params
-    this.numRetries =  conf.getInt("hbase.client.retries.number", 2);
-    this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000);
-    this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
-    this.serverLeaseTimeout =
-      conf.getInt("hbase.master.lease.period", 120 * 1000);
-
-    // Cache flushing thread.
-    this.cacheFlusher = new Flusher(conf, this);
-    
-    // Compaction thread
-    this.compactSplitThread = new CompactSplitThread(this);
-    
-    // Log rolling thread
-    this.logRoller = new LogRoller(this);
-
-    // Task thread to process requests from Master
-    this.worker = new Worker();
-    this.workerThread = new Thread(worker);
-
-    // Server to handle client requests
-    this.server = HbaseRPC.getServer(this, address.getBindAddress(), 
-      address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
-      false, conf);
-    // Address is givin a default IP for the moment. Will be changed after
-    // calling the master.
-    this.serverInfo = new HServerInfo(new HServerAddress(
-      new InetSocketAddress(DEFAULT_HOST,
-      this.server.getListenerAddress().getPort())), System.currentTimeMillis(),
-      this.conf.getInt("hbase.regionserver.info.port", 60030));
-    this.numRegionsToReport =                                        
-      conf.getInt("hbase.regionserver.numregionstoreport", 10);      
-      
-    this.leases = new Leases(
-      conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
-      this.threadWakeFrequency);
-    
-    int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
-    for(int i = 0; i < nbBlocks; i++)  {
-      reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]);
-    }
-    
-    // Register shutdown hook for HRegionServer, runs an orderly shutdown
-    // when a kill signal is recieved
-    Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
-  }
-
-  /**
-   * The HRegionServer sticks in this loop until closed. It repeatedly checks
-   * in with the HMaster, sending heartbeats & reports, and receiving HRegion 
-   * load/unload instructions.
-   */
-  public void run() {
-    boolean quiesceRequested = false;
-    // A sleeper that sleeps for msgInterval.
-    Sleeper sleeper =
-      new Sleeper(this.msgInterval, this.stopRequested);
-    try {
-      init(reportForDuty(sleeper));
-      long lastMsg = 0;
-      // Now ask master what it wants us to do and tell it what we have done
-      for (int tries = 0; !stopRequested.get() && isHealthy();) {
-        long now = System.currentTimeMillis();
-        if (lastMsg != 0 && (now - lastMsg) >= serverLeaseTimeout) {
-          // It has been way too long since we last reported to the master.
-          // Commit suicide.
-          LOG.fatal("unable to report to master for " + (now - lastMsg) +
-            " milliseconds - aborting server");
-          abort();
-          break;
-        }
-        if ((now - lastMsg) >= msgInterval) {
-          HMsg outboundArray[] = null;
-          synchronized(this.outboundMsgs) {
-            outboundArray =
-              this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]);
-            this.outboundMsgs.clear();
-          }
-          try {
-            this.serverInfo.setLoad(new HServerLoad(requestCount.get(),
-                onlineRegions.size()));
-            this.requestCount.set(0);
-            HMsg msgs[] = hbaseMaster.regionServerReport(
-              serverInfo, outboundArray, getMostLoadedRegions());
-            lastMsg = System.currentTimeMillis();
-            if (this.quiesced.get() && onlineRegions.size() == 0) {
-              // We've just told the master we're exiting because we aren't
-              // serving any regions. So set the stop bit and exit.
-              LOG.info("Server quiesced and not serving any regions. " +
-                "Starting shutdown");
-              stopRequested.set(true);
-              this.outboundMsgs.clear();
-              continue;
-            }
-
-            // Queue up the HMaster's instruction stream for processing
-            boolean restart = false;
-            for(int i = 0;
-                !restart && !stopRequested.get() && i < msgs.length;
-                i++) {
-              LOG.info(msgs[i].toString());
-              switch(msgs[i].getType()) {
-              case MSG_CALL_SERVER_STARTUP:
-                // We the MSG_CALL_SERVER_STARTUP on startup but we can also
-                // get it when the master is panicing because for instance
-                // the HDFS has been yanked out from under it.  Be wary of
-                // this message.
-                if (checkFileSystem()) {
-                  closeAllRegions();
-                  try {
-                    log.closeAndDelete();
-                  } catch (Exception e) {
-                    LOG.error("error closing and deleting HLog", e);
-                  }
-                  try {
-                    serverInfo.setStartCode(System.currentTimeMillis());
-                    log = setupHLog();
-                  } catch (IOException e) {
-                    this.abortRequested = true;
-                    this.stopRequested.set(true);
-                    e = RemoteExceptionHandler.checkIOException(e); 
-                    LOG.fatal("error restarting server", e);
-                    break;
-                  }
-                  reportForDuty(sleeper);
-                  restart = true;
-                } else {
-                  LOG.fatal("file system available check failed. " +
-                  "Shutting down server.");
-                }
-                break;
-
-              case MSG_REGIONSERVER_STOP:
-                stopRequested.set(true);
-                break;
-
-              case MSG_REGIONSERVER_QUIESCE:
-                if (!quiesceRequested) {
-                  try {
-                    toDo.put(new ToDoEntry(msgs[i]));
-                  } catch (InterruptedException e) {
-                    throw new RuntimeException("Putting into msgQueue was " +
-                        "interrupted.", e);
-                  }
-                  quiesceRequested = true;
-                }
-                break;
-
-              default:
-                if (fsOk) {
-                  try {
-                    toDo.put(new ToDoEntry(msgs[i]));
-                  } catch (InterruptedException e) {
-                    throw new RuntimeException("Putting into msgQueue was " +
-                        "interrupted.", e);
-                  }
-                }
-              }
-            }
-            // Reset tries count if we had a successful transaction.
-            tries = 0;
-
-            if (restart || this.stopRequested.get()) {
-              toDo.clear();
-              continue;
-            }
-          } catch (Exception e) {
-            if (e instanceof IOException) {
-              e = RemoteExceptionHandler.checkIOException((IOException) e);
-            }
-            if (tries < this.numRetries) {
-              LOG.warn("Processing message (Retry: " + tries + ")", e);
-              tries++;
-            } else {
-              LOG.fatal("Exceeded max retries: " + this.numRetries, e);
-              if (!checkFileSystem()) {
-                continue;
-              }
-              // Something seriously wrong. Shutdown.
-              stop();
-            }
-          }
-        }
-        // Do some housekeeping before going to sleep
-        housekeeping();
-        sleeper.sleep(lastMsg);
-      } // for
-    } catch (OutOfMemoryError error) {
-      abort();
-      LOG.fatal("Ran out of memory", error);
-    } catch (Throwable t) {
-      LOG.fatal("Unhandled exception. Aborting...", t);
-      abort();
-    }
-    RegionHistorian.getInstance().offline();
-    this.leases.closeAfterLeasesExpire();
-    this.worker.stop();
-    this.server.stop();
-    if (this.infoServer != null) {
-      LOG.info("Stopping infoServer");
-      try {
-        this.infoServer.stop();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
-
-    // Send interrupts to wake up threads if sleeping so they notice shutdown.
-    // TODO: Should we check they are alive?  If OOME could have exited already
-    cacheFlusher.interruptIfNecessary();
-    compactSplitThread.interruptIfNecessary();
-    this.logRoller.interruptIfNecessary();
-
-    if (abortRequested) {
-      if (this.fsOk) {
-        // Only try to clean up if the file system is available
-        try {
-          this.log.close();
-          LOG.info("On abort, closed hlog");
-        } catch (IOException e) {
-          LOG.error("Unable to close log in abort",
-              RemoteExceptionHandler.checkIOException(e));
-        }
-        closeAllRegions(); // Don't leave any open file handles
-      }
-      LOG.info("aborting server at: " +
-        serverInfo.getServerAddress().toString());
-    } else {
-      ArrayList<HRegion> closedRegions = closeAllRegions();
-      try {
-        log.closeAndDelete();
-      } catch (IOException e) {
-        LOG.error("Close and delete failed",
-            RemoteExceptionHandler.checkIOException(e));
-      }
-      try {
-        HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
-        exitMsg[0] = HMsg.REPORT_EXITING;
-        // Tell the master what regions we are/were serving
-        int i = 1;
-        for (HRegion region: closedRegions) {
-          exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE,
-              region.getRegionInfo());
-        }
-
-        LOG.info("telling master that region server is shutting down at: " +
-            serverInfo.getServerAddress().toString());
-        hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null);
-      } catch (IOException e) {
-        LOG.warn("Failed to send exiting message to master: ",
-            RemoteExceptionHandler.checkIOException(e));
-      }
-      LOG.info("stopping server at: " +
-        serverInfo.getServerAddress().toString());
-    }
-    join();
-    LOG.info(Thread.currentThread().getName() + " exiting");
-  }
-  
-  /*
-   * Run init. Sets up hlog and starts up all server threads.
-   * @param c Extra configuration.
-   */
-  protected void init(final MapWritable c) throws IOException {
-    try {
-      for (Map.Entry<Writable, Writable> e: c.entrySet()) {
-        String key = e.getKey().toString();
-        String value = e.getValue().toString();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Config from master: " + key + "=" + value);
-        }
-        this.conf.set(key, value);
-      }
-      // Master may have sent us a new address with the other configs.
-      // Update our address in this case. See HBASE-719
-      if(conf.get("hbase.regionserver.address") != null)
-        serverInfo.setServerAddress(new HServerAddress
-            (conf.get("hbase.regionserver.address"), 
-            serverInfo.getServerAddress().getPort()));
-      // Master sent us hbase.rootdir to use. Should be fully qualified
-      // path with file system specification included.  Set 'fs.default.name'
-      // to match the filesystem on hbase.rootdir else underlying hadoop hdfs
-      // accessors will be going against wrong filesystem (unless all is set
-      // to defaults).
-      this.conf.set("fs.default.name", this.conf.get("hbase.rootdir"));
-      this.fs = FileSystem.get(this.conf);
-      this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
-      this.log = setupHLog();
-      startServiceThreads();
-      isOnline = true;
-    } catch (IOException e) {
-      this.stopRequested.set(true);
-      isOnline = false;
-      e = RemoteExceptionHandler.checkIOException(e); 
-      LOG.fatal("Failed init", e);
-      IOException ex = new IOException("region server startup failed");
-      ex.initCause(e);
-      throw ex;
-    }
-  }
-
-  /**
-   * Report the status of the server. A server is online once all the startup 
-   * is completed (setting up filesystem, starting service threads, etc.). This
-   * method is designed mostly to be useful in tests.
-   * @return true if online, false if not.
-   */
-  public boolean isOnline() {
-    return isOnline;
-  }
-    
-  private HLog setupHLog() throws RegionServerRunningException,
-    IOException {
-    
-    Path logdir = new Path(rootDir, "log" + "_" + 
-        serverInfo.getServerAddress().getBindAddress() + "_" +
-        this.serverInfo.getStartCode() + "_" + 
-        this.serverInfo.getServerAddress().getPort());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Log dir " + logdir);
-    }
-    if (fs.exists(logdir)) {
-      throw new RegionServerRunningException("region server already " +
-        "running at " + this.serverInfo.getServerAddress().toString() +
-        " because logdir " + logdir.toString() + " exists");
-    }
-    return new HLog(fs, logdir, conf, logRoller);
-  }
-  
-  /*
-   * Start Chore Threads, Server, Worker and lease checker threads. Install an
-   * UncaughtExceptionHandler that calls abort of RegionServer if we get
-   * an unhandled exception.  We cannot set the handler on all threads.
-   * Server's internal Listener thread is off limits.  For Server, if an OOME,
-   * it waits a while then retries.  Meantime, a flush or a compaction that
-   * tries to run should trigger same critical condition and the shutdown will
-   * run.  On its way out, this server will shut down Server.  Leases are sort
-   * of inbetween. It has an internal thread that while it inherits from
-   * Chore, it keeps its own internal stop mechanism so needs to be stopped
-   * by this hosting server.  Worker logs the exception and exits.
-   */
-  private void startServiceThreads() throws IOException {
-    String n = Thread.currentThread().getName();
-    UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
-      public void uncaughtException(Thread t, Throwable e) {
-        abort();
-        LOG.fatal("Set stop flag in " + t.getName(), e);
-      }
-    };
-    Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller",
-        handler);
-    Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
-      handler);
-    Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
-        handler);
-    Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
-    // Leases is not a Thread. Internally it runs a daemon thread.  If it gets
-    // an unhandled exception, it will just exit.
-    this.leases.setName(n + ".leaseChecker");
-    this.leases.start();
-    // Put up info server.
-    int port = this.conf.getInt("hbase.regionserver.info.port", 60030);
-    if (port >= 0) {
-      String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
-      this.infoServer = new InfoServer("regionserver", a, port, false);
-      this.infoServer.setAttribute("regionserver", this);
-      this.infoServer.start();
-    }
-    // Start Server.  This service is like leases in that it internally runs
-    // a thread.
-    this.server.start();
-    LOG.info("HRegionServer started at: " +
-        serverInfo.getServerAddress().toString());
-  }
-
-  /*
-   * Verify that server is healthy
-   */
-  private boolean isHealthy() {
-    if (!fsOk) {
-      // File system problem
-      return false;
-    }
-    // Verify that all threads are alive
-    if (!(leases.isAlive() && compactSplitThread.isAlive() &&
-        cacheFlusher.isAlive() && logRoller.isAlive() &&
-        workerThread.isAlive())) {
-      // One or more threads are no longer alive - shut down
-      stop();
-      return false;
-    }
-    return true;
-  }
-  /*
-   * Run some housekeeping tasks before we go into 'hibernation' sleeping at
-   * the end of the main HRegionServer run loop.
-   */
-  private void housekeeping() {
-    // If the todo list has > 0 messages, iterate looking for open region
-    // messages. Send the master a message that we're working on its
-    // processing so it doesn't assign the region elsewhere.
-    if (this.toDo.size() <= 0) {
-      return;
-    }
-    // This iterator is 'safe'.  We are guaranteed a view on state of the
-    // queue at time iterator was taken out.  Apparently goes from oldest.
-    for (ToDoEntry e: this.toDo) {
-      if (e.msg.isType(HMsg.Type.MSG_REGION_OPEN)) {
-        addProcessingMessage(e.msg.getRegionInfo());
-      }
-    }
-  }
-
-  /** @return the HLog */
-  HLog getLog() {
-    return this.log;
-  }
-
-  /**
-   * Sets a flag that will cause all the HRegionServer threads to shut down
-   * in an orderly fashion.  Used by unit tests.
-   */
-  public void stop() {
-    this.stopRequested.set(true);
-    synchronized(this) {
-      notifyAll(); // Wakes run() if it is sleeping
-    }
-  }
-  
-  /**
-   * Cause the server to exit without closing the regions it is serving, the
-   * log it is using and without notifying the master.
-   * Used unit testing and on catastrophic events such as HDFS is yanked out
-   * from under hbase or we OOME.
-   */
-  public void abort() {
-    reservedSpace.clear();
-    this.abortRequested = true;
-    stop();
-  }
-
-  /** 
-   * Wait on all threads to finish.
-   * Presumption is that all closes and stops have already been called.
-   */
-  void join() {
-    join(this.workerThread);
-    join(this.cacheFlusher);
-    join(this.compactSplitThread);
-    join(this.logRoller);
-  }
-
-  private void join(final Thread t) {
-    while (t.isAlive()) {
-      try {
-        t.join();
-      } catch (InterruptedException e) {
-        // continue
-      }
-    }
-  }
-  
-  /*
-   * Let the master know we're here
-   * Run initialization using parameters passed us by the master.
-   */
-  private MapWritable reportForDuty(final Sleeper sleeper)
-  throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Telling master at " +
-        conf.get(MASTER_ADDRESS) + " that we are up");
-    }
-    // Do initial RPC setup.  The final argument indicates that the RPC should retry indefinitely.
-    this.hbaseMaster = (HMasterRegionInterface)HbaseRPC.waitForProxy(
-      HMasterRegionInterface.class, HMasterRegionInterface.versionID,
-      new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
-      this.conf, -1);
-    MapWritable result = null;
-    long lastMsg = 0;
-    while(!stopRequested.get()) {
-      try {
-        this.requestCount.set(0);
-        this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
-        lastMsg = System.currentTimeMillis();
-        result = this.hbaseMaster.regionServerStartup(serverInfo);
-        break;
-      } catch (Leases.LeaseStillHeldException e) {
-        LOG.info("Lease " + e.getName() + " already held on master. Check " +
-          "DNS configuration so that all region servers are" +
-          "reporting their true IPs and not 127.0.0.1. Otherwise, this" +
-          "problem should resolve itself after the lease period of " +
-          this.conf.get("hbase.master.lease.period")
-          + " seconds expires over on the master");
-      } catch (IOException e) {
-        LOG.warn("error telling master we are up", e);
-      }
-      sleeper.sleep(lastMsg);
-    }
-    return result;
-  }
-
-  /* Add to the outbound message buffer */
-  private void reportOpen(HRegionInfo region) {
-    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region));
-  }
-
-  /* Add to the outbound message buffer */
-  private void reportClose(HRegionInfo region) {
-    reportClose(region, null);
-  }
-
-  /* Add to the outbound message buffer */
-  private void reportClose(final HRegionInfo region, final Text message) {
-    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message));
-  }
-
-  
-  /**
-   * Add to the outbound message buffer
-   * 
-   * When a region splits, we need to tell the master that there are two new 
-   * regions that need to be assigned.
-   * 
-   * We do not need to inform the master about the old region, because we've
-   * updated the meta or root regions, and the master will pick that up on its
-   * next rescan of the root or meta tables.
-   */
-  void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA,
-      HRegionInfo newRegionB) {
-
-    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion,
-      new Text(oldRegion.getRegionNameAsString() + " split; daughters: " +
-        newRegionA.getRegionNameAsString() + ", " +
-        newRegionB.getRegionNameAsString())));
-    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA));
-    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB));
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-  // HMaster-given operations
-  //////////////////////////////////////////////////////////////////////////////
-
-  /*
-   * Data structure to hold a HMsg and retries count.
-   */
-  private static class ToDoEntry {
-    private int tries;
-    private final HMsg msg;
-    ToDoEntry(HMsg msg) {
-      this.tries = 0;
-      this.msg = msg;
-    }
-  }
-  
-  final BlockingQueue<ToDoEntry> toDo = new LinkedBlockingQueue<ToDoEntry>();
-  private Worker worker;
-  private Thread workerThread;
-  
-  /** Thread that performs long running requests from the master */
-  class Worker implements Runnable {
-    void stop() {
-      synchronized(toDo) {
-        toDo.notifyAll();
-      }
-    }
-    
-    /** {@inheritDoc} */
-    public void run() {
-      try {
-        while(!stopRequested.get()) {
-          ToDoEntry e = null;
-          try {
-            e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-            if(e == null || stopRequested.get()) {
-              continue;
-            }
-            LOG.info(e.msg);
-            switch(e.msg.getType()) {
-
-            case MSG_REGIONSERVER_QUIESCE:
-              closeUserRegions();
-              break;
-
-            case MSG_REGION_OPEN:
-              // Open a region
-              openRegion(e.msg.getRegionInfo());
-              break;
-
-            case MSG_REGION_CLOSE:
-              // Close a region
-              closeRegion(e.msg.getRegionInfo(), true);
-              break;
-
-            case MSG_REGION_CLOSE_WITHOUT_REPORT:
-              // Close a region, don't reply
-              closeRegion(e.msg.getRegionInfo(), false);
-              break;
-
-            default:
-              throw new AssertionError(
-                  "Impossible state during msg processing.  Instruction: "
-                  + e.msg.toString());
-            }
-          } catch (InterruptedException ex) {
-            // continue
-          } catch (Exception ex) {
-            if (ex instanceof IOException) {
-              ex = RemoteExceptionHandler.checkIOException((IOException) ex);
-            }
-            if(e != null && e.tries < numRetries) {
-              LOG.warn(ex);
-              e.tries++;
-              try {
-                toDo.put(e);
-              } catch (InterruptedException ie) {
-                throw new RuntimeException("Putting into msgQueue was " +
-                    "interrupted.", ex);
-              }
-            } else {
-              LOG.error("unable to process message" +
-                  (e != null ? (": " + e.msg.toString()) : ""), ex);
-              if (!checkFileSystem()) {
-                break;
-              }
-            }
-          }
-        }
-      } catch(Throwable t) {
-        LOG.fatal("Unhandled exception", t);
-      } finally {
-        LOG.info("worker thread exiting");
-      }
-    }
-  }
-  
-  void openRegion(final HRegionInfo regionInfo) {
-    // If historian is not online and this is not a meta region, online it.
-    if (!regionInfo.isMetaRegion() &&
-        !RegionHistorian.getInstance().isOnline()) {
-      RegionHistorian.getInstance().online(this.conf);
-    }
-    Integer mapKey = Bytes.mapKey(regionInfo.getRegionName());
-    HRegion region = this.onlineRegions.get(mapKey);
-    if (region == null) {
-      try {
-        region = instantiateRegion(regionInfo);
-        // Startup a compaction early if one is needed.
-        this.compactSplitThread.compactionRequested(region);
-      } catch (IOException e) {
-        LOG.error("error opening region " + regionInfo.getRegionNameAsString(), e);
-
-        // TODO: add an extra field in HRegionInfo to indicate that there is
-        // an error. We can't do that now because that would be an incompatible
-        // change that would require a migration
-        reportClose(regionInfo, new Text(StringUtils.stringifyException(e)));
-        return;
-      }
-      this.lock.writeLock().lock();
-      try {
-        this.log.setSequenceNumber(region.getMinSequenceId());
-        this.onlineRegions.put(mapKey, region);
-      } finally {
-        this.lock.writeLock().unlock();
-      }
-    }
-    reportOpen(regionInfo); 
-  }
-  
-  protected HRegion instantiateRegion(final HRegionInfo regionInfo)
-      throws IOException {
-    return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
-        .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null,
-        this.cacheFlusher, new Progressable() {
-          public void progress() {
-            addProcessingMessage(regionInfo);
-          }
-        });
-  }
-  
-  /*
-   * Add a MSG_REPORT_PROCESS_OPEN to the outbound queue.
-   * This method is called while region is in the queue of regions to process
-   * and then while the region is being opened, it is called from the Worker
-   * thread that is running the region open.
-   * @param hri Region to add the message for
-   */
-  protected void addProcessingMessage(final HRegionInfo hri) {
-    getOutboundMsgs().add(new HMsg(HMsg.Type.MSG_REPORT_PROCESS_OPEN, hri));
-  }
-
-  void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
-  throws IOException {
-	HRegion region = this.removeFromOnlineRegions(hri);
-    if (region != null) {
-      region.close();
-      if(reportWhenCompleted) {
-        reportClose(hri);
-      }
-    }
-  }
-
-  /** Called either when the master tells us to restart or from stop() */
-  ArrayList<HRegion> closeAllRegions() {
-    ArrayList<HRegion> regionsToClose = new ArrayList<HRegion>();
-    this.lock.writeLock().lock();
-    try {
-      regionsToClose.addAll(onlineRegions.values());
-      onlineRegions.clear();
-    } finally {
-      this.lock.writeLock().unlock();
-    }
-    for(HRegion region: regionsToClose) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("closing region " + Bytes.toString(region.getRegionName()));
-      }
-      try {
-        region.close(abortRequested);
-      } catch (IOException e) {
-        LOG.error("error closing region " +
-            Bytes.toString(region.getRegionName()),
-          RemoteExceptionHandler.checkIOException(e));
-      }
-    }
-    return regionsToClose;
-  }
-
-  /** Called as the first stage of cluster shutdown. */
-  void closeUserRegions() {
-    ArrayList<HRegion> regionsToClose = new ArrayList<HRegion>();
-    this.lock.writeLock().lock();
-    try {
-      synchronized (onlineRegions) {
-        for (Iterator<Map.Entry<Integer, HRegion>> i =
-            onlineRegions.entrySet().iterator(); i.hasNext();) {
-          Map.Entry<Integer, HRegion> e = i.next();
-          HRegion r = e.getValue();
-          if (!r.getRegionInfo().isMetaRegion()) {
-            regionsToClose.add(r);
-            i.remove();
-          }
-        }
-      }
-    } finally {
-      this.lock.writeLock().unlock();
-    }
-    for(HRegion region: regionsToClose) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("closing region " + Bytes.toString(region.getRegionName()));
-      }
-      try {
-        region.close();
-      } catch (IOException e) {
-        LOG.error("error closing region " + region.getRegionName(),
-          RemoteExceptionHandler.checkIOException(e));
-      }
-    }
-    this.quiesced.set(true);
-    if (onlineRegions.size() == 0) {
-      outboundMsgs.add(HMsg.REPORT_EXITING);
-    } else {
-      outboundMsgs.add(HMsg.REPORT_QUIESCED);
-    }
-  }
-
-  //
-  // HRegionInterface
-  //
-
-  /** {@inheritDoc} */
-  public HRegionInfo getRegionInfo(final byte [] regionName)
-  throws NotServingRegionException {
-    requestCount.incrementAndGet();
-    return getRegion(regionName).getRegionInfo();
-  }
-
-  /** {@inheritDoc} */
-  public Cell get(final byte [] regionName, final byte [] row,
-    final byte [] column) 
-  throws IOException {
-    checkOpen();
-    requestCount.incrementAndGet();
-    try {
-      return getRegion(regionName).get(row, column);
-    } catch (IOException e) {
-      checkFileSystem();
-      throw e;
-    }
-  }
-
-  /** {@inheritDoc} */
-  public Cell[] get(final byte [] regionName, final byte [] row,
-    final byte [] column, final int numVersions) 
-  throws IOException {
-    checkOpen();
-    requestCount.incrementAndGet();
-    try {
-      return getRegion(regionName).get(row, column, numVersions);
-    } catch (IOException e) {
-      checkFileSystem();
-      throw e;
-    }
-  }
-
-  /** {@inheritDoc} */
-  public Cell[] get(final byte [] regionName, final byte [] row,
-    final byte [] column, final long timestamp, final int numVersions) 
-  throws IOException {
-    checkOpen();
-    requestCount.incrementAndGet();
-    try {
-      return getRegion(regionName).get(row, column, timestamp, numVersions);
-    } catch (IOException e) {
-      checkFileSystem();
-      throw e;
-    }
-  }
-
-  /** {@inheritDoc} */
-  public RowResult getRow(final byte [] regionName, final byte [] row,
-    final long ts)
-  throws IOException {
-    return getRow(regionName, row, null, ts);
-  }
-
-  /** {@inheritDoc} */
-  public RowResult getRow(final byte [] regionName, final byte [] row, 
-    final byte [][] columns)
-  throws IOException {
-    return getRow(regionName, row, columns, HConstants.LATEST_TIMESTAMP);
-  }
-
-  /** {@inheritDoc} */
-  public RowResult getRow(final byte [] regionName, final byte [] row, 
-    final byte [][] columns, final long ts)
-  throws IOException {
-    checkOpen();
-    requestCount.incrementAndGet();
-    try {
-      // convert the columns array into a set so it's easy to check later.
-      Set<byte []> columnSet = null;
-      if (columns != null) {
-        columnSet = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-        columnSet.addAll(Arrays.asList(columns));
-      }
-      
-      HRegion region = getRegion(regionName);
-      Map<byte [], Cell> map = region.getFull(row, columnSet, ts);
-      HbaseMapWritable<byte [], Cell> result =
-        new HbaseMapWritable<byte [], Cell>();
-      result.putAll(map);
-      return new RowResult(row, result);
-    } catch (IOException e) {
-      checkFileSystem();
-      throw e;
-    }
-  }
-
-  /** {@inheritDoc} */
-  public RowResult getClosestRowBefore(final byte [] regionName, 
-    final byte [] row)
-  throws IOException {
-    checkOpen();
-    requestCount.incrementAndGet();
-    try {
-      // locate the region we're operating on
-      HRegion region = getRegion(regionName);
-      // ask the region for all the data 
-      RowResult rr = region.getClosestRowBefore(row);
-      return rr;
-    } catch (IOException e) {
-      checkFileSystem();
-      throw e;
-    }
-  }
-
-  /** {@inheritDoc} */
-  public RowResult next(final long scannerId) throws IOException {
-    checkOpen();
-    requestCount.incrementAndGet();
-    try {
-      String scannerName = String.valueOf(scannerId);
-      InternalScanner s = scanners.get(scannerName);
-      if (s == null) {
-        throw new UnknownScannerException("Name: " + scannerName);
-      }
-      this.leases.renewLease(scannerName);
-
-      // Collect values to be returned here
-      HbaseMapWritable<byte [], Cell> values
-        = new HbaseMapWritable<byte [], Cell>();
-      HStoreKey key = new HStoreKey();
-      TreeMap<byte [], Cell> results =
-        new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
-      while (s.next(key, results)) {
-        values.putAll(results);
-        if (values.size() > 0) {
-          // Row has something in it. Return the value.
-          break;
-        }
-
-        // No data for this row, go get another.
-        results.clear();
-      }
-      return values.size() == 0 ? null : new RowResult(key.getRow(), values);
-    } catch (IOException e) {
-      checkFileSystem();
-      throw e;
-    }
-  }
-
-  /** {@inheritDoc} */
-  public void batchUpdate(final byte [] regionName, BatchUpdate b)
-  throws IOException {
-    checkOpen();
-    this.requestCount.incrementAndGet();
-    HRegion region = getRegion(regionName);
-    validateValuesLength(b, region);
-    try {
-      cacheFlusher.reclaimMemcacheMemory();
-      region.batchUpdate(b);
-    } catch (OutOfMemoryError error) {
-      abort();
-      LOG.fatal("Ran out of memory", error);
-    } catch (IOException e) {
-      checkFileSystem();
-      throw e;
-    }
-  }
-  
-  /**
-   * Utility method to verify values length
-   * @param batchUpdate The update to verify
-   * @throws IOException Thrown if a value is too long
-   */
-  private void validateValuesLength(BatchUpdate batchUpdate, 
-      HRegion region) throws IOException {
-    HTableDescriptor desc = region.getTableDesc();
-    for (Iterator<BatchOperation> iter = 
-      batchUpdate.iterator(); iter.hasNext();) {
-      
-      BatchOperation operation = iter.next();
-      int maxLength = 
-        desc.getFamily(HStoreKey.getFamily(operation.getColumn())).
-          getMaxValueLength();
-      if(operation.getValue() != null)
-        if(operation.getValue().length > maxLength) {
-          throw new IOException("Value in column " + 
-              Bytes.toString(operation.getColumn()) + " is too long. " + 
-              operation.getValue().length + " instead of " + maxLength);
-        }
-    }
-  }
-  
-  //
-  // remote scanner interface
-  //
-
-  /** {@inheritDoc} */
-  public long openScanner(byte [] regionName, byte [][] cols, byte [] firstRow,
-    final long timestamp, final RowFilterInterface filter)
-  throws IOException {
-    checkOpen();
-    NullPointerException npe = null;
-    if (regionName == null) {
-      npe = new NullPointerException("regionName is null");
-    } else if (cols == null) {
-      npe = new NullPointerException("columns to scan is null");
-    } else if (firstRow == null) {
-      npe = new NullPointerException("firstRow for scanner is null");
-    }
-    if (npe != null) {
-      IOException io = new IOException("Invalid arguments to openScanner");
-      io.initCause(npe);
-      throw io;
-    }
-    requestCount.incrementAndGet();
-    try {
-      HRegion r = getRegion(regionName);
-      InternalScanner s =
-        r.getScanner(cols, firstRow, timestamp, filter);
-      long scannerId = addScanner(s);
-      return scannerId;
-    } catch (IOException e) {
-      LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")",
-          RemoteExceptionHandler.checkIOException(e));
-      checkFileSystem();
-      throw e;
-    }
-  }
-  
-  protected long addScanner(InternalScanner s) throws LeaseStillHeldException {
-    long scannerId = -1L;
-    scannerId = rand.nextLong();
-    String scannerName = String.valueOf(scannerId);
-    synchronized(scanners) {
-      scanners.put(scannerName, s);
-    }
-    this.leases.
-      createLease(scannerName, new ScannerListener(scannerName));
-    return scannerId;
-  }
-  
-  /** {@inheritDoc} */
-  public void close(final long scannerId) throws IOException {
-    checkOpen();
-    requestCount.incrementAndGet();
-    try {
-      String scannerName = String.valueOf(scannerId);
-      InternalScanner s = null;
-      synchronized(scanners) {
-        s = scanners.remove(scannerName);
-      }
-      if(s == null) {
-        throw new UnknownScannerException(scannerName);
-      }
-      s.close();
-      this.leases.cancelLease(scannerName);
-    } catch (IOException e) {
-      checkFileSystem();
-      throw e;
-    }
-  }
-
-  Map<String, InternalScanner> scanners =
-    Collections.synchronizedMap(new HashMap<String, InternalScanner>());
-
-  /** 
-   * Instantiated as a scanner lease.
-   * If the lease times out, the scanner is closed
-   */
-  private class ScannerListener implements LeaseListener {
-    private final String scannerName;
-    
-    ScannerListener(final String n) {
-      this.scannerName = n;
-    }
-    
-    /** {@inheritDoc} */
-    public void leaseExpired() {
-      LOG.info("Scanner " + this.scannerName + " lease expired");
-      InternalScanner s = null;
-      synchronized(scanners) {
-        s = scanners.remove(this.scannerName);
-      }
-      if (s != null) {
-        try {
-          s.close();
-        } catch (IOException e) {
-          LOG.error("Closing scanner", e);
-        }
-      }
-    }
-  }
-  
-  //
-  // Methods that do the actual work for the remote API
-  //
-  
-  /** {@inheritDoc} */
-  public void deleteAll(final byte [] regionName, final byte [] row,
-      final byte [] column, final long timestamp) 
-  throws IOException {
-    HRegion region = getRegion(regionName);
-    region.deleteAll(row, column, timestamp);
-  }
-
-  /** {@inheritDoc} */
-  public void deleteAll(final byte [] regionName, final byte [] row,
-      final long timestamp) 
-  throws IOException {
-    HRegion region = getRegion(regionName);
-    region.deleteAll(row, timestamp);
-  }
-
-  /** {@inheritDoc} */
-  public void deleteFamily(byte [] regionName, byte [] row, byte [] family, 
-    long timestamp) throws IOException{
-    getRegion(regionName).deleteFamily(row, family, timestamp);
-  }
-
-
-  /**
-   * @return Info on this server.
-   */
-  public HServerInfo getServerInfo() {
-    return this.serverInfo;
-  }
-
-  /** @return the info server */
-  public InfoServer getInfoServer() {
-    return infoServer;
-  }
-  
-  /**
-   * @return true if a stop has been requested.
-   */
-  public boolean isStopRequested() {
-    return stopRequested.get();
-  }
-  
-  /**
-   * 
-   * @return the configuration
-   */
-  public HBaseConfiguration getConfiguration() {
-    return conf;
-  }
-
-  /** @return the write lock for the server */
-  ReentrantReadWriteLock.WriteLock getWriteLock() {
-    return lock.writeLock();
-  }
-
-  /**
-   * @return Immutable list of this servers regions.
-   */
-  public Collection<HRegion> getOnlineRegions() {
-    return Collections.unmodifiableCollection(onlineRegions.values());
-  }
-
-  /**
-   * @return The HRegionInfos from online regions sorted
-   */
-  public SortedSet<HRegionInfo> getSortedOnlineRegionInfos() {
-    SortedSet<HRegionInfo> result = new TreeSet<HRegionInfo>();
-    synchronized(this.onlineRegions) {
-      for (HRegion r: this.onlineRegions.values()) {
-        result.add(r.getRegionInfo());
-      }
-    }
-    return result;
-  }
-  
-  /**
-   * This method removes HRegion corresponding to hri from the Map of onlineRegions.  
-   * 
-   * @param hri the HRegionInfo corresponding to the HRegion to-be-removed.
-   * @return the removed HRegion, or null if the HRegion was not in onlineRegions.
-   */
-  HRegion removeFromOnlineRegions(HRegionInfo hri) {
-    this.lock.writeLock().lock();
-    HRegion toReturn = null;
-    try {
-      toReturn = onlineRegions.remove(Bytes.mapKey(hri.getRegionName()));
-    } finally {
-      this.lock.writeLock().unlock();
-    }
-    return toReturn;
-  }
-
-  /**
-   * @return A new Map of online regions sorted by region size with the first
-   * entry being the biggest.
-   */
-  public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
-    // we'll sort the regions in reverse
-    SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
-        new Comparator<Long>() {
-          public int compare(Long a, Long b) {
-            return -1 * a.compareTo(b);
-          }
-        });
-    // Copy over all regions. Regions are sorted by size with biggest first.
-    synchronized (this.onlineRegions) {
-      for (HRegion region : this.onlineRegions.values()) {
-        sortedRegions.put(region.memcacheSize.get(), region);
-      }
-    }
-    return sortedRegions;
-  }
-  
-  /**
-   * @param regionName
-   * @return HRegion for the passed <code>regionName</code> or null if named
-   * region is not member of the online regions.
-   */
-  public HRegion getOnlineRegion(final byte [] regionName) {
-    return onlineRegions.get(Bytes.mapKey(regionName));
-  }
-
-  /** @return the request count */
-  public AtomicInteger getRequestCount() {
-    return this.requestCount;
-  }
-
-  /** @return reference to FlushRequester */
-  public FlushRequester getFlushRequester() {
-    return this.cacheFlusher;
-  }
-  
-  /** 
-   * Protected utility method for safely obtaining an HRegion handle.
-   * @param regionName Name of online {@link HRegion} to return
-   * @return {@link HRegion} for <code>regionName</code>
-   * @throws NotServingRegionException
-   */
-  protected HRegion getRegion(final byte [] regionName)
-  throws NotServingRegionException {
-    HRegion region = null;
-    this.lock.readLock().lock();
-    try {
-      Integer key = Integer.valueOf(Bytes.hashCode(regionName));
-      region = onlineRegions.get(key);
-      if (region == null) {
-        throw new NotServingRegionException(regionName);
-      }
-      return region;
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /**
-   * Get the top N most loaded regions this server is serving so we can
-   * tell the master which regions it can reallocate if we're overloaded.
-   * TODO: actually calculate which regions are most loaded. (Right now, we're
-   * just grabbing the first N regions being served regardless of load.)
-   */
-  protected HRegionInfo[] getMostLoadedRegions() {
-    ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
-    synchronized (onlineRegions) {
-      for (HRegion r : onlineRegions.values()) {
-        if (regions.size() < numRegionsToReport) {
-          regions.add(r.getRegionInfo());
-        } else {
-          break;
-        }
-      }
-    }
-    return regions.toArray(new HRegionInfo[regions.size()]);
-  }
-  
-  /**  
-   * Called to verify that this server is up and running.
-   * 
-   * @throws IOException
-   */
-  protected void checkOpen() throws IOException {
-    if (this.stopRequested.get() || this.abortRequested) {
-      throw new IOException("Server not running");
-    }
-    if (!fsOk) {
-      throw new IOException("File system not available");
-    }
-  }
-  
-  /**
-   * Checks to see if the file system is still accessible.
-   * If not, sets abortRequested and stopRequested
-   * 
-   * @return false if file system is not available
-   */
-  protected boolean checkFileSystem() {
-    if (this.fsOk && fs != null) {
-      try {
-        FSUtils.checkFileSystemAvailable(fs);
-      } catch (IOException e) {
-        LOG.fatal("Shutting down HRegionServer: file system not available", e);
-        abort();
-        fsOk = false;
-      }
-    }
-    return this.fsOk;
-  }
- 
-  /**
-   * @return Returns list of non-closed regions hosted on this server.  If no
-   * regions to check, returns an empty list.
-   */
-  protected Set<HRegion> getRegionsToCheck() {
-    HashSet<HRegion> regionsToCheck = new HashSet<HRegion>();
-    //TODO: is this locking necessary? 
-    lock.readLock().lock();
-    try {
-      regionsToCheck.addAll(this.onlineRegions.values());
-    } finally {
-      lock.readLock().unlock();
-    }
-    // Purge closed regions.
-    for (final Iterator<HRegion> i = regionsToCheck.iterator(); i.hasNext();) {
-      HRegion r = i.next();
-      if (r.isClosed()) {
-        i.remove();
-      }
-    }
-    return regionsToCheck;
-  }
-
-  /** {@inheritDoc} */
-  public long getProtocolVersion(final String protocol, 
-      @SuppressWarnings("unused") final long clientVersion)
-  throws IOException {  
-    if (protocol.equals(HRegionInterface.class.getName())) {
-      return HRegionInterface.versionID;
-    }
-    throw new IOException("Unknown protocol to name node: " + protocol);
-  }
-  
-  /**
-   * @return Queue to which you can add outbound messages.
-   */
-  protected List<HMsg> getOutboundMsgs() {
-    return this.outboundMsgs;
-  }
-
-  /**
-   * Return the total size of all memcaches in every region.
-   * @return memcache size in bytes
-   */
-  public long getGlobalMemcacheSize() {
-    long total = 0;
-    synchronized (onlineRegions) {
-      for (HRegion region : onlineRegions.values()) {
-        total += region.memcacheSize.get();
-      }
-    }
-    return total;
-  }
-
-  /**
-   * @return Return the leases.
-   */
-  protected Leases getLeases() {
-    return leases;
-  }
-
-  /**
-   * @return Return the rootDir.
-   */
-  protected Path getRootDir() {
-    return rootDir;
-  }
-
-  /**
-   * @return Return the fs.
-   */
-  protected FileSystem getFileSystem() {
-    return fs;
-  }
-
-  //
-  // Main program and support routines
-  //
-
-  private static void printUsageAndExit() {
-    printUsageAndExit(null);
-  }
-  
-  private static void printUsageAndExit(final String message) {
-    if (message != null) {
-      System.err.println(message);
-    }
-    System.err.println("Usage: java " +
-        "org.apache.hbase.HRegionServer [--bind=hostname:port] start");
-    System.exit(0);
-  }
-  
-  /**
-   * Do class main.
-   * @param args
-   * @param regionServerClass HRegionServer to instantiate.
-   */
-  protected static void doMain(final String [] args,
-      final Class<? extends HRegionServer> regionServerClass) {
-    if (args.length < 1) {
-      printUsageAndExit();
-    }
-    Configuration conf = new HBaseConfiguration();
-    
-    // Process command-line args. TODO: Better cmd-line processing
-    // (but hopefully something not as painful as cli options).
-    final String addressArgKey = "--bind=";
-    for (String cmd: args) {
-      if (cmd.startsWith(addressArgKey)) {
-        conf.set(REGIONSERVER_ADDRESS, cmd.substring(addressArgKey.length()));
-        continue;
-      }
-      
-      if (cmd.equals("start")) {
-        try {
-          // If 'local', don't start a region server here.  Defer to
-          // LocalHBaseCluster.  It manages 'local' clusters.
-          if (LocalHBaseCluster.isLocal(conf)) {
-            LOG.warn("Not starting a distinct region server because " +
-              "hbase.master is set to 'local' mode");
-          } else {
-            Constructor<? extends HRegionServer> c =
-              regionServerClass.getConstructor(HBaseConfiguration.class);
-            HRegionServer hrs = c.newInstance(conf);
-            Thread t = new Thread(hrs);
-            t.setName("regionserver" + hrs.server.getListenerAddress());
-            t.start();
-          }
-        } catch (Throwable t) {
-          LOG.error( "Can not start region server because "+
-              StringUtils.stringifyException(t) );
-          System.exit(-1);
-        }
-        break;
-      }
-      
-      if (cmd.equals("stop")) {
-        printUsageAndExit("To shutdown the regionserver run " +
-        		"bin/hbase-daemon.sh stop regionserver or send a kill signal to" +
-        		"the regionserver pid");
-      }
-      
-      // Print out usage if we get to here.
-      printUsageAndExit();
-    }
-  }
-  
-  /**
-   * @param args
-   */
-  public static void main(String [] args) {
-    Configuration conf = new HBaseConfiguration();
-    @SuppressWarnings("unchecked")
-    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
-        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
-    doMain(args, regionServerClass);
-  }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HServerLoad;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LeaseListener;
+import org.apache.hadoop.hbase.Leases;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionHistorian;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchOperation;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.HbaseRPC;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.InfoServer;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * HRegionServer makes a set of HRegions available to clients.  It checks in with
+ * the HMaster. There are many HRegionServers in a single HBase deployment.
+ */
+public class HRegionServer implements HConstants, HRegionInterface, Runnable {
+  static final Log LOG = LogFactory.getLog(HRegionServer.class);
+  
+  // Set when a report to the master comes back with a message asking us to
+  // shutdown.  Also set by call to stop when debugging or running unit tests
+  // of HRegionServer in isolation. We use AtomicBoolean rather than
+  // plain boolean so we can pass a reference to Chore threads.  Otherwise,
+  // Chore threads need to know about the hosting class.
+  protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
+  
+  protected final AtomicBoolean quiesced = new AtomicBoolean(false);
+  
+  // Go down hard.  Used if file system becomes unavailable and also in
+  // debugging and unit tests.
+  protected volatile boolean abortRequested;
+  
+  // If false, the file system has become unavailable
+  protected volatile boolean fsOk;
+  
+  protected final HServerInfo serverInfo;
+  protected final HBaseConfiguration conf;
+  private FileSystem fs;
+  private Path rootDir;
+  private final Random rand = new Random();
+
+  // Key is Bytes.hashCode of region name byte array and the value is HRegion
+  // in both of the maps below.  Use Bytes.mapKey(byte []) generating key for
+  // below maps.
+  protected final Map<Integer, HRegion> onlineRegions =
+    new ConcurrentHashMap<Integer, HRegion>();
+
+  protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final List<HMsg> outboundMsgs =
+    Collections.synchronizedList(new ArrayList<HMsg>());
+
+  final int numRetries;
+  protected final int threadWakeFrequency;
+  private final int msgInterval;
+  private final int serverLeaseTimeout;
+
+  protected final int numRegionsToReport;
+  
+  // Remote HMaster
+  private HMasterRegionInterface hbaseMaster;
+
+  // Server to handle client requests.  Default access so can be accessed by
+  // unit tests.
+  final Server server;
+  
+  // Leases
+  private final Leases leases;
+  
+  // Request counter
+  private volatile AtomicInteger requestCount = new AtomicInteger();
+
+  // Info server.  Default access so can be used by unit tests.  REGIONSERVER
+  // is name of the webapp and the attribute name used stuffing this instance
+  // into web context.
+  InfoServer infoServer;
+  
+  /** region server process name */
+  public static final String REGIONSERVER = "regionserver";
+  
+  /**
+   * Space is reserved in HRS constructor and then released when aborting
+   * to recover from an OOME. See HBASE-706.
+   */
+  private final LinkedList<byte[]> reservedSpace = new LinkedList<byte []>();
+  
+  /**
+   * Thread to shutdown the region server in an orderly manner.  This thread
+   * is registered as a shutdown hook in the HRegionServer constructor and is
+   * only called when the HRegionServer receives a kill signal.
+   */
+  class ShutdownThread extends Thread {
+    private final HRegionServer instance;
+    
+    /**
+     * @param instance
+     */
+    public ShutdownThread(HRegionServer instance) {
+      this.instance = instance;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void run() {
+      LOG.info("Starting shutdown thread.");
+      
+      // tell the region server to stop and wait for it to complete
+      instance.stop();
+      instance.join();
+      LOG.info("Shutdown thread complete");
+    }    
+  }
+
+  // Compactions
+  final CompactSplitThread compactSplitThread;
+
+  // Cache flushing  
+  final Flusher cacheFlusher;
+  
+  // HLog and HLog roller.  log is protected rather than private to avoid
+  // eclipse warning when accessed by inner classes
+  protected HLog log;
+  final LogRoller logRoller;
+  
+  // flag set after we're done setting up server threads (used for testing)
+  protected volatile boolean isOnline;
+    
+  /**
+   * Starts a HRegionServer at the default location
+   * @param conf
+   * @throws IOException
+   */
+  public HRegionServer(HBaseConfiguration conf) throws IOException {
+    this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
+        DEFAULT_REGIONSERVER_ADDRESS)), conf);
+  }
+  
+  /**
+   * Starts a HRegionServer at the specified location
+   * @param address
+   * @param conf
+   * @throws IOException
+   */
+  public HRegionServer(HServerAddress address, HBaseConfiguration conf)
+  throws IOException {  
+    this.abortRequested = false;
+    this.fsOk = true;
+    this.conf = conf;
+
+    this.isOnline = false;
+
+    // Config'ed params
+    this.numRetries =  conf.getInt("hbase.client.retries.number", 2);
+    this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000);
+    this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
+    this.serverLeaseTimeout =
+      conf.getInt("hbase.master.lease.period", 120 * 1000);
+
+    // Cache flushing thread.
+    this.cacheFlusher = new Flusher(conf, this);
+    
+    // Compaction thread
+    this.compactSplitThread = new CompactSplitThread(this);
+    
+    // Log rolling thread
+    this.logRoller = new LogRoller(this);
+
+    // Task thread to process requests from Master
+    this.worker = new Worker();
+    this.workerThread = new Thread(worker);
+
+    // Server to handle client requests
+    this.server = HbaseRPC.getServer(this, address.getBindAddress(), 
+      address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
+      false, conf);
+    // Address is givin a default IP for the moment. Will be changed after
+    // calling the master.
+    this.serverInfo = new HServerInfo(new HServerAddress(
+      new InetSocketAddress(DEFAULT_HOST,
+      this.server.getListenerAddress().getPort())), System.currentTimeMillis(),
+      this.conf.getInt("hbase.regionserver.info.port", 60030));
+    this.numRegionsToReport =                                        
+      conf.getInt("hbase.regionserver.numregionstoreport", 10);      
+      
+    this.leases = new Leases(
+      conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
+      this.threadWakeFrequency);
+    
+    int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
+    for(int i = 0; i < nbBlocks; i++)  {
+      reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]);
+    }
+    
+    // Register shutdown hook for HRegionServer, runs an orderly shutdown
+    // when a kill signal is recieved
+    Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
+  }
+
+  /**
+   * The HRegionServer sticks in this loop until closed. It repeatedly checks
+   * in with the HMaster, sending heartbeats & reports, and receiving HRegion 
+   * load/unload instructions.
+   */
+  public void run() {
+    boolean quiesceRequested = false;
+    // A sleeper that sleeps for msgInterval.
+    Sleeper sleeper =
+      new Sleeper(this.msgInterval, this.stopRequested);
+    try {
+      init(reportForDuty(sleeper));
+      long lastMsg = 0;
+      // Now ask master what it wants us to do and tell it what we have done
+      for (int tries = 0; !stopRequested.get() && isHealthy();) {
+        long now = System.currentTimeMillis();
+        if (lastMsg != 0 && (now - lastMsg) >= serverLeaseTimeout) {
+          // It has been way too long since we last reported to the master.
+          // Commit suicide.
+          LOG.fatal("unable to report to master for " + (now - lastMsg) +
+            " milliseconds - aborting server");
+          abort();
+          break;
+        }
+        if ((now - lastMsg) >= msgInterval) {
+          HMsg outboundArray[] = null;
+          synchronized(this.outboundMsgs) {
+            outboundArray =
+              this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]);
+            this.outboundMsgs.clear();
+          }
+          try {
+            this.serverInfo.setLoad(new HServerLoad(requestCount.get(),
+                onlineRegions.size()));
+            this.requestCount.set(0);
+            HMsg msgs[] = hbaseMaster.regionServerReport(
+              serverInfo, outboundArray, getMostLoadedRegions());
+            lastMsg = System.currentTimeMillis();
+            if (this.quiesced.get() && onlineRegions.size() == 0) {
+              // We've just told the master we're exiting because we aren't
+              // serving any regions. So set the stop bit and exit.
+              LOG.info("Server quiesced and not serving any regions. " +
+                "Starting shutdown");
+              stopRequested.set(true);
+              this.outboundMsgs.clear();
+              continue;
+            }
+
+            // Queue up the HMaster's instruction stream for processing
+            boolean restart = false;
+            for(int i = 0;
+                !restart && !stopRequested.get() && i < msgs.length;
+                i++) {
+              LOG.info(msgs[i].toString());
+              switch(msgs[i].getType()) {
+              case MSG_CALL_SERVER_STARTUP:
+                // We the MSG_CALL_SERVER_STARTUP on startup but we can also
+                // get it when the master is panicing because for instance
+                // the HDFS has been yanked out from under it.  Be wary of
+                // this message.
+                if (checkFileSystem()) {
+                  closeAllRegions();
+                  try {
+                    log.closeAndDelete();
+                  } catch (Exception e) {
+                    LOG.error("error closing and deleting HLog", e);
+                  }
+                  try {
+                    serverInfo.setStartCode(System.currentTimeMillis());
+                    log = setupHLog();
+                  } catch (IOException e) {
+                    this.abortRequested = true;
+                    this.stopRequested.set(true);
+                    e = RemoteExceptionHandler.checkIOException(e); 
+                    LOG.fatal("error restarting server", e);
+                    break;
+                  }
+                  reportForDuty(sleeper);
+                  restart = true;
+                } else {
+                  LOG.fatal("file system available check failed. " +
+                  "Shutting down server.");
+                }
+                break;
+
+              case MSG_REGIONSERVER_STOP:
+                stopRequested.set(true);
+                break;
+
+              case MSG_REGIONSERVER_QUIESCE:
+                if (!quiesceRequested) {
+                  try {
+                    toDo.put(new ToDoEntry(msgs[i]));
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException("Putting into msgQueue was " +
+                        "interrupted.", e);
+                  }
+                  quiesceRequested = true;
+                }
+                break;
+
+              default:
+                if (fsOk) {
+                  try {
+                    toDo.put(new ToDoEntry(msgs[i]));
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException("Putting into msgQueue was " +
+                        "interrupted.", e);
+                  }
+                }
+              }
+            }
+            // Reset tries count if we had a successful transaction.
+            tries = 0;
+
+            if (restart || this.stopRequested.get()) {
+              toDo.clear();
+              continue;
+            }
+          } catch (Exception e) {
+            if (e instanceof IOException) {
+              e = RemoteExceptionHandler.checkIOException((IOException) e);
+            }
+            if (tries < this.numRetries) {
+              LOG.warn("Processing message (Retry: " + tries + ")", e);
+              tries++;
+            } else {
+              LOG.fatal("Exceeded max retries: " + this.numRetries, e);
+              if (!checkFileSystem()) {
+                continue;
+              }
+              // Something seriously wrong. Shutdown.
+              stop();
+            }
+          }
+        }
+        // Do some housekeeping before going to sleep
+        housekeeping();
+        sleeper.sleep(lastMsg);
+      } // for
+    } catch (OutOfMemoryError error) {
+      abort();
+      LOG.fatal("Ran out of memory", error);
+    } catch (Throwable t) {
+      LOG.fatal("Unhandled exception. Aborting...", t);
+      abort();
+    }
+    RegionHistorian.getInstance().offline();
+    this.leases.closeAfterLeasesExpire();
+    this.worker.stop();
+    this.server.stop();
+    if (this.infoServer != null) {
+      LOG.info("Stopping infoServer");
+      try {
+        this.infoServer.stop();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
+    }
+
+    // Send interrupts to wake up threads if sleeping so they notice shutdown.
+    // TODO: Should we check they are alive?  If OOME could have exited already
+    cacheFlusher.interruptIfNecessary();
+    compactSplitThread.interruptIfNecessary();
+    this.logRoller.interruptIfNecessary();
+
+    if (abortRequested) {
+      if (this.fsOk) {
+        // Only try to clean up if the file system is available
+        try {
+          this.log.close();
+          LOG.info("On abort, closed hlog");
+        } catch (IOException e) {
+          LOG.error("Unable to close log in abort",
+              RemoteExceptionHandler.checkIOException(e));
+        }
+        closeAllRegions(); // Don't leave any open file handles
+      }
+      LOG.info("aborting server at: " +
+        serverInfo.getServerAddress().toString());
+    } else {
+      ArrayList<HRegion> closedRegions = closeAllRegions();
+      try {
+        log.closeAndDelete();
+      } catch (IOException e) {
+        LOG.error("Close and delete failed",
+            RemoteExceptionHandler.checkIOException(e));
+      }
+      try {
+        HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
+        exitMsg[0] = HMsg.REPORT_EXITING;
+        // Tell the master what regions we are/were serving
+        int i = 1;
+        for (HRegion region: closedRegions) {
+          exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE,
+              region.getRegionInfo());
+        }
+
+        LOG.info("telling master that region server is shutting down at: " +
+            serverInfo.getServerAddress().toString());
+        hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null);
+      } catch (IOException e) {
+        LOG.warn("Failed to send exiting message to master: ",
+            RemoteExceptionHandler.checkIOException(e));
+      }
+      LOG.info("stopping server at: " +
+        serverInfo.getServerAddress().toString());
+    }
+    join();
+    LOG.info(Thread.currentThread().getName() + " exiting");
+  }
+  
+  /*
+   * Run init. Sets up hlog and starts up all server threads.
+   * @param c Extra configuration.
+   */
+  protected void init(final MapWritable c) throws IOException {
+    try {
+      for (Map.Entry<Writable, Writable> e: c.entrySet()) {
+        String key = e.getKey().toString();
+        String value = e.getValue().toString();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Config from master: " + key + "=" + value);
+        }
+        this.conf.set(key, value);
+      }
+      // Master may have sent us a new address with the other configs.
+      // Update our address in this case. See HBASE-719
+      if(conf.get("hbase.regionserver.address") != null)
+        serverInfo.setServerAddress(new HServerAddress
+            (conf.get("hbase.regionserver.address"), 
+            serverInfo.getServerAddress().getPort()));
+      // Master sent us hbase.rootdir to use. Should be fully qualified
+      // path with file system specification included.  Set 'fs.default.name'
+      // to match the filesystem on hbase.rootdir else underlying hadoop hdfs
+      // accessors will be going against wrong filesystem (unless all is set
+      // to defaults).
+      this.conf.set("fs.default.name", this.conf.get("hbase.rootdir"));
+      this.fs = FileSystem.get(this.conf);
+      this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
+      this.log = setupHLog();
+      startServiceThreads();
+      isOnline = true;
+    } catch (IOException e) {
+      this.stopRequested.set(true);
+      isOnline = false;
+      e = RemoteExceptionHandler.checkIOException(e); 
+      LOG.fatal("Failed init", e);
+      IOException ex = new IOException("region server startup failed");
+      ex.initCause(e);
+      throw ex;
+    }
+  }
+
+  /**
+   * Report the status of the server. A server is online once all the startup 
+   * is completed (setting up filesystem, starting service threads, etc.). This
+   * method is designed mostly to be useful in tests.
+   * @return true if online, false if not.
+   */
+  public boolean isOnline() {
+    return isOnline;
+  }
+    
+  private HLog setupHLog() throws RegionServerRunningException,
+    IOException {
+    
+    Path logdir = new Path(rootDir, "log" + "_" + 
+        serverInfo.getServerAddress().getBindAddress() + "_" +
+        this.serverInfo.getStartCode() + "_" + 
+        this.serverInfo.getServerAddress().getPort());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Log dir " + logdir);
+    }
+    if (fs.exists(logdir)) {
+      throw new RegionServerRunningException("region server already " +
+        "running at " + this.serverInfo.getServerAddress().toString() +
+        " because logdir " + logdir.toString() + " exists");
+    }
+    return new HLog(fs, logdir, conf, logRoller);
+  }
+  
+  /*
+   * Start Chore Threads, Server, Worker and lease checker threads. Install an
+   * UncaughtExceptionHandler that calls abort of RegionServer if we get
+   * an unhandled exception.  We cannot set the handler on all threads.
+   * Server's internal Listener thread is off limits.  For Server, if an OOME,
+   * it waits a while then retries.  Meantime, a flush or a compaction that
+   * tries to run should trigger same critical condition and the shutdown will
+   * run.  On its way out, this server will shut down Server.  Leases are sort
+   * of inbetween. It has an internal thread that while it inherits from
+   * Chore, it keeps its own internal stop mechanism so needs to be stopped
+   * by this hosting server.  Worker logs the exception and exits.
+   */
+  private void startServiceThreads() throws IOException {
+    String n = Thread.currentThread().getName();
+    UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
+      public void uncaughtException(Thread t, Throwable e) {
+        abort();
+        LOG.fatal("Set stop flag in " + t.getName(), e);
+      }
+    };
+    Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller",
+        handler);
+    Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
+      handler);
+    Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
+        handler);
+    Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
+    // Leases is not a Thread. Internally it runs a daemon thread.  If it gets
+    // an unhandled exception, it will just exit.
+    this.leases.setName(n + ".leaseChecker");
+    this.leases.start();
+    // Put up info server.
+    int port = this.conf.getInt("hbase.regionserver.info.port", 60030);
+    if (port >= 0) {
+      String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
+      this.infoServer = new InfoServer("regionserver", a, port, false);
+      this.infoServer.setAttribute("regionserver", this);
+      this.infoServer.start();
+    }
+    // Start Server.  This service is like leases in that it internally runs
+    // a thread.
+    this.server.start();
+    LOG.info("HRegionServer started at: " +
+        serverInfo.getServerAddress().toString());
+  }
+
+  /*
+   * Verify that server is healthy
+   */
+  private boolean isHealthy() {
+    if (!fsOk) {
+      // File system problem
+      return false;
+    }
+    // Verify that all threads are alive
+    if (!(leases.isAlive() && compactSplitThread.isAlive() &&
+        cacheFlusher.isAlive() && logRoller.isAlive() &&
+        workerThread.isAlive())) {
+      // One or more threads are no longer alive - shut down
+      stop();
+      return false;
+    }
+    return true;
+  }
+  /*
+   * Run some housekeeping tasks before we go into 'hibernation' sleeping at
+   * the end of the main HRegionServer run loop.
+   */
+  private void housekeeping() {
+    // If the todo list has > 0 messages, iterate looking for open region
+    // messages. Send the master a message that we're working on its
+    // processing so it doesn't assign the region elsewhere.
+    if (this.toDo.size() <= 0) {
+      return;
+    }
+    // This iterator is 'safe'.  We are guaranteed a view on state of the
+    // queue at time iterator was taken out.  Apparently goes from oldest.
+    for (ToDoEntry e: this.toDo) {
+      if (e.msg.isType(HMsg.Type.MSG_REGION_OPEN)) {
+        addProcessingMessage(e.msg.getRegionInfo());
+      }
+    }
+  }
+
+  /** @return the HLog */
+  HLog getLog() {
+    return this.log;
+  }
+
+  /**
+   * Sets a flag that will cause all the HRegionServer threads to shut down
+   * in an orderly fashion.  Used by unit tests.
+   */
+  public void stop() {
+    this.stopRequested.set(true);
+    synchronized(this) {
+      notifyAll(); // Wakes run() if it is sleeping
+    }
+  }
+  
+  /**
+   * Cause the server to exit without closing the regions it is serving, the
+   * log it is using and without notifying the master.
+   * Used unit testing and on catastrophic events such as HDFS is yanked out
+   * from under hbase or we OOME.
+   */
+  public void abort() {
+    reservedSpace.clear();
+    this.abortRequested = true;
+    stop();
+  }
+
+  /** 
+   * Wait on all threads to finish.
+   * Presumption is that all closes and stops have already been called.
+   */
+  void join() {
+    join(this.workerThread);
+    join(this.cacheFlusher);
+    join(this.compactSplitThread);
+    join(this.logRoller);
+  }
+
+  private void join(final Thread t) {
+    while (t.isAlive()) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        // continue
+      }
+    }
+  }
+  
+  /*
+   * Let the master know we're here
+   * Run initialization using parameters passed us by the master.
+   */
+  private MapWritable reportForDuty(final Sleeper sleeper)
+  throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Telling master at " +
+        conf.get(MASTER_ADDRESS) + " that we are up");
+    }
+    // Do initial RPC setup.  The final argument indicates that the RPC should retry indefinitely.
+    this.hbaseMaster = (HMasterRegionInterface)HbaseRPC.waitForProxy(
+      HMasterRegionInterface.class, HMasterRegionInterface.versionID,
+      new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
+      this.conf, -1);
+    MapWritable result = null;
+    long lastMsg = 0;
+    while(!stopRequested.get()) {
+      try {
+        this.requestCount.set(0);
+        this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
+        lastMsg = System.currentTimeMillis();
+        result = this.hbaseMaster.regionServerStartup(serverInfo);
+        break;
+      } catch (Leases.LeaseStillHeldException e) {
+        LOG.info("Lease " + e.getName() + " already held on master. Check " +
+          "DNS configuration so that all region servers are" +
+          "reporting their true IPs and not 127.0.0.1. Otherwise, this" +
+          "problem should resolve itself after the lease period of " +
+          this.conf.get("hbase.master.lease.period")
+          + " seconds expires over on the master");
+      } catch (IOException e) {
+        LOG.warn("error telling master we are up", e);
+      }
+      sleeper.sleep(lastMsg);
+    }
+    return result;
+  }
+
+  /* Add to the outbound message buffer */
+  private void reportOpen(HRegionInfo region) {
+    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region));
+  }
+
+  /* Add to the outbound message buffer */
+  private void reportClose(HRegionInfo region) {
+    reportClose(region, null);
+  }
+
+  /* Add to the outbound message buffer */
+  private void reportClose(final HRegionInfo region, final Text message) {
+    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message));
+  }
+
+  
+  /**
+   * Add to the outbound message buffer
+   * 
+   * When a region splits, we need to tell the master that there are two new 
+   * regions that need to be assigned.
+   * 
+   * We do not need to inform the master about the old region, because we've
+   * updated the meta or root regions, and the master will pick that up on its
+   * next rescan of the root or meta tables.
+   */
+  void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA,
+      HRegionInfo newRegionB) {
+
+    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion,
+      new Text(oldRegion.getRegionNameAsString() + " split; daughters: " +
+        newRegionA.getRegionNameAsString() + ", " +
+        newRegionB.getRegionNameAsString())));
+    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA));
+    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB));
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // HMaster-given operations
+  //////////////////////////////////////////////////////////////////////////////
+
+  /*
+   * Data structure to hold a HMsg and retries count.
+   */
+  private static class ToDoEntry {
+    private int tries;
+    private final HMsg msg;
+    ToDoEntry(HMsg msg) {
+      this.tries = 0;
+      this.msg = msg;
+    }
+  }
+  
+  final BlockingQueue<ToDoEntry> toDo = new LinkedBlockingQueue<ToDoEntry>();
+  private Worker worker;
+  private Thread workerThread;
+  
+  /** Thread that performs long running requests from the master */
+  class Worker implements Runnable {
+    void stop() {
+      synchronized(toDo) {
+        toDo.notifyAll();
+      }
+    }
+    
+    /** {@inheritDoc} */
+    public void run() {
+      try {
+        while(!stopRequested.get()) {
+          ToDoEntry e = null;
+          try {
+            e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+            if(e == null || stopRequested.get()) {
+              continue;
+            }
+            LOG.info(e.msg);
+            switch(e.msg.getType()) {
+
+            case MSG_REGIONSERVER_QUIESCE:
+              closeUserRegions();
+              break;
+
+            case MSG_REGION_OPEN:
+              // Open a region
+              openRegion(e.msg.getRegionInfo());
+              break;
+
+            case MSG_REGION_CLOSE:
+              // Close a region
+              closeRegion(e.msg.getRegionInfo(), true);
+              break;
+
+            case MSG_REGION_CLOSE_WITHOUT_REPORT:
+              // Close a region, don't reply
+              closeRegion(e.msg.getRegionInfo(), false);
+              break;
+
+            default:
+              throw new AssertionError(
+                  "Impossible state during msg processing.  Instruction: "
+                  + e.msg.toString());
+            }
+          } catch (InterruptedException ex) {
+            // continue
+          } catch (Exception ex) {
+            if (ex instanceof IOException) {
+              ex = RemoteExceptionHandler.checkIOException((IOException) ex);
+            }
+            if(e != null && e.tries < numRetries) {
+              LOG.warn(ex);
+              e.tries++;
+              try {
+                toDo.put(e);
+              } catch (InterruptedException ie) {
+                throw new RuntimeException("Putting into msgQueue was " +
+                    "interrupted.", ex);
+              }
+            } else {
+              LOG.error("unable to process message" +
+                  (e != null ? (": " + e.msg.toString()) : ""), ex);
+              if (!checkFileSystem()) {
+                break;
+              }
+            }
+          }
+        }

[... 783 lines stripped ...]