You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/05/05 06:22:54 UTC

svn commit: r1334314 [3/4] - in /hbase/trunk: conf/ security/src/main/java/org/apache/hadoop/hbase/security/ src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ ...

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1334314&r1=1334313&r2=1334314&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat May  5 04:22:52 2012
@@ -43,9 +43,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -68,21 +70,19 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.UnknownRowLockException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaEditor;
 import org.apache.hadoop.hbase.catalog.MetaReader;
-import org.apache.hadoop.hbase.client.Action;
 import org.apache.hadoop.hbase.client.AdminProtocol;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.ClientProtocol;
@@ -91,10 +91,8 @@ import org.apache.hadoop.hbase.client.Ge
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.MultiAction;
-import org.apache.hadoop.hbase.client.MultiResponse;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Exec;
@@ -102,24 +100,78 @@ import org.apache.hadoop.hbase.client.co
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.CacheStats;
 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.ipc.Invocation;
 import org.apache.hadoop.hbase.ipc.ProtocolSignature;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
+import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
+import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
@@ -128,7 +180,6 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
-import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.security.User;
@@ -136,6 +187,7 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.CompressionTest;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.InfoServer;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Sleeper;
@@ -157,14 +209,11 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.zookeeper.KeeperException;
 import org.codehaus.jackson.map.ObjectMapper;
 import com.google.protobuf.ServiceException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
@@ -172,19 +221,97 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.ipc.RegionServerStatusProtocol;
 
 import com.google.common.base.Function;
-import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcController;
 
 /**
  * HRegionServer makes a set of HRegions available to clients. It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
  */
 @InterfaceAudience.Private
-public class HRegionServer extends RegionServer
-    implements HRegionInterface, HBaseRPCErrorHandler {
+@SuppressWarnings("deprecation")
+public class HRegionServer implements ClientProtocol,
+    AdminProtocol, Runnable, RegionServerServices, HBaseRPCErrorHandler {
 
   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
 
+  private final Random rand = new Random();
+
+  /*
+   * Strings to be used in forming the exception message for
+   * RegionsAlreadyInTransitionException.
+   */
+  protected static final String OPEN = "OPEN";
+  protected static final String CLOSE = "CLOSE";
+
+  //RegionName vs current action in progress
+  //true - if open region action in progress
+  //false - if close region action in progress
+  protected final ConcurrentSkipListMap<byte[], Boolean> regionsInTransitionInRS =
+    new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
+
+  protected long maxScannerResultSize;
+
+  // Cache flushing
+  protected MemStoreFlusher cacheFlusher;
+
+  // catalog tracker
+  protected CatalogTracker catalogTracker;
+
+  /**
+   * Go here to get table descriptors.
+   */
+  protected TableDescriptors tableDescriptors;
+
+  // Replication services. If no replication, this handler will be null.
+  protected ReplicationSourceService replicationSourceHandler;
+  protected ReplicationSinkService replicationSinkHandler;
+
+  // Compactions
+  public CompactSplitThread compactSplitThread;
+
+  final Map<String, RegionScanner> scanners =
+      new ConcurrentHashMap<String, RegionScanner>();
+
+  /**
+   * Map of regions currently being served by this region server. Key is the
+   * encoded region name.  All access should be synchronized.
+   */
+  protected final Map<String, HRegion> onlineRegions =
+    new ConcurrentHashMap<String, HRegion>();
+
+  // Leases
+  protected Leases leases;
+
+  // Instance of the hbase executor service.
+  protected ExecutorService service;
+
+  // Request counter.
+  // Do we need this?  Can't we just sum region counters?  St.Ack 20110412
+  protected AtomicInteger requestCount = new AtomicInteger();
+
+  // If false, the file system has become unavailable
+  protected volatile boolean fsOk;
+  protected HFileSystem fs;
+
+  protected static final int NORMAL_QOS = 0;
+  protected static final int QOS_THRESHOLD = 10;  // the line between low and high qos
+  protected static final int HIGH_QOS = 100;
+
+  // Set when a report to the master comes back with a message asking us to
+  // shutdown. Also set by call to stop when debugging or running unit tests
+  // of HRegionServer in isolation.
+  protected volatile boolean stopped = false;
+
+  // Go down hard. Used if file system becomes unavailable and also in
+  // debugging and unit tests.
+  protected volatile boolean abortRequested;
+
+  // Port we put up the webui on.
+  protected int webuiport = -1;
+
+  Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
+
   // A state before we go into stopped state.  At this stage we're closing user
   // space regions.
   private boolean stopping = false;
@@ -231,7 +358,6 @@ public class HRegionServer extends Regio
 
   private RegionServerMetrics metrics;
 
-  @SuppressWarnings("unused")
   private RegionServerDynamicMetrics dynamicMetrics;
 
   /*
@@ -280,9 +406,6 @@ public class HRegionServer extends Regio
    */
   private ServerName serverNameFromMasterPOV;
 
-  // Port we put up the webui on.
-  private int webuiport = -1;
-
   /**
    * This servers startcode.
    */
@@ -349,7 +472,7 @@ public class HRegionServer extends Regio
     }
 
     this.rpcServer = HBaseRPC.getServer(this,
-      new Class<?>[]{HRegionInterface.class, ClientProtocol.class,
+      new Class<?>[]{ClientProtocol.class,
         AdminProtocol.class, HBaseRPCErrorHandler.class,
         OnlineRegions.class},
         initialIsa.getHostName(), // BindAddress is IP we got for this server.
@@ -582,7 +705,7 @@ public class HRegionServer extends Regio
 
     // Create the thread for the ThriftServer.
     if (conf.getBoolean("hbase.regionserver.export.thrift", false)) {
-      thriftServer = new HRegionThriftServer((RegionServer)this, conf);
+      thriftServer = new HRegionThriftServer(this, conf);
       thriftServer.start();
       LOG.info("Started Thrift API from Region Server.");
     }
@@ -591,7 +714,6 @@ public class HRegionServer extends Regio
   /**
    * The HRegionServer sticks in this loop until closed.
    */
-  @SuppressWarnings("deprecation")
   public void run() {
     try {
       // Do pre-registration initializations; zookeeper, lease threads, etc.
@@ -1834,1581 +1956,1841 @@ public class HRegionServer extends Regio
     }
   }
 
-  @Override
-  @QosPriority(priority=HIGH_QOS)
-  public HRegionInfo getRegionInfo(final byte[] regionName)
-  throws NotServingRegionException, IOException {
-    checkOpen();
-    requestCount.incrementAndGet();
-    return getRegion(regionName).getRegionInfo();
-  }
-
-  public Result getClosestRowBefore(final byte[] regionName, final byte[] row,
-      final byte[] family) throws IOException {
-    checkOpen();
-    requestCount.incrementAndGet();
-    try {
-      // locate the region we're operating on
-      HRegion region = getRegion(regionName);
-      // ask the region for all the data
-
-      Result r = region.getClosestRowBefore(row, family);
-      return r;
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    }
+  /** @return the info server */
+  public InfoServer getInfoServer() {
+    return infoServer;
   }
 
-  /** {@inheritDoc} */
-  public Result get(byte[] regionName, Get get) throws IOException {
-    checkOpen();
-    final long startTime = System.nanoTime();
-    requestCount.incrementAndGet();
-    try {
-      HRegion region = getRegion(regionName);
-      return region.get(get, getLockFromId(get.getLockId()));
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    } finally {
-      this.metrics.getLatencies.update(System.nanoTime() - startTime);
-    }
+  /**
+   * @return true if a stop has been requested.
+   */
+  public boolean isStopped() {
+    return this.stopped;
   }
 
-  public boolean exists(byte[] regionName, Get get) throws IOException {
-    checkOpen();
-    requestCount.incrementAndGet();
-    try {
-      HRegion region = getRegion(regionName);
-      Integer lock = getLockFromId(get.getLockId());
-      if (region.getCoprocessorHost() != null) {
-        Boolean result = region.getCoprocessorHost().preExists(get);
-        if (result != null) {
-          return result.booleanValue();
-        }
-      }
-      Result r = region.get(get, lock);
-      boolean result = r != null && !r.isEmpty();
-      if (region.getCoprocessorHost() != null) {
-        result = region.getCoprocessorHost().postExists(get, result);
-      }
-      return result;
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    }
+  @Override
+  public boolean isStopping() {
+    return this.stopping;
   }
 
-  public void put(final byte[] regionName, final Put put) throws IOException {
-    if (put.getRow() == null) {
-      throw new IllegalArgumentException("update has null row");
-    }
-
-    final long startTime = System.nanoTime();
-    checkOpen();
-    this.requestCount.incrementAndGet();
-    HRegion region = getRegion(regionName);
-    try {
-      if (!region.getRegionInfo().isMetaTable()) {
-        this.cacheFlusher.reclaimMemStoreMemory();
-      }
-      boolean writeToWAL = put.getWriteToWAL();
-      region.put(put, getLockFromId(put.getLockId()), writeToWAL);
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    } finally {
-      this.metrics.putLatencies.update(System.nanoTime() - startTime);
-    }
+  /**
+   *
+   * @return the configuration
+   */
+  public Configuration getConfiguration() {
+    return conf;
   }
 
-  public int put(final byte[] regionName, final List<Put> puts)
-      throws IOException {
-    checkOpen();
-    HRegion region = null;
-    int i = 0;
-
-    final long startTime = System.nanoTime();
-    try {
-      region = getRegion(regionName);
-      if (!region.getRegionInfo().isMetaTable()) {
-        this.cacheFlusher.reclaimMemStoreMemory();
-      }
-
-      @SuppressWarnings("unchecked")
-      Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
-
-      for (Put p : puts) {
-        Integer lock = getLockFromId(p.getLockId());
-        putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
-      }
-
-      this.requestCount.addAndGet(puts.size());
-      OperationStatus codes[] = region.put(putsWithLocks);
-      for (i = 0; i < codes.length; i++) {
-        if (codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
-          return i;
-        }
-      }
-      return -1;
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    } finally {
-      // going to count this as puts.size() PUTs for latency calculations
-      final long totalTime = System.nanoTime() - startTime;
-      final long putCount = i;
-      final long perPutTime = totalTime / putCount;
-      for (int request = 0; request < putCount; request++) {
-        this.metrics.putLatencies.update(perPutTime);
-      }
-    }
+  /** @return the write lock for the server */
+  ReentrantReadWriteLock.WriteLock getWriteLock() {
+    return lock.writeLock();
   }
 
-  private boolean checkAndMutate(final byte[] regionName, final byte[] row,
-      final byte[] family, final byte[] qualifier, final CompareOp compareOp,
-      final WritableByteArrayComparable comparator, final Writable w,
-      Integer lock) throws IOException {
-    checkOpen();
-    this.requestCount.incrementAndGet();
-    HRegion region = getRegion(regionName);
-    try {
-      if (!region.getRegionInfo().isMetaTable()) {
-        this.cacheFlusher.reclaimMemStoreMemory();
-      }
-      return region.checkAndMutate(row, family, qualifier, compareOp,
-        comparator, w, lock, true);
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    }
+  public int getNumberOfOnlineRegions() {
+    return this.onlineRegions.size();
   }
 
-  /**
-   *
-   * @param regionName
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param value
-   *          the expected value
-   * @param put
-   * @throws IOException
-   * @return true if the new put was execute, false otherwise
-   */
-  public boolean checkAndPut(final byte[] regionName, final byte[] row,
-      final byte[] family, final byte[] qualifier, final byte[] value,
-      final Put put) throws IOException {
-    checkOpen();
-    if (regionName == null) {
-      throw new IOException("Invalid arguments to checkAndPut "
-          + "regionName is null");
-    }
-    HRegion region = getRegion(regionName);
-    Integer lock = getLockFromId(put.getLockId());
-    WritableByteArrayComparable comparator = new BinaryComparator(value);
-    if (region.getCoprocessorHost() != null) {
-      Boolean result = region.getCoprocessorHost()
-        .preCheckAndPut(row, family, qualifier, CompareOp.EQUAL, comparator,
-          put);
-      if (result != null) {
-        return result.booleanValue();
-      }
-    }
-    boolean result = checkAndMutate(regionName, row, family, qualifier,
-      CompareOp.EQUAL, new BinaryComparator(value), put,
-      lock);
-    if (region.getCoprocessorHost() != null) {
-      result = region.getCoprocessorHost().postCheckAndPut(row, family,
-        qualifier, CompareOp.EQUAL, comparator, put, result);
-    }
-    return result;
+  boolean isOnlineRegionsEmpty() {
+    return this.onlineRegions.isEmpty();
   }
 
   /**
-   *
-   * @param regionName
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param compareOp
-   * @param comparator
-   * @param put
+   * @param encodedRegionName
+   * @return JSON Map of labels to values for passed in <code>encodedRegionName</code>
    * @throws IOException
-   * @return true if the new put was execute, false otherwise
    */
-  public boolean checkAndPut(final byte[] regionName, final byte[] row,
-      final byte[] family, final byte[] qualifier, final CompareOp compareOp,
-      final WritableByteArrayComparable comparator, final Put put)
-       throws IOException {
-    checkOpen();
-    if (regionName == null) {
-      throw new IOException("Invalid arguments to checkAndPut "
-          + "regionName is null");
+  public byte [] getRegionStats(final String encodedRegionName)
+  throws IOException {
+    HRegion r = null;
+    synchronized (this.onlineRegions) {
+      r = this.onlineRegions.get(encodedRegionName);
     }
-    HRegion region = getRegion(regionName);
-    Integer lock = getLockFromId(put.getLockId());
-    if (region.getCoprocessorHost() != null) {
-      Boolean result = region.getCoprocessorHost()
-        .preCheckAndPut(row, family, qualifier, compareOp, comparator, put);
-      if (result != null) {
-        return result.booleanValue();
+    if (r == null) return null;
+    ObjectMapper mapper = new ObjectMapper();
+    int stores = 0;
+    int storefiles = 0;
+    int storefileSizeMB = 0;
+    int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
+    int storefileIndexSizeMB = 0;
+    synchronized (r.stores) {
+      stores += r.stores.size();
+      for (Store store : r.stores.values()) {
+        storefiles += store.getStorefilesCount();
+        storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
+        storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
       }
     }
-    boolean result = checkAndMutate(regionName, row, family, qualifier,
-      compareOp, comparator, put, lock);
-    if (region.getCoprocessorHost() != null) {
-      result = region.getCoprocessorHost().postCheckAndPut(row, family,
-        qualifier, compareOp, comparator, put, result);
-    }
-    return result;
+    Map<String, Integer> map = new TreeMap<String, Integer>();
+    map.put("stores", stores);
+    map.put("storefiles", storefiles);
+    map.put("storefileSizeMB", storefileSizeMB);
+    map.put("storefileIndexSizeMB", storefileIndexSizeMB);
+    map.put("memstoreSizeMB", memstoreSizeMB);
+    StringWriter w = new StringWriter();
+    mapper.writeValue(w, map);
+    w.close();
+    return Bytes.toBytes(w.toString());
   }
 
   /**
-   *
-   * @param regionName
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param value
-   *          the expected value
-   * @param delete
-   * @throws IOException
-   * @return true if the new put was execute, false otherwise
+   * For tests and web ui.
+   * This method will only work if HRegionServer is in the same JVM as client;
+   * HRegion cannot be serialized to cross an rpc.
+   * @see #getOnlineRegions()
    */
-  public boolean checkAndDelete(final byte[] regionName, final byte[] row,
-      final byte[] family, final byte[] qualifier, final byte[] value,
-      final Delete delete) throws IOException {
-    checkOpen();
-
-    if (regionName == null) {
-      throw new IOException("Invalid arguments to checkAndDelete "
-          + "regionName is null");
-    }
-    HRegion region = getRegion(regionName);
-    Integer lock = getLockFromId(delete.getLockId());
-    WritableByteArrayComparable comparator = new BinaryComparator(value);
-    if (region.getCoprocessorHost() != null) {
-      Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
-        family, qualifier, CompareOp.EQUAL, comparator, delete);
-      if (result != null) {
-        return result.booleanValue();
-      }
-    }
-    boolean result = checkAndMutate(regionName, row, family, qualifier,
-      CompareOp.EQUAL, comparator, delete, lock);
-    if (region.getCoprocessorHost() != null) {
-      result = region.getCoprocessorHost().postCheckAndDelete(row, family,
-        qualifier, CompareOp.EQUAL, comparator, delete, result);
-    }
-    return result;
+  public Collection<HRegion> getOnlineRegionsLocalContext() {
+    Collection<HRegion> regions = this.onlineRegions.values();
+    return Collections.unmodifiableCollection(regions);
   }
 
   @Override
-  public List<String> getStoreFileList(byte[] regionName, byte[] columnFamily)
-    throws IllegalArgumentException {
-    return getStoreFileList(regionName, new byte[][]{columnFamily});
+  public void addToOnlineRegions(HRegion region) {
+    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
   }
 
   @Override
-  public List<String> getStoreFileList(byte[] regionName, byte[][] columnFamilies)
-    throws IllegalArgumentException {
-    HRegion region = getOnlineRegion(regionName);
-    if (region == null) {
-      throw new IllegalArgumentException("No region: " + new String(regionName)
-          + " available");
-    }
-    return region.getStoreFileList(columnFamilies);
+  public boolean removeFromOnlineRegions(final String encodedName) {
+    HRegion toReturn = null;
+    toReturn = this.onlineRegions.remove(encodedName);
+    
+    //Clear all of the dynamic metrics as they are now probably useless.
+    //This is a clear because dynamic metrics could include metrics per cf and
+    //per hfile.  Figuring out which cfs, hfiles, and regions are still relevant to
+    //this region server would be an onerous task.  Instead just clear everything
+    //and on the next tick of the metrics everything that is still relevant will be
+    //re-added.
+    this.dynamicMetrics.clear();
+    return toReturn != null;
   }
 
-  public List<String> getStoreFileList(byte[] regionName)
-    throws IllegalArgumentException {
-    HRegion region = getOnlineRegion(regionName);
-    if (region == null) {
-      throw new IllegalArgumentException("No region: " + new String(regionName)
-          + " available");
-    }
-    Set<byte[]> columnFamilies = region.getStores().keySet();
-    int nCF = columnFamilies.size();
-    return region.getStoreFileList(columnFamilies.toArray(new byte[nCF][]));
-  }
-  
- /**
-  * Flushes the given region
-  */
-  public void flushRegion(byte[] regionName)
-    throws IllegalArgumentException, IOException {
-    HRegion region = getOnlineRegion(regionName);
-    if (region == null) {
-      throw new IllegalArgumentException("No region : " + new String(regionName)
-      + " available");
+  /**
+   * @return A new Map of online regions sorted by region size with the first
+   *         entry being the biggest.
+   */
+  public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
+    // we'll sort the regions in reverse
+    SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
+        new Comparator<Long>() {
+          public int compare(Long a, Long b) {
+            return -1 * a.compareTo(b);
+          }
+        });
+    // Copy over all regions. Regions are sorted by size with biggest first.
+    for (HRegion region : this.onlineRegions.values()) {
+      sortedRegions.put(Long.valueOf(region.memstoreSize.get()), region);
     }
-    region.flushcache();
+    return sortedRegions;
   }
 
- /**
-   * Flushes the given region if lastFlushTime < ifOlderThanTS
-   */
-   public void flushRegion(byte[] regionName, long ifOlderThanTS)
-     throws IllegalArgumentException, IOException {
-     HRegion region = getOnlineRegion(regionName);
-     if (region == null) {
-       throw new IllegalArgumentException("No region : " + new String(regionName)
-       + " available");
-     }
-     if (region.getLastFlushTime() < ifOlderThanTS) region.flushcache();
-   }
+  /** @return the request count */
+  public AtomicInteger getRequestCount() {
+    return this.requestCount;
+  }
 
   /**
-   * Gets last flush time for the given region
-   * @return the last flush time for a region
+   * @return time stamp in millis of when this region server was started
    */
-  public long getLastFlushTime(byte[] regionName) {
-    HRegion region = getOnlineRegion(regionName);
-    if (region == null) {
-      throw new IllegalArgumentException("No region : " + new String(regionName)
-      + " available");
-    }
-    return region.getLastFlushTime();
+  public long getStartcode() {
+    return this.startcode;
   }
- 
-  /**
-   *
-   * @param regionName
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param compareOp
-   * @param comparator
-   * @param delete
-   * @throws IOException
-   * @return true if the new put was execute, false otherwise
-   */
-  public boolean checkAndDelete(final byte[] regionName, final byte[] row,
-      final byte[] family, final byte[] qualifier, final CompareOp compareOp,
-      final WritableByteArrayComparable comparator, final Delete delete)
-      throws IOException {
-    checkOpen();
-
-    if (regionName == null) {
-      throw new IOException("Invalid arguments to checkAndDelete "
-        + "regionName is null");
-    }
-    HRegion region = getRegion(regionName);
-    Integer lock = getLockFromId(delete.getLockId());
-    if (region.getCoprocessorHost() != null) {
-      Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
-        family, qualifier, compareOp, comparator, delete);
-     if (result != null) {
-       return result.booleanValue();
-     }
-    }
-    boolean result = checkAndMutate(regionName, row, family, qualifier,
-      compareOp, comparator, delete, lock);
-   if (region.getCoprocessorHost() != null) {
-     result = region.getCoprocessorHost().postCheckAndDelete(row, family,
-       qualifier, compareOp, comparator, delete, result);
-   }
-   return result;
- }
 
-  //
-  // remote scanner interface
-  //
+  /** @return reference to FlushRequester */
+  public FlushRequester getFlushRequester() {
+    return this.cacheFlusher;
+  }
 
-  public long openScanner(byte[] regionName, Scan scan) throws IOException {
-    checkOpen();
-    NullPointerException npe = null;
-    if (regionName == null) {
-      npe = new NullPointerException("regionName is null");
-    } else if (scan == null) {
-      npe = new NullPointerException("scan is null");
-    }
-    if (npe != null) {
-      throw new IOException("Invalid arguments to openScanner", npe);
-    }
-    requestCount.incrementAndGet();
-    try {
-      HRegion r = getRegion(regionName);
-      r.checkRow(scan.getStartRow(), "Scan");
-      r.prepareScanner(scan);
-      RegionScanner s = null;
-      if (r.getCoprocessorHost() != null) {
-        s = r.getCoprocessorHost().preScannerOpen(scan);
-      }
-      if (s == null) {
-        s = r.getScanner(scan);
-      }
-      if (r.getCoprocessorHost() != null) {
-        RegionScanner savedScanner = r.getCoprocessorHost().postScannerOpen(
-            scan, s);
-        if (savedScanner == null) {
-          LOG.warn("PostScannerOpen impl returning null. "
-              + "Check the RegionObserver implementation.");
-        } else {
-          s = savedScanner;
-        }
+  /**
+   * Get the top N most loaded regions this server is serving so we can tell the
+   * master which regions it can reallocate if we're overloaded. TODO: actually
+   * calculate which regions are most loaded. (Right now, we're just grabbing
+   * the first N regions being served regardless of load.)
+   */
+  protected HRegionInfo[] getMostLoadedRegions() {
+    ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
+    for (HRegion r : onlineRegions.values()) {
+      if (!r.isAvailable()) {
+        continue;
+      }
+      if (regions.size() < numRegionsToReport) {
+        regions.add(r.getRegionInfo());
+      } else {
+        break;
       }
-      return addScanner(s);
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t, "Failed openScanner"));
     }
+    return regions.toArray(new HRegionInfo[regions.size()]);
   }
 
-  public Result next(final long scannerId) throws IOException {
-    Result[] res = next(scannerId, 1);
-    if (res == null || res.length == 0) {
-      return null;
+  @Override
+  @QosPriority(priority=HIGH_QOS)
+  public ProtocolSignature getProtocolSignature(
+      String protocol, long version, int clientMethodsHashCode)
+  throws IOException {
+    if (protocol.equals(ClientProtocol.class.getName())) {
+      return new ProtocolSignature(ClientProtocol.VERSION, null);
+    } else if (protocol.equals(AdminProtocol.class.getName())) {
+      return new ProtocolSignature(AdminProtocol.VERSION, null);
     }
-    return res[0];
+    throw new IOException("Unknown protocol: " + protocol);
   }
 
-  public Result[] next(final long scannerId, int nbRows) throws IOException {
-    String scannerName = String.valueOf(scannerId);
-    RegionScanner s = this.scanners.get(scannerName);
-    if (s == null) throw new UnknownScannerException("Name: " + scannerName);
-    try {
-      checkOpen();
-    } catch (IOException e) {
-      // If checkOpen failed, server not running or filesystem gone,
-      // cancel this lease; filesystem is gone or we're closing or something.
-      try {
-        this.leases.cancelLease(scannerName);
-      } catch (LeaseException le) {
-        LOG.info("Server shutting down and client tried to access missing scanner " +
-          scannerName);
-      }
-      throw e;
+  @Override
+  @QosPriority(priority=HIGH_QOS)
+  public long getProtocolVersion(final String protocol, final long clientVersion)
+  throws IOException {
+    if (protocol.equals(ClientProtocol.class.getName())) {
+      return ClientProtocol.VERSION;
+    } else if (protocol.equals(AdminProtocol.class.getName())) {
+      return AdminProtocol.VERSION;
     }
-    Leases.Lease lease = null;
-    try {
-      // Remove lease while its being processed in server; protects against case
-      // where processing of request takes > lease expiration time.
-      lease = this.leases.removeLease(scannerName);
-      List<Result> results = new ArrayList<Result>(nbRows);
-      long currentScanResultSize = 0;
-      List<KeyValue> values = new ArrayList<KeyValue>();
-
-      // Call coprocessor. Get region info from scanner.
-      HRegion region = getRegion(s.getRegionInfo().getRegionName());
-      if (region != null && region.getCoprocessorHost() != null) {
-        Boolean bypass = region.getCoprocessorHost().preScannerNext(s,
-            results, nbRows);
-        if (!results.isEmpty()) {
-          for (Result r : results) {
-            for (KeyValue kv : r.raw()) {
-              currentScanResultSize += kv.heapSize();
-            }
-          }
-        }
-        if (bypass != null) {
-          return s.isFilterDone() && results.isEmpty() ? null
-              : results.toArray(new Result[0]);
-        }
-      }
-      long maxResultSize;
-      if (s.getMaxResultSize() > 0) {
-        maxResultSize = s.getMaxResultSize();
-      } else {
-        maxResultSize = maxScannerResultSize;
-      }
-      for (int i = 0; i < nbRows && currentScanResultSize < maxResultSize; i++) {
-        requestCount.incrementAndGet();
-        // Collect values to be returned here
-        boolean moreRows = s.next(values, SchemaMetrics.METRIC_NEXTSIZE);
-        if (!values.isEmpty()) {
-          for (KeyValue kv : values) {
-            currentScanResultSize += kv.heapSize();
-          }
-          results.add(new Result(values));
-        }
-        if (!moreRows) {
-          break;
-        }
-        values.clear();
-      }
+    throw new IOException("Unknown protocol: " + protocol);
+  }
 
-      // coprocessor postNext hook
-      if (region != null && region.getCoprocessorHost() != null) {
-        region.getCoprocessorHost().postScannerNext(s, results, nbRows, true);
-      }
+  /**
+   * @return Return the leases.
+   */
+  protected Leases getLeases() {
+    return leases;
+  }
 
-      // If the scanner's filter - if any - is done with the scan
-      // and wants to tell the client to stop the scan. This is done by passing
-      // a null result.
-      return s.isFilterDone() && results.isEmpty() ? null
-          : results.toArray(new Result[0]);
-    } catch (Throwable t) {
-      if (t instanceof NotServingRegionException) {
-        this.scanners.remove(scannerName);
-      }
-      throw convertThrowableToIOE(cleanup(t));
-    } finally {
-      // We're done. On way out readd the above removed lease.  Adding resets
-      // expiration time on lease.
-      if (this.scanners.containsKey(scannerName)) {
-        if (lease != null) this.leases.addLease(lease);
-      }
-    }
+  /**
+   * @return Return the rootDir.
+   */
+  protected Path getRootDir() {
+    return rootDir;
   }
 
-  public void close(final long scannerId) throws IOException {
-    try {
-      checkOpen();
-      requestCount.incrementAndGet();
-      String scannerName = String.valueOf(scannerId);
-      RegionScanner s = scanners.get(scannerName);
+  /**
+   * @return Return the fs.
+   */
+  public FileSystem getFileSystem() {
+    return fs;
+  }
 
-      HRegion region = null;
-      if (s != null) {
-        // call coprocessor.
-        region = getRegion(s.getRegionInfo().getRegionName());
-        if (region != null && region.getCoprocessorHost() != null) {
-          if (region.getCoprocessorHost().preScannerClose(s)) {
-            return; // bypass
-          }
-        }
-      }
+  public String toString() {
+    return getServerName().toString();
+  }
 
-      s = scanners.remove(scannerName);
-      if (s != null) {
-        s.close();
-        this.leases.cancelLease(scannerName);
+  /**
+   * Interval at which threads should run
+   *
+   * @return the interval
+   */
+  public int getThreadWakeFrequency() {
+    return threadWakeFrequency;
+  }
 
-        if (region != null && region.getCoprocessorHost() != null) {
-          region.getCoprocessorHost().postScannerClose(s);
-        }
-      }
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    }
+  @Override
+  public ZooKeeperWatcher getZooKeeper() {
+    return zooKeeper;
+  }
+
+  @Override
+  public ServerName getServerName() {
+    // Our servername could change after we talk to the master.
+    return this.serverNameFromMasterPOV == null?
+      new ServerName(this.isa.getHostName(), this.isa.getPort(), this.startcode):
+        this.serverNameFromMasterPOV;
+  }
+
+  @Override
+  public CompactionRequestor getCompactionRequester() {
+    return this.compactSplitThread;
+  }
+
+  public ZooKeeperWatcher getZooKeeperWatcher() {
+    return this.zooKeeper;
+  }
+
+
+  public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS() {
+    return this.regionsInTransitionInRS;
+  }
+
+  public ExecutorService getExecutorService() {
+    return service;
   }
 
   //
-  // Methods that do the actual work for the remote API
+  // Main program and support routines
   //
-  public void delete(final byte[] regionName, final Delete delete)
-      throws IOException {
-    checkOpen();
-    final long startTime = System.nanoTime();
-    try {
-      boolean writeToWAL = delete.getWriteToWAL();
-      this.requestCount.incrementAndGet();
-      HRegion region = getRegion(regionName);
-      if (!region.getRegionInfo().isMetaTable()) {
-        this.cacheFlusher.reclaimMemStoreMemory();
-      }
-      Integer lid = getLockFromId(delete.getLockId());
-      region.delete(delete, lid, writeToWAL);
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    } finally {
-      this.metrics.deleteLatencies.update(System.nanoTime() - startTime);
+
+  /**
+   * Load the replication service objects, if any
+   */
+  static private void createNewReplicationInstance(Configuration conf,
+    HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
+
+    // If replication is not enabled, then return immediately.
+    if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
+      return;
     }
-  }
 
-  public int delete(final byte[] regionName, final List<Delete> deletes)
-      throws IOException {
-    checkOpen();
-    // Count of Deletes processed.
-    int i = 0;
-    HRegion region = null;
-    try {
-      region = getRegion(regionName);
-      if (!region.getRegionInfo().isMetaTable()) {
-        this.cacheFlusher.reclaimMemStoreMemory();
-      }
-      int size = deletes.size();
-      Integer[] locks = new Integer[size];
-      for (Delete delete : deletes) {
-        final long startTime = System.nanoTime();
-        this.requestCount.incrementAndGet();
-        locks[i] = getLockFromId(delete.getLockId());
-        region.delete(delete, locks[i], delete.getWriteToWAL());
-        i++;
-        this.metrics.deleteLatencies.update(System.nanoTime() - startTime);
-      }
-    } catch (WrongRegionException ex) {
-      LOG.debug("Batch deletes: " + i, ex);
-      return i;
-    } catch (NotServingRegionException ex) {
-      return i;
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
+    // read in the name of the source replication class from the config file.
+    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
+                               HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+
+    // read in the name of the sink replication class from the config file.
+    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
+                             HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+
+    // If both the sink and the source class names are the same, then instantiate
+    // only one object.
+    if (sourceClassname.equals(sinkClassname)) {
+      server.replicationSourceHandler = (ReplicationSourceService)
+                                         newReplicationInstance(sourceClassname,
+                                         conf, server, fs, logDir, oldLogDir);
+      server.replicationSinkHandler = (ReplicationSinkService)
+                                         server.replicationSourceHandler;
+    }
+    else {
+      server.replicationSourceHandler = (ReplicationSourceService)
+                                         newReplicationInstance(sourceClassname,
+                                         conf, server, fs, logDir, oldLogDir);
+      server.replicationSinkHandler = (ReplicationSinkService)
+                                         newReplicationInstance(sinkClassname,
+                                         conf, server, fs, logDir, oldLogDir);
     }
-    return -1;
   }
 
-  public long lockRow(byte[] regionName, byte[] row) throws IOException {
-    checkOpen();
-    NullPointerException npe = null;
-    if (regionName == null) {
-      npe = new NullPointerException("regionName is null");
-    } else if (row == null) {
-      npe = new NullPointerException("row to lock is null");
-    }
-    if (npe != null) {
-      IOException io = new IOException("Invalid arguments to lockRow");
-      io.initCause(npe);
-      throw io;
-    }
-    requestCount.incrementAndGet();
+  static private ReplicationService newReplicationInstance(String classname,
+    Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
+    Path oldLogDir) throws IOException{
+
+    Class<?> clazz = null;
     try {
-      HRegion region = getRegion(regionName);
-      Integer r = region.obtainRowLock(row);
-      long lockId = addRowLock(r, region);
-      LOG.debug("Row lock " + lockId + " explicitly acquired by client");
-      return lockId;
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t, "Error obtaining row lock (fsOk: "
-          + this.fsOk + ")"));
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      clazz = Class.forName(classname, true, classLoader);
+    } catch (java.lang.ClassNotFoundException nfe) {
+      throw new IOException("Cound not find class for " + classname);
     }
+
+    // create an instance of the replication object.
+    ReplicationService service = (ReplicationService)
+                              ReflectionUtils.newInstance(clazz, conf);
+    service.initialize(server, fs, logDir, oldLogDir);
+    return service;
   }
 
-  @Override
-  @QosPriority(priority=HIGH_QOS)
-  public void unlockRow(byte[] regionName, long lockId) throws IOException {
-    checkOpen();
-    NullPointerException npe = null;
-    if (regionName == null) {
-      npe = new NullPointerException("regionName is null");
-    } else if (lockId == -1L) {
-      npe = new NullPointerException("lockId is null");
-    }
-    if (npe != null) {
-      IOException io = new IOException("Invalid arguments to unlockRow");
-      io.initCause(npe);
-      throw io;
-    }
-    requestCount.incrementAndGet();
+  /**
+   * @param hrs
+   * @return Thread the RegionServer is running in correctly named.
+   * @throws IOException
+   */
+  public static Thread startRegionServer(final HRegionServer hrs)
+      throws IOException {
+    return startRegionServer(hrs, "regionserver" + hrs.isa.getPort());
+  }
+
+  /**
+   * @param hrs
+   * @param name
+   * @return Thread the RegionServer is running in correctly named.
+   * @throws IOException
+   */
+  public static Thread startRegionServer(final HRegionServer hrs,
+      final String name) throws IOException {
+    Thread t = new Thread(hrs);
+    t.setName(name);
+    t.start();
+    // Install shutdown hook that will catch signals and run an orderly shutdown
+    // of the hrs.
+    ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs
+        .getConfiguration()), hrs, t);
+    return t;
+  }
+
+  /**
+   * Utility for constructing an instance of the passed HRegionServer class.
+   *
+   * @param regionServerClass
+   * @param conf2
+   * @return HRegionServer instance.
+   */
+  public static HRegionServer constructRegionServer(
+      Class<? extends HRegionServer> regionServerClass,
+      final Configuration conf2) {
     try {
-      HRegion region = getRegion(regionName);
-      String lockName = String.valueOf(lockId);
-      Integer r = rowlocks.remove(lockName);
-      if (r == null) {
-        throw new UnknownRowLockException(lockName);
-      }
-      region.releaseRowLock(r);
-      this.leases.cancelLease(lockName);
-      LOG.debug("Row lock " + lockId
-          + " has been explicitly released by client");
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
+      Constructor<? extends HRegionServer> c = regionServerClass
+          .getConstructor(Configuration.class);
+      return c.newInstance(conf2);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed construction of " + "Regionserver: "
+          + regionServerClass.toString(), e);
     }
   }
 
   /**
-   * Atomically bulk load several HFiles into an open region
-   * @return true if successful, false is failed but recoverably (no action)
-   * @throws IOException if failed unrecoverably
+   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
    */
-  @Override
-  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
-      byte[] regionName) throws IOException {
-    checkOpen();
-    HRegion region = getRegion(regionName);
-    return region.bulkLoadHFiles(familyPaths);
+  public static void main(String[] args) throws Exception {
+	VersionInfo.logVersion();
+    Configuration conf = HBaseConfiguration.create();
+    @SuppressWarnings("unchecked")
+    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
+        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
+
+    new HRegionServerCommandLine(regionServerClass).doMain(args);
   }
 
-  // Region open/close direct RPCs
+  /**
+   * Gets the online regions of the specified table.
+   * This method looks at the in-memory onlineRegions.  It does not go to <code>.META.</code>.
+   * Only returns <em>online</em> regions.  If a region on this table has been
+   * closed during a disable, etc., it will not be included in the returned list.
+   * So, the returned list may not necessarily be ALL regions in this table, its
+   * all the ONLINE regions in the table.
+   * @param tableName
+   * @return Online regions from <code>tableName</code>
+   */
+   public List<HRegion> getOnlineRegions(byte[] tableName) {
+     List<HRegion> tableRegions = new ArrayList<HRegion>();
+     synchronized (this.onlineRegions) {
+       for (HRegion region: this.onlineRegions.values()) {
+         HRegionInfo regionInfo = region.getRegionInfo();
+         if(Bytes.equals(regionInfo.getTableName(), tableName)) {
+           tableRegions.add(region);
+         }
+       }
+     }
+     return tableRegions;
+   }
 
-  @Override
-  @QosPriority(priority=HIGH_QOS)
-  public RegionOpeningState openRegion(HRegionInfo region)
-  throws IOException {
-    return openRegion(region, -1);
-  }
-  @Override
-  @QosPriority(priority = HIGH_QOS)
-  public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode)
-      throws IOException {
-    checkOpen();
-    checkIfRegionInTransition(region, OPEN);
-    HRegion onlineRegion = this.getFromOnlineRegions(region.getEncodedName());
-    if (null != onlineRegion) {
-      // See HBASE-5094. Cross check with META if still this RS is owning the
-      // region.
-      Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(
-          this.catalogTracker, region.getRegionName());
-      if (this.getServerName().equals(p.getSecond())) {
-        LOG.warn("Attempted open of " + region.getEncodedName()
-            + " but already online on this server");
-        return RegionOpeningState.ALREADY_OPENED;
-      } else {
-        LOG.warn("The region " + region.getEncodedName()
-            + " is online on this server but META does not have this server.");
-        this.removeFromOnlineRegions(region.getEncodedName());
-      }
-    }
-    LOG.info("Received request to open region: " +
-      region.getRegionNameAsString());
-    HTableDescriptor htd = this.tableDescriptors.get(region.getTableName());
-    this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(),
-        true);
-    // Need to pass the expected version in the constructor.
-    if (region.isRootRegion()) {
-      this.service.submit(new OpenRootHandler(this, this, region, htd,
-          versionOfOfflineNode));
-    } else if (region.isMetaRegion()) {
-      this.service.submit(new OpenMetaHandler(this, this, region, htd,
-          versionOfOfflineNode));
-    } else {
-      this.service.submit(new OpenRegionHandler(this, this, region, htd,
-          versionOfOfflineNode));
-    }
-    return RegionOpeningState.OPENED;
+  // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
+  public String[] getCoprocessors() {
+    HBaseProtos.ServerLoad sl = buildServerLoad();
+    return sl == null? null:
+      ServerLoad.getRegionServerCoprocessors(new ServerLoad(sl));
   }
 
-  @Override
-  @QosPriority(priority=HIGH_QOS)
-  public void openRegions(List<HRegionInfo> regions)
-  throws IOException {
-    checkOpen();
-    LOG.info("Received request to open " + regions.size() + " region(s)");
-    for (HRegionInfo region: regions) openRegion(region);
+  /**
+   * Register bean with platform management server
+   */
+  void registerMBean() {
+    MXBeanImpl mxBeanInfo = MXBeanImpl.init(this);
+    mxBean = MBeanUtil.registerMBean("RegionServer", "RegionServer",
+        mxBeanInfo);
+    LOG.info("Registered RegionServer MXBean");
   }
 
-  @Override
-  @QosPriority(priority=HIGH_QOS)
-  public boolean closeRegion(HRegionInfo region)
-  throws IOException {
-    return closeRegion(region, true, -1);
+  /**
+   * Instantiated as a row lock lease. If the lease times out, the row lock is
+   * released
+   */
+  private class RowLockListener implements LeaseListener {
+    private final String lockName;
+    private final HRegion region;
+
+    RowLockListener(final String lockName, final HRegion region) {
+      this.lockName = lockName;
+      this.region = region;
+    }
+
+    public void leaseExpired() {
+      LOG.info("Row Lock " + this.lockName + " lease expired");
+      Integer r = rowlocks.remove(this.lockName);
+      if (r != null) {
+        region.releaseRowLock(r);
+      }
+    }
   }
 
-  @Override
-  @QosPriority(priority=HIGH_QOS)
-  public boolean closeRegion(final HRegionInfo region,
-    final int versionOfClosingNode)
-  throws IOException {
-    return closeRegion(region, true, versionOfClosingNode);
+  /**
+   * Instantiated as a scanner lease. If the lease times out, the scanner is
+   * closed
+   */
+  private class ScannerListener implements LeaseListener {
+    private final String scannerName;
+
+    ScannerListener(final String n) {
+      this.scannerName = n;
+    }
+
+    public void leaseExpired() {
+      RegionScanner s = scanners.remove(this.scannerName);
+      if (s != null) {
+        LOG.info("Scanner " + this.scannerName + " lease expired on region "
+            + s.getRegionInfo().getRegionNameAsString());
+        try {
+          HRegion region = getRegion(s.getRegionInfo().getRegionName());
+          if (region != null && region.getCoprocessorHost() != null) {
+            region.getCoprocessorHost().preScannerClose(s);
+          }
+
+          s.close();
+          if (region != null && region.getCoprocessorHost() != null) {
+            region.getCoprocessorHost().postScannerClose(s);
+          }
+        } catch (IOException e) {
+          LOG.error("Closing scanner for "
+              + s.getRegionInfo().getRegionNameAsString(), e);
+        }
+      } else {
+        LOG.info("Scanner " + this.scannerName + " lease expired");
+      }
+    }
   }
 
-  @Override
-  @QosPriority(priority=HIGH_QOS)
-  public boolean closeRegion(HRegionInfo region, final boolean zk)
-  throws IOException {
-    return closeRegion(region, zk, -1);
+  /**
+   * Method to get the Integer lock identifier used internally from the long
+   * lock identifier used by the client.
+   *
+   * @param lockId
+   *          long row lock identifier from client
+   * @return intId Integer row lock used internally in HRegion
+   * @throws IOException
+   *           Thrown if this is not a valid client lock id.
+   */
+  Integer getLockFromId(long lockId) throws IOException {
+    if (lockId == -1L) {
+      return null;
+    }
+    String lockName = String.valueOf(lockId);
+    Integer rl = rowlocks.get(lockName);
+    if (rl == null) {
+      throw new UnknownRowLockException("Invalid row lock");
+    }
+    this.leases.renewLease(lockName);
+    return rl;
   }
 
-  @QosPriority(priority=HIGH_QOS)
-  protected boolean closeRegion(HRegionInfo region, final boolean zk,
-    final int versionOfClosingNode)
-  throws IOException {
-    checkOpen();
-    LOG.info("Received close region: " + region.getRegionNameAsString() +
-      ". Version of ZK closing node:" + versionOfClosingNode);
-    boolean hasit = this.onlineRegions.containsKey(region.getEncodedName());
-    if (!hasit) {
-      LOG.warn("Received close for region we are not serving; " +
-        region.getEncodedName());
-      throw new NotServingRegionException("Received close for "
-        + region.getRegionNameAsString() + " but we are not serving it");
+  /**
+   * Called to verify that this server is up and running.
+   *
+   * @throws IOException
+   */
+  protected void checkOpen() throws IOException {
+    if (this.stopped || this.abortRequested) {
+      throw new RegionServerStoppedException("Server " + getServerName() +
+        " not running" + (this.abortRequested ? ", aborting" : ""));
+    }
+    if (!fsOk) {
+      throw new RegionServerStoppedException("File system not available");
     }
-    checkIfRegionInTransition(region, CLOSE);
-    return closeRegion(region, false, zk, versionOfClosingNode);
   }
 
-  @Override
-  @QosPriority(priority=HIGH_QOS)
-  public boolean closeRegion(byte[] encodedRegionName, boolean zk)
-    throws IOException {
-    return closeRegion(encodedRegionName, false, zk);
+  protected void checkIfRegionInTransition(HRegionInfo region,
+      String currentAction) throws RegionAlreadyInTransitionException {
+    byte[] encodedName = region.getEncodedNameAsBytes();
+    if (this.regionsInTransitionInRS.containsKey(encodedName)) {
+      boolean openAction = this.regionsInTransitionInRS.get(encodedName);
+      // The below exception message will be used in master.
+      throw new RegionAlreadyInTransitionException("Received:" + currentAction +
+        " for the region:" + region.getRegionNameAsString() +
+        " ,which we are already trying to " +
+        (openAction ? OPEN : CLOSE)+ ".");
+    }
   }
 
   /**
-   * @param encodedRegionName
-   *          encodedregionName to close
-   * @param abort
-   *          True if we are aborting
-   * @param zk
-   *          True if we are to update zk about the region close; if the close
-   *          was orchestrated by master, then update zk. If the close is being
-   *          run by the regionserver because its going down, don't update zk.
+   * @param region Region to close
+   * @param abort True if we are aborting
+   * @param zk True if we are to update zk about the region close; if the close
+   * was orchestrated by master, then update zk.  If the close is being run by
+   * the regionserver because its going down, don't update zk.
    * @return True if closed a region.
    */
-  protected boolean closeRegion(byte[] encodedRegionName, final boolean abort,
-      final boolean zk) throws IOException {
-    String encodedRegionNameStr = Bytes.toString(encodedRegionName);
-    HRegion region = this.getFromOnlineRegions(encodedRegionNameStr);
-    if (null != region) {
-      return closeRegion(region.getRegionInfo(), abort, zk);
-    }
-    LOG.error("The specified region name" + encodedRegionNameStr
-        + " does not exist to close the region.");
-    return false;
+  protected boolean closeRegion(HRegionInfo region, final boolean abort,
+      final boolean zk) {
+    return closeRegion(region, abort, zk, -1);
   }
 
-  // Manual remote region administration RPCs
-
-  @Override
-  @QosPriority(priority=HIGH_QOS)
-  public void flushRegion(HRegionInfo regionInfo)
-      throws NotServingRegionException, IOException {
-    checkOpen();
-    LOG.info("Flushing " + regionInfo.getRegionNameAsString());
-    HRegion region = getRegion(regionInfo.getRegionName());
-    region.flushcache();
+    /**
+   * @param region Region to close
+   * @param abort True if we are aborting
+   * @param zk True if we are to update zk about the region close; if the close
+   * was orchestrated by master, then update zk.  If the close is being run by
+   * the regionserver because its going down, don't update zk.
+   * @param versionOfClosingNode
+   *   the version of znode to compare when RS transitions the znode from
+   *   CLOSING state.
+   * @return True if closed a region.
+   */
+  protected boolean closeRegion(HRegionInfo region, final boolean abort,
+      final boolean zk, final int versionOfClosingNode) {
+    if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) {
+      LOG.warn("Received close for region we are already opening or closing; " +
+        region.getEncodedName());
+      return false;
+    }
+    this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), false);
+    CloseRegionHandler crh = null;
+    if (region.isRootRegion()) {
+      crh = new CloseRootHandler(this, this, region, abort, zk,
+        versionOfClosingNode);
+    } else if (region.isMetaRegion()) {
+      crh = new CloseMetaHandler(this, this, region, abort, zk,
+        versionOfClosingNode);
+    } else {
+      crh = new CloseRegionHandler(this, this, region, abort, zk,
+        versionOfClosingNode);
+    }
+    this.service.submit(crh);
+    return true;
   }
 
-  @Override
-  @QosPriority(priority=HIGH_QOS)
-  public void splitRegion(HRegionInfo regionInfo)
-      throws NotServingRegionException, IOException {
-    splitRegion(regionInfo, null);
+   /**
+   * @param regionName
+   * @return HRegion for the passed binary <code>regionName</code> or null if
+   *         named region is not member of the online regions.
+   */
+  public HRegion getOnlineRegion(final byte[] regionName) {
+    String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
+    return this.onlineRegions.get(encodedRegionName);
   }
 
   @Override
-  public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
-      throws NotServingRegionException, IOException {
-    checkOpen();
-    HRegion region = getRegion(regionInfo.getRegionName());
-    region.flushcache();
-    region.forceSplit(splitPoint);
-    compactSplitThread.requestSplit(region, region.checkSplit());
+  public HRegion getFromOnlineRegions(final String encodedRegionName) {
+    return this.onlineRegions.get(encodedRegionName);
   }
 
-  @Override
-  @QosPriority(priority=HIGH_QOS)
-  public void compactRegion(HRegionInfo regionInfo, boolean major)
-      throws NotServingRegionException, IOException {
-    checkOpen();
-    HRegion region = getRegion(regionInfo.getRegionName());
-    if (major) {
-      region.triggerMajorCompaction();
-    }
-    compactSplitThread.requestCompaction(region, "User-triggered "
-        + (major ? "major " : "") + "compaction",
-        CompactSplitThread.PRIORITY_USER);
+  /**
+   * Protected utility method for safely obtaining an HRegion handle.
+   *
+   * @param regionName
+   *          Name of online {@link HRegion} to return
+   * @return {@link HRegion} for <code>regionName</code>
+   * @throws NotServingRegionException
+   */
+  protected HRegion getRegion(final byte[] regionName)
+      throws NotServingRegionException {
+    HRegion region = null;
+    region = getOnlineRegion(regionName);
+    if (region == null) {
+      throw new NotServingRegionException("Region is not online: " +
+        Bytes.toStringBinary(regionName));
+    }
+    return region;
   }
 
-  /** @return the info server */
-  public InfoServer getInfoServer() {
-    return infoServer;
+  /*
+   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
+   * IOE if it isn't already.
+   *
+   * @param t Throwable
+   *
+   * @return Throwable converted to an IOE; methods can only let out IOEs.
+   */
+  protected Throwable cleanup(final Throwable t) {
+    return cleanup(t, null);
   }
 
-  /**
-   * @return true if a stop has been requested.
+  /*
+   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
+   * IOE if it isn't already.
+   *
+   * @param t Throwable
+   *
+   * @param msg Message to log in error. Can be null.
+   *
+   * @return Throwable converted to an IOE; methods can only let out IOEs.
    */
-  public boolean isStopped() {
-    return this.stopped;
+  protected Throwable cleanup(final Throwable t, final String msg) {
+    // Don't log as error if NSRE; NSRE is 'normal' operation.
+    if (t instanceof NotServingRegionException) {
+      LOG.debug("NotServingRegionException; " +  t.getMessage());
+      return t;
+    }
+    if (msg == null) {
+      LOG.error("", RemoteExceptionHandler.checkThrowable(t));
+    } else {
+      LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
+    }
+    if (!checkOOME(t)) {
+      checkFileSystem();
+    }
+    return t;
   }
 
-  @Override
-  public boolean isStopping() {
-    return this.stopping;
+  /*
+   * @param t
+   *
+   * @return Make <code>t</code> an IOE if it isn't already.
+   */
+  protected IOException convertThrowableToIOE(final Throwable t) {
+    return convertThrowableToIOE(t, null);
   }
 
-  /**
+  /*
+   * @param t
    *
-   * @return the configuration
+   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
+   *
+   * @return Make <code>t</code> an IOE if it isn't already.
    */
-  public Configuration getConfiguration() {
-    return conf;
+  protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
+    return (t instanceof IOException ? (IOException) t : msg == null
+        || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
   }
 
-  /** @return the write lock for the server */
-  ReentrantReadWriteLock.WriteLock getWriteLock() {
-    return lock.writeLock();
+  /*
+   * Check if an OOME and, if so, abort immediately to avoid creating more objects.
+   *
+   * @param e
+   *
+   * @return True if we OOME'd and are aborting.
+   */
+  public boolean checkOOME(final Throwable e) {
+    boolean stop = false;
+    try {
+      if (e instanceof OutOfMemoryError
+          || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
+          || (e.getMessage() != null && e.getMessage().contains(
+              "java.lang.OutOfMemoryError"))) {
+        stop = true;
+        LOG.fatal(
+          "Run out of memory; HRegionServer will abort itself immediately", e);
+      }
+    } finally {
+      if (stop) {
+        Runtime.getRuntime().halt(1);
+      }
+    }
+    return stop;
   }
 
-  @Override
-  @QosPriority(priority=HIGH_QOS)
-  public List<HRegionInfo> getOnlineRegions() throws IOException {
-    checkOpen();
-    List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
-    for (Map.Entry<String,HRegion> e: this.onlineRegions.entrySet()) {
-      list.add(e.getValue().getRegionInfo());
+  /**
+   * Checks to see if the file system is still accessible. If not, sets
+   * abortRequested and stopRequested
+   *
+   * @return false if file system is not available
+   */
+  public boolean checkFileSystem() {
+    if (this.fsOk && this.fs != null) {
+      try {
+        FSUtils.checkFileSystemAvailable(this.fs);
+      } catch (IOException e) {
+        abort("File System not available", e);
+        this.fsOk = false;
+      }
     }
-    Collections.sort(list);
-    return list;
+    return this.fsOk;
   }
 
-  public int getNumberOfOnlineRegions() {
-    return this.onlineRegions.size();
+  protected long addRowLock(Integer r, HRegion region)
+      throws LeaseStillHeldException {
+    long lockId = nextLong();
+    String lockName = String.valueOf(lockId);
+    rowlocks.put(lockName, r);
+    this.leases.createLease(lockName, new RowLockListener(lockName, region));
+    return lockId;
   }
 
-  boolean isOnlineRegionsEmpty() {
-    return this.onlineRegions.isEmpty();
+  protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
+    long scannerId = nextLong();
+    String scannerName = String.valueOf(scannerId);
+    scanners.put(scannerName, s);
+    this.leases.createLease(scannerName, new ScannerListener(scannerName));
+    return scannerId;
   }
 
   /**
-   * @param encodedRegionName
-   * @return JSON Map of labels to values for passed in <code>encodedRegionName</code>
-   * @throws IOException
+   * Generate a random positive long number
+   *
+   * @return a random positive long number
    */
-  public byte [] getRegionStats(final String encodedRegionName)
-  throws IOException {
-    HRegion r = null;
-    synchronized (this.onlineRegions) {
-      r = this.onlineRegions.get(encodedRegionName);
+  protected long nextLong() {
+    long n = rand.nextLong();
+    if (n == 0) {
+      return nextLong();
     }
-    if (r == null) return null;
-    ObjectMapper mapper = new ObjectMapper();
-    int stores = 0;
-    int storefiles = 0;
-    int storefileSizeMB = 0;
-    int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
-    int storefileIndexSizeMB = 0;
-    long totalCompactingKVs = 0;
-    long currentCompactedKVs = 0;
-    synchronized (r.stores) {
-      stores += r.stores.size();
-      for (Store store : r.stores.values()) {
-        storefiles += store.getStorefilesCount();
-        storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
-        storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
-      }
+    if (n < 0) {
+      n = -n;
     }
-    Map<String, Integer> map = new TreeMap<String, Integer>();
-    map.put("stores", stores);
-    map.put("storefiles", storefiles);
-    map.put("storefileSizeMB", storefileIndexSizeMB);
-    map.put("memstoreSizeMB", memstoreSizeMB);
-    StringWriter w = new StringWriter();
-    mapper.writeValue(w, map);
-    w.close();
-    return Bytes.toBytes(w.toString());
+    return n;
   }
 
+  // Start Client methods
+
   /**
-   * For tests and web ui.
-   * This method will only work if HRegionServer is in the same JVM as client;
-   * HRegion cannot be serialized to cross an rpc.
-   * @see #getOnlineRegions()
+   * Get data from a table.
+   *
+   * @param controller the RPC controller
+   * @param request the get request
+   * @throws ServiceException
    */
-  public Collection<HRegion> getOnlineRegionsLocalContext() {
-    Collection<HRegion> regions = this.onlineRegions.values();
-    return Collections.unmodifiableCollection(regions);
-  }
-
   @Override
-  public void addToOnlineRegions(HRegion region) {
-    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
+  public GetResponse get(final RpcController controller,
+      final GetRequest request) throws ServiceException {
+    try {
+      requestCount.incrementAndGet();
+      HRegion region = getRegion(request.getRegion());
+      GetResponse.Builder builder = GetResponse.newBuilder();
+      ClientProtos.Get get = request.getGet();
+      Boolean existence = null;
+      Result r = null;
+      if (request.getClosestRowBefore()) {
+        if (get.getColumnCount() != 1) {
+          throw new DoNotRetryIOException(
+            "get ClosestRowBefore supports one and only one family now, not "
+              + get.getColumnCount() + " families");
+        }
+        byte[] row = get.getRow().toByteArray();
+        byte[] family = get.getColumn(0).getFamily().toByteArray();
+        r = region.getClosestRowBefore(row, family);
+      } else {
+        Get clientGet = ProtobufUtil.toGet(get);
+        if (request.getExistenceOnly() && region.getCoprocessorHost() != null) {
+          existence = region.getCoprocessorHost().preExists(clientGet);
+        }
+        if (existence == null) {
+          Integer lock = getLockFromId(clientGet.getLockId());
+          r = region.get(clientGet, lock);
+          if (request.getExistenceOnly()) {
+            boolean exists = r != null && !r.isEmpty();
+            if (region.getCoprocessorHost() != null) {
+              exists = region.getCoprocessorHost().postExists(clientGet, exists);
+            }
+            existence = Boolean.valueOf(exists);
+          }
+        }
+      }
+      if (existence != null) {
+        builder.setExists(existence.booleanValue());
+      } else if (r != null) {
+        builder.setResult(ProtobufUtil.toResult(r));
+      }
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
   }
 
+  /**
+   * Mutate data in a table.
+   *
+   * @param controller the RPC controller
+   * @param request the mutate request
+   * @throws ServiceException
+   */
   @Override
-  public boolean removeFromOnlineRegions(final String encodedName) {
-    HRegion toReturn = null;
-    toReturn = this.onlineRegions.remove(encodedName);
-    
-    //Clear all of the dynamic metrics as they are now probably useless.
-    //This is a clear because dynamic metrics could include metrics per cf and
-    //per hfile.  Figuring out which cfs, hfiles, and regions are still relevant to
-    //this region server would be an onerous task.  Instead just clear everything
-    //and on the next tick of the metrics everything that is still relevant will be
-    //re-added.
-    this.dynamicMetrics.clear();
-    return toReturn != null;
+  public MutateResponse mutate(final RpcController controller,
+      final MutateRequest request) throws ServiceException {
+    try {
+      requestCount.incrementAndGet();
+      HRegion region = getRegion(request.getRegion());
+      MutateResponse.Builder builder = MutateResponse.newBuilder();
+      Mutate mutate = request.getMutate();
+      if (!region.getRegionInfo().isMetaTable()) {
+        cacheFlusher.reclaimMemStoreMemory();
+      }
+      Integer lock = null;
+      Result r = null;
+      Boolean processed = null;
+      MutateType type = mutate.getMutateType();
+      switch (type) {
+      case APPEND:
+        r = append(region, mutate);
+        break;
+      case INCREMENT:
+        r = increment(region, mutate);
+        break;
+      case PUT:
+        Put put = ProtobufUtil.toPut(mutate);
+        lock = getLockFromId(put.getLockId());
+        if (request.hasCondition()) {
+          Condition condition = request.getCondition();
+          byte[] row = condition.getRow().toByteArray();
+          byte[] family = condition.getFamily().toByteArray();
+          byte[] qualifier = condition.getQualifier().toByteArray();
+          CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
+          WritableByteArrayComparable comparator =
+            (WritableByteArrayComparable)ProtobufUtil.toObject(condition.getComparator());
+          if (region.getCoprocessorHost() != null) {
+            processed = region.getCoprocessorHost().preCheckAndPut(
+              row, family, qualifier, compareOp, comparator, put);
+          }
+          if (processed == null) {
+            boolean result = region.checkAndMutate(row, family,
+              qualifier, compareOp, comparator, put, lock, true);
+            if (region.getCoprocessorHost() != null) {
+              result = region.getCoprocessorHost().postCheckAndPut(row, family,
+                qualifier, compareOp, comparator, put, result);
+            }
+            processed = Boolean.valueOf(result);
+          }
+        } else {
+          region.put(put, lock);
+          processed = Boolean.TRUE;
+        }
+        break;
+      case DELETE:
+        Delete delete = ProtobufUtil.toDelete(mutate);
+        lock = getLockFromId(delete.getLockId());
+        if (request.hasCondition()) {
+          Condition condition = request.getCondition();
+          byte[] row = condition.getRow().toByteArray();
+          byte[] family = condition.getFamily().toByteArray();
+          byte[] qualifier = condition.getQualifier().toByteArray();
+          CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
+          WritableByteArrayComparable comparator =
+            (WritableByteArrayComparable)ProtobufUtil.toObject(condition.getComparator());
+          if (region.getCoprocessorHost() != null) {
+            processed = region.getCoprocessorHost().preCheckAndDelete(
+              row, family, qualifier, compareOp, comparator, delete);
+          }
+          if (processed == null) {
+            boolean result = region.checkAndMutate(row, family,
+              qualifier, compareOp, comparator, delete, lock, true);
+            if (region.getCoprocessorHost() != null) {
+              result = region.getCoprocessorHost().postCheckAndDelete(row, family,
+                qualifier, compareOp, comparator, delete, result);
+            }
+            processed = Boolean.valueOf(result);
+          }
+        } else {
+          region.delete(delete, lock, delete.getWriteToWAL());
+          processed = Boolean.TRUE;
+        }
+        break;
+        default:
+          throw new DoNotRetryIOException(
+            "Unsupported mutate type: " + type.name());
+      }
+      if (processed != null) {
+        builder.setProcessed(processed.booleanValue());
+      } else if (r != null) {
+        builder.setResult(ProtobufUtil.toResult(r));
+      }
+      return builder.build();
+    } catch (IOException ie) {
+      checkFileSystem();
+      throw new ServiceException(ie);
+    }
   }
 
+  //
+  // remote scanner interface
+  //
+
   /**
-   * @return A new Map of online regions sorted by region size with the first
-   *         entry being the biggest.
+   * Scan data in a table.
+   *
+   * @param controller the RPC controller
+   * @param request the scan request
+   * @throws ServiceException
    */
-  public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
-    // we'll sort the regions in reverse
-    SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
-        new Comparator<Long>() {
-          public int compare(Long a, Long b) {
-            return -1 * a.compareTo(b);
+  @Override
+  public ScanResponse scan(final RpcController controller,
+      final ScanRequest request) throws ServiceException {
+    Leases.Lease lease = null;
+    String scannerName = null;
+    try {
+      if (!request.hasScannerId() && !request.hasScan()) {
+        throw new DoNotRetryIOException(
+          "Missing required input: scannerId or scan");
+      }
+      long scannerId = -1;
+      if (request.hasScannerId()) {
+        scannerId = request.getScannerId();
+        scannerName = String.valueOf(scannerId);
+      }
+      try {
+        checkOpen();
+      } catch (IOException e) {
+        // If checkOpen failed, server not running or filesystem gone,
+        // cancel this lease; filesystem is gone or we're closing or something.
+        if (scannerName != null) {
+          try {
+            leases.cancelLease(scannerName);
+          } catch (LeaseException le) {
+            LOG.info("Server shutting down and client tried to access missing scanner " +
+              scannerName);
           }
-        });
-    // Copy over all regions. Regions are sorted by size with biggest first.
-    for (HRegion region : this.onlineRegions.values()) {
-      sortedRegions.put(Long.valueOf(region.memstoreSize.get()), region);
-    }
-    return sortedRegions;
-  }
+        }
+        throw e;
+      }
+      requestCount.incrementAndGet();
+
+      try {
+        int ttl = 0;
+        HRegion region = null;
+        RegionScanner scanner = null;
+        boolean moreResults = true;
+        boolean closeScanner = false;
+        ScanResponse.Builder builder = ScanResponse.newBuilder();
+        if (request.hasCloseScanner()) {
+          closeScanner = request.getCloseScanner();
+        }
+        int rows = 1;
+        if (request.hasNumberOfRows()) {
+          rows = request.getNumberOfRows();
+        }
+        if (request.hasScannerId()) {
+          scanner = scanners.get(scannerName);
+          if (scanner == null) {
+            throw new UnknownScannerException(
+              "Name: " + scannerName + ", already closed?");
+          }
+          region = getRegion(scanner.getRegionInfo().getRegionName());
+        } else {
+          region = getRegion(request.getRegion());
+          ClientProtos.Scan protoScan = request.getScan();
+          Scan scan = ProtobufUtil.toScan(protoScan);
+          region.prepareScanner(scan);
+          if (region.getCoprocessorHost() != null) {
+            scanner = region.getCoprocessorHost().preScannerOpen(scan);
+          }
+          if (scanner == null) {
+            scanner = region.getScanner(scan);
+          }
+          if (region.getCoprocessorHost() != null) {
+            scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
+          }
+          scannerId = addScanner(scanner);
+          scannerName = String.valueOf(scannerId);
+          ttl = leases.leasePeriod;
+        }
 
-  /** @return the request count */
-  public AtomicInteger getRequestCount() {
-    return this.requestCount;
-  }
+        if (rows > 0) {
+          try {
+            // Remove lease while its being processed in server; protects against case
+            // where processing of request takes > lease expiration time.
+            lease = leases.removeLease(scannerName);
+            List<Result> results = new ArrayList<Result>(rows);
+            long currentScanResultSize = 0;
+
+            boolean done = false;
+            // Call coprocessor. Get region info from scanner.
+            if (region != null && region.getCoprocessorHost() != null) {
+              Boolean bypass = region.getCoprocessorHost().preScannerNext(
+                scanner, results, rows);
+              if (!results.isEmpty()) {
+                for (Result r : results) {
+                  for (KeyValue kv : r.raw()) {
+                    currentScanResultSize += kv.heapSize();
+                  }
+                }
+              }
+              if (bypass != null && bypass.booleanValue()) {
+                done = true;
+              }
+            }
 
-  /**
-   * @return time stamp in millis of when this region server was started
-   */
-  public long getStartcode() {
-    return this.startcode;
-  }
+            if (!done) {
+              long maxResultSize = scanner.getMaxResultSize();
+              if (maxResultSize <= 0) {
+                maxResultSize = maxScannerResultSize;
+              }
+              List<KeyValue> values = new ArrayList<KeyValue>();
+              for (int i = 0; i < rows
+                  && currentScanResultSize < maxResultSize; i++) {
+                // Collect values to be returned here
+                boolean moreRows = scanner.next(values, SchemaMetrics.METRIC_NEXTSIZE);
+                if (!values.isEmpty()) {
+                  for (KeyValue kv : values) {
+                    currentScanResultSize += kv.heapSize();
+                  }
+                  results.add(new Result(values));
+                }
+                if (!moreRows) {
+                  break;
+                }
+                values.clear();
+              }
 
-  /** @return reference to FlushRequester */
-  public FlushRequester getFlushRequester() {
-    return this.cacheFlusher;
+              // coprocessor postNext hook
+              if (region != null && region.getCoprocessorHost() != null) {
+                region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
+              }
+            }
+
+            // If the scanner's filter - if any - is done with the scan
+            // and wants to tell the client to stop the scan. This is done by passing
+            // a null result, and setting moreResults to false.
+            if (scanner.isFilterDone() && results.isEmpty()) {
+              moreResults = false;
+              results = null;
+            } else {
+              for (Result result: results) {
+                if (result != null) {
+                  builder.addResult(ProtobufUtil.toResult(result));
+                }
+              }
+            }
+          } finally {
+            // We're done. On way out re-add the above removed lease.
+            // Adding resets expiration time on lease.
+            if (scanners.containsKey(scannerName)) {
+              if (lease != null) leases.addLease(lease);
+              ttl = leases.leasePeriod;
+            }
+          }
+        }
+
+        if (!moreResults || closeScanner) {
+          ttl = 0;
+          moreResults = false;
+          if (region != null && region.getCoprocessorHost() != null) {
+            if (region.getCoprocessorHost().preScannerClose(scanner)) {
+              return builder.build(); // bypass
+            }
+          }
+          scanner = scanners.remove(scannerName);
+          if (scanner != null) {
+            scanner.close();
+            leases.cancelLease(scannerName);
+            if (region != null && region.getCoprocessorHost() != null) {
+              region.getCoprocessorHost().postScannerClose(scanner);
+            }
+          }
+        }
+
+        if (ttl > 0) {
+          builder.setTtl(ttl);
+        }
+        builder.setScannerId(scannerId);
+        builder.setMoreResults(moreResults);
+        return builder.build();
+      } catch (Throwable t) {
+        if (scannerName != null &&
+            t instanceof NotServingRegionException) {
+          scanners.remove(scannerName);
+        }
+        throw convertThrowableToIOE(cleanup(t));
+      }
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
   }
 
   /**
-   * Get the top N most loaded regions this server is serving so we can tell the
-   * master which regions it can reallocate if we're overloaded. TODO: actually
-   * calculate which regions are most loaded. (Right now, we're just grabbing
-   * the first N regions being served regardless of load.)
+   * Lock a row in a table.
+   *
+   * @param controller the RPC controller
+   * @param request the lock row request
+   * @throws ServiceException
    */
-  protected HRegionInfo[] getMostLoadedRegions() {
-    ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
-    for (HRegion r : onlineRegions.values()) {
-      if (!r.isAvailable()) {
-        continue;
+  @Override
+  public LockRowResponse lockRow(final RpcController controller,
+      final LockRowRequest request) throws ServiceException {
+    try {
+      if (request.getRowCount() != 1) {
+        throw new DoNotRetryIOException(
+          "lockRow supports only one row now, not " + request.getRowCount() + " rows");
       }
-      if (regions.size() < numRegionsToReport) {
-        regions.add(r.getRegionInfo());
-      } else {
-        break;
+      requestCount.incrementAndGet();
+      HRegion region = getRegion(request.getRegion());
+      byte[] row = request.getRow(0).toByteArray();
+      try {
+        Integer r = region.obtainRowLock(row);
+        long lockId = addRowLock(r, region);
+        LOG.debug("Row lock " + lockId + " explicitly acquired by client");
+        LockRowResponse.Builder builder = LockRowResponse.newBuilder();
+        builder.setLockId(lockId);
+        return builder.build();
+      } catch (Throwable t) {
+        throw convertThrowableToIOE(cleanup(t,
+          "Error obtaining row lock (fsOk: " + this.fsOk + ")"));
       }
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
     }
-    return regions.toArray(new HRegionInfo[regions.size()]);
   }
 
+  /**
+   * Unlock a locked row in a table.
+   *
+   * @param controller the RPC controller
+   * @param request the unlock row request
+   * @throws ServiceException
+   */
   @Override
   @QosPriority(priority=HIGH_QOS)
-  public ProtocolSignature getProtocolSignature(
-      String protocol, long version, int clientMethodsHashCode)
-  throws IOException {
-    if (protocol.equals(HRegionInterface.class.getName())) {
-      return new ProtocolSignature(HRegionInterface.VERSION, null);
-    } else if (protocol.equals(ClientProtocol.class.getName())) {

[... 1144 lines stripped ...]