You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2014/03/25 20:34:55 UTC

svn commit: r1581479 [6/9] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-it/src/test/java/org/apache/hadoop/hbase/ hbase-it/src/test/java/org/apache/hadoo...

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java?rev=1581479&r1=1581478&r2=1581479&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerCommandLine.java Tue Mar 25 19:34:52 2014
@@ -59,9 +59,8 @@ public class HRegionServerCommandLine ex
       } else {
         logProcessInfo(getConf());
         HRegionServer hrs = HRegionServer.constructRegionServer(regionServerClass, conf);
-        Thread rsThread = HRegionServer.startRegionServer(hrs);
-
-        rsThread.join();
+        hrs.start();
+        hrs.join();
         if (hrs.isAborted()) {
           throw new RuntimeException("HRegionServer Aborted");
         }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=1581479&r1=1581478&r2=1581479&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Tue Mar 25 19:34:52 2014
@@ -148,19 +148,6 @@ class LogRoller extends HasThread implem
     }
   }
 
-  /**
-   * Called by region server to wake up this thread if it sleeping.
-   * It is sleeping if rollLock is not held.
-   */
-  public void interruptIfNecessary() {
-    try {
-      rollLock.lock();
-      this.interrupt();
-    } finally {
-      rollLock.unlock();
-    }
-  }
-
   protected HLog getWAL() throws IOException {
     return this.services.getWAL(null);
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java?rev=1581479&r1=1581478&r2=1581479&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java Tue Mar 25 19:34:52 2014
@@ -118,7 +118,7 @@ class MetricsRegionServerWrapperImpl
 
   @Override
   public String getZookeeperQuorum() {
-    ZooKeeperWatcher zk = regionServer.getZooKeeperWatcher();
+    ZooKeeperWatcher zk = regionServer.getZooKeeper();
     if (zk == null) {
       return "";
     }
@@ -127,7 +127,7 @@ class MetricsRegionServerWrapperImpl
 
   @Override
   public String getCoprocessors() {
-    String[] coprocessors = regionServer.getCoprocessors();
+    String[] coprocessors = regionServer.getRegionServerCoprocessors();
     if (coprocessors == null || coprocessors.length == 0) {
       return "";
     }
@@ -154,7 +154,7 @@ class MetricsRegionServerWrapperImpl
 
   @Override
   public long getTotalRequestCount() {
-    return regionServer.requestCount.get();
+    return regionServer.rpcServices.requestCount.get();
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java?rev=1581479&r1=1581478&r2=1581479&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java Tue Mar 25 19:34:52 2014
@@ -45,10 +45,6 @@ public class RSDumpServlet extends State
     HRegionServer hrs = (HRegionServer)getServletContext().getAttribute(
         HRegionServer.REGIONSERVER);
     assert hrs != null : "No RS in context!";
-    
-    Configuration hrsconf = (Configuration)getServletContext().getAttribute(
-        HRegionServer.REGIONSERVER_CONF);
-    assert hrsconf != null : "No RS conf in context";
 
     response.setContentType("text/plain");
  
@@ -61,7 +57,7 @@ public class RSDumpServlet extends State
     OutputStream os = response.getOutputStream();
     PrintWriter out = new PrintWriter(os);
     
-    out.println("Master status for " + hrs.getServerName()
+    out.println("RegionServer status for " + hrs.getServerName()
         + " as of " + new Date());
     
     out.println("\n\nVersion Info:");
@@ -94,18 +90,14 @@ public class RSDumpServlet extends State
     
     out.println("\n\nRS Queue:");
     out.println(LINE);
-    if(isShowQueueDump(hrsconf)) {
+    if(isShowQueueDump(conf)) {
       dumpQueue(hrs, out);
     } 
     
     out.flush();
   }
-  
-  private boolean isShowQueueDump(Configuration conf){
-    return conf.getBoolean("hbase.regionserver.servlet.show.queuedump", true);
-  }
-    
-  private void dumpQueue(HRegionServer hrs, PrintWriter out)
+
+  public static void dumpQueue(HRegionServer hrs, PrintWriter out)
       throws IOException {
     // 1. Print out Compaction/Split Queue
     out.println("Compaction/Split Queue summary: " 
@@ -117,5 +109,4 @@ public class RSDumpServlet extends State
         + hrs.cacheFlusher.toString());
     out.println(hrs.cacheFlusher.dumpQueue());
   }
-  
 }

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java?rev=1581479&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java Tue Mar 25 19:34:52 2014
@@ -0,0 +1,1985 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.DroppedSnapshotException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
+import org.apache.hadoop.hbase.exceptions.OperationConflictException;
+import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
+import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
+import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Counter;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.net.DNS;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
+
+/**
+ * Implements the regionserver RPC services.
+ */
+@InterfaceAudience.Private
+public class RSRpcServices implements HBaseRPCErrorHandler,
+    AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction {
+  protected static final Log LOG = LogFactory.getLog(RSRpcServices.class);
+
+  /** RPC scheduler to use for the region server. */
+  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
+    "hbase.region.server.rpc.scheduler.factory.class";
+
+  // Request counter. (Includes requests that are not serviced by regions.)
+  final Counter requestCount = new Counter();
+  // Server to handle client requests.
+  final RpcServerInterface rpcServer;
+  final InetSocketAddress isa;
+
+  private final HRegionServer regionServer;
+  private final long maxScannerResultSize;
+
+  // The reference to the priority extraction function
+  private final PriorityFunction priority;
+
+  private final AtomicLong scannerIdGen = new AtomicLong(0L);
+  private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
+    new ConcurrentHashMap<String, RegionScannerHolder>();
+
+  /**
+   * The lease timeout period for client scanners (milliseconds).
+   */
+  private final int scannerLeaseTimeoutPeriod;
+
+  /**
+   * Holder class which holds the RegionScanner and nextCallSeq together.
+   */
+  private static class RegionScannerHolder {
+    private RegionScanner s;
+    private long nextCallSeq = 0L;
+    private HRegion r;
+
+    public RegionScannerHolder(RegionScanner s, HRegion r) {
+      this.s = s;
+      this.r = r;
+    }
+  }
+
+  /**
+   * Instantiated as a scanner lease. If the lease times out, the scanner is
+   * closed
+   */
+  private class ScannerListener implements LeaseListener {
+    private final String scannerName;
+
+    ScannerListener(final String n) {
+      this.scannerName = n;
+    }
+
+    @Override
+    public void leaseExpired() {
+      RegionScannerHolder rsh = scanners.remove(this.scannerName);
+      if (rsh != null) {
+        RegionScanner s = rsh.s;
+        LOG.info("Scanner " + this.scannerName + " lease expired on region "
+          + s.getRegionInfo().getRegionNameAsString());
+        try {
+          HRegion region = regionServer.getRegion(s.getRegionInfo().getRegionName());
+          if (region != null && region.getCoprocessorHost() != null) {
+            region.getCoprocessorHost().preScannerClose(s);
+          }
+
+          s.close();
+          if (region != null && region.getCoprocessorHost() != null) {
+            region.getCoprocessorHost().postScannerClose(s);
+          }
+        } catch (IOException e) {
+          LOG.error("Closing scanner for "
+            + s.getRegionInfo().getRegionNameAsString(), e);
+        }
+      } else {
+        LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
+          " scanner found, hence no chance to close that related scanner!");
+      }
+    }
+  }
+
+  private static ResultOrException getResultOrException(
+      final ClientProtos.Result r, final int index) {
+    return getResultOrException(ResponseConverter.buildActionResult(r), index);
+  }
+
+  private static ResultOrException getResultOrException(final Exception e, final int index) {
+    return getResultOrException(ResponseConverter.buildActionResult(e), index);
+  }
+
+  private static ResultOrException getResultOrException(
+      final ResultOrException.Builder builder, final int index) {
+    return builder.setIndex(index).build();
+  }
+
+  /**
+   * Starts the nonce operation for a mutation, if needed.
+   * @param mutation Mutation.
+   * @param nonceGroup Nonce group from the request.
+   * @returns Nonce used (can be NO_NONCE).
+   */
+  private long startNonceOperation(final MutationProto mutation, long nonceGroup)
+      throws IOException, OperationConflictException {
+    if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
+    boolean canProceed = false;
+    try {
+      canProceed = regionServer.nonceManager.startOperation(
+        nonceGroup, mutation.getNonce(), regionServer);
+    } catch (InterruptedException ex) {
+      throw new InterruptedIOException("Nonce start operation interrupted");
+    }
+    if (!canProceed) {
+      // TODO: instead, we could convert append/increment to get w/mvcc
+      String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
+        + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
+        + "] may have already completed";
+      throw new OperationConflictException(message);
+    }
+    return mutation.getNonce();
+  }
+
+  /**
+   * Ends nonce operation for a mutation, if needed.
+   * @param mutation Mutation.
+   * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
+   * @param success Whether the operation for this nonce has succeeded.
+   */
+  private void endNonceOperation(final MutationProto mutation,
+      long nonceGroup, boolean success) {
+    if (regionServer.nonceManager != null && mutation.hasNonce()) {
+      regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
+    }
+  }
+
+  /**
+   * @return True if current call supports cellblocks
+   */
+  private boolean isClientCellBlockSupport() {
+    RpcCallContext context = RpcServer.getCurrentCall();
+    return context != null && context.isClientCellBlockSupport();
+  }
+
+  private void addResult(final MutateResponse.Builder builder,
+      final Result result, final PayloadCarryingRpcController rpcc) {
+    if (result == null) return;
+    if (isClientCellBlockSupport()) {
+      builder.setResult(ProtobufUtil.toResultNoData(result));
+      rpcc.setCellScanner(result.cellScanner());
+    } else {
+      ClientProtos.Result pbr = ProtobufUtil.toResult(result);
+      builder.setResult(pbr);
+    }
+  }
+
+  private void addResults(final ScanResponse.Builder builder, final List<Result> results,
+      final RpcController controller) {
+    if (results == null || results.isEmpty()) return;
+    if (isClientCellBlockSupport()) {
+      for (Result res : results) {
+        builder.addCellsPerResult(res.size());
+      }
+      ((PayloadCarryingRpcController)controller).
+        setCellScanner(CellUtil.createCellScanner(results));
+    } else {
+      for (Result res: results) {
+        ClientProtos.Result pbr = ProtobufUtil.toResult(res);
+        builder.addResults(pbr);
+      }
+    }
+  }
+
+  /**
+   * Mutate a list of rows atomically.
+   *
+   * @param region
+   * @param actions
+   * @param cellScanner if non-null, the mutation data -- the Cell content.
+   * @throws IOException
+   */
+  private void mutateRows(final HRegion region, final List<ClientProtos.Action> actions,
+      final CellScanner cellScanner) throws IOException {
+    if (!region.getRegionInfo().isMetaTable()) {
+      regionServer.cacheFlusher.reclaimMemStoreMemory();
+    }
+    RowMutations rm = null;
+    for (ClientProtos.Action action: actions) {
+      if (action.hasGet()) {
+        throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
+          action.getGet());
+      }
+      MutationType type = action.getMutation().getMutateType();
+      if (rm == null) {
+        rm = new RowMutations(action.getMutation().getRow().toByteArray());
+      }
+      switch (type) {
+      case PUT:
+        rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
+        break;
+      case DELETE:
+        rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
+        break;
+      default:
+          throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
+      }
+    }
+    region.mutateRow(rm);
+  }
+
+  /**
+   * Execute an append mutation.
+   *
+   * @param region
+   * @param m
+   * @param cellScanner
+   * @return result to return to client if default operation should be
+   * bypassed as indicated by RegionObserver, null otherwise
+   * @throws IOException
+   */
+  private Result append(final HRegion region, final MutationProto m,
+      final CellScanner cellScanner, long nonceGroup) throws IOException {
+    long before = EnvironmentEdgeManager.currentTimeMillis();
+    Append append = ProtobufUtil.toAppend(m, cellScanner);
+    Result r = null;
+    if (region.getCoprocessorHost() != null) {
+      r = region.getCoprocessorHost().preAppend(append);
+    }
+    if (r == null) {
+      long nonce = startNonceOperation(m, nonceGroup);
+      boolean success = false;
+      try {
+        r = region.append(append, nonceGroup, nonce);
+        success = true;
+      } finally {
+        endNonceOperation(m, nonceGroup, success);
+      }
+      if (region.getCoprocessorHost() != null) {
+        region.getCoprocessorHost().postAppend(append, r);
+      }
+    }
+    regionServer.metricsRegionServer.updateAppend(
+      EnvironmentEdgeManager.currentTimeMillis() - before);
+    return r;
+  }
+
+  /**
+   * Execute an increment mutation.
+   *
+   * @param region
+   * @param mutation
+   * @return the Result
+   * @throws IOException
+   */
+  private Result increment(final HRegion region, final MutationProto mutation,
+      final CellScanner cells, long nonceGroup) throws IOException {
+    long before = EnvironmentEdgeManager.currentTimeMillis();
+    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
+    Result r = null;
+    if (region.getCoprocessorHost() != null) {
+      r = region.getCoprocessorHost().preIncrement(increment);
+    }
+    if (r == null) {
+      long nonce = startNonceOperation(mutation, nonceGroup);
+      boolean success = false;
+      try {
+        r = region.increment(increment, nonceGroup, nonce);
+        success = true;
+      } finally {
+        endNonceOperation(mutation, nonceGroup, success);
+      }
+      if (region.getCoprocessorHost() != null) {
+        r = region.getCoprocessorHost().postIncrement(increment, r);
+      }
+    }
+    regionServer.metricsRegionServer.updateIncrement(
+      EnvironmentEdgeManager.currentTimeMillis() - before);
+    return r;
+  }
+
+  /**
+   * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
+   * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
+   * @param region
+   * @param actions
+   * @param cellScanner
+   * @param builder
+   * @param cellsToReturn  Could be null. May be allocated in this method.  This is what this
+   * method returns as a 'result'.
+   * @return Return the <code>cellScanner</code> passed
+   */
+  private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
+      final RegionAction actions, final CellScanner cellScanner,
+      final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
+    // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
+    // one at a time, we instead pass them in batch.  Be aware that the corresponding
+    // ResultOrException instance that matches each Put or Delete is then added down in the
+    // doBatchOp call.  We should be staying aligned though the Put and Delete are deferred/batched
+    List<ClientProtos.Action> mutations = null;
+    for (ClientProtos.Action action: actions.getActionList()) {
+      ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
+      try {
+        Result r = null;
+        if (action.hasGet()) {
+          Get get = ProtobufUtil.toGet(action.getGet());
+          r = region.get(get);
+        } else if (action.hasServiceCall()) {
+          resultOrExceptionBuilder = ResultOrException.newBuilder();
+          try {
+            Message result = execServiceOnRegion(region, action.getServiceCall());
+            ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
+                ClientProtos.CoprocessorServiceResult.newBuilder();
+            resultOrExceptionBuilder.setServiceResult(
+                serviceResultBuilder.setValue(
+                  serviceResultBuilder.getValueBuilder()
+                    .setName(result.getClass().getName())
+                    .setValue(result.toByteString())));
+          } catch (IOException ioe) {
+            resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
+          }
+        } else if (action.hasMutation()) {
+          MutationType type = action.getMutation().getMutateType();
+          if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
+              !mutations.isEmpty()) {
+            // Flush out any Puts or Deletes already collected.
+            doBatchOp(builder, region, mutations, cellScanner);
+            mutations.clear();
+          }
+          switch (type) {
+          case APPEND:
+            r = append(region, action.getMutation(), cellScanner, nonceGroup);
+            break;
+          case INCREMENT:
+            r = increment(region, action.getMutation(), cellScanner,  nonceGroup);
+            break;
+          case PUT:
+          case DELETE:
+            // Collect the individual mutations and apply in a batch
+            if (mutations == null) {
+              mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
+            }
+            mutations.add(action);
+            break;
+          default:
+            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
+          }
+        } else {
+          throw new HBaseIOException("Unexpected Action type");
+        }
+        if (r != null) {
+          ClientProtos.Result pbResult = null;
+          if (isClientCellBlockSupport()) {
+            pbResult = ProtobufUtil.toResultNoData(r);
+            //  Hard to guess the size here.  Just make a rough guess.
+            if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
+            cellsToReturn.add(r);
+          } else {
+            pbResult = ProtobufUtil.toResult(r);
+          }
+          resultOrExceptionBuilder =
+            ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
+        }
+        // Could get to here and there was no result and no exception.  Presumes we added
+        // a Put or Delete to the collecting Mutations List for adding later.  In this
+        // case the corresponding ResultOrException instance for the Put or Delete will be added
+        // down in the doBatchOp method call rather than up here.
+      } catch (IOException ie) {
+        resultOrExceptionBuilder = ResultOrException.newBuilder().
+          setException(ResponseConverter.buildException(ie));
+      }
+      if (resultOrExceptionBuilder != null) {
+        // Propagate index.
+        resultOrExceptionBuilder.setIndex(action.getIndex());
+        builder.addResultOrException(resultOrExceptionBuilder.build());
+      }
+    }
+    // Finish up any outstanding mutations
+    if (mutations != null && !mutations.isEmpty()) {
+      doBatchOp(builder, region, mutations, cellScanner);
+    }
+    return cellsToReturn;
+  }
+
+  /**
+   * Execute a list of Put/Delete mutations.
+   *
+   * @param builder
+   * @param region
+   * @param mutations
+   */
+  private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
+      final List<ClientProtos.Action> mutations, final CellScanner cells) {
+    Mutation[] mArray = new Mutation[mutations.size()];
+    long before = EnvironmentEdgeManager.currentTimeMillis();
+    boolean batchContainsPuts = false, batchContainsDelete = false;
+    try {
+      int i = 0;
+      for (ClientProtos.Action action: mutations) {
+        MutationProto m = action.getMutation();
+        Mutation mutation;
+        if (m.getMutateType() == MutationType.PUT) {
+          mutation = ProtobufUtil.toPut(m, cells);
+          batchContainsPuts = true;
+        } else {
+          mutation = ProtobufUtil.toDelete(m, cells);
+          batchContainsDelete = true;
+        }
+        mArray[i++] = mutation;
+      }
+
+      requestCount.add(mutations.size());
+      if (!region.getRegionInfo().isMetaTable()) {
+        regionServer.cacheFlusher.reclaimMemStoreMemory();
+      }
+
+      OperationStatus codes[] = region.batchMutate(mArray);
+      for (i = 0; i < codes.length; i++) {
+        int index = mutations.get(i).getIndex();
+        Exception e = null;
+        switch (codes[i].getOperationStatusCode()) {
+          case BAD_FAMILY:
+            e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
+            builder.addResultOrException(getResultOrException(e, index));
+            break;
+
+          case SANITY_CHECK_FAILURE:
+            e = new FailedSanityCheckException(codes[i].getExceptionMsg());
+            builder.addResultOrException(getResultOrException(e, index));
+            break;
+
+          default:
+            e = new DoNotRetryIOException(codes[i].getExceptionMsg());
+            builder.addResultOrException(getResultOrException(e, index));
+            break;
+
+          case SUCCESS:
+            builder.addResultOrException(getResultOrException(
+              ClientProtos.Result.getDefaultInstance(), index));
+            break;
+        }
+      }
+    } catch (IOException ie) {
+      for (int i = 0; i < mutations.size(); i++) {
+        builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
+      }
+    }
+    long after = EnvironmentEdgeManager.currentTimeMillis();
+    if (batchContainsPuts) {
+      regionServer.metricsRegionServer.updatePut(after - before);
+    }
+    if (batchContainsDelete) {
+      regionServer.metricsRegionServer.updateDelete(after - before);
+    }
+  }
+
+  /**
+   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
+   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
+   * @param region
+   * @param mutations
+   * @return an array of OperationStatus which internally contains the OperationStatusCode and the
+   *         exceptionMessage if any
+   * @throws IOException
+   */
+  private OperationStatus [] doReplayBatchOp(final HRegion region,
+      final List<HLogSplitter.MutationReplay> mutations) throws IOException {
+    HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()];
+
+    long before = EnvironmentEdgeManager.currentTimeMillis();
+    boolean batchContainsPuts = false, batchContainsDelete = false;
+    try {
+      int i = 0;
+      for (HLogSplitter.MutationReplay m : mutations) {
+        if (m.type == MutationType.PUT) {
+          batchContainsPuts = true;
+        } else {
+          batchContainsDelete = true;
+        }
+        mArray[i++] = m;
+      }
+      requestCount.add(mutations.size());
+      if (!region.getRegionInfo().isMetaTable()) {
+        regionServer.cacheFlusher.reclaimMemStoreMemory();
+      }
+      return region.batchReplay(mArray);
+    } finally {
+      long after = EnvironmentEdgeManager.currentTimeMillis();
+      if (batchContainsPuts) {
+        regionServer.metricsRegionServer.updatePut(after - before);
+      }
+      if (batchContainsDelete) {
+        regionServer.metricsRegionServer.updateDelete(after - before);
+      }
+    }
+  }
+
+  private void closeAllScanners() {
+    // Close any outstanding scanners. Means they'll get an UnknownScanner
+    // exception next time they come in.
+    for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
+      try {
+        e.getValue().s.close();
+      } catch (IOException ioe) {
+        LOG.warn("Closing scanner " + e.getKey(), ioe);
+      }
+    }
+  }
+
+  public RSRpcServices(HRegionServer rs) throws IOException {
+    RpcSchedulerFactory rpcSchedulerFactory;
+    try {
+      Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
+          REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+          SimpleRpcSchedulerFactory.class);
+      rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException(e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException(e);
+    }
+    // Server to handle client requests.
+    String hostname = rs.conf.get("hbase.regionserver.ipc.address",
+      Strings.domainNamePointerToHostName(DNS.getDefaultHost(
+        rs.conf.get("hbase.regionserver.dns.interface", "default"),
+        rs.conf.get("hbase.regionserver.dns.nameserver", "default"))));
+    int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
+      HConstants.DEFAULT_REGIONSERVER_PORT);
+    // Creation of a HSA will force a resolve.
+    InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
+    if (initialIsa.getAddress() == null) {
+      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
+    }
+    priority = new AnnotationReadingPriorityFunction(this);
+    String name = rs.getProcessName() + "/" + initialIsa.toString();
+    // Set how many times to retry talking to another server over HConnection.
+    ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
+    rpcServer = new RpcServer(rs, name, getServices(),
+      initialIsa, // BindAddress is IP we got for this server.
+      rs.conf,
+      rpcSchedulerFactory.create(rs.conf, this));
+    rpcServer.start();
+
+    scannerLeaseTimeoutPeriod = rs.conf.getInt(
+      HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
+    maxScannerResultSize = rs.conf.getLong(
+      HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
+      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
+
+    // Set our address.
+    isa = rpcServer.getListenerAddress();
+    rpcServer.setErrorHandler(this);
+    regionServer = rs;
+    rs.setName(name);
+  }
+
+  RegionScanner getScanner(long scannerId) {
+    String scannerIdString = Long.toString(scannerId);
+    RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
+    if (scannerHolder != null) {
+      return scannerHolder.s;
+    }
+    return null;
+  }
+
+  long addScanner(RegionScanner s, HRegion r) throws LeaseStillHeldException {
+    long scannerId = this.scannerIdGen.incrementAndGet();
+    String scannerName = String.valueOf(scannerId);
+
+    RegionScannerHolder existing =
+      scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
+    assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
+
+    regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
+        new ScannerListener(scannerName));
+    return scannerId;
+  }
+
+  /**
+   * Find the HRegion based on a region specifier
+   *
+   * @param regionSpecifier the region specifier
+   * @return the corresponding region
+   * @throws IOException if the specifier is not null,
+   *    but failed to find the region
+   */
+  HRegion getRegion(
+      final RegionSpecifier regionSpecifier) throws IOException {
+    return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
+        ProtobufUtil.getRegionEncodedName(regionSpecifier));
+  }
+
+  PriorityFunction getPriority() {
+    return priority;
+  }
+
+  void stop() {
+    closeAllScanners();
+    rpcServer.stop();
+  }
+
+  /**
+   * Called to verify that this server is up and running.
+   *
+   * @throws IOException
+   */
+  protected void checkOpen() throws IOException {
+    if (regionServer.isStopped() || regionServer.isAborted()) {
+      throw new RegionServerStoppedException("Server " + regionServer.serverName
+        + " not running" + (regionServer.isAborted() ? ", aborting" : ""));
+    }
+    if (!regionServer.fsOk) {
+      throw new RegionServerStoppedException("File system not available");
+    }
+    if (!regionServer.isOnline()) {
+      throw new ServerNotRunningYetException("Server is not running yet");
+    }
+  }
+
+  /**
+   * @return list of blocking services and their security info classes that this server supports
+   */
+  protected List<BlockingServiceAndInterface> getServices() {
+    List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
+    bssi.add(new BlockingServiceAndInterface(
+      ClientService.newReflectiveBlockingService(this),
+      ClientService.BlockingInterface.class));
+    bssi.add(new BlockingServiceAndInterface(
+      AdminService.newReflectiveBlockingService(this),
+      AdminService.BlockingInterface.class));
+    return bssi;
+  }
+
+  @Retention(RetentionPolicy.RUNTIME)
+  protected @interface QosPriority {
+    int priority() default 0;
+  }
+
+  public InetSocketAddress getSocketAddress() {
+    return isa;
+  }
+
+  @Override
+  public int getPriority(RequestHeader header, Message param) {
+    return priority.getPriority(header, param);
+  }
+
+  /*
+   * Check if an OOME and, if so, abort immediately to avoid creating more objects.
+   *
+   * @param e
+   *
+   * @return True if we OOME'd and are aborting.
+   */
+  @Override
+  public boolean checkOOME(final Throwable e) {
+    boolean stop = false;
+    try {
+      if (e instanceof OutOfMemoryError
+          || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
+          || (e.getMessage() != null && e.getMessage().contains(
+              "java.lang.OutOfMemoryError"))) {
+        stop = true;
+        LOG.fatal("Run out of memory; " + getClass().getSimpleName()
+          + " will abort itself immediately", e);
+      }
+    } finally {
+      if (stop) {
+        Runtime.getRuntime().halt(1);
+      }
+    }
+    return stop;
+  }
+
+  /**
+   * Close a region on the region server.
+   *
+   * @param controller the RPC controller
+   * @param request the request
+   * @throws ServiceException
+   */
+  @Override
+  @QosPriority(priority=HConstants.HIGH_QOS)
+  public CloseRegionResponse closeRegion(final RpcController controller,
+      final CloseRegionRequest request) throws ServiceException {
+    int versionOfClosingNode = -1;
+    if (request.hasVersionOfClosingNode()) {
+      versionOfClosingNode = request.getVersionOfClosingNode();
+    }
+    boolean zk = request.getTransitionInZK();
+    final ServerName sn = (request.hasDestinationServer() ?
+      ProtobufUtil.toServerName(request.getDestinationServer()) : null);
+
+    try {
+      checkOpen();
+      if (request.hasServerStartCode()) {
+        // check that we are the same server that this RPC is intended for.
+        long serverStartCode = request.getServerStartCode();
+        if (regionServer.serverName.getStartcode() !=  serverStartCode) {
+          throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
+              "different server with startCode: " + serverStartCode + ", this server is: "
+              + regionServer.serverName));
+        }
+      }
+      final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
+
+      // Can be null if we're calling close on a region that's not online
+      final HRegion region = regionServer.getFromOnlineRegions(encodedRegionName);
+      if ((region  != null) && (region .getCoprocessorHost() != null)) {
+        region.getCoprocessorHost().preClose(false);
+      }
+
+      requestCount.increment();
+      LOG.info("Close " + encodedRegionName + ", via zk=" + (zk ? "yes" : "no")
+        + ", znode version=" + versionOfClosingNode + ", on " + sn);
+
+      boolean closed = regionServer.closeRegion(encodedRegionName, false, zk, versionOfClosingNode, sn);
+      CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Compact a region on the region server.
+   *
+   * @param controller the RPC controller
+   * @param request the request
+   * @throws ServiceException
+   */
+  @Override
+  @QosPriority(priority=HConstants.HIGH_QOS)
+  public CompactRegionResponse compactRegion(final RpcController controller,
+      final CompactRegionRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+      HRegion region = getRegion(request.getRegion());
+      region.startRegionOperation(Operation.COMPACT_REGION);
+      LOG.info("Compacting " + region.getRegionNameAsString());
+      boolean major = false;
+      byte [] family = null;
+      Store store = null;
+      if (request.hasFamily()) {
+        family = request.getFamily().toByteArray();
+        store = region.getStore(family);
+        if (store == null) {
+          throw new ServiceException(new IOException("column family " + Bytes.toString(family)
+            + " does not exist in region " + region.getRegionNameAsString()));
+        }
+      }
+      if (request.hasMajor()) {
+        major = request.getMajor();
+      }
+      if (major) {
+        if (family != null) {
+          store.triggerMajorCompaction();
+        } else {
+          region.triggerMajorCompaction();
+        }
+      }
+
+      String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("User-triggered compaction requested for region "
+          + region.getRegionNameAsString() + familyLogMsg);
+      }
+      String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
+      if(family != null) {
+        regionServer.compactSplitThread.requestCompaction(region, store, log,
+          Store.PRIORITY_USER, null);
+      } else {
+        regionServer.compactSplitThread.requestCompaction(region, log,
+          Store.PRIORITY_USER, null);
+      }
+      return CompactRegionResponse.newBuilder().build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Flush a region on the region server.
+   *
+   * @param controller the RPC controller
+   * @param request the request
+   * @throws ServiceException
+   */
+  @Override
+  @QosPriority(priority=HConstants.HIGH_QOS)
+  public FlushRegionResponse flushRegion(final RpcController controller,
+      final FlushRegionRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+      HRegion region = getRegion(request.getRegion());
+      LOG.info("Flushing " + region.getRegionNameAsString());
+      boolean shouldFlush = true;
+      if (request.hasIfOlderThanTs()) {
+        shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs();
+      }
+      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
+      if (shouldFlush) {
+        boolean result = region.flushcache();
+        if (result) {
+          regionServer.compactSplitThread.requestSystemCompaction(region,
+            "Compaction through user triggered flush");
+        }
+        builder.setFlushed(result);
+      }
+      builder.setLastFlushTime(region.getLastFlushTime());
+      return builder.build();
+    } catch (DroppedSnapshotException ex) {
+      // Cache flush can fail in a few places. If it fails in a critical
+      // section, we get a DroppedSnapshotException and a replay of hlog
+      // is required. Currently the only way to do this is a restart of
+      // the server.
+      regionServer.abort("Replay of HLog required. Forcing server shutdown", ex);
+      throw new ServiceException(ex);
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  @Override
+  @QosPriority(priority=HConstants.HIGH_QOS)
+  public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
+      final GetOnlineRegionRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+      Map<String, HRegion> onlineRegions = regionServer.onlineRegions;
+      List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
+      for (HRegion region: onlineRegions.values()) {
+        list.add(region.getRegionInfo());
+      }
+      Collections.sort(list);
+      return ResponseConverter.buildGetOnlineRegionResponse(list);
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  @Override
+  @QosPriority(priority=HConstants.HIGH_QOS)
+  public GetRegionInfoResponse getRegionInfo(final RpcController controller,
+      final GetRegionInfoRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+      HRegion region = getRegion(request.getRegion());
+      HRegionInfo info = region.getRegionInfo();
+      GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
+      builder.setRegionInfo(HRegionInfo.convert(info));
+      if (request.hasCompactionState() && request.getCompactionState()) {
+        builder.setCompactionState(region.getCompactionState());
+      }
+      builder.setIsRecovering(region.isRecovering());
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Get some information of the region server.
+   *
+   * @param controller the RPC controller
+   * @param request the request
+   * @throws ServiceException
+   */
+  @Override
+  @SuppressWarnings("deprecation")
+  public GetServerInfoResponse getServerInfo(final RpcController controller,
+      final GetServerInfoRequest request) throws ServiceException {
+    try {
+      checkOpen();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+    requestCount.increment();
+    int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
+    return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
+  }
+
+  @Override
+  public GetStoreFileResponse getStoreFile(final RpcController controller,
+      final GetStoreFileRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      HRegion region = getRegion(request.getRegion());
+      requestCount.increment();
+      Set<byte[]> columnFamilies;
+      if (request.getFamilyCount() == 0) {
+        columnFamilies = region.getStores().keySet();
+      } else {
+        columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
+        for (ByteString cf: request.getFamilyList()) {
+          columnFamilies.add(cf.toByteArray());
+        }
+      }
+      int nCF = columnFamilies.size();
+      List<String>  fileList = region.getStoreFileList(
+        columnFamilies.toArray(new byte[nCF][]));
+      GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
+      builder.addAllStoreFile(fileList);
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Merge regions on the region server.
+   *
+   * @param controller the RPC controller
+   * @param request the request
+   * @return merge regions response
+   * @throws ServiceException
+   */
+  @Override
+  @QosPriority(priority = HConstants.HIGH_QOS)
+  public MergeRegionsResponse mergeRegions(final RpcController controller,
+      final MergeRegionsRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+      HRegion regionA = getRegion(request.getRegionA());
+      HRegion regionB = getRegion(request.getRegionB());
+      boolean forcible = request.getForcible();
+      regionA.startRegionOperation(Operation.MERGE_REGION);
+      regionB.startRegionOperation(Operation.MERGE_REGION);
+      LOG.info("Receiving merging request for  " + regionA + ", " + regionB
+          + ",forcible=" + forcible);
+      regionA.flushcache();
+      regionB.flushcache();
+      regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible);
+      return MergeRegionsResponse.newBuilder().build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Open asynchronously a region or a set of regions on the region server.
+   *
+   * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
+   *  before being called. As a consequence, this method should be called only from the master.
+   * <p>
+   * Different manages states for the region are:<ul>
+   *  <li>region not opened: the region opening will start asynchronously.</li>
+   *  <li>a close is already in progress: this is considered as an error.</li>
+   *  <li>an open is already in progress: this new open request will be ignored. This is important
+   *  because the Master can do multiple requests if it crashes.</li>
+   *  <li>the region is already opened:  this new open request will be ignored./li>
+   *  </ul>
+   * </p>
+   * <p>
+   * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
+   * For a single region opening, errors are sent through a ServiceException. For bulk assign,
+   * errors are put in the response as FAILED_OPENING.
+   * </p>
+   * @param controller the RPC controller
+   * @param request the request
+   * @throws ServiceException
+   */
+  @Override
+  @QosPriority(priority=HConstants.HIGH_QOS)
+  public OpenRegionResponse openRegion(final RpcController controller,
+      final OpenRegionRequest request) throws ServiceException {
+    try {
+      checkOpen();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+    requestCount.increment();
+    if (request.hasServerStartCode()) {
+      // check that we are the same server that this RPC is intended for.
+      long serverStartCode = request.getServerStartCode();
+      if (regionServer.serverName.getStartcode() !=  serverStartCode) {
+        throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
+            "different server with startCode: " + serverStartCode + ", this server is: "
+            + regionServer.serverName));
+      }
+    }
+    OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
+    final int regionCount = request.getOpenInfoCount();
+    final Map<TableName, HTableDescriptor> htds =
+        new HashMap<TableName, HTableDescriptor>(regionCount);
+    final boolean isBulkAssign = regionCount > 1;
+    for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
+      final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
+
+      int versionOfOfflineNode = -1;
+      if (regionOpenInfo.hasVersionOfOfflineNode()) {
+        versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode();
+      }
+      HTableDescriptor htd;
+      try {
+        final HRegion onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
+        if (onlineRegion != null) {
+          //Check if the region can actually be opened.
+          if (onlineRegion.getCoprocessorHost() != null) {
+            onlineRegion.getCoprocessorHost().preOpen();
+          }
+          // See HBASE-5094. Cross check with hbase:meta if still this RS is owning
+          // the region.
+          Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(
+            regionServer.catalogTracker, region.getRegionName());
+          if (regionServer.serverName.equals(p.getSecond())) {
+            Boolean closing = regionServer.regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
+            // Map regionsInTransitionInRSOnly has an entry for a region only if the region
+            // is in transition on this RS, so here closing can be null. If not null, it can
+            // be true or false. True means the region is opening on this RS; while false
+            // means the region is closing. Only return ALREADY_OPENED if not closing (i.e.
+            // not in transition any more, or still transition to open.
+            if (!Boolean.FALSE.equals(closing)
+                && regionServer.getFromOnlineRegions(region.getEncodedName()) != null) {
+              LOG.warn("Attempted open of " + region.getEncodedName()
+                + " but already online on this server");
+              builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
+              continue;
+            }
+          } else {
+            LOG.warn("The region " + region.getEncodedName() + " is online on this server"
+              + " but hbase:meta does not have this server - continue opening.");
+            regionServer.removeFromOnlineRegions(onlineRegion, null);
+          }
+        }
+        LOG.info("Open " + region.getRegionNameAsString());
+        htd = htds.get(region.getTable());
+        if (htd == null) {
+          htd = regionServer.tableDescriptors.get(region.getTable());
+          htds.put(region.getTable(), htd);
+        }
+
+        final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
+          region.getEncodedNameAsBytes(), Boolean.TRUE);
+
+        if (Boolean.FALSE.equals(previous)) {
+          // There is a close in progress. We need to mark this open as failed in ZK.
+          OpenRegionHandler.
+            tryTransitionFromOfflineToFailedOpen(regionServer, region, versionOfOfflineNode);
+
+          throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
+            + region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
+        }
+
+        if (Boolean.TRUE.equals(previous)) {
+          // An open is in progress. This is supported, but let's log this.
+          LOG.info("Receiving OPEN for the region:" +
+            region.getRegionNameAsString() + " , which we are already trying to OPEN"
+              + " - ignoring this new request for this region.");
+        }
+
+        // We are opening this region. If it moves back and forth for whatever reason, we don't
+        // want to keep returning the stale moved record while we are opening/if we close again.
+        regionServer.removeFromMovedRegions(region.getEncodedName());
+
+        if (previous == null) {
+          // check if the region to be opened is marked in recovering state in ZK
+          if (regionServer.distributedLogReplay
+              && SplitLogManager.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
+            region.getEncodedName())) {
+            regionServer.recoveringRegions.put(region.getEncodedName(), null);
+          }
+          // If there is no action in progress, we can submit a specific handler.
+          // Need to pass the expected version in the constructor.
+          if (region.isMetaRegion()) {
+            regionServer.service.submit(new OpenMetaHandler(
+              regionServer, regionServer, region, htd, versionOfOfflineNode));
+          } else {
+            regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
+              regionOpenInfo.getFavoredNodesList());
+            regionServer.service.submit(new OpenRegionHandler(
+              regionServer, regionServer, region, htd, versionOfOfflineNode));
+          }
+        }
+
+        builder.addOpeningState(RegionOpeningState.OPENED);
+
+      } catch (KeeperException zooKeeperEx) {
+        LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
+        throw new ServiceException(zooKeeperEx);
+      } catch (IOException ie) {
+        LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
+        if (isBulkAssign) {
+          builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
+        } else {
+          throw new ServiceException(ie);
+        }
+      }
+    }
+    return builder.build();
+  }
+
+  /**
+   * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
+   * that the given mutations will be durable on the receiving RS if this method returns without any
+   * exception.
+   * @param controller the RPC controller
+   * @param request the request
+   * @throws ServiceException
+   */
+  @Override
+  @QosPriority(priority = HConstants.REPLAY_QOS)
+  public ReplicateWALEntryResponse replay(final RpcController controller,
+      final ReplicateWALEntryRequest request) throws ServiceException {
+    long before = EnvironmentEdgeManager.currentTimeMillis();
+    CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
+    try {
+      checkOpen();
+      List<WALEntry> entries = request.getEntryList();
+      if (entries == null || entries.isEmpty()) {
+        // empty input
+        return ReplicateWALEntryResponse.newBuilder().build();
+      }
+      HRegion region = regionServer.getRegionByEncodedName(
+        entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
+      RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
+      List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
+      List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>();
+      // when tag is enabled, we need tag replay edits with log sequence number
+      boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3);
+      for (WALEntry entry : entries) {
+        if (regionServer.nonceManager != null) {
+          long nonceGroup = entry.getKey().hasNonceGroup()
+            ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
+          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
+          regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
+        }
+        Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
+          new Pair<HLogKey, WALEdit>();
+        List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry,
+          cells, walEntry, needAddReplayTag);
+        if (coprocessorHost != null) {
+          // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
+          // KeyValue.
+          if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
+            walEntry.getSecond())) {
+            // if bypass this log entry, ignore it ...
+            continue;
+          }
+          walEntries.add(walEntry);
+        }
+        mutations.addAll(edits);
+      }
+
+      if (!mutations.isEmpty()) {
+        OperationStatus[] result = doReplayBatchOp(region, mutations);
+        // check if it's a partial success
+        for (int i = 0; result != null && i < result.length; i++) {
+          if (result[i] != OperationStatus.SUCCESS) {
+            throw new IOException(result[i].getExceptionMsg());
+          }
+        }
+      }
+      if (coprocessorHost != null) {
+        for (Pair<HLogKey, WALEdit> wal : walEntries) {
+          coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
+            wal.getSecond());
+        }
+      }
+      return ReplicateWALEntryResponse.newBuilder().build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    } finally {
+      regionServer.metricsRegionServer.updateReplay(
+        EnvironmentEdgeManager.currentTimeMillis() - before);
+    }
+  }
+
+  /**
+   * Replicate WAL entries on the region server.
+   *
+   * @param controller the RPC controller
+   * @param request the request
+   * @throws ServiceException
+   */
+  @Override
+  @QosPriority(priority=HConstants.REPLICATION_QOS)
+  public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
+      final ReplicateWALEntryRequest request) throws ServiceException {
+    try {
+      if (regionServer.replicationSinkHandler != null) {
+        checkOpen();
+        requestCount.increment();
+        regionServer.replicationSinkHandler.replicateLogEntries(request.getEntryList(),
+          ((PayloadCarryingRpcController)controller).cellScanner());
+      }
+      return ReplicateWALEntryResponse.newBuilder().build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Roll the WAL writer of the region server.
+   * @param controller the RPC controller
+   * @param request the request
+   * @throws ServiceException
+   */
+  @Override
+  public RollWALWriterResponse rollWALWriter(final RpcController controller,
+      final RollWALWriterRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+      HLog wal = regionServer.getWAL();
+      byte[][] regionsToFlush = wal.rollWriter(true);
+      RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
+      if (regionsToFlush != null) {
+        for (byte[] region: regionsToFlush) {
+          builder.addRegionToFlush(HBaseZeroCopyByteString.wrap(region));
+        }
+      }
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Split a region on the region server.
+   *
+   * @param controller the RPC controller
+   * @param request the request
+   * @throws ServiceException
+   */
+  @Override
+  @QosPriority(priority=HConstants.HIGH_QOS)
+  public SplitRegionResponse splitRegion(final RpcController controller,
+      final SplitRegionRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+      HRegion region = getRegion(request.getRegion());
+      region.startRegionOperation(Operation.SPLIT_REGION);
+      LOG.info("Splitting " + region.getRegionNameAsString());
+      region.flushcache();
+      byte[] splitPoint = null;
+      if (request.hasSplitPoint()) {
+        splitPoint = request.getSplitPoint().toByteArray();
+      }
+      region.forceSplit(splitPoint);
+      regionServer.compactSplitThread.requestSplit(region, region.checkSplit());
+      return SplitRegionResponse.newBuilder().build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Stop the region server.
+   *
+   * @param controller the RPC controller
+   * @param request the request
+   * @throws ServiceException
+   */
+  @Override
+  public StopServerResponse stopServer(final RpcController controller,
+      final StopServerRequest request) throws ServiceException {
+    requestCount.increment();
+    String reason = request.getReason();
+    regionServer.stop(reason);
+    return StopServerResponse.newBuilder().build();
+  }
+
+  @Override
+  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
+      UpdateFavoredNodesRequest request) throws ServiceException {
+    List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
+    UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
+    for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
+      HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
+      regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
+        regionUpdateInfo.getFavoredNodesList());
+    }
+    respBuilder.setResponse(openInfoList.size());
+    return respBuilder.build();
+  }
+
+  /**
+   * Atomically bulk load several HFiles into an open region
+   * @return true if successful, false is failed but recoverably (no action)
+   * @throws IOException if failed unrecoverably
+   */
+  @Override
+  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
+      final BulkLoadHFileRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+      HRegion region = getRegion(request.getRegion());
+      List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
+      for (FamilyPath familyPath: request.getFamilyPathList()) {
+        familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
+          familyPath.getPath()));
+      }
+      boolean bypass = false;
+      if (region.getCoprocessorHost() != null) {
+        bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
+      }
+      boolean loaded = false;
+      if (!bypass) {
+        loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum());
+      }
+      if (region.getCoprocessorHost() != null) {
+        loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
+      }
+      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
+      builder.setLoaded(loaded);
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  @Override
+  public CoprocessorServiceResponse execService(final RpcController controller,
+      final CoprocessorServiceRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      requestCount.increment();
+      HRegion region = getRegion(request.getRegion());
+      Message result = execServiceOnRegion(region, request.getCall());
+      CoprocessorServiceResponse.Builder builder =
+        CoprocessorServiceResponse.newBuilder();
+      builder.setRegion(RequestConverter.buildRegionSpecifier(
+        RegionSpecifierType.REGION_NAME, region.getRegionName()));
+      builder.setValue(
+        builder.getValueBuilder().setName(result.getClass().getName())
+          .setValue(result.toByteString()));
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  private Message execServiceOnRegion(HRegion region,
+      final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
+    // ignore the passed in controller (from the serialized call)
+    ServerRpcController execController = new ServerRpcController();
+    Message result = region.execService(execController, serviceCall);
+    if (execController.getFailedOn() != null) {
+      throw execController.getFailedOn();
+    }
+    return result;
+  }
+
+  /**
+   * Get data from a table.
+   *
+   * @param controller the RPC controller
+   * @param request the get request
+   * @throws ServiceException
+   */
+  @Override
+  public GetResponse get(final RpcController controller,
+      final GetRequest request) throws ServiceException {
+    long before = EnvironmentEdgeManager.currentTimeMillis();
+    try {
+      checkOpen();
+      requestCount.increment();
+      HRegion region = getRegion(request.getRegion());
+
+      GetResponse.Builder builder = GetResponse.newBuilder();
+      ClientProtos.Get get = request.getGet();
+      Boolean existence = null;
+      Result r = null;
+
+      if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
+        if (get.getColumnCount() != 1) {
+          throw new DoNotRetryIOException(
+            "get ClosestRowBefore supports one and only one family now, not "
+              + get.getColumnCount() + " families");
+        }
+        byte[] row = get.getRow().toByteArray();
+        byte[] family = get.getColumn(0).getFamily().toByteArray();
+        r = region.getClosestRowBefore(row, family);
+      } else {
+        Get clientGet = ProtobufUtil.toGet(get);
+        if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
+          existence = region.getCoprocessorHost().preExists(clientGet);
+        }
+        if (existence == null) {
+          r = region.get(clientGet);
+          if (get.getExistenceOnly()) {
+            boolean exists = r.getExists();
+            if (region.getCoprocessorHost() != null) {
+              exists = region.getCoprocessorHost().postExists(clientGet, exists);
+            }
+            existence = exists;
+          }
+        }
+      }
+      if (existence != null){
+        ClientProtos.Result pbr = ProtobufUtil.toResult(existence);
+        builder.setResult(pbr);
+      } else  if (r != null) {
+        ClientProtos.Result pbr = ProtobufUtil.toResult(r);
+        builder.setResult(pbr);
+      }
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    } finally {
+      regionServer.metricsRegionServer.updateGet(
+        EnvironmentEdgeManager.currentTimeMillis() - before);
+    }
+  }
+
+  /**
+   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
+   *
+   * @param rpcc the RPC controller
+   * @param request the multi request
+   * @throws ServiceException
+   */
+  @Override
+  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
+  throws ServiceException {
+    try {
+      checkOpen();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+
+    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
+    // It is also the conduit via which we pass back data.
+    PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
+    CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
+    if (controller != null) controller.setCellScanner(null);
+
+    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
+
+    // this will contain all the cells that we need to return. It's created later, if needed.
+    List<CellScannable> cellsToReturn = null;
+    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
+    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
+
+    for (RegionAction regionAction : request.getRegionActionList()) {
+      this.requestCount.add(regionAction.getActionCount());
+      HRegion region;
+      regionActionResultBuilder.clear();
+      try {
+        region = getRegion(regionAction.getRegion());
+      } catch (IOException e) {
+        regionActionResultBuilder.setException(ResponseConverter.buildException(e));
+        responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
+        continue;  // For this region it's a failure.
+      }
+
+      if (regionAction.hasAtomic() && regionAction.getAtomic()) {
+        // How does this call happen?  It may need some work to play well w/ the surroundings.
+        // Need to return an item per Action along w/ Action index.  TODO.
+        try {
+          mutateRows(region, regionAction.getActionList(), cellScanner);
+        } catch (IOException e) {
+          // As it's atomic, we may expect it's a global failure.
+          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
+        }
+      } else {
+        // doNonAtomicRegionMutation manages the exception internally
+        cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
+            regionActionResultBuilder, cellsToReturn, nonceGroup);
+      }
+      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
+    }
+    // Load the controller with the Cells to return.
+    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
+      controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
+    }
+    return responseBuilder.build();
+  }
+
+  /**
+   * Mutate data in a table.
+   *
+   * @param rpcc the RPC controller
+   * @param request the mutate request
+   * @throws ServiceException
+   */
+  @Override
+  public MutateResponse mutate(final RpcController rpcc,
+      final MutateRequest request) throws ServiceException {
+    // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
+    // It is also the conduit via which we pass back data.
+    PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
+    CellScanner cellScanner = controller != null? controller.cellScanner(): null;
+    // Clear scanner so we are not holding on to reference across call.
+    if (controller != null) controller.setCellScanner(null);
+    try {
+      checkOpen();
+      requestCount.increment();
+      HRegion region = getRegion(request.getRegion());
+      MutateResponse.Builder builder = MutateResponse.newBuilder();
+      MutationProto mutation = request.getMutation();
+      if (!region.getRegionInfo().isMetaTable()) {
+        regionServer.cacheFlusher.reclaimMemStoreMemory();
+      }
+      long nonceGroup = request.hasNonceGroup()
+          ? request.getNonceGroup() : HConstants.NO_NONCE;
+      Result r = null;
+      Boolean processed = null;
+      MutationType type = mutation.getMutateType();
+      switch (type) {
+      case APPEND:
+        // TODO: this doesn't actually check anything.
+        r = append(region, mutation, cellScanner, nonceGroup);
+        break;
+      case INCREMENT:
+        // TODO: this doesn't actually check anything.
+        r = increment(region, mutation, cellScanner, nonceGroup);
+        break;
+      case PUT:
+        Put put = ProtobufUtil.toPut(mutation, cellScanner);
+        if (request.hasCondition()) {
+          Condition condition = request.getCondition();
+          byte[] row = condition.getRow().toByteArray();
+          byte[] family = condition.getFamily().toByteArray();
+          byte[] qualifier = condition.getQualifier().toByteArray();
+          CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
+          ByteArrayComparable comparator =
+            ProtobufUtil.toComparator(condition.getComparator());
+          if (region.getCoprocessorHost() != null) {
+            processed = region.getCoprocessorHost().preCheckAndPut(
+              row, family, qualifier, compareOp, comparator, put);
+          }
+          if (processed == null) {
+            boolean result = region.checkAndMutate(row, family,
+              qualifier, compareOp, comparator, put, true);
+            if (region.getCoprocessorHost() != null) {
+              result = region.getCoprocessorHost().postCheckAndPut(row, family,
+                qualifier, compareOp, comparator, put, result);
+            }
+            processed = result;
+          }
+        } else {
+          region.put(put);
+          processed = Boolean.TRUE;
+        }
+        break;
+      case DELETE:
+        Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
+        if (request.hasCondition()) {
+          Condition condition = request.getCondition();
+          byte[] row = condition.getRow().toByteArray();
+          byte[] family = condition.getFamily().toByteArray();
+          byte[] qualifier = condition.getQualifier().toByteArray();
+          CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
+          ByteArrayComparable comparator =
+            ProtobufUtil.toComparator(condition.getComparator());
+          if (region.getCoprocessorHost() != null) {
+            processed = region.getCoprocessorHost().preCheckAndDelete(
+              row, family, qualifier, compareOp, comparator, delete);
+          }
+          if (processed == null) {
+            boolean result = region.checkAndMutate(row, family,
+              qualifier, compareOp, comparator, delete, true);
+            if (region.getCoprocessorHost() != null) {
+              result = region.getCoprocessorHost().postCheckAndDelete(row, family,
+                qualifier, compareOp, comparator, delete, result);
+            }
+            processed = result;
+          }
+        } else {
+          region.delete(delete);
+          processed = Boolean.TRUE;
+        }
+        break;
+      default:
+          throw new DoNotRetryIOException(
+            "Unsupported mutate type: " + type.name());
+      }
+      if (processed != null) builder.setProcessed(processed.booleanValue());
+      addResult(builder, r, controller);
+      return builder.build();
+    } catch (IOException ie) {
+      regionServer.checkFileSystem();
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Scan data in a table.
+   *
+   * @param controller the RPC controller
+   * @param request the scan request
+   * @throws ServiceException
+   */
+  @Override
+  public ScanResponse scan(final RpcController controller, final ScanRequest request)
+  throws ServiceException {
+    Leases.Lease lease = null;
+    String scannerName = null;
+    try {
+      if (!request.hasScannerId() && !request.hasScan()) {
+        throw new DoNotRetryIOException(
+          "Missing required input: scannerId or scan");
+      }
+      long scannerId = -1;
+      if (request.hasScannerId()) {
+        scannerId = request.getScannerId();
+        scannerName = String.valueOf(scannerId);
+      }
+      try {
+        checkOpen();
+      } catch (IOException e) {
+        // If checkOpen failed, server not running or filesystem gone,
+        // cancel this lease; filesystem is gone or we're closing or something.
+        if (scannerName != null) {
+          try {
+            regionServer.leases.cancelLease(scannerName);
+          } catch (LeaseException le) {
+            LOG.info("Server shutting down and client tried to access missing scanner " +
+              scannerName);
+          }
+        }
+        throw e;
+      }
+      requestCount.increment();
+
+      int ttl = 0;
+      HRegion region = null;
+      RegionScanner scanner = null;
+      RegionScannerHolder rsh = null;
+      boolean moreResults = true;
+      boolean closeScanner = false;
+      ScanResponse.Builder builder = ScanResponse.newBuilder();
+      if (request.hasCloseScanner()) {
+        closeScanner = request.getCloseScanner();
+      }
+      int rows = 1;
+      if (request.hasNumberOfRows()) {
+        rows = request.getNumberOfRows();
+      }
+      if (request.hasScannerId()) {
+        rsh = scanners.get(scannerName);
+        if (rsh == null) {
+          LOG.info("Client tried to access missing scanner " + scannerName);
+          throw new UnknownScannerException(
+            "Name: " + scannerName + ", already closed?");
+        }
+        scanner = rsh.s;
+        HRegionInfo hri = scanner.getRegionInfo();
+        region = regionServer.getRegion(hri.getRegionName());
+        if (region != rsh.r) { // Yes, should be the same instance
+          throw new NotServingRegionException("Region was re-opened after the scanner"
+            + scannerName + " was created: " + hri.getRegionNameAsString());
+        }
+      } else {
+        region = getRegion(request.getRegion());
+        ClientProtos.Scan protoScan = request.getScan();
+        boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
+        Scan scan = ProtobufUtil.toScan(protoScan);
+        // if the request doesn't set this, get the default region setting.
+        if (!isLoadingCfsOnDemandSet) {
+          scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
+        }
+        scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
+        region.prepareScanner(scan);
+        if (region.getCoprocessorHost() != null) {
+          scanner = region.getCoprocessorHost().preScannerOpen(scan);
+        }
+        if (scanner == null) {
+          scanner = region.getScanner(scan);
+        }
+        if (region.getCoprocessorHost() != null) {
+          scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
+        }
+        scannerId = addScanner(scanner, region);
+        scannerName = String.valueOf(scannerId);
+        ttl = this.scannerLeaseTimeoutPeriod;
+      }
+
+      if (rows > 0) {
+        // if nextCallSeq does not match throw Exception straight away. This needs to be
+        // performed even before checking of Lease.
+        // See HBASE-5974
+        if (request.hasNextCallSeq()) {
+          if (rsh == null) {
+            rsh = scanners.get(scannerName);
+          }
+          if (rsh != null) {
+            if (request.getNextCallSeq() != rsh.nextCallSeq) {
+              throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
+                + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
+                "; request=" + TextFormat.shortDebugString(request));
+            }
+            // Increment the nextCallSeq value which is the next expected from client.
+            rsh.nextCallSeq++;
+          }
+        }
+        try {
+          // Remove lease while its being processed in server; protects against case
+          // where processing of request takes > lease expiration time.
+          lease = regionServer.leases.removeLease(scannerName);
+          List<Result> results = new ArrayList<Result>(rows);
+          long currentScanResultSize = 0;
+
+          boolean done = false;
+          // Call coprocessor. Get region info from scanner.
+          if (region != null && region.getCoprocessorHost() != null) {
+            Boolean bypass = region.getCoprocessorHost().preScannerNext(
+              scanner, results, rows);
+            if (!results.isEmpty()) {
+              for (Result r : results) {
+                if (maxScannerResultSize < Long.MAX_VALUE){
+                  for (Cell kv : r.rawCells()) {
+                    currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
+                  }
+                }
+              }
+            }
+            if (bypass != null && bypass.booleanValue()) {
+              done = true;
+            }
+          }
+
+          if (!done) {
+            long maxResultSize = scanner.getMaxResultSize();
+            if (maxResultSize <= 0) {
+              maxResultSize = maxScannerResultSize;
+            }
+            List<Cell> values = new ArrayList<Cell>();
+            region.startRegionOperation(Operation.SCAN);
+            try {
+              int i = 0;
+              synchronized(scanner) {
+                for (; i < rows
+                    && currentScanResultSize < maxResultSize; ) {
+                  // Collect values to be returned here
+                  boolean moreRows = scanner.nextRaw(values);
+                  if (!values.isEmpty()) {
+                    if (maxScannerResultSize < Long.MAX_VALUE){
+                      for (Cell kv : values) {
+                        currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
+                      }
+                    }
+                    results.add(Result.create(values));
+                    i++;
+                  }
+                  if (!moreRows) {
+                    break;
+                  }
+                  values.clear();
+                }
+              }
+              region.readRequestsCount.add(i);
+            } finally {
+              region.closeRegionOperation();
+            }
+
+            // coprocessor postNext hook
+            if (region != null && region.getCoprocessorHost() != null) {
+              region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
+            }
+          }
+
+          // If the scanner's filter - if any - is done with the scan
+          // and wants to tell the client to stop the scan. This is done by passing
+          // a null result, and setting moreResults to false.
+          if (scanner.isFilterDone() && results.isEmpty()) {
+            moreResults = false;
+            results = null;
+          } else {
+            addResults(builder, results, controller);
+          }
+        } finally {
+          // We're done. On way out re-add the above removed lease.
+          // Adding resets expiration time on lease.
+          if (scanners.containsKey(scannerName)) {
+            if (lease != null) regionServer.leases.addLease(lease);
+            ttl = this.scannerLeaseTimeoutPeriod;
+          }
+        }
+      }
+
+      if (!moreResults || closeScanner) {
+        ttl = 0;
+        moreResults = false;
+        if (region != null && region.getCoprocessorHost() != null) {
+          if (region.getCoprocessorHost().preScannerClose(scanner)) {
+            return builder.build(); // bypass
+          }
+        }
+        rsh = scanners.remove(scannerName);
+        if (rsh != null) {
+          scanner = rsh.s;
+          scanner.close();
+          regionServer.leases.cancelLease(scannerName);
+          if (region != null && region.getCoprocessorHost() != null) {
+            region.getCoprocessorHost().postScannerClose(scanner);
+          }
+        }
+      }
+
+      if (ttl > 0) {
+        builder.setTtl(ttl);
+      }
+      builder.setScannerId(scannerId);
+      builder.setMoreResults(moreResults);
+      return builder.build();
+    } catch (IOException ie) {
+      if (scannerName != null && ie instanceof NotServingRegionException) {
+        RegionScannerHolder rsh = scanners.remove(scannerName);
+        if (rsh != null) {
+          try {
+            RegionScanner scanner = rsh.s;
+            scanner.close();
+            regionServer.leases.cancelLease(scannerName);
+          } catch (IOException e) {}
+        }
+      }
+      throw new ServiceException(ie);
+    }
+  }
+}

Propchange: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
------------------------------------------------------------------------------
    svn:eol-style = native