You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/04/13 22:28:22 UTC
svn commit: r1325937 [6/7] - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/catalog/
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/filter/
main/java/org/apache/hadoop/hbase/io/
main/java/org/apache/hadoop/hbase/ipc...
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java?rev=1325937&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java Fri Apr 13 20:28:21 2012
@@ -0,0 +1,1164 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.UnknownRowLockException;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Exec;
+import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.regionserver.HRegionServer.QosPriority;
+import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * RegionServer makes a set of HRegions available to clients. It checks in with
+ * the HMaster. There are many RegionServers in a single HBase deployment.
+ *
+ * This will be a replacement for the HRegionServer. It has protobuf protocols
+ * implementations. All the HRegionInterface implementations stay in HRegionServer
+ * for possible backward compatibility requests. This also makes it easier to
+ * rip of HRegionInterface later on.
+ */
+@InterfaceAudience.Private
+public abstract class RegionServer implements
+ ClientProtocol, Runnable, RegionServerServices {
+
+ private static final Log LOG = LogFactory.getLog(RegionServer.class);
+
+ private final Random rand = new Random();
+
+ protected long maxScannerResultSize;
+
+ // Cache flushing
+ protected MemStoreFlusher cacheFlusher;
+
+ final Map<String, RegionScanner> scanners =
+ new ConcurrentHashMap<String, RegionScanner>();
+
+ /**
+ * Map of regions currently being served by this region server. Key is the
+ * encoded region name. All access should be synchronized.
+ */
+ protected final Map<String, HRegion> onlineRegions =
+ new ConcurrentHashMap<String, HRegion>();
+
+ // Leases
+ protected Leases leases;
+
+ // Request counter.
+ // Do we need this? Can't we just sum region counters? St.Ack 20110412
+ protected AtomicInteger requestCount = new AtomicInteger();
+
+ // If false, the file system has become unavailable
+ protected volatile boolean fsOk;
+ protected HFileSystem fs;
+
+ protected static final int NORMAL_QOS = 0;
+ protected static final int QOS_THRESHOLD = 10; // the line between low and high qos
+ protected static final int HIGH_QOS = 100;
+
+ // Set when a report to the master comes back with a message asking us to
+ // shutdown. Also set by call to stop when debugging or running unit tests
+ // of HRegionServer in isolation.
+ protected volatile boolean stopped = false;
+
+ // Go down hard. Used if file system becomes unavailable and also in
+ // debugging and unit tests.
+ protected volatile boolean abortRequested;
+
+ Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
+
+ /**
+ * Instantiated as a row lock lease. If the lease times out, the row lock is
+ * released
+ */
+ private class RowLockListener implements LeaseListener {
+ private final String lockName;
+ private final HRegion region;
+
+ RowLockListener(final String lockName, final HRegion region) {
+ this.lockName = lockName;
+ this.region = region;
+ }
+
+ public void leaseExpired() {
+ LOG.info("Row Lock " + this.lockName + " lease expired");
+ Integer r = rowlocks.remove(this.lockName);
+ if (r != null) {
+ region.releaseRowLock(r);
+ }
+ }
+ }
+
+ /**
+ * Instantiated as a scanner lease. If the lease times out, the scanner is
+ * closed
+ */
+ private class ScannerListener implements LeaseListener {
+ private final String scannerName;
+
+ ScannerListener(final String n) {
+ this.scannerName = n;
+ }
+
+ public void leaseExpired() {
+ RegionScanner s = scanners.remove(this.scannerName);
+ if (s != null) {
+ LOG.info("Scanner " + this.scannerName + " lease expired on region "
+ + s.getRegionInfo().getRegionNameAsString());
+ try {
+ HRegion region = getRegion(s.getRegionInfo().getRegionName());
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().preScannerClose(s);
+ }
+
+ s.close();
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerClose(s);
+ }
+ } catch (IOException e) {
+ LOG.error("Closing scanner for "
+ + s.getRegionInfo().getRegionNameAsString(), e);
+ }
+ } else {
+ LOG.info("Scanner " + this.scannerName + " lease expired");
+ }
+ }
+ }
+
+ /**
+ * Method to get the Integer lock identifier used internally from the long
+ * lock identifier used by the client.
+ *
+ * @param lockId
+ * long row lock identifier from client
+ * @return intId Integer row lock used internally in HRegion
+ * @throws IOException
+ * Thrown if this is not a valid client lock id.
+ */
+ Integer getLockFromId(long lockId) throws IOException {
+ if (lockId == -1L) {
+ return null;
+ }
+ String lockName = String.valueOf(lockId);
+ Integer rl = rowlocks.get(lockName);
+ if (rl == null) {
+ throw new UnknownRowLockException("Invalid row lock");
+ }
+ this.leases.renewLease(lockName);
+ return rl;
+ }
+
+ /**
+ * Called to verify that this server is up and running.
+ *
+ * @throws IOException
+ */
+ protected void checkOpen() throws IOException {
+ if (this.stopped || this.abortRequested) {
+ throw new RegionServerStoppedException("Server " + getServerName() +
+ " not running" + (this.abortRequested ? ", aborting" : ""));
+ }
+ if (!fsOk) {
+ throw new RegionServerStoppedException("File system not available");
+ }
+ }
+
+ /**
+ * @param regionName
+ * @return HRegion for the passed binary <code>regionName</code> or null if
+ * named region is not member of the online regions.
+ */
+ public HRegion getOnlineRegion(final byte[] regionName) {
+ String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
+ return this.onlineRegions.get(encodedRegionName);
+ }
+
+ /**
+ * Protected utility method for safely obtaining an HRegion handle.
+ *
+ * @param regionName
+ * Name of online {@link HRegion} to return
+ * @return {@link HRegion} for <code>regionName</code>
+ * @throws NotServingRegionException
+ */
+ protected HRegion getRegion(final byte[] regionName)
+ throws NotServingRegionException {
+ HRegion region = null;
+ region = getOnlineRegion(regionName);
+ if (region == null) {
+ throw new NotServingRegionException("Region is not online: " +
+ Bytes.toStringBinary(regionName));
+ }
+ return region;
+ }
+
+ /*
+ * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
+ * IOE if it isn't already.
+ *
+ * @param t Throwable
+ *
+ * @return Throwable converted to an IOE; methods can only let out IOEs.
+ */
+ protected Throwable cleanup(final Throwable t) {
+ return cleanup(t, null);
+ }
+
+ /*
+ * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
+ * IOE if it isn't already.
+ *
+ * @param t Throwable
+ *
+ * @param msg Message to log in error. Can be null.
+ *
+ * @return Throwable converted to an IOE; methods can only let out IOEs.
+ */
+ protected Throwable cleanup(final Throwable t, final String msg) {
+ // Don't log as error if NSRE; NSRE is 'normal' operation.
+ if (t instanceof NotServingRegionException) {
+ LOG.debug("NotServingRegionException; " + t.getMessage());
+ return t;
+ }
+ if (msg == null) {
+ LOG.error("", RemoteExceptionHandler.checkThrowable(t));
+ } else {
+ LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
+ }
+ if (!checkOOME(t)) {
+ checkFileSystem();
+ }
+ return t;
+ }
+
+ /*
+ * @param t
+ *
+ * @return Make <code>t</code> an IOE if it isn't already.
+ */
+ protected IOException convertThrowableToIOE(final Throwable t) {
+ return convertThrowableToIOE(t, null);
+ }
+
+ /*
+ * @param t
+ *
+ * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
+ *
+ * @return Make <code>t</code> an IOE if it isn't already.
+ */
+ protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
+ return (t instanceof IOException ? (IOException) t : msg == null
+ || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
+ }
+
+ /*
+ * Check if an OOME and, if so, abort immediately to avoid creating more objects.
+ *
+ * @param e
+ *
+ * @return True if we OOME'd and are aborting.
+ */
+ public boolean checkOOME(final Throwable e) {
+ boolean stop = false;
+ try {
+ if (e instanceof OutOfMemoryError
+ || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
+ || (e.getMessage() != null && e.getMessage().contains(
+ "java.lang.OutOfMemoryError"))) {
+ stop = true;
+ LOG.fatal(
+ "Run out of memory; HRegionServer will abort itself immediately", e);
+ }
+ } finally {
+ if (stop) {
+ Runtime.getRuntime().halt(1);
+ }
+ }
+ return stop;
+ }
+
+ /**
+ * Checks to see if the file system is still accessible. If not, sets
+ * abortRequested and stopRequested
+ *
+ * @return false if file system is not available
+ */
+ public boolean checkFileSystem() {
+ if (this.fsOk && this.fs != null) {
+ try {
+ FSUtils.checkFileSystemAvailable(this.fs);
+ } catch (IOException e) {
+ abort("File System not available", e);
+ this.fsOk = false;
+ }
+ }
+ return this.fsOk;
+ }
+
+ protected long addRowLock(Integer r, HRegion region)
+ throws LeaseStillHeldException {
+ long lockId = nextLong();
+ String lockName = String.valueOf(lockId);
+ rowlocks.put(lockName, r);
+ this.leases.createLease(lockName, new RowLockListener(lockName, region));
+ return lockId;
+ }
+
+ protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
+ long scannerId = nextLong();
+ String scannerName = String.valueOf(scannerId);
+ scanners.put(scannerName, s);
+ this.leases.createLease(scannerName, new ScannerListener(scannerName));
+ return scannerId;
+ }
+
+ /**
+ * Generate a random positive long number
+ *
+ * @return a random positive long number
+ */
+ protected long nextLong() {
+ long n = rand.nextLong();
+ if (n == 0) {
+ return nextLong();
+ }
+ if (n < 0) {
+ n = -n;
+ }
+ return n;
+ }
+
+ // Start Client methods
+
+ /**
+ * Get data from a table.
+ *
+ * @param controller the RPC controller
+ * @param request the get request
+ * @throws ServiceException
+ */
+ @Override
+ public GetResponse get(final RpcController controller,
+ final GetRequest request) throws ServiceException {
+ try {
+ requestCount.incrementAndGet();
+ HRegion region = getRegion(request.getRegion());
+ GetResponse.Builder builder = GetResponse.newBuilder();
+ ClientProtos.Get get = request.getGet();
+ Boolean existence = null;
+ Result r = null;
+ if (request.getClosestRowBefore()) {
+ if (get.getColumnCount() != 1) {
+ throw new DoNotRetryIOException(
+ "get ClosestRowBefore supports one and only one family now, not "
+ + get.getColumnCount() + " families");
+ }
+ byte[] row = get.getRow().toByteArray();
+ byte[] family = get.getColumn(0).getFamily().toByteArray();
+ r = region.getClosestRowBefore(row, family);
+ } else {
+ Get clientGet = ProtobufUtil.toGet(get);
+ if (request.getExistenceOnly() && region.getCoprocessorHost() != null) {
+ existence = region.getCoprocessorHost().preExists(clientGet);
+ }
+ if (existence == null) {
+ Integer lock = getLockFromId(clientGet.getLockId());
+ r = region.get(clientGet, lock);
+ if (request.getExistenceOnly()) {
+ boolean exists = r != null && !r.isEmpty();
+ if (region.getCoprocessorHost() != null) {
+ exists = region.getCoprocessorHost().postExists(clientGet, exists);
+ }
+ existence = Boolean.valueOf(exists);
+ }
+ }
+ }
+ if (existence != null) {
+ builder.setExists(existence.booleanValue());
+ } else if (r != null) {
+ builder.setResult(ProtobufUtil.toResult(r));
+ }
+ return builder.build();
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
+
+ /**
+ * Mutate data in a table.
+ *
+ * @param controller the RPC controller
+ * @param request the mutate request
+ * @throws ServiceException
+ */
+ @Override
+ public MutateResponse mutate(final RpcController controller,
+ final MutateRequest request) throws ServiceException {
+ try {
+ requestCount.incrementAndGet();
+ HRegion region = getRegion(request.getRegion());
+ MutateResponse.Builder builder = MutateResponse.newBuilder();
+ Mutate mutate = request.getMutate();
+ if (!region.getRegionInfo().isMetaTable()) {
+ cacheFlusher.reclaimMemStoreMemory();
+ }
+ Integer lock = null;
+ Result r = null;
+ Boolean processed = null;
+ MutateType type = mutate.getMutateType();
+ switch (type) {
+ case APPEND:
+ r = append(region, mutate);
+ break;
+ case INCREMENT:
+ r = increment(region, mutate);
+ break;
+ case PUT:
+ Put put = ProtobufUtil.toPut(mutate);
+ lock = getLockFromId(put.getLockId());
+ if (request.hasCondition()) {
+ Condition condition = request.getCondition();
+ byte[] row = condition.getRow().toByteArray();
+ byte[] family = condition.getFamily().toByteArray();
+ byte[] qualifier = condition.getQualifier().toByteArray();
+ CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
+ WritableByteArrayComparable comparator =
+ (WritableByteArrayComparable)ProtobufUtil.toObject(condition.getComparator());
+ if (region.getCoprocessorHost() != null) {
+ processed = region.getCoprocessorHost().preCheckAndPut(
+ row, family, qualifier, compareOp, comparator, put);
+ }
+ if (processed == null) {
+ boolean result = region.checkAndMutate(row, family,
+ qualifier, compareOp, comparator, put, lock, true);
+ if (region.getCoprocessorHost() != null) {
+ result = region.getCoprocessorHost().postCheckAndPut(row, family,
+ qualifier, compareOp, comparator, put, result);
+ }
+ processed = Boolean.valueOf(result);
+ }
+ } else {
+ region.put(put, lock);
+ processed = Boolean.TRUE;
+ }
+ break;
+ case DELETE:
+ Delete delete = ProtobufUtil.toDelete(mutate);
+ lock = getLockFromId(delete.getLockId());
+ if (request.hasCondition()) {
+ Condition condition = request.getCondition();
+ byte[] row = condition.getRow().toByteArray();
+ byte[] family = condition.getFamily().toByteArray();
+ byte[] qualifier = condition.getQualifier().toByteArray();
+ CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
+ WritableByteArrayComparable comparator =
+ (WritableByteArrayComparable)ProtobufUtil.toObject(condition.getComparator());
+ if (region.getCoprocessorHost() != null) {
+ processed = region.getCoprocessorHost().preCheckAndDelete(
+ row, family, qualifier, compareOp, comparator, delete);
+ }
+ if (processed == null) {
+ boolean result = region.checkAndMutate(row, family,
+ qualifier, compareOp, comparator, delete, lock, true);
+ if (region.getCoprocessorHost() != null) {
+ result = region.getCoprocessorHost().postCheckAndDelete(row, family,
+ qualifier, compareOp, comparator, delete, result);
+ }
+ processed = Boolean.valueOf(result);
+ }
+ } else {
+ region.delete(delete, lock, delete.getWriteToWAL());
+ processed = Boolean.TRUE;
+ }
+ break;
+ default:
+ throw new DoNotRetryIOException(
+ "Unsupported mutate type: " + type.name());
+ }
+ if (processed != null) {
+ builder.setProcessed(processed.booleanValue());
+ } else if (r != null) {
+ builder.setResult(ProtobufUtil.toResult(r));
+ }
+ return builder.build();
+ } catch (IOException ie) {
+ checkFileSystem();
+ throw new ServiceException(ie);
+ }
+ }
+
+ //
+ // remote scanner interface
+ //
+
+ /**
+ * Scan data in a table.
+ *
+ * @param controller the RPC controller
+ * @param request the scan request
+ * @throws ServiceException
+ */
+ @Override
+ public ScanResponse scan(final RpcController controller,
+ final ScanRequest request) throws ServiceException {
+ Leases.Lease lease = null;
+ String scannerName = null;
+ try {
+ if (!request.hasScannerId() && !request.hasScan()) {
+ throw new DoNotRetryIOException(
+ "Missing required input: scannerId or scan");
+ }
+ long scannerId = -1;
+ if (request.hasScannerId()) {
+ scannerId = request.getScannerId();
+ scannerName = String.valueOf(scannerId);
+ }
+ try {
+ checkOpen();
+ } catch (IOException e) {
+ // If checkOpen failed, server not running or filesystem gone,
+ // cancel this lease; filesystem is gone or we're closing or something.
+ if (scannerName != null) {
+ try {
+ leases.cancelLease(scannerName);
+ } catch (LeaseException le) {
+ LOG.info("Server shutting down and client tried to access missing scanner " +
+ scannerName);
+ }
+ }
+ throw e;
+ }
+ requestCount.incrementAndGet();
+
+ try {
+ int ttl = 0;
+ HRegion region = null;
+ RegionScanner scanner = null;
+ boolean moreResults = true;
+ boolean closeScanner = false;
+ ScanResponse.Builder builder = ScanResponse.newBuilder();
+ if (request.hasCloseScanner()) {
+ closeScanner = request.getCloseScanner();
+ }
+ int rows = 1;
+ if (request.hasNumberOfRows()) {
+ rows = request.getNumberOfRows();
+ }
+ if (request.hasScannerId()) {
+ scanner = scanners.get(scannerName);
+ if (scanner == null) {
+ throw new UnknownScannerException(
+ "Name: " + scannerName + ", already closed?");
+ }
+ region = getRegion(scanner.getRegionInfo().getRegionName());
+ } else {
+ region = getRegion(request.getRegion());
+ ClientProtos.Scan protoScan = request.getScan();
+ Scan scan = ProtobufUtil.toScan(protoScan);
+ region.prepareScanner(scan);
+ if (region.getCoprocessorHost() != null) {
+ scanner = region.getCoprocessorHost().preScannerOpen(scan);
+ }
+ if (scanner == null) {
+ scanner = region.getScanner(scan);
+ }
+ if (region.getCoprocessorHost() != null) {
+ scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
+ }
+ scannerId = addScanner(scanner);
+ scannerName = String.valueOf(scannerId);
+ ttl = leases.leasePeriod;
+ }
+
+ if (rows > 0) {
+ try {
+ // Remove lease while its being processed in server; protects against case
+ // where processing of request takes > lease expiration time.
+ lease = leases.removeLease(scannerName);
+ List<Result> results = new ArrayList<Result>(rows);
+ long currentScanResultSize = 0;
+
+ boolean done = false;
+ // Call coprocessor. Get region info from scanner.
+ if (region != null && region.getCoprocessorHost() != null) {
+ Boolean bypass = region.getCoprocessorHost().preScannerNext(
+ scanner, results, rows);
+ if (!results.isEmpty()) {
+ for (Result r : results) {
+ for (KeyValue kv : r.raw()) {
+ currentScanResultSize += kv.heapSize();
+ }
+ }
+ }
+ if (bypass != null && bypass.booleanValue()) {
+ done = true;
+ }
+ }
+
+ if (!done) {
+ List<KeyValue> values = new ArrayList<KeyValue>();
+ for (int i = 0; i < rows
+ && currentScanResultSize < maxScannerResultSize; i++) {
+ // Collect values to be returned here
+ boolean moreRows = scanner.next(values, HRegion.METRIC_NEXTSIZE);
+ if (!values.isEmpty()) {
+ for (KeyValue kv : values) {
+ currentScanResultSize += kv.heapSize();
+ }
+ results.add(new Result(values));
+ }
+ if (!moreRows) {
+ break;
+ }
+ values.clear();
+ }
+
+ // coprocessor postNext hook
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
+ }
+ }
+
+ // If the scanner's filter - if any - is done with the scan
+ // and wants to tell the client to stop the scan. This is done by passing
+ // a null result, and setting moreResults to false.
+ if (scanner.isFilterDone() && results.isEmpty()) {
+ moreResults = false;
+ results = null;
+ } else {
+ for (Result result: results) {
+ if (result != null) {
+ builder.addResult(ProtobufUtil.toResult(result));
+ }
+ }
+ }
+ } finally {
+ // We're done. On way out re-add the above removed lease.
+ // Adding resets expiration time on lease.
+ if (scanners.containsKey(scannerName)) {
+ if (lease != null) leases.addLease(lease);
+ ttl = leases.leasePeriod;
+ }
+ }
+ }
+
+ if (!moreResults || closeScanner) {
+ ttl = 0;
+ moreResults = false;
+ if (region != null && region.getCoprocessorHost() != null) {
+ if (region.getCoprocessorHost().preScannerClose(scanner)) {
+ return builder.build(); // bypass
+ }
+ }
+ scanner = scanners.remove(scannerName);
+ if (scanner != null) {
+ scanner.close();
+ leases.cancelLease(scannerName);
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerClose(scanner);
+ }
+ }
+ }
+
+ if (ttl > 0) {
+ builder.setTtl(ttl);
+ }
+ builder.setScannerId(scannerId);
+ builder.setMoreResults(moreResults);
+ return builder.build();
+ } catch (Throwable t) {
+ if (scannerName != null &&
+ t instanceof NotServingRegionException) {
+ scanners.remove(scannerName);
+ }
+ throw convertThrowableToIOE(cleanup(t));
+ }
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
+
+ /**
+ * Lock a row in a table.
+ *
+ * @param controller the RPC controller
+ * @param request the lock row request
+ * @throws ServiceException
+ */
+ @Override
+ public LockRowResponse lockRow(final RpcController controller,
+ final LockRowRequest request) throws ServiceException {
+ try {
+ if (request.getRowCount() != 1) {
+ throw new DoNotRetryIOException(
+ "lockRow supports only one row now, not " + request.getRowCount() + " rows");
+ }
+ requestCount.incrementAndGet();
+ HRegion region = getRegion(request.getRegion());
+ byte[] row = request.getRow(0).toByteArray();
+ try {
+ Integer r = region.obtainRowLock(row);
+ long lockId = addRowLock(r, region);
+ LOG.debug("Row lock " + lockId + " explicitly acquired by client");
+ LockRowResponse.Builder builder = LockRowResponse.newBuilder();
+ builder.setLockId(lockId);
+ return builder.build();
+ } catch (Throwable t) {
+ throw convertThrowableToIOE(cleanup(t,
+ "Error obtaining row lock (fsOk: " + this.fsOk + ")"));
+ }
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
+
+ /**
+ * Unlock a locked row in a table.
+ *
+ * @param controller the RPC controller
+ * @param request the unlock row request
+ * @throws ServiceException
+ */
+ @Override
+ @QosPriority(priority=HIGH_QOS)
+ public UnlockRowResponse unlockRow(final RpcController controller,
+ final UnlockRowRequest request) throws ServiceException {
+ try {
+ requestCount.incrementAndGet();
+ HRegion region = getRegion(request.getRegion());
+ if (!request.hasLockId()) {
+ throw new DoNotRetryIOException(
+ "Invalid unlock rowrequest, missing lock id");
+ }
+ long lockId = request.getLockId();
+ String lockName = String.valueOf(lockId);
+ try {
+ Integer r = rowlocks.remove(lockName);
+ if (r == null) {
+ throw new UnknownRowLockException(lockName);
+ }
+ region.releaseRowLock(r);
+ this.leases.cancelLease(lockName);
+ LOG.debug("Row lock " + lockId
+ + " has been explicitly released by client");
+ return UnlockRowResponse.newBuilder().build();
+ } catch (Throwable t) {
+ throw convertThrowableToIOE(cleanup(t));
+ }
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
+
+ /**
+ * Atomically bulk load several HFiles into an open region
+ * @return true if successful, false is failed but recoverably (no action)
+ * @throws IOException if failed unrecoverably
+ */
+ @Override
+ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
+ final BulkLoadHFileRequest request) throws ServiceException {
+ try {
+ requestCount.incrementAndGet();
+ HRegion region = getRegion(request.getRegion());
+ List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
+ for (FamilyPath familyPath: request.getFamilyPathList()) {
+ familyPaths.add(new Pair<byte[], String>(
+ familyPath.getFamily().toByteArray(), familyPath.getPath()));
+ }
+ boolean loaded = region.bulkLoadHFiles(familyPaths);
+ BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
+ builder.setLoaded(loaded);
+ return builder.build();
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
+
+ /**
+ * Executes a single {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
+ * method using the registered protocol handlers.
+ * {@link CoprocessorProtocol} implementations must be registered per-region
+ * via the
+ * {@link org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)}
+ * method before they are available.
+ *
+ * @param regionName name of the region against which the invocation is executed
+ * @param call an {@code Exec} instance identifying the protocol, method name,
+ * and parameters for the method invocation
+ * @return an {@code ExecResult} instance containing the region name of the
+ * invocation and the return value
+ * @throws IOException if no registered protocol handler is found or an error
+ * occurs during the invocation
+ * @see org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)
+ */
+ @Override
+ public ExecCoprocessorResponse execCoprocessor(final RpcController controller,
+ final ExecCoprocessorRequest request) throws ServiceException {
+ try {
+ requestCount.incrementAndGet();
+ HRegion region = getRegion(request.getRegion());
+ ExecCoprocessorResponse.Builder
+ builder = ExecCoprocessorResponse.newBuilder();
+ ClientProtos.Exec call = request.getCall();
+ Exec clientCall = ProtobufUtil.toExec(call);
+ ExecResult result = region.exec(clientCall);
+ builder.setValue(ProtobufUtil.toParameter(result.getValue()));
+ return builder.build();
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
+
+ /**
+ * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
+ *
+ * @param controller the RPC controller
+ * @param request the multi request
+ * @throws ServiceException
+ */
+ @Override
+ public MultiResponse multi(final RpcController controller,
+ final MultiRequest request) throws ServiceException {
+ try {
+ HRegion region = getRegion(request.getRegion());
+ MultiResponse.Builder builder = MultiResponse.newBuilder();
+ if (request.hasAtomic() && request.getAtomic()) {
+ List<Mutate> mutates = new ArrayList<Mutate>();
+ for (NameBytesPair parameter: request.getActionList()) {
+ Object action = ProtobufUtil.toObject(parameter);
+ if (action instanceof Mutate) {
+ mutates.add((Mutate)action);
+ } else {
+ throw new DoNotRetryIOException(
+ "Unsupported atomic atction type: "
+ + action.getClass().getName());
+ }
+ }
+ mutateRows(region, mutates);
+ } else {
+ ActionResult.Builder resultBuilder = null;
+ List<Mutate> puts = new ArrayList<Mutate>();
+ for (NameBytesPair parameter: request.getActionList()) {
+ requestCount.incrementAndGet();
+ try {
+ Object result = null;
+ Object action = ProtobufUtil.toObject(parameter);
+ if (action instanceof ClientProtos.Get) {
+ Get get = ProtobufUtil.toGet((ClientProtos.Get)action);
+ Integer lock = getLockFromId(get.getLockId());
+ Result r = region.get(get, lock);
+ if (r != null) {
+ result = ProtobufUtil.toResult(r);
+ }
+ } else if (action instanceof Mutate) {
+ Mutate mutate = (Mutate)action;
+ MutateType type = mutate.getMutateType();
+ if (type != MutateType.PUT) {
+ if (!puts.isEmpty()) {
+ put(builder, region, puts);
+ puts.clear();
+ } else if (!region.getRegionInfo().isMetaTable()) {
+ cacheFlusher.reclaimMemStoreMemory();
+ }
+ }
+ Result r = null;
+ switch (type) {
+ case APPEND:
+ r = append(region, mutate);
+ break;
+ case INCREMENT:
+ r = increment(region, mutate);
+ break;
+ case PUT:
+ puts.add(mutate);
+ break;
+ case DELETE:
+ Delete delete = ProtobufUtil.toDelete(mutate);
+ Integer lock = getLockFromId(delete.getLockId());
+ region.delete(delete, lock, delete.getWriteToWAL());
+ r = new Result();
+ break;
+ default:
+ throw new DoNotRetryIOException(
+ "Unsupported mutate type: " + type.name());
+ }
+ if (r != null) {
+ result = ProtobufUtil.toResult(r);
+ }
+ } else if (action instanceof ClientProtos.Exec) {
+ Exec call = ProtobufUtil.toExec((ClientProtos.Exec)action);
+ result = region.exec(call).getValue();
+ } else {
+ LOG.debug("Error: invalid action, "
+ + "it must be a Get, Mutate, or Exec.");
+ throw new DoNotRetryIOException("Invalid action, "
+ + "it must be a Get, Mutate, or Exec.");
+ }
+ if (result != null) {
+ if (resultBuilder == null) {
+ resultBuilder = ActionResult.newBuilder();
+ } else {
+ resultBuilder.clear();
+ }
+ NameBytesPair value = ProtobufUtil.toParameter(result);
+ resultBuilder.setValue(value);
+ builder.addResult(resultBuilder.build());
+ }
+ } catch (IOException ie) {
+ builder.addResult(ResponseConverter.buildActionResult(ie));
+ }
+ }
+ if (!puts.isEmpty()) {
+ put(builder, region, puts);
+ }
+ }
+ return builder.build();
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
+
+// End Client methods
+
+ /**
+ * Find the HRegion based on a region specifier
+ *
+ * @param regionSpecifier the region specifier
+ * @return the corresponding region
+ * @throws IOException if the specifier is not null,
+ * but failed to find the region
+ */
+ protected HRegion getRegion(
+ final RegionSpecifier regionSpecifier) throws IOException {
+ byte[] value = regionSpecifier.getValue().toByteArray();
+ RegionSpecifierType type = regionSpecifier.getType();
+ checkOpen();
+ switch (type) {
+ case REGION_NAME:
+ return getRegion(value);
+ case ENCODED_REGION_NAME:
+ String encodedRegionName = Bytes.toString(value);
+ HRegion region = this.onlineRegions.get(encodedRegionName);
+ if (region == null) {
+ throw new NotServingRegionException(
+ "Region is not online: " + encodedRegionName);
+ }
+ return region;
+ default:
+ throw new DoNotRetryIOException(
+ "Unsupported region specifier type: " + type);
+ }
+ }
+
+ /**
+ * Execute an append mutation.
+ *
+ * @param region
+ * @param mutate
+ * @return
+ * @throws IOException
+ */
+ protected Result append(final HRegion region,
+ final Mutate mutate) throws IOException {
+ Append append = ProtobufUtil.toAppend(mutate);
+ Result r = null;
+ if (region.getCoprocessorHost() != null) {
+ r = region.getCoprocessorHost().preAppend(append);
+ }
+ if (r == null) {
+ Integer lock = getLockFromId(append.getLockId());
+ r = region.append(append, lock, append.getWriteToWAL());
+ if (region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postAppend(append, r);
+ }
+ }
+ return r;
+ }
+
+ /**
+ * Execute an increment mutation.
+ *
+ * @param region
+ * @param mutate
+ * @return
+ * @throws IOException
+ */
+ protected Result increment(final HRegion region,
+ final Mutate mutate) throws IOException {
+ Increment increment = ProtobufUtil.toIncrement(mutate);
+ Result r = null;
+ if (region.getCoprocessorHost() != null) {
+ r = region.getCoprocessorHost().preIncrement(increment);
+ }
+ if (r == null) {
+ Integer lock = getLockFromId(increment.getLockId());
+ r = region.increment(increment, lock, increment.getWriteToWAL());
+ if (region.getCoprocessorHost() != null) {
+ r = region.getCoprocessorHost().postIncrement(increment, r);
+ }
+ }
+ return r;
+ }
+
+ /**
+ * Execute a list of put mutations.
+ *
+ * @param builder
+ * @param region
+ * @param puts
+ */
+ protected void put(final MultiResponse.Builder builder,
+ final HRegion region, final List<Mutate> puts) {
+ @SuppressWarnings("unchecked")
+ Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
+
+ try {
+ ActionResult.Builder resultBuilder = ActionResult.newBuilder();
+ NameBytesPair value = ProtobufUtil.toParameter(new Result());
+ resultBuilder.setValue(value);
+ ActionResult result = resultBuilder.build();
+
+ int i = 0;
+ for (Mutate put : puts) {
+ Put p = ProtobufUtil.toPut(put);
+ Integer lock = getLockFromId(p.getLockId());
+ putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
+ builder.addResult(result);
+ }
+
+ requestCount.addAndGet(puts.size());
+ if (!region.getRegionInfo().isMetaTable()) {
+ cacheFlusher.reclaimMemStoreMemory();
+ }
+
+ OperationStatus codes[] = region.put(putsWithLocks);
+ for (i = 0; i < codes.length; i++) {
+ if (codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
+ result = ResponseConverter.buildActionResult(
+ new DoNotRetryIOException(codes[i].getExceptionMsg()));
+ builder.setResult(i, result);
+ }
+ }
+ } catch (IOException ie) {
+ ActionResult result = ResponseConverter.buildActionResult(ie);
+ for (int i = 0, n = puts.size(); i < n; i++) {
+ builder.setResult(i, result);
+ }
+ }
+ }
+
+ /**
+ * Mutate a list of rows atomically.
+ *
+ * @param region
+ * @param mutates
+ * @throws IOException
+ */
+ protected void mutateRows(final HRegion region,
+ final List<Mutate> mutates) throws IOException {
+ Mutate firstMutate = mutates.get(0);
+ if (!region.getRegionInfo().isMetaTable()) {
+ cacheFlusher.reclaimMemStoreMemory();
+ }
+ byte[] row = firstMutate.getRow().toByteArray();
+ RowMutations rm = new RowMutations(row);
+ for (Mutate mutate: mutates) {
+ MutateType type = mutate.getMutateType();
+ switch (mutate.getMutateType()) {
+ case PUT:
+ rm.add(ProtobufUtil.toPut(mutate));
+ break;
+ case DELETE:
+ rm.add(ProtobufUtil.toDelete(mutate));
+ break;
+ default:
+ throw new DoNotRetryIOException(
+ "mutate supports atomic put and/or delete, not "
+ + type.name());
+ }
+ }
+ region.mutateRow(rm);
+ }
+}
Added: hbase/trunk/src/main/protobuf/Admin.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/protobuf/Admin.proto?rev=1325937&view=auto
==============================================================================
--- hbase/trunk/src/main/protobuf/Admin.proto (added)
+++ hbase/trunk/src/main/protobuf/Admin.proto Fri Apr 13 20:28:21 2012
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file contains protocol buffers that are used for Admin service.
+
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "AdminProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "hbase.proto";
+
+message GetRegionInfoRequest {
+ required RegionSpecifier region = 1;
+}
+
+message GetRegionInfoResponse {
+ required RegionInfo regionInfo = 1;
+}
+
+/**
+ * Get a list of store files for a set of column families in a particular region.
+ * If no column family is specified, get the store files for all column families.
+ */
+message GetStoreFileListRequest {
+ required RegionSpecifier region = 1;
+ repeated bytes columnFamily = 2;
+}
+
+message GetStoreFileListResponse {
+ repeated string storeFile = 1;
+}
+
+message GetOnlineRegionRequest {
+}
+
+message GetOnlineRegionResponse {
+ repeated RegionInfo regionInfo = 1;
+}
+
+message OpenRegionRequest {
+ repeated RegionSpecifier region = 1;
+ optional uint32 versionOfOfflineNode = 2;
+}
+
+message OpenRegionResponse {
+ repeated RegionOpeningState openingState = 1;
+
+ enum RegionOpeningState {
+ OPENED = 0;
+ ALREADY_OPENED = 1;
+ FAILED_OPENING = 2;
+ }
+}
+
+/**
+ * Closes the specified region and will use or not use ZK during the close
+ * according to the specified flag.
+ */
+message CloseRegionRequest {
+ required RegionSpecifier region = 1;
+ optional uint32 versionOfClosingNode = 2;
+ optional bool transitionInZK = 3 [default = true];
+}
+
+message CloseRegionResponse {
+ required bool closed = 1;
+}
+
+/**
+ * Flushes the MemStore of the specified region.
+ * <p>
+ * This method is synchronous.
+ */
+message FlushRegionRequest {
+ required RegionSpecifier region = 1;
+ optional uint64 ifOlderThanTs = 2;
+}
+
+message FlushRegionResponse {
+ required uint64 lastFlushTime = 1;
+ optional bool flushed = 2;
+}
+
+/**
+ * Splits the specified region.
+ * <p>
+ * This method currently flushes the region and then forces a compaction which
+ * will then trigger a split. The flush is done synchronously but the
+ * compaction is asynchronous.
+ */
+message SplitRegionRequest {
+ required RegionSpecifier region = 1;
+ optional bytes splitPoint = 2;
+}
+
+message SplitRegionResponse {
+}
+
+/**
+ * Compacts the specified region. Performs a major compaction if specified.
+ * <p>
+ * This method is asynchronous.
+ */
+message CompactRegionRequest {
+ required RegionSpecifier region = 1;
+ optional bool major = 2;
+}
+
+message CompactRegionResponse {
+}
+
+message UUID {
+ required uint64 leastSigBits = 1;
+ required uint64 mostSigBits = 2;
+}
+
+// Protocol buffer version of HLog
+message WALEntry {
+ required WALKey walKey = 1;
+ required WALEdit edit = 2;
+
+ // Protocol buffer version of HLogKey
+ message WALKey {
+ required bytes encodedRegionName = 1;
+ required bytes tableName = 2;
+ required uint64 logSequenceNumber = 3;
+ required uint64 writeTime = 4;
+ optional UUID clusterId = 5;
+ }
+
+ message WALEdit {
+ repeated bytes keyValue = 1;
+ repeated FamilyScope familyScope = 2;
+
+ enum ScopeType {
+ REPLICATION_SCOPE_LOCAL = 0;
+ REPLICATION_SCOPE_GLOBAL = 1;
+ }
+
+ message FamilyScope {
+ required bytes family = 1;
+ required ScopeType scopeType = 2;
+ }
+ }
+}
+
+/**
+ * Replicates the given entries. The guarantee is that the given entries
+ * will be durable on the slave cluster if this method returns without
+ * any exception.
+ * hbase.replication has to be set to true for this to work.
+ */
+message ReplicateWALEntryRequest {
+ repeated WALEntry walEntry = 1;
+}
+
+message ReplicateWALEntryResponse {
+}
+
+// Replacement for rollHLogWriter in HRegionInterface
+message RollWALWriterRequest {
+}
+
+message RollWALWriterResponse {
+ // A list of encoded name of regions to flush
+ repeated bytes regionToFlush = 1;
+}
+
+message StopServerRequest {
+ required string reason = 1;
+}
+
+message StopServerResponse {
+}
+
+message GetServerInfoRequest {
+}
+
+message GetServerInfoResponse {
+ required ServerName serverName = 1;
+}
+
+service AdminService {
+ rpc getRegionInfo(GetRegionInfoRequest)
+ returns(GetRegionInfoResponse);
+
+ rpc getStoreFileList(GetStoreFileListRequest)
+ returns(GetStoreFileListResponse);
+
+ rpc getOnlineRegion(GetOnlineRegionRequest)
+ returns(GetOnlineRegionResponse);
+
+ rpc openRegion(OpenRegionRequest)
+ returns(OpenRegionResponse);
+
+ rpc closeRegion(CloseRegionRequest)
+ returns(CloseRegionResponse);
+
+ rpc flushRegion(FlushRegionRequest)
+ returns(FlushRegionResponse);
+
+ rpc splitRegion(SplitRegionRequest)
+ returns(SplitRegionResponse);
+
+ rpc compactRegion(CompactRegionRequest)
+ returns(CompactRegionResponse);
+
+ rpc replicateWALEntry(ReplicateWALEntryRequest)
+ returns(ReplicateWALEntryResponse);
+
+ rpc rollWALWriter(RollWALWriterRequest)
+ returns(RollWALWriterResponse);
+
+ rpc getServerInfo(GetServerInfoRequest)
+ returns(GetServerInfoResponse);
+
+ rpc stopServer(StopServerRequest)
+ returns(StopServerResponse);
+}
Added: hbase/trunk/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/protobuf/Client.proto?rev=1325937&view=auto
==============================================================================
--- hbase/trunk/src/main/protobuf/Client.proto (added)
+++ hbase/trunk/src/main/protobuf/Client.proto Fri Apr 13 20:28:21 2012
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file contains protocol buffers that are used for Client service.
+
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "ClientProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "hbase.proto";
+
+/**
+ * Container for a list of column qualifier names of a family.
+ */
+message Column {
+ required bytes family = 1;
+ repeated bytes qualifier = 2;
+}
+
+/**
+ * The protocol buffer version of Get
+ */
+message Get {
+ required bytes row = 1;
+ repeated Column column = 2;
+ repeated NameBytesPair attribute = 3;
+ optional uint64 lockId = 4;
+ optional NameBytesPair filter = 5;
+ optional TimeRange timeRange = 6;
+ optional uint32 maxVersions = 7 [default = 1];
+ optional bool cacheBlocks = 8 [default = true];
+}
+
+/**
+ * For performance reason, we don't use KeyValue
+ * here. We use the actual KeyValue bytes.
+ */
+message Result {
+ repeated bytes keyValueBytes = 1;
+}
+
+/**
+ * The get request. Perform a single Get operation.
+ * Unless existenceOnly is specified, return all the requested data
+ * for the row that matches exactly, or the one that immediately
+ * precedes it if closestRowBefore is specified.
+ *
+ * If existenceOnly is set, only the existence will be returned.
+ */
+message GetRequest {
+ required RegionSpecifier region = 1;
+ required Get get = 2;
+
+ // If the row to get doesn't exist, return the
+ // closest row before.
+ optional bool closestRowBefore = 3;
+
+ // The result isn't asked for, just check for
+ // the existence. If specified, closestRowBefore
+ // will be ignored
+ optional bool existenceOnly = 4;
+}
+
+message GetResponse {
+ optional Result result = 1;
+
+ // used for Get to check existence only
+ optional bool exists = 2;
+}
+
+/**
+ * Condition to check if the value of a given cell (row,
+ * family, qualifier) matches a value via a given comparator.
+ *
+ * Condition is used in check and mutate operations.
+ */
+message Condition {
+ required bytes row = 1;
+ required bytes family = 2;
+ required bytes qualifier = 3;
+ required CompareType compareType = 4;
+ required NameBytesPair comparator = 5;
+
+ enum CompareType {
+ LESS = 0;
+ LESS_OR_EQUAL = 1;
+ EQUAL = 2;
+ NOT_EQUAL = 3;
+ GREATER_OR_EQUAL = 4;
+ GREATER = 5;
+ NO_OP = 6;
+ }
+}
+
+/**
+ * A specific mutate inside a mutate request.
+ * It can be an append, increment, put or delete based
+ * on the mutate type.
+ */
+message Mutate {
+ required bytes row = 1;
+ required MutateType mutateType = 2;
+ repeated ColumnValue columnValue = 3;
+ repeated NameBytesPair attribute = 4;
+ optional uint64 timestamp = 5;
+ optional uint64 lockId = 6;
+ optional bool writeToWAL = 7 [default = true];
+
+ // For some mutate, result may be returned, in which case,
+ // time range can be specified for potential performance gain
+ optional TimeRange timeRange = 10;
+
+ enum MutateType {
+ APPEND = 0;
+ INCREMENT = 1;
+ PUT = 2;
+ DELETE = 3;
+ }
+
+ enum DeleteType {
+ DELETE_ONE_VERSION = 0;
+ DELETE_MULTIPLE_VERSIONS = 1;
+ DELETE_FAMILY = 2;
+ }
+
+ message ColumnValue {
+ required bytes family = 1;
+ repeated QualifierValue qualifierValue = 2;
+
+ message QualifierValue {
+ optional bytes qualifier = 1;
+ optional bytes value = 2;
+ optional uint64 timestamp = 3;
+ optional DeleteType deleteType = 4;
+ }
+ }
+}
+
+/**
+ * The mutate request. Perform a single Mutate operation.
+ *
+ * Optionally, you can specify a condition. The mutate
+ * will take place only if the condition is met. Otherwise,
+ * the mutate will be ignored. In the response result,
+ * parameter processed is used to indicate if the mutate
+ * actually happened.
+ */
+message MutateRequest {
+ required RegionSpecifier region = 1;
+ required Mutate mutate = 2;
+ optional Condition condition = 3;
+}
+
+message MutateResponse {
+ optional Result result = 1;
+
+ // used for mutate to indicate processed only
+ optional bool processed = 2;
+}
+
+/**
+ * Instead of get from a table, you can scan it with optional filters.
+ * You can specify the row key range, time range, the columns/families
+ * to scan and so on.
+ *
+ * This scan is used the first time in a scan request. The response of
+ * the initial scan will return a scanner id, which should be used to
+ * fetch result batches later on before it is closed.
+ */
+message Scan {
+ repeated Column column = 1;
+ repeated NameBytesPair attribute = 2;
+ optional bytes startRow = 3;
+ optional bytes stopRow = 4;
+ optional NameBytesPair filter = 5;
+ optional TimeRange timeRange = 6;
+ optional uint32 maxVersions = 7 [default = 1];
+ optional bool cacheBlocks = 8 [default = true];
+ optional uint32 batchSize = 9;
+}
+
+/**
+ * A scan request. Initially, it should specify a scan. Later on, you
+ * can use the scanner id returned to fetch result batches with a different
+ * scan request.
+ *
+ * The scanner will remain open if there are more results, and it's not
+ * asked to be closed explicitly.
+ *
+ * You can fetch the results and ask the scanner to be closed to save
+ * a trip if you are not interested in remaining results.
+ */
+message ScanRequest {
+ optional RegionSpecifier region = 1;
+ optional Scan scan = 2;
+ optional uint64 scannerId = 3;
+ optional uint32 numberOfRows = 4;
+ optional bool closeScanner = 5;
+}
+
+/**
+ * The scan response. If there are no more results, moreResults will
+ * be false. If it is not specified, it means there are more.
+ */
+message ScanResponse {
+ repeated Result result = 1;
+ optional uint64 scannerId = 2;
+ optional bool moreResults = 3;
+ optional uint32 ttl = 4;
+}
+
+message LockRowRequest {
+ required RegionSpecifier region = 1;
+ repeated bytes row = 2;
+}
+
+message LockRowResponse {
+ required uint64 lockId = 1;
+ optional uint32 ttl = 2;
+}
+
+message UnlockRowRequest {
+ required RegionSpecifier region = 1;
+ required uint64 lockId = 2;
+}
+
+message UnlockRowResponse {
+}
+
+/**
+ * Atomically bulk load multiple HFiles (say from different column families)
+ * into an open region.
+ */
+message BulkLoadHFileRequest {
+ required RegionSpecifier region = 1;
+ repeated FamilyPath familyPath = 2;
+
+ message FamilyPath {
+ required bytes family = 1;
+ required string path = 2;
+ }
+}
+
+message BulkLoadHFileResponse {
+ required bool loaded = 1;
+}
+
+/**
+ * An individual coprocessor call. You must specify the protocol,
+ * the method, and the row to which the call will be executed.
+ *
+ * You can specify the configuration settings in the property list.
+ *
+ * The parameter list has the parameters used for the method.
+ * A parameter is a pair of parameter name and the binary parameter
+ * value. The name is the parameter class name. The value is the
+ * binary format of the parameter, for example, protocol buffer
+ * encoded value.
+ */
+message Exec {
+ required bytes row = 1;
+ required string protocolName = 2;
+ required string methodName = 3;
+ repeated NameStringPair property = 4;
+ repeated NameBytesPair parameter = 5;
+}
+
+ /**
+ * Executes a single {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
+ * method using the registered protocol handlers.
+ * {@link CoprocessorProtocol} implementations must be registered via the
+ * {@link org.apache.hadoop.hbase.regionserver.HRegion#registerProtocol(
+ * Class, org.apache.hadoop.hbase.ipc.CoprocessorProtocol)}
+ * method before they are available.
+ */
+message ExecCoprocessorRequest {
+ required RegionSpecifier region = 1;
+ required Exec call = 2;
+}
+
+message ExecCoprocessorResponse {
+ required NameBytesPair value = 1;
+}
+
+/**
+ * An individual action result. The result will in the
+ * same order as the action in the request. If an action
+ * returns a value, it is set in value field. If it doesn't
+ * return anything, the result will be empty. If an action
+ * fails to execute due to any exception, the exception
+ * is returned as a stringified parameter.
+ */
+message ActionResult {
+ optional NameBytesPair value = 1;
+ optional NameBytesPair exception = 2;
+}
+
+/**
+ * You can execute a list of actions on a given region in order.
+ *
+ * If it is a list of mutate actions, atomic can be set
+ * to make sure they can be processed atomically, just like
+ * RowMutations.
+ */
+message MultiRequest {
+ required RegionSpecifier region = 1;
+ repeated NameBytesPair action = 2;
+ optional bool atomic = 3;
+}
+
+message MultiResponse {
+ repeated ActionResult result = 1;
+}
+
+service ClientService {
+ rpc get(GetRequest)
+ returns(GetResponse);
+
+ rpc mutate(MutateRequest)
+ returns(MutateResponse);
+
+ rpc scan(ScanRequest)
+ returns(ScanResponse);
+
+ rpc lockRow(LockRowRequest)
+ returns(LockRowResponse);
+
+ rpc unlockRow(UnlockRowRequest)
+ returns(UnlockRowResponse);
+
+ rpc bulkLoadHFile(BulkLoadHFileRequest)
+ returns(BulkLoadHFileResponse);
+
+ rpc execCoprocessor(ExecCoprocessorRequest)
+ returns(ExecCoprocessorResponse);
+
+ rpc multi(MultiRequest)
+ returns(MultiResponse);
+}
Modified: hbase/trunk/src/main/protobuf/hbase.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/protobuf/hbase.proto?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/protobuf/hbase.proto (original)
+++ hbase/trunk/src/main/protobuf/hbase.proto Fri Apr 13 20:28:21 2012
@@ -101,3 +101,16 @@ message ServerName {
optional uint32 port = 2;
optional uint64 startCode = 3;
}
+
+// Comment data structures
+
+message NameStringPair {
+ required string name = 1;
+ required string value = 2;
+}
+
+message NameBytesPair {
+ required string name = 1;
+ optional bytes value = 2;
+}
+
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java Fri Apr 13 20:28:21 2012
@@ -41,6 +41,9 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
@@ -58,6 +61,9 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
/**
* Test {@link CatalogTracker}
*/
@@ -131,13 +137,16 @@ public class TestCatalogTracker {
/**
* Test interruptable while blocking wait on root and meta.
* @throws IOException
+ * @throws ServiceException
* @throws InterruptedException
*/
@Test public void testInterruptWaitOnMetaAndRoot()
- throws IOException, InterruptedException {
- HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
- HConnection connection = mockConnection(implementation);
+ throws IOException, InterruptedException, ServiceException {
+ final ClientProtocol client = Mockito.mock(ClientProtocol.class);
+ HConnection connection = mockConnection(null, client);
try {
+ Mockito.when(client.get((RpcController)Mockito.any(), (GetRequest)Mockito.any())).
+ thenReturn(GetResponse.newBuilder().build());
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
ServerName hsa = ct.getRootLocation();
Assert.assertNull(hsa);
@@ -176,10 +185,11 @@ public class TestCatalogTracker {
*/
@Test
public void testServerNotRunningIOException()
- throws IOException, InterruptedException, KeeperException {
+ throws IOException, InterruptedException, KeeperException, ServiceException {
// Mock an HRegionInterface.
final HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
- HConnection connection = mockConnection(implementation);
+ final ClientProtocol client = Mockito.mock(ClientProtocol.class);
+ HConnection connection = mockConnection(implementation, client);
try {
// If a 'getRegionInfo' is called on mocked HRegionInterface, throw IOE
// the first time. 'Succeed' the second time we are called.
@@ -198,6 +208,8 @@ public class TestCatalogTracker {
Mockito.when(connection.getRegionServerWithRetries((ServerCallable<Result>)Mockito.any())).
thenReturn(getMetaTableRowResult());
+ Mockito.when(client.get((RpcController)Mockito.any(), (GetRequest)Mockito.any())).
+ thenReturn(GetResponse.newBuilder().build());
// Now start up the catalogtracker with our doctored Connection.
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
try {
@@ -245,17 +257,18 @@ public class TestCatalogTracker {
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
+ * @throws ServiceException
*/
@Test
public void testGetMetaServerConnectionFails()
- throws IOException, InterruptedException, KeeperException {
- // Mock an HRegionInterface.
- final HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
- HConnection connection = mockConnection(implementation);
+ throws IOException, InterruptedException, KeeperException, ServiceException {
+ // Mock an ClientProtocol.
+ final ClientProtocol implementation = Mockito.mock(ClientProtocol.class);
+ HConnection connection = mockConnection(null, implementation);
try {
// If a 'get' is called on mocked interface, throw connection refused.
- Mockito.when(implementation.get((byte[]) Mockito.any(), (Get) Mockito.any())).
- thenThrow(new ConnectException("Connection refused"));
+ Mockito.when(implementation.get((RpcController) Mockito.any(), (GetRequest) Mockito.any())).
+ thenThrow(new ServiceException(new ConnectException("Connection refused")));
// Now start up the catalogtracker with our doctored Connection.
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
try {
@@ -371,7 +384,7 @@ public class TestCatalogTracker {
// to make our test work.
// Mock an HRegionInterface.
final HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
- HConnection connection = mockConnection(implementation);
+ HConnection connection = mockConnection(implementation, null);
try {
// Now the ct is up... set into the mocks some answers that make it look
// like things have been getting assigned. Make it so we'll return a
@@ -419,6 +432,7 @@ public class TestCatalogTracker {
/**
* @param implementation An {@link HRegionInterface} instance; you'll likely
* want to pass a mocked HRS; can be null.
+ * @param client A mocked ClientProtocol instance, can be null
* @return Mock up a connection that returns a {@link Configuration} when
* {@link HConnection#getConfiguration()} is called, a 'location' when
* {@link HConnection#getRegionLocation(byte[], byte[], boolean)} is called,
@@ -429,7 +443,8 @@ public class TestCatalogTracker {
* when done with this mocked Connection.
* @throws IOException
*/
- private HConnection mockConnection(final HRegionInterface implementation)
+ private HConnection mockConnection(
+ final HRegionInterface implementation, final ClientProtocol client)
throws IOException {
HConnection connection =
HConnectionTestingUtility.getMockedConnection(UTIL.getConfiguration());
@@ -449,6 +464,11 @@ public class TestCatalogTracker {
Mockito.when(connection.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())).
thenReturn(implementation);
}
+ if (client != null) {
+ // If a call to getClient, return this implementation.
+ Mockito.when(connection.getClient(Mockito.anyString(), Mockito.anyInt())).
+ thenReturn(client);
+ }
return connection;
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java Fri Apr 13 20:28:21 2012
@@ -31,8 +31,10 @@ import org.apache.hadoop.hbase.client.HC
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -42,6 +44,9 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
/**
* Test MetaReader/Editor but without spinning up a cluster.
* We mock regionserver back and forth (we do spin up a zk cluster).
@@ -82,7 +87,8 @@ public class TestMetaReaderEditorNoClust
* @throws InterruptedException
*/
@Test
- public void testRideOverServerNotRunning() throws IOException, InterruptedException {
+ public void testRideOverServerNotRunning()
+ throws IOException, InterruptedException, ServiceException {
// Need a zk watcher.
ZooKeeperWatcher zkw = new ZooKeeperWatcher(UTIL.getConfiguration(),
this.getClass().getSimpleName(), ABORTABLE, true);
@@ -92,27 +98,16 @@ public class TestMetaReaderEditorNoClust
HConnection connection = null;
CatalogTracker ct = null;
try {
- // Mock an HRegionInterface. Our mock implementation will fail a few
+ // Mock an ClientProtocol. Our mock implementation will fail a few
// times when we go to open a scanner.
- final HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
- // When openScanner called throw IOE 'Server not running' a few times
+ final ClientProtocol implementation = Mockito.mock(ClientProtocol.class);
+ // When scan called throw IOE 'Server not running' a few times
// before we return a scanner id. Whats WEIRD is that these
// exceptions do not show in the log because they are caught and only
// printed if we FAIL. We eventually succeed after retry so these don't
// show. We will know if they happened or not because we will ask
- // mockito at the end of this test to verify that openscanner was indeed
+ // mockito at the end of this test to verify that scan was indeed
// called the wanted number of times.
- final long scannerid = 123L;
- Mockito.when(implementation.openScanner((byte [])Mockito.any(),
- (Scan)Mockito.any())).
- thenThrow(new IOException("Server not running (1 of 3)")).
- thenThrow(new IOException("Server not running (2 of 3)")).
- thenThrow(new IOException("Server not running (3 of 3)")).
- thenReturn(scannerid);
- // Make it so a verifiable answer comes back when next is called. Return
- // the verifiable answer and then a null so we stop scanning. Our
- // verifiable answer is something that looks like a row in META with
- // a server and startcode that is that of the above defined servername.
List<KeyValue> kvs = new ArrayList<KeyValue>();
final byte [] rowToVerify = Bytes.toBytes("rowToVerify");
kvs.add(new KeyValue(rowToVerify,
@@ -124,10 +119,19 @@ public class TestMetaReaderEditorNoClust
kvs.add(new KeyValue(rowToVerify,
HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
Bytes.toBytes(sn.getStartcode())));
- final Result [] result = new Result [] {new Result(kvs)};
- Mockito.when(implementation.next(Mockito.anyLong(), Mockito.anyInt())).
- thenReturn(result).
- thenReturn(null);
+ final Result [] results = new Result [] {new Result(kvs)};
+ ScanResponse.Builder builder = ScanResponse.newBuilder();
+ for (Result result: results) {
+ builder.addResult(ProtobufUtil.toResult(result));
+ }
+ Mockito.when(implementation.scan(
+ (RpcController)Mockito.any(), (ScanRequest)Mockito.any())).
+ thenThrow(new ServiceException("Server not running (1 of 3)")).
+ thenThrow(new ServiceException("Server not running (2 of 3)")).
+ thenThrow(new ServiceException("Server not running (3 of 3)")).
+ thenReturn(ScanResponse.newBuilder().setScannerId(1234567890L).build())
+ .thenReturn(builder.build()).thenReturn(
+ ScanResponse.newBuilder().setMoreResults(false).build());
// Associate a spied-upon HConnection with UTIL.getConfiguration. Need
// to shove this in here first so it gets picked up all over; e.g. by
@@ -150,7 +154,7 @@ public class TestMetaReaderEditorNoClust
// Now shove our HRI implementation into the spied-upon connection.
Mockito.doReturn(implementation).
- when(connection).getHRegionConnection(Mockito.anyString(), Mockito.anyInt());
+ when(connection).getClient(Mockito.anyString(), Mockito.anyInt());
// Now start up the catalogtracker with our doctored Connection.
ct = new CatalogTracker(zkw, null, connection, ABORTABLE, 0);
@@ -160,10 +164,10 @@ public class TestMetaReaderEditorNoClust
assertTrue(hris.size() == 1);
assertTrue(hris.firstEntry().getKey().equals(HRegionInfo.FIRST_META_REGIONINFO));
assertTrue(Bytes.equals(rowToVerify, hris.firstEntry().getValue().getRow()));
- // Finally verify that openscanner was called four times -- three times
- // with exception and then on 4th attempt we succeed.
- Mockito.verify(implementation, Mockito.times(4)).
- openScanner((byte [])Mockito.any(), (Scan)Mockito.any());
+ // Finally verify that scan was called four times -- three times
+ // with exception and then on 4th, 5th and 6th attempt we succeed
+ Mockito.verify(implementation, Mockito.times(6)).
+ scan((RpcController)Mockito.any(), (ScanRequest)Mockito.any());
} finally {
if (ct != null) ct.stop();
HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java Fri Apr 13 20:28:21 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.ZooKeeper
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.mockito.Mockito;
/**
@@ -92,7 +93,8 @@ public class HConnectionTestingUtility {
* @throws IOException
*/
public static HConnection getMockedConnectionAndDecorate(final Configuration conf,
- final HRegionInterface implementation, final ServerName sn, final HRegionInfo hri)
+ final HRegionInterface implementation, final ClientProtocol client,
+ final ServerName sn, final HRegionInfo hri)
throws IOException {
HConnection c = HConnectionTestingUtility.getMockedConnection(conf);
Mockito.doNothing().when(c).close();
@@ -108,6 +110,11 @@ public class HConnectionTestingUtility {
Mockito.when(c.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())).
thenReturn(implementation);
}
+ if (client != null) {
+ // If a call to getClient, return this client.
+ Mockito.when(c.getClient(Mockito.anyString(), Mockito.anyInt())).
+ thenReturn(client);
+ }
return c;
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java Fri Apr 13 20:28:21 2012
@@ -99,6 +99,7 @@ import org.junit.experimental.categories
import com.google.common.collect.Lists;
import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
@Category(SmallTests.class)
public class TestHbaseObjectWritable extends TestCase {
@@ -523,6 +524,7 @@ public class TestHbaseObjectWritable ext
assertEquals(80,HbaseObjectWritable.getClassCode(Message.class).intValue());
assertEquals(81,HbaseObjectWritable.getClassCode(Array.class).intValue());
+ assertEquals(82,HbaseObjectWritable.getClassCode(RpcController.class).intValue());
}
/**
@@ -531,7 +533,7 @@ public class TestHbaseObjectWritable ext
* note on the test above.
*/
public void testGetNextObjectCode(){
- assertEquals(82,HbaseObjectWritable.getNextClassCode());
+ assertEquals(83,HbaseObjectWritable.getNextClassCode());
}
@org.junit.Rule
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java Fri Apr 13 20:28:21 2012
@@ -50,6 +50,23 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowResponse;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -64,6 +81,9 @@ import org.apache.hadoop.hbase.util.Pair
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
/**
* A mock RegionServer implementation.
* Use this when you can't bend Mockito to your liking (e.g. return null result
@@ -72,7 +92,7 @@ import org.apache.zookeeper.KeeperExcept
* {@link #setGetResult(byte[], byte[], Result)} for how to fill the backing data
* store that the get pulls from.
*/
-class MockRegionServer implements HRegionInterface, RegionServerServices {
+class MockRegionServer implements HRegionInterface, ClientProtocol, RegionServerServices {
private final ServerName sn;
private final ZooKeeperWatcher zkw;
private final Configuration conf;
@@ -245,9 +265,8 @@ class MockRegionServer implements HRegio
@Override
public Result get(byte[] regionName, Get get) throws IOException {
- Map<byte [], Result> m = this.gets.get(regionName);
- if (m == null) return null;
- return m.get(get.getRow());
+ // TODO Auto-generated method stub
+ return null;
}
@Override
@@ -597,4 +616,87 @@ class MockRegionServer implements HRegio
public void mutateRow(byte[] regionName, RowMutations rm) throws IOException {
// TODO Auto-generated method stub
}
+
+ @Override
+ public GetResponse get(RpcController controller, GetRequest request)
+ throws ServiceException {
+ byte[] regionName = request.getRegion().getValue().toByteArray();
+ Map<byte [], Result> m = this.gets.get(regionName);
+ GetResponse.Builder builder = GetResponse.newBuilder();
+ if (m != null) {
+ byte[] row = request.getGet().getRow().toByteArray();
+ builder.setResult(ProtobufUtil.toResult(m.get(row)));
+ }
+ return builder.build();
+ }
+
+ @Override
+ public MutateResponse mutate(RpcController controller, MutateRequest request)
+ throws ServiceException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ScanResponse scan(RpcController controller, ScanRequest request)
+ throws ServiceException {
+ ScanResponse.Builder builder = ScanResponse.newBuilder();
+ try {
+ if (request.hasScan()) {
+ byte[] regionName = request.getRegion().getValue().toByteArray();
+ builder.setScannerId(openScanner(regionName, null));
+ builder.setMoreResults(true);
+ }
+ else {
+ long scannerId = request.getScannerId();
+ Result result = next(scannerId);
+ if (result != null) {
+ builder.addResult(ProtobufUtil.toResult(result));
+ builder.setMoreResults(true);
+ }
+ else {
+ builder.setMoreResults(false);
+ close(scannerId);
+ }
+ }
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public LockRowResponse lockRow(RpcController controller,
+ LockRowRequest request) throws ServiceException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public UnlockRowResponse unlockRow(RpcController controller,
+ UnlockRowRequest request) throws ServiceException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public BulkLoadHFileResponse bulkLoadHFile(RpcController controller,
+ BulkLoadHFileRequest request) throws ServiceException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ExecCoprocessorResponse execCoprocessor(RpcController controller,
+ ExecCoprocessorRequest request) throws ServiceException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse multi(
+ RpcController controller, MultiRequest request) throws ServiceException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
\ No newline at end of file