You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/04/13 22:28:22 UTC

svn commit: r1325937 [6/7] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/catalog/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/filter/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/ipc...

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java?rev=1325937&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java Fri Apr 13 20:28:21 2012
@@ -0,0 +1,1164 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.UnknownRowLockException;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Exec;
+import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.regionserver.HRegionServer.QosPriority;
+import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * RegionServer makes a set of HRegions available to clients. It checks in with
+ * the HMaster. There are many RegionServers in a single HBase deployment.
+ *
+ * This will be a replacement for the HRegionServer. It has protobuf protocols
+ * implementations. All the HRegionInterface implementations stay in HRegionServer
+ * for possible backward compatibility requests.  This also makes it easier to
+ * rip of HRegionInterface later on.
+ */
+@InterfaceAudience.Private
+public abstract class RegionServer implements
+    ClientProtocol, Runnable, RegionServerServices {
+
+  private static final Log LOG = LogFactory.getLog(RegionServer.class);
+
+  private final Random rand = new Random();
+
+  protected long maxScannerResultSize;
+
+  // Cache flushing
+  protected MemStoreFlusher cacheFlusher;
+
+  final Map<String, RegionScanner> scanners =
+      new ConcurrentHashMap<String, RegionScanner>();
+
+  /**
+   * Map of regions currently being served by this region server. Key is the
+   * encoded region name.  All access should be synchronized.
+   */
+  protected final Map<String, HRegion> onlineRegions =
+    new ConcurrentHashMap<String, HRegion>();
+
+  // Leases
+  protected Leases leases;
+
+  // Request counter.
+  // Do we need this?  Can't we just sum region counters?  St.Ack 20110412
+  protected AtomicInteger requestCount = new AtomicInteger();
+
+  // If false, the file system has become unavailable
+  protected volatile boolean fsOk;
+  protected HFileSystem fs;
+
+  protected static final int NORMAL_QOS = 0;
+  protected static final int QOS_THRESHOLD = 10;  // the line between low and high qos
+  protected static final int HIGH_QOS = 100;
+
+  // Set when a report to the master comes back with a message asking us to
+  // shutdown. Also set by call to stop when debugging or running unit tests
+  // of HRegionServer in isolation.
+  protected volatile boolean stopped = false;
+
+  // Go down hard. Used if file system becomes unavailable and also in
+  // debugging and unit tests.
+  protected volatile boolean abortRequested;
+
+  Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
+
+  /**
+   * Instantiated as a row lock lease. If the lease times out, the row lock is
+   * released
+   */
+  private class RowLockListener implements LeaseListener {
+    private final String lockName;
+    private final HRegion region;
+
+    RowLockListener(final String lockName, final HRegion region) {
+      this.lockName = lockName;
+      this.region = region;
+    }
+
+    public void leaseExpired() {
+      LOG.info("Row Lock " + this.lockName + " lease expired");
+      Integer r = rowlocks.remove(this.lockName);
+      if (r != null) {
+        region.releaseRowLock(r);
+      }
+    }
+  }
+
+  /**
+   * Instantiated as a scanner lease. If the lease times out, the scanner is
+   * closed
+   */
+  private class ScannerListener implements LeaseListener {
+    private final String scannerName;
+
+    ScannerListener(final String n) {
+      this.scannerName = n;
+    }
+
+    public void leaseExpired() {
+      RegionScanner s = scanners.remove(this.scannerName);
+      if (s != null) {
+        LOG.info("Scanner " + this.scannerName + " lease expired on region "
+            + s.getRegionInfo().getRegionNameAsString());
+        try {
+          HRegion region = getRegion(s.getRegionInfo().getRegionName());
+          if (region != null && region.getCoprocessorHost() != null) {
+            region.getCoprocessorHost().preScannerClose(s);
+          }
+
+          s.close();
+          if (region != null && region.getCoprocessorHost() != null) {
+            region.getCoprocessorHost().postScannerClose(s);
+          }
+        } catch (IOException e) {
+          LOG.error("Closing scanner for "
+              + s.getRegionInfo().getRegionNameAsString(), e);
+        }
+      } else {
+        LOG.info("Scanner " + this.scannerName + " lease expired");
+      }
+    }
+  }
+
+  /**
+   * Method to get the Integer lock identifier used internally from the long
+   * lock identifier used by the client.
+   *
+   * @param lockId
+   *          long row lock identifier from client
+   * @return intId Integer row lock used internally in HRegion
+   * @throws IOException
+   *           Thrown if this is not a valid client lock id.
+   */
+  Integer getLockFromId(long lockId) throws IOException {
+    if (lockId == -1L) {
+      return null;
+    }
+    String lockName = String.valueOf(lockId);
+    Integer rl = rowlocks.get(lockName);
+    if (rl == null) {
+      throw new UnknownRowLockException("Invalid row lock");
+    }
+    this.leases.renewLease(lockName);
+    return rl;
+  }
+
+  /**
+   * Called to verify that this server is up and running.
+   *
+   * @throws IOException
+   */
+  protected void checkOpen() throws IOException {
+    if (this.stopped || this.abortRequested) {
+      throw new RegionServerStoppedException("Server " + getServerName() +
+        " not running" + (this.abortRequested ? ", aborting" : ""));
+    }
+    if (!fsOk) {
+      throw new RegionServerStoppedException("File system not available");
+    }
+  }
+
+   /**
+   * @param regionName
+   * @return HRegion for the passed binary <code>regionName</code> or null if
+   *         named region is not member of the online regions.
+   */
+  public HRegion getOnlineRegion(final byte[] regionName) {
+    String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
+    return this.onlineRegions.get(encodedRegionName);
+  }
+
+  /**
+   * Protected utility method for safely obtaining an HRegion handle.
+   *
+   * @param regionName
+   *          Name of online {@link HRegion} to return
+   * @return {@link HRegion} for <code>regionName</code>
+   * @throws NotServingRegionException
+   */
+  protected HRegion getRegion(final byte[] regionName)
+      throws NotServingRegionException {
+    HRegion region = null;
+    region = getOnlineRegion(regionName);
+    if (region == null) {
+      throw new NotServingRegionException("Region is not online: " +
+        Bytes.toStringBinary(regionName));
+    }
+    return region;
+  }
+
+  /*
+   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
+   * IOE if it isn't already.
+   *
+   * @param t Throwable
+   *
+   * @return Throwable converted to an IOE; methods can only let out IOEs.
+   */
+  protected Throwable cleanup(final Throwable t) {
+    return cleanup(t, null);
+  }
+
+  /*
+   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
+   * IOE if it isn't already.
+   *
+   * @param t Throwable
+   *
+   * @param msg Message to log in error. Can be null.
+   *
+   * @return Throwable converted to an IOE; methods can only let out IOEs.
+   */
+  protected Throwable cleanup(final Throwable t, final String msg) {
+    // Don't log as error if NSRE; NSRE is 'normal' operation.
+    if (t instanceof NotServingRegionException) {
+      LOG.debug("NotServingRegionException; " +  t.getMessage());
+      return t;
+    }
+    if (msg == null) {
+      LOG.error("", RemoteExceptionHandler.checkThrowable(t));
+    } else {
+      LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
+    }
+    if (!checkOOME(t)) {
+      checkFileSystem();
+    }
+    return t;
+  }
+
+  /*
+   * @param t
+   *
+   * @return Make <code>t</code> an IOE if it isn't already.
+   */
+  protected IOException convertThrowableToIOE(final Throwable t) {
+    return convertThrowableToIOE(t, null);
+  }
+
+  /*
+   * @param t
+   *
+   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
+   *
+   * @return Make <code>t</code> an IOE if it isn't already.
+   */
+  protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
+    return (t instanceof IOException ? (IOException) t : msg == null
+        || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
+  }
+
+  /*
+   * Check if an OOME and, if so, abort immediately to avoid creating more objects.
+   *
+   * @param e
+   *
+   * @return True if we OOME'd and are aborting.
+   */
+  public boolean checkOOME(final Throwable e) {
+    boolean stop = false;
+    try {
+      if (e instanceof OutOfMemoryError
+          || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
+          || (e.getMessage() != null && e.getMessage().contains(
+              "java.lang.OutOfMemoryError"))) {
+        stop = true;
+        LOG.fatal(
+          "Run out of memory; HRegionServer will abort itself immediately", e);
+      }
+    } finally {
+      if (stop) {
+        Runtime.getRuntime().halt(1);
+      }
+    }
+    return stop;
+  }
+
+  /**
+   * Checks to see if the file system is still accessible. If not, sets
+   * abortRequested and stopRequested
+   *
+   * @return false if file system is not available
+   */
+  public boolean checkFileSystem() {
+    if (this.fsOk && this.fs != null) {
+      try {
+        FSUtils.checkFileSystemAvailable(this.fs);
+      } catch (IOException e) {
+        abort("File System not available", e);
+        this.fsOk = false;
+      }
+    }
+    return this.fsOk;
+  }
+
+  protected long addRowLock(Integer r, HRegion region)
+      throws LeaseStillHeldException {
+    long lockId = nextLong();
+    String lockName = String.valueOf(lockId);
+    rowlocks.put(lockName, r);
+    this.leases.createLease(lockName, new RowLockListener(lockName, region));
+    return lockId;
+  }
+
+  protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
+    long scannerId = nextLong();
+    String scannerName = String.valueOf(scannerId);
+    scanners.put(scannerName, s);
+    this.leases.createLease(scannerName, new ScannerListener(scannerName));
+    return scannerId;
+  }
+
+  /**
+   * Generate a random positive long number
+   *
+   * @return a random positive long number
+   */
+  protected long nextLong() {
+    long n = rand.nextLong();
+    if (n == 0) {
+      return nextLong();
+    }
+    if (n < 0) {
+      n = -n;
+    }
+    return n;
+  }
+
+  // Start Client methods
+
+  /**
+   * Get data from a table.
+   *
+   * @param controller the RPC controller
+   * @param request the get request
+   * @throws ServiceException
+   */
+  @Override
+  public GetResponse get(final RpcController controller,
+      final GetRequest request) throws ServiceException {
+    try {
+      requestCount.incrementAndGet();
+      HRegion region = getRegion(request.getRegion());
+      GetResponse.Builder builder = GetResponse.newBuilder();
+      ClientProtos.Get get = request.getGet();
+      Boolean existence = null;
+      Result r = null;
+      if (request.getClosestRowBefore()) {
+        if (get.getColumnCount() != 1) {
+          throw new DoNotRetryIOException(
+            "get ClosestRowBefore supports one and only one family now, not "
+              + get.getColumnCount() + " families");
+        }
+        byte[] row = get.getRow().toByteArray();
+        byte[] family = get.getColumn(0).getFamily().toByteArray();
+        r = region.getClosestRowBefore(row, family);
+      } else {
+        Get clientGet = ProtobufUtil.toGet(get);
+        if (request.getExistenceOnly() && region.getCoprocessorHost() != null) {
+          existence = region.getCoprocessorHost().preExists(clientGet);
+        }
+        if (existence == null) {
+          Integer lock = getLockFromId(clientGet.getLockId());
+          r = region.get(clientGet, lock);
+          if (request.getExistenceOnly()) {
+            boolean exists = r != null && !r.isEmpty();
+            if (region.getCoprocessorHost() != null) {
+              exists = region.getCoprocessorHost().postExists(clientGet, exists);
+            }
+            existence = Boolean.valueOf(exists);
+          }
+        }
+      }
+      if (existence != null) {
+        builder.setExists(existence.booleanValue());
+      } else if (r != null) {
+        builder.setResult(ProtobufUtil.toResult(r));
+      }
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Mutate data in a table.
+   *
+   * @param controller the RPC controller
+   * @param request the mutate request
+   * @throws ServiceException
+   */
+  @Override
+  public MutateResponse mutate(final RpcController controller,
+      final MutateRequest request) throws ServiceException {
+    try {
+      requestCount.incrementAndGet();
+      HRegion region = getRegion(request.getRegion());
+      MutateResponse.Builder builder = MutateResponse.newBuilder();
+      Mutate mutate = request.getMutate();
+      if (!region.getRegionInfo().isMetaTable()) {
+        cacheFlusher.reclaimMemStoreMemory();
+      }
+      Integer lock = null;
+      Result r = null;
+      Boolean processed = null;
+      MutateType type = mutate.getMutateType();
+      switch (type) {
+      case APPEND:
+        r = append(region, mutate);
+        break;
+      case INCREMENT:
+        r = increment(region, mutate);
+        break;
+      case PUT:
+        Put put = ProtobufUtil.toPut(mutate);
+        lock = getLockFromId(put.getLockId());
+        if (request.hasCondition()) {
+          Condition condition = request.getCondition();
+          byte[] row = condition.getRow().toByteArray();
+          byte[] family = condition.getFamily().toByteArray();
+          byte[] qualifier = condition.getQualifier().toByteArray();
+          CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
+          WritableByteArrayComparable comparator =
+            (WritableByteArrayComparable)ProtobufUtil.toObject(condition.getComparator());
+          if (region.getCoprocessorHost() != null) {
+            processed = region.getCoprocessorHost().preCheckAndPut(
+              row, family, qualifier, compareOp, comparator, put);
+          }
+          if (processed == null) {
+            boolean result = region.checkAndMutate(row, family,
+              qualifier, compareOp, comparator, put, lock, true);
+            if (region.getCoprocessorHost() != null) {
+              result = region.getCoprocessorHost().postCheckAndPut(row, family,
+                qualifier, compareOp, comparator, put, result);
+            }
+            processed = Boolean.valueOf(result);
+          }
+        } else {
+          region.put(put, lock);
+          processed = Boolean.TRUE;
+        }
+        break;
+      case DELETE:
+        Delete delete = ProtobufUtil.toDelete(mutate);
+        lock = getLockFromId(delete.getLockId());
+        if (request.hasCondition()) {
+          Condition condition = request.getCondition();
+          byte[] row = condition.getRow().toByteArray();
+          byte[] family = condition.getFamily().toByteArray();
+          byte[] qualifier = condition.getQualifier().toByteArray();
+          CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
+          WritableByteArrayComparable comparator =
+            (WritableByteArrayComparable)ProtobufUtil.toObject(condition.getComparator());
+          if (region.getCoprocessorHost() != null) {
+            processed = region.getCoprocessorHost().preCheckAndDelete(
+              row, family, qualifier, compareOp, comparator, delete);
+          }
+          if (processed == null) {
+            boolean result = region.checkAndMutate(row, family,
+              qualifier, compareOp, comparator, delete, lock, true);
+            if (region.getCoprocessorHost() != null) {
+              result = region.getCoprocessorHost().postCheckAndDelete(row, family,
+                qualifier, compareOp, comparator, delete, result);
+            }
+            processed = Boolean.valueOf(result);
+          }
+        } else {
+          region.delete(delete, lock, delete.getWriteToWAL());
+          processed = Boolean.TRUE;
+        }
+        break;
+        default:
+          throw new DoNotRetryIOException(
+            "Unsupported mutate type: " + type.name());
+      }
+      if (processed != null) {
+        builder.setProcessed(processed.booleanValue());
+      } else if (r != null) {
+        builder.setResult(ProtobufUtil.toResult(r));
+      }
+      return builder.build();
+    } catch (IOException ie) {
+      checkFileSystem();
+      throw new ServiceException(ie);
+    }
+  }
+
+  //
+  // remote scanner interface
+  //
+
+  /**
+   * Scan data in a table.
+   *
+   * @param controller the RPC controller
+   * @param request the scan request
+   * @throws ServiceException
+   */
+  @Override
+  public ScanResponse scan(final RpcController controller,
+      final ScanRequest request) throws ServiceException {
+    Leases.Lease lease = null;
+    String scannerName = null;
+    try {
+      if (!request.hasScannerId() && !request.hasScan()) {
+        throw new DoNotRetryIOException(
+          "Missing required input: scannerId or scan");
+      }
+      long scannerId = -1;
+      if (request.hasScannerId()) {
+        scannerId = request.getScannerId();
+        scannerName = String.valueOf(scannerId);
+      }
+      try {
+        checkOpen();
+      } catch (IOException e) {
+        // If checkOpen failed, server not running or filesystem gone,
+        // cancel this lease; filesystem is gone or we're closing or something.
+        if (scannerName != null) {
+          try {
+            leases.cancelLease(scannerName);
+          } catch (LeaseException le) {
+            LOG.info("Server shutting down and client tried to access missing scanner " +
+              scannerName);
+          }
+        }
+        throw e;
+      }
+      requestCount.incrementAndGet();
+
+      try {
+        int ttl = 0;
+        HRegion region = null;
+        RegionScanner scanner = null;
+        boolean moreResults = true;
+        boolean closeScanner = false;
+        ScanResponse.Builder builder = ScanResponse.newBuilder();
+        if (request.hasCloseScanner()) {
+          closeScanner = request.getCloseScanner();
+        }
+        int rows = 1;
+        if (request.hasNumberOfRows()) {
+          rows = request.getNumberOfRows();
+        }
+        if (request.hasScannerId()) {
+          scanner = scanners.get(scannerName);
+          if (scanner == null) {
+            throw new UnknownScannerException(
+              "Name: " + scannerName + ", already closed?");
+          }
+          region = getRegion(scanner.getRegionInfo().getRegionName());
+        } else {
+          region = getRegion(request.getRegion());
+          ClientProtos.Scan protoScan = request.getScan();
+          Scan scan = ProtobufUtil.toScan(protoScan);
+          region.prepareScanner(scan);
+          if (region.getCoprocessorHost() != null) {
+            scanner = region.getCoprocessorHost().preScannerOpen(scan);
+          }
+          if (scanner == null) {
+            scanner = region.getScanner(scan);
+          }
+          if (region.getCoprocessorHost() != null) {
+            scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
+          }
+          scannerId = addScanner(scanner);
+          scannerName = String.valueOf(scannerId);
+          ttl = leases.leasePeriod;
+        }
+
+        if (rows > 0) {
+          try {
+            // Remove lease while its being processed in server; protects against case
+            // where processing of request takes > lease expiration time.
+            lease = leases.removeLease(scannerName);
+            List<Result> results = new ArrayList<Result>(rows);
+            long currentScanResultSize = 0;
+
+            boolean done = false;
+            // Call coprocessor. Get region info from scanner.
+            if (region != null && region.getCoprocessorHost() != null) {
+              Boolean bypass = region.getCoprocessorHost().preScannerNext(
+                scanner, results, rows);
+              if (!results.isEmpty()) {
+                for (Result r : results) {
+                  for (KeyValue kv : r.raw()) {
+                    currentScanResultSize += kv.heapSize();
+                  }
+                }
+              }
+              if (bypass != null && bypass.booleanValue()) {
+                done = true;
+              }
+            }
+
+            if (!done) {
+              List<KeyValue> values = new ArrayList<KeyValue>();
+              for (int i = 0; i < rows
+                  && currentScanResultSize < maxScannerResultSize; i++) {
+                // Collect values to be returned here
+                boolean moreRows = scanner.next(values, HRegion.METRIC_NEXTSIZE);
+                if (!values.isEmpty()) {
+                  for (KeyValue kv : values) {
+                    currentScanResultSize += kv.heapSize();
+                  }
+                  results.add(new Result(values));
+                }
+                if (!moreRows) {
+                  break;
+                }
+                values.clear();
+              }
+
+              // coprocessor postNext hook
+              if (region != null && region.getCoprocessorHost() != null) {
+                region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
+              }
+            }
+
+            // If the scanner's filter - if any - is done with the scan
+            // and wants to tell the client to stop the scan. This is done by passing
+            // a null result, and setting moreResults to false.
+            if (scanner.isFilterDone() && results.isEmpty()) {
+              moreResults = false;
+              results = null;
+            } else {
+              for (Result result: results) {
+                if (result != null) {
+                  builder.addResult(ProtobufUtil.toResult(result));
+                }
+              }
+            }
+          } finally {
+            // We're done. On way out re-add the above removed lease.
+            // Adding resets expiration time on lease.
+            if (scanners.containsKey(scannerName)) {
+              if (lease != null) leases.addLease(lease);
+              ttl = leases.leasePeriod;
+            }
+          }
+        }
+
+        if (!moreResults || closeScanner) {
+          ttl = 0;
+          moreResults = false;
+          if (region != null && region.getCoprocessorHost() != null) {
+            if (region.getCoprocessorHost().preScannerClose(scanner)) {
+              return builder.build(); // bypass
+            }
+          }
+          scanner = scanners.remove(scannerName);
+          if (scanner != null) {
+            scanner.close();
+            leases.cancelLease(scannerName);
+            if (region != null && region.getCoprocessorHost() != null) {
+              region.getCoprocessorHost().postScannerClose(scanner);
+            }
+          }
+        }
+
+        if (ttl > 0) {
+          builder.setTtl(ttl);
+        }
+        builder.setScannerId(scannerId);
+        builder.setMoreResults(moreResults);
+        return builder.build();
+      } catch (Throwable t) {
+        if (scannerName != null &&
+            t instanceof NotServingRegionException) {
+          scanners.remove(scannerName);
+        }
+        throw convertThrowableToIOE(cleanup(t));
+      }
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Lock a row in a table.
+   *
+   * @param controller the RPC controller
+   * @param request the lock row request
+   * @throws ServiceException
+   */
+  @Override
+  public LockRowResponse lockRow(final RpcController controller,
+      final LockRowRequest request) throws ServiceException {
+    try {
+      if (request.getRowCount() != 1) {
+        throw new DoNotRetryIOException(
+          "lockRow supports only one row now, not " + request.getRowCount() + " rows");
+      }
+      requestCount.incrementAndGet();
+      HRegion region = getRegion(request.getRegion());
+      byte[] row = request.getRow(0).toByteArray();
+      try {
+        Integer r = region.obtainRowLock(row);
+        long lockId = addRowLock(r, region);
+        LOG.debug("Row lock " + lockId + " explicitly acquired by client");
+        LockRowResponse.Builder builder = LockRowResponse.newBuilder();
+        builder.setLockId(lockId);
+        return builder.build();
+      } catch (Throwable t) {
+        throw convertThrowableToIOE(cleanup(t,
+          "Error obtaining row lock (fsOk: " + this.fsOk + ")"));
+      }
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Unlock a locked row in a table.
+   *
+   * @param controller the RPC controller
+   * @param request the unlock row request
+   * @throws ServiceException
+   */
+  @Override
+  @QosPriority(priority=HIGH_QOS)
+  public UnlockRowResponse unlockRow(final RpcController controller,
+      final UnlockRowRequest request) throws ServiceException {
+    try {
+      requestCount.incrementAndGet();
+      HRegion region = getRegion(request.getRegion());
+      if (!request.hasLockId()) {
+        throw new DoNotRetryIOException(
+          "Invalid unlock rowrequest, missing lock id");
+      }
+      long lockId = request.getLockId();
+      String lockName = String.valueOf(lockId);
+      try {
+        Integer r = rowlocks.remove(lockName);
+        if (r == null) {
+          throw new UnknownRowLockException(lockName);
+        }
+        region.releaseRowLock(r);
+        this.leases.cancelLease(lockName);
+        LOG.debug("Row lock " + lockId
+          + " has been explicitly released by client");
+        return UnlockRowResponse.newBuilder().build();
+      } catch (Throwable t) {
+        throw convertThrowableToIOE(cleanup(t));
+      }
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Atomically bulk load several HFiles into an open region
+   * @return true if successful, false is failed but recoverably (no action)
+   * @throws IOException if failed unrecoverably
+   */
+  @Override
+  public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
+      final BulkLoadHFileRequest request) throws ServiceException {
+    try {
+      requestCount.incrementAndGet();
+      HRegion region = getRegion(request.getRegion());
+      List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
+      for (FamilyPath familyPath: request.getFamilyPathList()) {
+        familyPaths.add(new Pair<byte[], String>(
+          familyPath.getFamily().toByteArray(), familyPath.getPath()));
+      }
+      boolean loaded = region.bulkLoadHFiles(familyPaths);
+      BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
+      builder.setLoaded(loaded);
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Executes a single {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
+   * method using the registered protocol handlers.
+   * {@link CoprocessorProtocol} implementations must be registered per-region
+   * via the
+   * {@link org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)}
+   * method before they are available.
+   *
+   * @param regionName name of the region against which the invocation is executed
+   * @param call an {@code Exec} instance identifying the protocol, method name,
+   *     and parameters for the method invocation
+   * @return an {@code ExecResult} instance containing the region name of the
+   *     invocation and the return value
+   * @throws IOException if no registered protocol handler is found or an error
+   *     occurs during the invocation
+   * @see org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)
+   */
+  @Override
+  public ExecCoprocessorResponse execCoprocessor(final RpcController controller,
+      final ExecCoprocessorRequest request) throws ServiceException {
+    try {
+      requestCount.incrementAndGet();
+      HRegion region = getRegion(request.getRegion());
+      ExecCoprocessorResponse.Builder
+        builder = ExecCoprocessorResponse.newBuilder();
+      ClientProtos.Exec call = request.getCall();
+      Exec clientCall = ProtobufUtil.toExec(call);
+      ExecResult result = region.exec(clientCall);
+      builder.setValue(ProtobufUtil.toParameter(result.getValue()));
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+  /**
+   * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
+   *
+   * @param controller the RPC controller
+   * @param request the multi request
+   * @throws ServiceException
+   */
+  @Override
+  public MultiResponse multi(final RpcController controller,
+      final MultiRequest request) throws ServiceException {
+    try {
+      HRegion region = getRegion(request.getRegion());
+      MultiResponse.Builder builder = MultiResponse.newBuilder();
+      if (request.hasAtomic() && request.getAtomic()) {
+        List<Mutate> mutates = new ArrayList<Mutate>();
+        for (NameBytesPair parameter: request.getActionList()) {
+          Object action = ProtobufUtil.toObject(parameter);
+          if (action instanceof Mutate) {
+            mutates.add((Mutate)action);
+          } else {
+            throw new DoNotRetryIOException(
+              "Unsupported atomic atction type: "
+                + action.getClass().getName());
+          }
+        }
+        mutateRows(region, mutates);
+      } else {
+        ActionResult.Builder resultBuilder = null;
+        List<Mutate> puts = new ArrayList<Mutate>();
+        for (NameBytesPair parameter: request.getActionList()) {
+          requestCount.incrementAndGet();
+          try {
+            Object result = null;
+            Object action = ProtobufUtil.toObject(parameter);
+            if (action instanceof ClientProtos.Get) {
+              Get get = ProtobufUtil.toGet((ClientProtos.Get)action);
+              Integer lock = getLockFromId(get.getLockId());
+              Result r = region.get(get, lock);
+              if (r != null) {
+                result = ProtobufUtil.toResult(r);
+              }
+            } else if (action instanceof Mutate) {
+              Mutate mutate = (Mutate)action;
+              MutateType type = mutate.getMutateType();
+              if (type != MutateType.PUT) {
+                if (!puts.isEmpty()) {
+                  put(builder, region, puts);
+                  puts.clear();
+                } else if (!region.getRegionInfo().isMetaTable()) {
+                  cacheFlusher.reclaimMemStoreMemory();
+                }
+              }
+              Result r = null;
+              switch (type) {
+              case APPEND:
+                r = append(region, mutate);
+                break;
+              case INCREMENT:
+                r = increment(region, mutate);
+                break;
+              case PUT:
+                puts.add(mutate);
+                break;
+              case DELETE:
+                Delete delete = ProtobufUtil.toDelete(mutate);
+                Integer lock = getLockFromId(delete.getLockId());
+                region.delete(delete, lock, delete.getWriteToWAL());
+                r = new Result();
+                break;
+                default:
+                  throw new DoNotRetryIOException(
+                    "Unsupported mutate type: " + type.name());
+              }
+              if (r != null) {
+                result = ProtobufUtil.toResult(r);
+              }
+            } else if (action instanceof ClientProtos.Exec) {
+              Exec call = ProtobufUtil.toExec((ClientProtos.Exec)action);
+              result = region.exec(call).getValue();
+            } else {
+              LOG.debug("Error: invalid action, "
+                + "it must be a Get, Mutate, or Exec.");
+              throw new DoNotRetryIOException("Invalid action, "
+                + "it must be a Get, Mutate, or Exec.");
+            }
+            if (result != null) {
+              if (resultBuilder == null) {
+                resultBuilder = ActionResult.newBuilder();
+              } else {
+                resultBuilder.clear();
+              }
+              NameBytesPair value = ProtobufUtil.toParameter(result);
+              resultBuilder.setValue(value);
+              builder.addResult(resultBuilder.build());
+            }
+          } catch (IOException ie) {
+            builder.addResult(ResponseConverter.buildActionResult(ie));
+          }
+        }
+        if (!puts.isEmpty()) {
+          put(builder, region, puts);
+        }
+      }
+      return builder.build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
+// End Client methods
+
+  /**
+   * Find the HRegion based on a region specifier
+   *
+   * @param regionSpecifier the region specifier
+   * @return the corresponding region
+   * @throws IOException if the specifier is not null,
+   *    but failed to find the region
+   */
+  protected HRegion getRegion(
+      final RegionSpecifier regionSpecifier) throws IOException {
+    byte[] value = regionSpecifier.getValue().toByteArray();
+    RegionSpecifierType type = regionSpecifier.getType();
+    checkOpen();
+    switch (type) {
+    case REGION_NAME:
+      return getRegion(value);
+    case ENCODED_REGION_NAME:
+      String encodedRegionName = Bytes.toString(value);
+      HRegion region = this.onlineRegions.get(encodedRegionName);
+      if (region == null) {
+        throw new NotServingRegionException(
+          "Region is not online: " + encodedRegionName);
+      }
+      return region;
+      default:
+        throw new DoNotRetryIOException(
+          "Unsupported region specifier type: " + type);
+    }
+  }
+
+  /**
+   * Execute an append mutation.
+   *
+   * @param region
+   * @param mutate
+   * @return
+   * @throws IOException
+   */
+  protected Result append(final HRegion region,
+      final Mutate mutate) throws IOException {
+    Append append = ProtobufUtil.toAppend(mutate);
+    Result r = null;
+    if (region.getCoprocessorHost() != null) {
+      r = region.getCoprocessorHost().preAppend(append);
+    }
+    if (r == null) {
+      Integer lock = getLockFromId(append.getLockId());
+      r = region.append(append, lock, append.getWriteToWAL());
+      if (region.getCoprocessorHost() != null) {
+        region.getCoprocessorHost().postAppend(append, r);
+      }
+    }
+    return r;
+  }
+
+  /**
+   * Execute an increment mutation.
+   *
+   * @param region
+   * @param mutate
+   * @return
+   * @throws IOException
+   */
+  protected Result increment(final HRegion region,
+      final Mutate mutate) throws IOException {
+    Increment increment = ProtobufUtil.toIncrement(mutate);
+    Result r = null;
+    if (region.getCoprocessorHost() != null) {
+      r = region.getCoprocessorHost().preIncrement(increment);
+    }
+    if (r == null) {
+      Integer lock = getLockFromId(increment.getLockId());
+      r = region.increment(increment, lock, increment.getWriteToWAL());
+      if (region.getCoprocessorHost() != null) {
+        r = region.getCoprocessorHost().postIncrement(increment, r);
+      }
+    }
+    return r;
+  }
+
+  /**
+   * Execute a list of put mutations.
+   *
+   * @param builder
+   * @param region
+   * @param puts
+   */
+  protected void put(final MultiResponse.Builder builder,
+      final HRegion region, final List<Mutate> puts) {
+    @SuppressWarnings("unchecked")
+    Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
+
+    try {
+      ActionResult.Builder resultBuilder = ActionResult.newBuilder();
+      NameBytesPair value = ProtobufUtil.toParameter(new Result());
+      resultBuilder.setValue(value);
+      ActionResult result = resultBuilder.build();
+
+      int i = 0;
+      for (Mutate put : puts) {
+        Put p = ProtobufUtil.toPut(put);
+        Integer lock = getLockFromId(p.getLockId());
+        putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
+        builder.addResult(result);
+      }
+
+      requestCount.addAndGet(puts.size());
+      if (!region.getRegionInfo().isMetaTable()) {
+        cacheFlusher.reclaimMemStoreMemory();
+      }
+
+      OperationStatus codes[] = region.put(putsWithLocks);
+      for (i = 0; i < codes.length; i++) {
+        if (codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
+          result = ResponseConverter.buildActionResult(
+            new DoNotRetryIOException(codes[i].getExceptionMsg()));
+          builder.setResult(i, result);
+        }
+      }
+    } catch (IOException ie) {
+      ActionResult result = ResponseConverter.buildActionResult(ie);
+      for (int i = 0, n = puts.size(); i < n; i++) {
+        builder.setResult(i, result);
+      }
+    }
+  }
+
+  /**
+   * Mutate a list of rows atomically.
+   *
+   * @param region
+   * @param mutates
+   * @throws IOException
+   */
+  protected void mutateRows(final HRegion region,
+      final List<Mutate> mutates) throws IOException {
+    Mutate firstMutate = mutates.get(0);
+    if (!region.getRegionInfo().isMetaTable()) {
+      cacheFlusher.reclaimMemStoreMemory();
+    }
+    byte[] row = firstMutate.getRow().toByteArray();
+    RowMutations rm = new RowMutations(row);
+    for (Mutate mutate: mutates) {
+      MutateType type = mutate.getMutateType();
+      switch (mutate.getMutateType()) {
+      case PUT:
+        rm.add(ProtobufUtil.toPut(mutate));
+        break;
+      case DELETE:
+        rm.add(ProtobufUtil.toDelete(mutate));
+        break;
+        default:
+          throw new DoNotRetryIOException(
+            "mutate supports atomic put and/or delete, not "
+              + type.name());
+      }
+    }
+    region.mutateRow(rm);
+  }
+}

Added: hbase/trunk/src/main/protobuf/Admin.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/protobuf/Admin.proto?rev=1325937&view=auto
==============================================================================
--- hbase/trunk/src/main/protobuf/Admin.proto (added)
+++ hbase/trunk/src/main/protobuf/Admin.proto Fri Apr 13 20:28:21 2012
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file contains protocol buffers that are used for Admin service.
+
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "AdminProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "hbase.proto";
+
+message GetRegionInfoRequest {
+  required RegionSpecifier region = 1;
+}
+
+message GetRegionInfoResponse {
+  required RegionInfo regionInfo = 1;
+}
+
+/**
+ * Get a list of store files for a set of column families in a particular region.
+ * If no column family is specified, get the store files for all column families.
+ */
+message GetStoreFileListRequest {
+  required RegionSpecifier region = 1;
+  repeated bytes columnFamily = 2;
+}
+
+message GetStoreFileListResponse {
+  repeated string storeFile = 1;
+}
+
+message GetOnlineRegionRequest {
+}
+
+message GetOnlineRegionResponse {
+  repeated RegionInfo regionInfo = 1;
+}
+
+message OpenRegionRequest {
+  repeated RegionSpecifier region = 1;
+  optional uint32 versionOfOfflineNode = 2;
+}
+
+message OpenRegionResponse {
+  repeated RegionOpeningState openingState = 1;
+
+  enum RegionOpeningState {
+    OPENED = 0;
+    ALREADY_OPENED = 1;
+    FAILED_OPENING = 2;
+  }
+}
+
+/**
+ * Closes the specified region and will use or not use ZK during the close
+ * according to the specified flag.
+ */
+message CloseRegionRequest {
+  required RegionSpecifier region = 1;
+  optional uint32 versionOfClosingNode = 2;
+  optional bool transitionInZK = 3 [default = true];
+}
+
+message CloseRegionResponse {
+  required bool closed = 1;
+}
+
+/**
+ * Flushes the MemStore of the specified region.
+ * <p>
+ * This method is synchronous.
+ */
+message FlushRegionRequest {
+  required RegionSpecifier region = 1;
+  optional uint64 ifOlderThanTs = 2;
+}
+
+message FlushRegionResponse {
+  required uint64 lastFlushTime = 1;
+  optional bool flushed = 2;
+}
+
+/**
+ * Splits the specified region.
+ * <p>
+ * This method currently flushes the region and then forces a compaction which
+ * will then trigger a split.  The flush is done synchronously but the
+ * compaction is asynchronous.
+ */
+message SplitRegionRequest {
+  required RegionSpecifier region = 1;
+  optional bytes splitPoint = 2;
+}
+
+message SplitRegionResponse {
+}
+
+/**
+ * Compacts the specified region.  Performs a major compaction if specified.
+ * <p>
+ * This method is asynchronous.
+ */
+message CompactRegionRequest {
+  required RegionSpecifier region = 1;
+  optional bool major = 2;
+}
+
+message CompactRegionResponse {
+}
+
+message UUID {
+  required uint64 leastSigBits = 1;
+  required uint64 mostSigBits = 2;
+}
+
+// Protocol buffer version of HLog
+message WALEntry {
+  required WALKey walKey = 1;
+  required WALEdit edit = 2;
+
+  // Protocol buffer version of HLogKey
+  message WALKey {
+    required bytes encodedRegionName = 1;
+    required bytes tableName = 2;
+    required uint64 logSequenceNumber = 3;
+    required uint64 writeTime = 4;
+    optional UUID clusterId = 5;
+  }
+
+  message WALEdit {
+    repeated bytes keyValue = 1;
+    repeated FamilyScope familyScope = 2;
+
+    enum ScopeType {
+      REPLICATION_SCOPE_LOCAL = 0;
+      REPLICATION_SCOPE_GLOBAL = 1;
+    }
+
+    message FamilyScope {
+      required bytes family = 1;
+      required ScopeType scopeType = 2;
+    }
+  }
+}
+
+/**
+ * Replicates the given entries. The guarantee is that the given entries
+ * will be durable on the slave cluster if this method returns without
+ * any exception.
+ * hbase.replication has to be set to true for this to work.
+ */
+message ReplicateWALEntryRequest {
+  repeated WALEntry walEntry = 1;
+}
+
+message ReplicateWALEntryResponse {
+}
+
+// Replacement for rollHLogWriter in HRegionInterface 
+message RollWALWriterRequest {
+}
+
+message RollWALWriterResponse {
+  // A list of encoded name of regions to flush
+  repeated bytes regionToFlush = 1;
+}
+
+message StopServerRequest {
+  required string reason = 1;
+}
+
+message StopServerResponse {
+}
+
+message GetServerInfoRequest {
+}
+
+message GetServerInfoResponse {
+  required ServerName serverName = 1;
+}
+
+service AdminService {
+  rpc getRegionInfo(GetRegionInfoRequest)
+    returns(GetRegionInfoResponse);
+
+  rpc getStoreFileList(GetStoreFileListRequest)
+    returns(GetStoreFileListResponse);
+
+  rpc getOnlineRegion(GetOnlineRegionRequest)
+    returns(GetOnlineRegionResponse);
+
+  rpc openRegion(OpenRegionRequest)
+    returns(OpenRegionResponse);
+
+  rpc closeRegion(CloseRegionRequest)
+    returns(CloseRegionResponse);
+
+  rpc flushRegion(FlushRegionRequest)
+    returns(FlushRegionResponse);
+
+  rpc splitRegion(SplitRegionRequest)
+    returns(SplitRegionResponse);
+
+  rpc compactRegion(CompactRegionRequest)
+    returns(CompactRegionResponse);
+
+  rpc replicateWALEntry(ReplicateWALEntryRequest)
+    returns(ReplicateWALEntryResponse);
+
+  rpc rollWALWriter(RollWALWriterRequest)
+    returns(RollWALWriterResponse);
+
+  rpc getServerInfo(GetServerInfoRequest)
+    returns(GetServerInfoResponse);
+
+  rpc stopServer(StopServerRequest)
+    returns(StopServerResponse);
+}

Added: hbase/trunk/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/protobuf/Client.proto?rev=1325937&view=auto
==============================================================================
--- hbase/trunk/src/main/protobuf/Client.proto (added)
+++ hbase/trunk/src/main/protobuf/Client.proto Fri Apr 13 20:28:21 2012
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file contains protocol buffers that are used for Client service.
+
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "ClientProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "hbase.proto";
+
+/**
+ * Container for a list of column qualifier names of a family.
+ */
+message Column {
+  required bytes family = 1;
+  repeated bytes qualifier = 2;
+}
+
+/**
+ * The protocol buffer version of Get
+ */
+message Get {
+  required bytes row = 1;
+  repeated Column column = 2;
+  repeated NameBytesPair attribute = 3;
+  optional uint64 lockId = 4;
+  optional NameBytesPair filter = 5;
+  optional TimeRange timeRange = 6;
+  optional uint32 maxVersions = 7 [default = 1];
+  optional bool cacheBlocks = 8 [default = true];
+}
+
+/**
+ * For performance reason, we don't use KeyValue
+ * here. We use the actual KeyValue bytes.
+ */
+message Result {
+  repeated bytes keyValueBytes = 1;
+}
+
+/**
+ * The get request. Perform a single Get operation.
+ * Unless existenceOnly is specified, return all the requested data
+ * for the row that matches exactly, or the one that immediately
+ * precedes it if closestRowBefore is specified.
+ *
+ * If existenceOnly is set, only the existence will be returned.
+ */
+message GetRequest {
+  required RegionSpecifier region = 1;
+  required Get get = 2;
+
+  // If the row to get doesn't exist, return the
+  // closest row before.
+  optional bool closestRowBefore = 3;
+
+  // The result isn't asked for, just check for
+  // the existence. If specified, closestRowBefore
+  // will be ignored
+  optional bool existenceOnly = 4;
+}
+
+message GetResponse {
+  optional Result result = 1;
+
+  // used for Get to check existence only
+  optional bool exists = 2;
+}
+
+/**
+ * Condition to check if the value of a given cell (row,
+ * family, qualifier) matches a value via a given comparator.
+ *
+ * Condition is used in check and mutate operations.
+ */
+message Condition {
+  required bytes row = 1;
+  required bytes family = 2;
+  required bytes qualifier = 3;
+  required CompareType compareType = 4;
+  required NameBytesPair comparator = 5;
+
+  enum CompareType {
+    LESS = 0;
+    LESS_OR_EQUAL = 1;
+    EQUAL = 2;
+    NOT_EQUAL = 3;
+    GREATER_OR_EQUAL = 4;
+    GREATER = 5;
+    NO_OP = 6;
+  }
+}
+
+/**
+ * A specific mutate inside a mutate request.
+ * It can be an append, increment, put or delete based
+ * on the mutate type.
+ */
+message Mutate {
+  required bytes row = 1;
+  required MutateType mutateType = 2;
+  repeated ColumnValue columnValue = 3;
+  repeated NameBytesPair attribute = 4;
+  optional uint64 timestamp = 5;
+  optional uint64 lockId = 6;
+  optional bool writeToWAL = 7 [default = true];
+
+  // For some mutate, result may be returned, in which case,
+  // time range can be specified for potential performance gain
+  optional TimeRange timeRange = 10;
+
+  enum MutateType {
+    APPEND = 0;
+    INCREMENT = 1;
+    PUT = 2;
+    DELETE = 3;
+  }
+
+  enum DeleteType {
+    DELETE_ONE_VERSION = 0;
+    DELETE_MULTIPLE_VERSIONS = 1;
+    DELETE_FAMILY = 2;
+  }
+
+  message ColumnValue {
+    required bytes family = 1;
+    repeated QualifierValue qualifierValue = 2;
+
+    message QualifierValue {
+      optional bytes qualifier = 1;
+      optional bytes value = 2;
+      optional uint64 timestamp = 3;
+      optional DeleteType deleteType = 4;
+    }
+  }
+}
+
+/**
+ * The mutate request. Perform a single Mutate operation.
+ *
+ * Optionally, you can specify a condition. The mutate
+ * will take place only if the condition is met.  Otherwise,
+ * the mutate will be ignored.  In the response result,
+ * parameter processed is used to indicate if the mutate
+ * actually happened.
+ */
+message MutateRequest {
+  required RegionSpecifier region = 1;
+  required Mutate mutate = 2;
+  optional Condition condition = 3;
+}
+
+message MutateResponse {
+  optional Result result = 1;
+
+  // used for mutate to indicate processed only
+  optional bool processed = 2;
+}
+
+/**
+ * Instead of get from a table, you can scan it with optional filters.
+ * You can specify the row key range, time range, the columns/families
+ * to scan and so on.
+ *
+ * This scan is used the first time in a scan request. The response of
+ * the initial scan will return a scanner id, which should be used to
+ * fetch result batches later on before it is closed.
+ */
+message Scan {
+  repeated Column column = 1;
+  repeated NameBytesPair attribute = 2;
+  optional bytes startRow = 3;
+  optional bytes stopRow = 4;
+  optional NameBytesPair filter = 5;
+  optional TimeRange timeRange = 6;
+  optional uint32 maxVersions = 7 [default = 1];
+  optional bool cacheBlocks = 8 [default = true];
+  optional uint32 batchSize = 9;
+}
+
+/**
+ * A scan request. Initially, it should specify a scan. Later on, you
+ * can use the scanner id returned to fetch result batches with a different
+ * scan request.
+ *
+ * The scanner will remain open if there are more results, and it's not
+ * asked to be closed explicitly.
+ *
+ * You can fetch the results and ask the scanner to be closed to save
+ * a trip if you are not interested in remaining results.
+ */
+message ScanRequest {
+  optional RegionSpecifier region = 1;
+  optional Scan scan = 2;
+  optional uint64 scannerId = 3;
+  optional uint32 numberOfRows = 4;
+  optional bool closeScanner = 5;
+}
+
+/**
+ * The scan response. If there are no more results, moreResults will
+ * be false.  If it is not specified, it means there are more.
+ */
+message ScanResponse {
+  repeated Result result = 1;
+  optional uint64 scannerId = 2;
+  optional bool moreResults = 3;
+  optional uint32 ttl = 4;
+}
+
+message LockRowRequest {
+  required RegionSpecifier region = 1;
+  repeated bytes row = 2;
+}
+
+message LockRowResponse {
+  required uint64 lockId = 1;
+  optional uint32 ttl = 2;
+}
+
+message UnlockRowRequest {
+  required RegionSpecifier region = 1;
+  required uint64 lockId = 2;
+}
+
+message UnlockRowResponse {
+}
+
+/**
+ * Atomically bulk load multiple HFiles (say from different column families)
+ * into an open region.
+ */
+message BulkLoadHFileRequest {
+  required RegionSpecifier region = 1;
+  repeated FamilyPath familyPath = 2;
+
+  message FamilyPath {
+    required bytes family = 1;
+    required string path = 2;
+  }
+}
+
+message BulkLoadHFileResponse {
+  required bool loaded = 1;
+}
+
+/**
+ * An individual coprocessor call. You must specify the protocol,
+ * the method, and the row to which the call will be executed.
+ *
+ * You can specify the configuration settings in the property list.
+ *
+ * The parameter list has the parameters used for the method.
+ * A parameter is a pair of parameter name and the binary parameter
+ * value. The name is the parameter class name.  The value is the
+ * binary format of the parameter, for example, protocol buffer
+ * encoded value.
+ */
+message Exec {
+  required bytes row = 1;
+  required string protocolName = 2;
+  required string methodName = 3;
+  repeated NameStringPair property = 4;
+  repeated NameBytesPair parameter = 5;
+}
+
+  /**
+   * Executes a single {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
+   * method using the registered protocol handlers.
+   * {@link CoprocessorProtocol} implementations must be registered via the
+   * {@link org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(
+   * Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)}
+   * method before they are available.
+ */
+message ExecCoprocessorRequest {
+  required RegionSpecifier region = 1;
+  required Exec call = 2;
+}
+
+message ExecCoprocessorResponse {
+  required NameBytesPair value = 1;
+}
+
+/**
+ * An individual action result. The result will in the
+ * same order as the action in the request. If an action
+ * returns a value, it is set in value field. If it doesn't
+ * return anything, the result will be empty. If an action
+ * fails to execute due to any exception, the exception
+ * is returned as a stringified parameter.
+ */
+message ActionResult {
+  optional NameBytesPair value = 1;
+  optional NameBytesPair exception = 2;
+}
+
+/**
+ * You can execute a list of actions on a given region in order.
+ *
+ * If it is a list of mutate actions, atomic can be set
+ * to make sure they can be processed atomically, just like
+ * RowMutations.
+ */
+message MultiRequest {
+  required RegionSpecifier region = 1;
+  repeated NameBytesPair action = 2;
+  optional bool atomic = 3;
+}
+
+message MultiResponse {
+  repeated ActionResult result = 1;
+}
+
+service ClientService {
+  rpc get(GetRequest)
+    returns(GetResponse);
+
+  rpc mutate(MutateRequest)
+    returns(MutateResponse);
+
+  rpc scan(ScanRequest)
+    returns(ScanResponse);
+
+  rpc lockRow(LockRowRequest)
+    returns(LockRowResponse);
+
+  rpc unlockRow(UnlockRowRequest)
+    returns(UnlockRowResponse);
+
+  rpc bulkLoadHFile(BulkLoadHFileRequest)
+    returns(BulkLoadHFileResponse);
+
+  rpc execCoprocessor(ExecCoprocessorRequest)
+    returns(ExecCoprocessorResponse);
+
+  rpc multi(MultiRequest)
+    returns(MultiResponse);
+}

Modified: hbase/trunk/src/main/protobuf/hbase.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/protobuf/hbase.proto?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/protobuf/hbase.proto (original)
+++ hbase/trunk/src/main/protobuf/hbase.proto Fri Apr 13 20:28:21 2012
@@ -101,3 +101,16 @@ message ServerName {
   optional uint32 port = 2;
   optional uint64 startCode = 3;
 }
+
+// Comment data structures
+
+message NameStringPair {
+  required string name = 1;
+  required string value = 2;
+}
+
+message NameBytesPair {
+  required string name = 1;
+  optional bytes value = 2;
+}
+

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java Fri Apr 13 20:28:21 2012
@@ -41,6 +41,9 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.client.ServerCallable;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.Writables;
@@ -58,6 +61,9 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
 /**
  * Test {@link CatalogTracker}
  */
@@ -131,13 +137,16 @@ public class TestCatalogTracker {
   /**
    * Test interruptable while blocking wait on root and meta.
    * @throws IOException
+   * @throws ServiceException
    * @throws InterruptedException
    */
   @Test public void testInterruptWaitOnMetaAndRoot()
-  throws IOException, InterruptedException {
-    HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
-    HConnection connection = mockConnection(implementation);
+  throws IOException, InterruptedException, ServiceException {
+    final ClientProtocol client = Mockito.mock(ClientProtocol.class);
+    HConnection connection = mockConnection(null, client);
     try {
+      Mockito.when(client.get((RpcController)Mockito.any(), (GetRequest)Mockito.any())).
+      thenReturn(GetResponse.newBuilder().build());
       final CatalogTracker ct = constructAndStartCatalogTracker(connection);
       ServerName hsa = ct.getRootLocation();
       Assert.assertNull(hsa);
@@ -176,10 +185,11 @@ public class TestCatalogTracker {
    */
   @Test
   public void testServerNotRunningIOException()
-  throws IOException, InterruptedException, KeeperException {
+  throws IOException, InterruptedException, KeeperException, ServiceException {
     // Mock an HRegionInterface.
     final HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
-    HConnection connection = mockConnection(implementation);
+    final ClientProtocol client = Mockito.mock(ClientProtocol.class);
+    HConnection connection = mockConnection(implementation, client);
     try {
       // If a 'getRegionInfo' is called on mocked HRegionInterface, throw IOE
       // the first time.  'Succeed' the second time we are called.
@@ -198,6 +208,8 @@ public class TestCatalogTracker {
       Mockito.when(connection.getRegionServerWithRetries((ServerCallable<Result>)Mockito.any())).
         thenReturn(getMetaTableRowResult());
 
+      Mockito.when(client.get((RpcController)Mockito.any(), (GetRequest)Mockito.any())).
+        thenReturn(GetResponse.newBuilder().build());
       // Now start up the catalogtracker with our doctored Connection.
       final CatalogTracker ct = constructAndStartCatalogTracker(connection);
       try {
@@ -245,17 +257,18 @@ public class TestCatalogTracker {
    * @throws IOException
    * @throws InterruptedException
    * @throws KeeperException
+   * @throws ServiceException
    */
   @Test
   public void testGetMetaServerConnectionFails()
-  throws IOException, InterruptedException, KeeperException {
-    // Mock an HRegionInterface.
-    final HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
-    HConnection connection = mockConnection(implementation);
+  throws IOException, InterruptedException, KeeperException, ServiceException {
+    // Mock an ClientProtocol.
+    final ClientProtocol implementation = Mockito.mock(ClientProtocol.class);
+    HConnection connection = mockConnection(null, implementation);
     try {
       // If a 'get' is called on mocked interface, throw connection refused.
-      Mockito.when(implementation.get((byte[]) Mockito.any(), (Get) Mockito.any())).
-        thenThrow(new ConnectException("Connection refused"));
+      Mockito.when(implementation.get((RpcController) Mockito.any(), (GetRequest) Mockito.any())).
+        thenThrow(new ServiceException(new ConnectException("Connection refused")));
       // Now start up the catalogtracker with our doctored Connection.
       final CatalogTracker ct = constructAndStartCatalogTracker(connection);
       try {
@@ -371,7 +384,7 @@ public class TestCatalogTracker {
     // to make our test work.
     // Mock an HRegionInterface.
     final HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
-    HConnection connection = mockConnection(implementation);
+    HConnection connection = mockConnection(implementation, null);
     try {
       // Now the ct is up... set into the mocks some answers that make it look
       // like things have been getting assigned. Make it so we'll return a
@@ -419,6 +432,7 @@ public class TestCatalogTracker {
   /**
    * @param implementation An {@link HRegionInterface} instance; you'll likely
    * want to pass a mocked HRS; can be null.
+   * @param client A mocked ClientProtocol instance, can be null
    * @return Mock up a connection that returns a {@link Configuration} when
    * {@link HConnection#getConfiguration()} is called, a 'location' when
    * {@link HConnection#getRegionLocation(byte[], byte[], boolean)} is called,
@@ -429,7 +443,8 @@ public class TestCatalogTracker {
    * when done with this mocked Connection.
    * @throws IOException
    */
-  private HConnection mockConnection(final HRegionInterface implementation)
+  private HConnection mockConnection(
+      final HRegionInterface implementation, final ClientProtocol client)
   throws IOException {
     HConnection connection =
       HConnectionTestingUtility.getMockedConnection(UTIL.getConfiguration());
@@ -449,6 +464,11 @@ public class TestCatalogTracker {
       Mockito.when(connection.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())).
         thenReturn(implementation);
     }
+    if (client != null) {
+      // If a call to getClient, return this implementation.
+      Mockito.when(connection.getClient(Mockito.anyString(), Mockito.anyInt())).
+        thenReturn(client);
+    }
     return connection;
   }
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java Fri Apr 13 20:28:21 2012
@@ -31,8 +31,10 @@ import org.apache.hadoop.hbase.client.HC
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -42,6 +44,9 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
 /**
  * Test MetaReader/Editor but without spinning up a cluster.
  * We mock regionserver back and forth (we do spin up a zk cluster).
@@ -82,7 +87,8 @@ public class TestMetaReaderEditorNoClust
    * @throws InterruptedException 
    */
   @Test
-  public void testRideOverServerNotRunning() throws IOException, InterruptedException {
+  public void testRideOverServerNotRunning()
+      throws IOException, InterruptedException, ServiceException {
     // Need a zk watcher.
     ZooKeeperWatcher zkw = new ZooKeeperWatcher(UTIL.getConfiguration(),
       this.getClass().getSimpleName(), ABORTABLE, true);
@@ -92,27 +98,16 @@ public class TestMetaReaderEditorNoClust
     HConnection connection = null;
     CatalogTracker ct = null;
     try {
-      // Mock an HRegionInterface. Our mock implementation will fail a few
+      // Mock an ClientProtocol. Our mock implementation will fail a few
       // times when we go to open a scanner.
-      final HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
-      // When openScanner called throw IOE 'Server not running' a few times
+      final ClientProtocol implementation = Mockito.mock(ClientProtocol.class);
+      // When scan called throw IOE 'Server not running' a few times
       // before we return a scanner id.  Whats WEIRD is that these
       // exceptions do not show in the log because they are caught and only
       // printed if we FAIL.  We eventually succeed after retry so these don't
       // show.  We will know if they happened or not because we will ask
-      // mockito at the end of this test to verify that openscanner was indeed
+      // mockito at the end of this test to verify that scan was indeed
       // called the wanted number of times.
-      final long scannerid = 123L;
-      Mockito.when(implementation.openScanner((byte [])Mockito.any(),
-          (Scan)Mockito.any())).
-        thenThrow(new IOException("Server not running (1 of 3)")).
-        thenThrow(new IOException("Server not running (2 of 3)")).
-        thenThrow(new IOException("Server not running (3 of 3)")).
-        thenReturn(scannerid);
-      // Make it so a verifiable answer comes back when next is called.  Return
-      // the verifiable answer and then a null so we stop scanning.  Our
-      // verifiable answer is something that looks like a row in META with
-      // a server and startcode that is that of the above defined servername.
       List<KeyValue> kvs = new ArrayList<KeyValue>();
       final byte [] rowToVerify = Bytes.toBytes("rowToVerify");
       kvs.add(new KeyValue(rowToVerify,
@@ -124,10 +119,19 @@ public class TestMetaReaderEditorNoClust
       kvs.add(new KeyValue(rowToVerify,
         HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
         Bytes.toBytes(sn.getStartcode())));
-      final Result [] result = new Result [] {new Result(kvs)};
-      Mockito.when(implementation.next(Mockito.anyLong(), Mockito.anyInt())).
-        thenReturn(result).
-        thenReturn(null);
+      final Result [] results = new Result [] {new Result(kvs)};
+      ScanResponse.Builder builder = ScanResponse.newBuilder();
+      for (Result result: results) {
+        builder.addResult(ProtobufUtil.toResult(result));
+      }
+      Mockito.when(implementation.scan(
+        (RpcController)Mockito.any(), (ScanRequest)Mockito.any())).
+          thenThrow(new ServiceException("Server not running (1 of 3)")).
+          thenThrow(new ServiceException("Server not running (2 of 3)")).
+          thenThrow(new ServiceException("Server not running (3 of 3)")).
+          thenReturn(ScanResponse.newBuilder().setScannerId(1234567890L).build())
+            .thenReturn(builder.build()).thenReturn(
+              ScanResponse.newBuilder().setMoreResults(false).build());
 
       // Associate a spied-upon HConnection with UTIL.getConfiguration.  Need
       // to shove this in here first so it gets picked up all over; e.g. by
@@ -150,7 +154,7 @@ public class TestMetaReaderEditorNoClust
 
       // Now shove our HRI implementation into the spied-upon connection.
       Mockito.doReturn(implementation).
-        when(connection).getHRegionConnection(Mockito.anyString(), Mockito.anyInt());
+        when(connection).getClient(Mockito.anyString(), Mockito.anyInt());
 
       // Now start up the catalogtracker with our doctored Connection.
       ct = new CatalogTracker(zkw, null, connection, ABORTABLE, 0);
@@ -160,10 +164,10 @@ public class TestMetaReaderEditorNoClust
       assertTrue(hris.size() == 1);
       assertTrue(hris.firstEntry().getKey().equals(HRegionInfo.FIRST_META_REGIONINFO));
       assertTrue(Bytes.equals(rowToVerify, hris.firstEntry().getValue().getRow()));
-      // Finally verify that openscanner was called four times -- three times
-      // with exception and then on 4th attempt we succeed.
-      Mockito.verify(implementation, Mockito.times(4)).
-        openScanner((byte [])Mockito.any(), (Scan)Mockito.any());
+      // Finally verify that scan was called four times -- three times
+      // with exception and then on 4th, 5th and 6th attempt we succeed
+      Mockito.verify(implementation, Mockito.times(6)).
+        scan((RpcController)Mockito.any(), (ScanRequest)Mockito.any());
     } finally {
       if (ct != null) ct.stop();
       HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java Fri Apr 13 20:28:21 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.ZooKeeper
 import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
 import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
 import org.mockito.Mockito;
 
 /**
@@ -92,7 +93,8 @@ public class HConnectionTestingUtility {
    * @throws IOException
    */
   public static HConnection getMockedConnectionAndDecorate(final Configuration conf,
-      final HRegionInterface implementation, final ServerName sn, final HRegionInfo hri)
+      final HRegionInterface implementation, final ClientProtocol client,
+      final ServerName sn, final HRegionInfo hri)
   throws IOException {
     HConnection c = HConnectionTestingUtility.getMockedConnection(conf);
     Mockito.doNothing().when(c).close();
@@ -108,6 +110,11 @@ public class HConnectionTestingUtility {
       Mockito.when(c.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())).
         thenReturn(implementation);
     }
+    if (client != null) {
+      // If a call to getClient, return this client.
+      Mockito.when(c.getClient(Mockito.anyString(), Mockito.anyInt())).
+        thenReturn(client);
+    }
     return c;
   }
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java Fri Apr 13 20:28:21 2012
@@ -99,6 +99,7 @@ import org.junit.experimental.categories
 
 import com.google.common.collect.Lists;
 import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
 
 @Category(SmallTests.class)
 public class TestHbaseObjectWritable extends TestCase {
@@ -523,6 +524,7 @@ public class TestHbaseObjectWritable ext
     assertEquals(80,HbaseObjectWritable.getClassCode(Message.class).intValue());
 
     assertEquals(81,HbaseObjectWritable.getClassCode(Array.class).intValue());
+    assertEquals(82,HbaseObjectWritable.getClassCode(RpcController.class).intValue());
   }
 
   /**
@@ -531,7 +533,7 @@ public class TestHbaseObjectWritable ext
    * note on the test above. 
    */
   public void testGetNextObjectCode(){
-    assertEquals(82,HbaseObjectWritable.getNextClassCode());
+    assertEquals(83,HbaseObjectWritable.getNextClassCode());
   }
 
   @org.junit.Rule

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java Fri Apr 13 20:28:21 2012
@@ -50,6 +50,23 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.ipc.ProtocolSignature;
 import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowResponse;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -64,6 +81,9 @@ import org.apache.hadoop.hbase.util.Pair
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
 /**
  * A mock RegionServer implementation.
  * Use this when you can't bend Mockito to your liking (e.g. return null result
@@ -72,7 +92,7 @@ import org.apache.zookeeper.KeeperExcept
  * {@link #setGetResult(byte[], byte[], Result)} for how to fill the backing data
  * store that the get pulls from.
  */
-class MockRegionServer implements HRegionInterface, RegionServerServices {
+class MockRegionServer implements HRegionInterface, ClientProtocol, RegionServerServices {
   private final ServerName sn;
   private final ZooKeeperWatcher zkw;
   private final Configuration conf;
@@ -245,9 +265,8 @@ class MockRegionServer implements HRegio
 
   @Override
   public Result get(byte[] regionName, Get get) throws IOException {
-    Map<byte [], Result> m = this.gets.get(regionName);
-    if (m == null) return null;
-    return m.get(get.getRow());
+    // TODO Auto-generated method stub
+    return null;
   }
 
   @Override
@@ -597,4 +616,87 @@ class MockRegionServer implements HRegio
   public void mutateRow(byte[] regionName, RowMutations rm) throws IOException {
     // TODO Auto-generated method stub
   }
+
+  @Override
+  public GetResponse get(RpcController controller, GetRequest request)
+      throws ServiceException {
+    byte[] regionName = request.getRegion().getValue().toByteArray();
+    Map<byte [], Result> m = this.gets.get(regionName);
+    GetResponse.Builder builder = GetResponse.newBuilder();
+    if (m != null) {
+      byte[] row = request.getGet().getRow().toByteArray();
+      builder.setResult(ProtobufUtil.toResult(m.get(row)));
+    }
+    return builder.build();
+  }
+
+  @Override
+  public MutateResponse mutate(RpcController controller, MutateRequest request)
+      throws ServiceException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public ScanResponse scan(RpcController controller, ScanRequest request)
+      throws ServiceException {
+    ScanResponse.Builder builder = ScanResponse.newBuilder();
+    try {
+      if (request.hasScan()) {
+        byte[] regionName = request.getRegion().getValue().toByteArray();
+        builder.setScannerId(openScanner(regionName, null));
+        builder.setMoreResults(true);
+      }
+      else {
+        long scannerId = request.getScannerId();
+        Result result = next(scannerId);
+        if (result != null) {
+          builder.addResult(ProtobufUtil.toResult(result));
+          builder.setMoreResults(true);
+        }
+        else {
+          builder.setMoreResults(false);
+          close(scannerId);
+        }
+      }
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+    return builder.build();
+  }
+
+  @Override
+  public LockRowResponse lockRow(RpcController controller,
+      LockRowRequest request) throws ServiceException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public UnlockRowResponse unlockRow(RpcController controller,
+      UnlockRowRequest request) throws ServiceException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public BulkLoadHFileResponse bulkLoadHFile(RpcController controller,
+      BulkLoadHFileRequest request) throws ServiceException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public ExecCoprocessorResponse execCoprocessor(RpcController controller,
+      ExecCoprocessorRequest request) throws ServiceException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse multi(
+      RpcController controller, MultiRequest request) throws ServiceException {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }
\ No newline at end of file