You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/08/10 17:22:40 UTC
[3/5] hbase git commit: REVERT of revert of "HBASE-16308 Contain
protobuf references Gather up the pb references into a few locations only
rather than have pb references distributed all about the code base." This is
a revert of a revert; i.e. we are addi
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 882e21b..d2423b3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -18,12 +18,6 @@
*/
package org.apache.hadoop.hbase.client;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@@ -43,7 +37,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -74,6 +67,15 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
+import com.google.common.annotations.VisibleForTesting;
+// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
+// Internally, we use shaded protobuf. This below are part of our public API.
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+// SEE ABOVE NOTE!
+
/**
* An implementation of {@link Table}. Used to communicate with a single HBase table.
* Lightweight. Get as needed and just close when done.
@@ -416,23 +418,16 @@ public class HTable implements Table {
if (get.getConsistency() == Consistency.STRONG) {
// Good old call.
- final Get getReq = get;
+ final Get configuredGet = get;
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
- getName(), get.getRow()) {
+ this.rpcControllerFactory, getName(), get.getRow()) {
@Override
- public Result call(int callTimeout) throws IOException {
- ClientProtos.GetRequest request =
- RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- ClientProtos.GetResponse response = getStub().get(controller, request);
- if (response == null) return null;
- return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ protected Result rpcCall() throws Exception {
+ ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
+ getLocation().getRegionInfo().getRegionName(), configuredGet);
+ ClientProtos.GetResponse response = getStub().get(getRpcController(), request);
+ if (response == null) return null;
+ return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
}
};
return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable,
@@ -448,7 +443,6 @@ public class HTable implements Table {
return callable.call(operationTimeout);
}
-
/**
* {@inheritDoc}
*/
@@ -459,16 +453,14 @@ public class HTable implements Table {
}
try {
Object[] r1 = new Object[gets.size()];
- batch((List) gets, r1);
-
- // translate.
+ batch((List<? extends Row>)gets, r1);
+ // Translate.
Result [] results = new Result[r1.length];
- int i=0;
- for (Object o : r1) {
- // batch ensures if there is a failure we get an exception instead
- results[i++] = (Result) o;
+ int i = 0;
+ for (Object obj: r1) {
+ // Batch ensures if there is a failure we get an exception instead
+ results[i++] = (Result)obj;
}
-
return results;
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
@@ -516,21 +508,13 @@ public class HTable implements Table {
public void delete(final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
- tableName, delete.getRow()) {
+ this.rpcControllerFactory, getName(), delete.getRow()) {
@Override
- public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
-
- try {
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), delete);
- MutateResponse response = getStub().mutate(controller, request);
- return Boolean.valueOf(response.getProcessed());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ protected Boolean rpcCall() throws Exception {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), delete);
+ MutateResponse response = getStub().mutate(getRpcController(), request);
+ return Boolean.valueOf(response.getProcessed());
}
};
rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
@@ -586,41 +570,28 @@ public class HTable implements Table {
*/
@Override
public void mutateRow(final RowMutations rm) throws IOException {
- final RetryingTimeTracker tracker = new RetryingTimeTracker();
- PayloadCarryingServerCallable<MultiResponse> callable =
- new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
+ CancellableRegionServerCallable<MultiResponse> callable =
+ new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
rpcControllerFactory) {
- @Override
- public MultiResponse call(int callTimeout) throws IOException {
- tracker.start();
- controller.setPriority(tableName);
- int remainingTime = tracker.getRemainingTime(callTimeout);
- if (remainingTime == 0) {
- throw new DoNotRetryIOException("Timeout for mutate row");
- }
- controller.setCallTimeout(remainingTime);
- try {
- RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
- getLocation().getRegionInfo().getRegionName(), rm);
- regionMutationBuilder.setAtomic(true);
- MultiRequest request =
- MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
- ClientProtos.MultiResponse response = getStub().multi(controller, request);
- ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
- if (res.hasException()) {
- Throwable ex = ProtobufUtil.toException(res.getException());
- if (ex instanceof IOException) {
- throw (IOException) ex;
- }
- throw new IOException("Failed to mutate row: " +
- Bytes.toStringBinary(rm.getRow()), ex);
- }
- return ResponseConverter.getResults(request, response, controller.cellScanner());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ @Override
+ protected MultiResponse rpcCall() throws Exception {
+ RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
+ getLocation().getRegionInfo().getRegionName(), rm);
+ regionMutationBuilder.setAtomic(true);
+ MultiRequest request =
+ MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+ ClientProtos.MultiResponse response = getStub().multi(getRpcController(), request);
+ ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
+ if (res.hasException()) {
+ Throwable ex = ProtobufUtil.toException(res.getException());
+ if (ex instanceof IOException) {
+ throw (IOException) ex;
}
+ throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
}
- };
+ return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
+ }
+ };
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
null, null, callable, operationTimeout);
ars.waitUntilDone();
@@ -629,38 +600,32 @@ public class HTable implements Table {
}
}
+ private static void checkHasFamilies(final Mutation mutation) throws IOException {
+ if (mutation.numFamilies() == 0) {
+ throw new IOException("Invalid arguments to " + mutation + ", zero columns specified");
+ }
+ }
+
/**
* {@inheritDoc}
*/
@Override
public Result append(final Append append) throws IOException {
- if (append.numFamilies() == 0) {
- throw new IOException(
- "Invalid arguments to append, no columns specified");
- }
-
- NonceGenerator ng = this.connection.getNonceGenerator();
- final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
- RegionServerCallable<Result> callable =
- new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
- @Override
- public Result call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(getTableName());
- controller.setCallTimeout(callTimeout);
- try {
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
- MutateResponse response = getStub().mutate(controller, request);
- if (!response.hasResult()) return null;
- return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
- };
- return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
- this.operationTimeout);
+ checkHasFamilies(append);
+ NoncedRegionServerCallable<Result> callable =
+ new NoncedRegionServerCallable<Result>(this.connection,
+ this.rpcControllerFactory, getName(), append.getRow()) {
+ @Override
+ protected Result call(PayloadCarryingRpcController controller) throws Exception {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
+ MutateResponse response = getStub().mutate(controller, request);
+ if (!response.hasResult()) return null;
+ return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
+ }
+ };
+ return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeout).
+ callWithRetries(callable, this.operationTimeout);
}
/**
@@ -668,27 +633,17 @@ public class HTable implements Table {
*/
@Override
public Result increment(final Increment increment) throws IOException {
- if (!increment.hasFamilies()) {
- throw new IOException(
- "Invalid arguments to increment, no columns specified");
- }
- NonceGenerator ng = this.connection.getNonceGenerator();
- final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
- RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
- getName(), increment.getRow()) {
+ checkHasFamilies(increment);
+ NoncedRegionServerCallable<Result> callable =
+ new NoncedRegionServerCallable<Result>(this.connection,
+ this.rpcControllerFactory, getName(), increment.getRow()) {
@Override
- public Result call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(getTableName());
- controller.setCallTimeout(callTimeout);
- try {
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
- MutateResponse response = getStub().mutate(controller, request);
- return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ protected Result call(PayloadCarryingRpcController controller) throws Exception {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
+ MutateResponse response = getStub().mutate(controller, request);
+ // Should this check for null like append does?
+ return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
}
};
return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
@@ -725,30 +680,21 @@ public class HTable implements Table {
"Invalid arguments to incrementColumnValue", npe);
}
- NonceGenerator ng = this.connection.getNonceGenerator();
- final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
- RegionServerCallable<Long> callable =
- new RegionServerCallable<Long>(connection, getName(), row) {
- @Override
- public Long call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(getTableName());
- controller.setCallTimeout(callTimeout);
- try {
- MutateRequest request = RequestConverter.buildIncrementRequest(
- getLocation().getRegionInfo().getRegionName(), row, family,
- qualifier, amount, durability, nonceGroup, nonce);
- MutateResponse response = getStub().mutate(controller, request);
- Result result =
- ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
- return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
- };
- return rpcCallerFactory.<Long> newCaller(writeRpcTimeout).callWithRetries(callable,
- this.operationTimeout);
+ NoncedRegionServerCallable<Long> callable =
+ new NoncedRegionServerCallable<Long>(this.connection, this.rpcControllerFactory, getName(),
+ row) {
+ @Override
+ protected Long call(PayloadCarryingRpcController controller) throws Exception {
+ MutateRequest request = RequestConverter.buildIncrementRequest(
+ getLocation().getRegionInfo().getRegionName(), row, family,
+ qualifier, amount, durability, getNonceGroup(), getNonce());
+ MutateResponse response = getStub().mutate(controller, request);
+ Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
+ return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
+ }
+ };
+ return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeout).
+ callWithRetries(callable, this.operationTimeout);
}
/**
@@ -760,25 +706,19 @@ public class HTable implements Table {
final Put put)
throws IOException {
RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
- @Override
- public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), CompareType.EQUAL, put);
- MutateResponse response = getStub().mutate(controller, request);
- return Boolean.valueOf(response.getProcessed());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
- };
- return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
- this.operationTimeout);
+ new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
+ getName(), row) {
+ @Override
+ protected Boolean rpcCall() throws Exception {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+ new BinaryComparator(value), CompareType.EQUAL, put);
+ MutateResponse response = getStub().mutate(getRpcController(), request);
+ return Boolean.valueOf(response.getProcessed());
+ }
+ };
+ return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).
+ callWithRetries(callable, this.operationTimeout);
}
/**
@@ -790,56 +730,43 @@ public class HTable implements Table {
final Put put)
throws IOException {
RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
- @Override
- public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- CompareType compareType = CompareType.valueOf(compareOp.name());
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), compareType, put);
- MutateResponse response = getStub().mutate(controller, request);
- return Boolean.valueOf(response.getProcessed());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
- };
- return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
- this.operationTimeout);
+ new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
+ getName(), row) {
+ @Override
+ protected Boolean rpcCall() throws Exception {
+ CompareType compareType = CompareType.valueOf(compareOp.name());
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+ new BinaryComparator(value), compareType, put);
+ MutateResponse response = getStub().mutate(getRpcController(), request);
+ return Boolean.valueOf(response.getProcessed());
+ }
+ };
+ return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).
+ callWithRetries(callable, this.operationTimeout);
}
/**
* {@inheritDoc}
*/
@Override
- public boolean checkAndDelete(final byte [] row,
- final byte [] family, final byte [] qualifier, final byte [] value,
- final Delete delete)
+ public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
+ final byte [] value, final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
- @Override
- public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), CompareType.EQUAL, delete);
- MutateResponse response = getStub().mutate(controller, request);
- return Boolean.valueOf(response.getProcessed());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
- };
- return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
- this.operationTimeout);
+ new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
+ getName(), row) {
+ @Override
+ protected Boolean rpcCall() throws Exception {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+ new BinaryComparator(value), CompareType.EQUAL, delete);
+ MutateResponse response = getStub().mutate(getRpcController(), request);
+ return Boolean.valueOf(response.getProcessed());
+ }
+ };
+ return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).
+ callWithRetries(callable, this.operationTimeout);
}
/**
@@ -851,25 +778,19 @@ public class HTable implements Table {
final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(connection, getName(), row) {
- @Override
- public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- CompareType compareType = CompareType.valueOf(compareOp.name());
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), compareType, delete);
- MutateResponse response = getStub().mutate(controller, request);
- return Boolean.valueOf(response.getProcessed());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
- };
- return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
+ new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
+ getName(), row) {
+ @Override
+ protected Boolean rpcCall() throws Exception {
+ CompareType compareType = CompareType.valueOf(compareOp.name());
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+ new BinaryComparator(value), compareType, delete);
+ MutateResponse response = getStub().mutate(getRpcController(), request);
+ return Boolean.valueOf(response.getProcessed());
+ }
+ };
+ return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@@ -880,40 +801,29 @@ public class HTable implements Table {
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final RowMutations rm)
throws IOException {
- final RetryingTimeTracker tracker = new RetryingTimeTracker();
- PayloadCarryingServerCallable<MultiResponse> callable =
- new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
+ CancellableRegionServerCallable<MultiResponse> callable =
+ new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
rpcControllerFactory) {
@Override
- public MultiResponse call(int callTimeout) throws IOException {
- tracker.start();
- controller.setPriority(tableName);
- int remainingTime = tracker.getRemainingTime(callTimeout);
- if (remainingTime == 0) {
- throw new DoNotRetryIOException("Timeout for mutate row");
- }
- controller.setCallTimeout(remainingTime);
- try {
- CompareType compareType = CompareType.valueOf(compareOp.name());
- MultiRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), compareType, rm);
- ClientProtos.MultiResponse response = getStub().multi(controller, request);
- ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
- if (res.hasException()) {
- Throwable ex = ProtobufUtil.toException(res.getException());
- if(ex instanceof IOException) {
- throw (IOException)ex;
- }
- throw new IOException("Failed to checkAndMutate row: "+
- Bytes.toStringBinary(rm.getRow()), ex);
+ protected MultiResponse rpcCall() throws Exception {
+ CompareType compareType = CompareType.valueOf(compareOp.name());
+ MultiRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+ new BinaryComparator(value), compareType, rm);
+ ClientProtos.MultiResponse response = getStub().multi(getRpcController(), request);
+ ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
+ if (res.hasException()) {
+ Throwable ex = ProtobufUtil.toException(res.getException());
+ if (ex instanceof IOException) {
+ throw (IOException)ex;
}
- return ResponseConverter.getResults(request, response, controller.cellScanner());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ throw new IOException("Failed to checkAndMutate row: "+
+ Bytes.toStringBinary(rm.getRow()), ex);
}
+ return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
}
};
+
/**
* Currently, we use one array to store 'processed' flag which is returned by server.
* It is excessive to send such a large array, but that is required by the framework right now
@@ -973,7 +883,6 @@ public class HTable implements Table {
}
/**
- * {@inheritDoc}
* @throws IOException
*/
void flushCommits() throws IOException {
@@ -1150,19 +1059,18 @@ public class HTable implements Table {
for (final byte[] r : keys) {
final RegionCoprocessorRpcChannel channel =
new RegionCoprocessorRpcChannel(connection, tableName, r);
- Future<R> future = pool.submit(
- new Callable<R>() {
- @Override
- public R call() throws Exception {
- T instance = ProtobufUtil.newServiceStub(service, channel);
- R result = callable.call(instance);
- byte[] region = channel.getLastRegion();
- if (callback != null) {
- callback.update(region, r, result);
- }
- return result;
- }
- });
+ Future<R> future = pool.submit(new Callable<R>() {
+ @Override
+ public R call() throws Exception {
+ T instance = ProtobufUtil.newServiceStub(service, channel);
+ R result = callable.call(instance);
+ byte[] region = channel.getLastRegion();
+ if (callback != null) {
+ callback.update(region, r, result);
+ }
+ return result;
+ }
+ });
futures.put(r, future);
}
for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
@@ -1236,9 +1144,6 @@ public class HTable implements Table {
return tableName + ";" + connection;
}
- /**
- * {@inheritDoc}
- */
@Override
public <R extends Message> Map<byte[], R> batchCoprocessorService(
Descriptors.MethodDescriptor methodDescriptor, Message request,
@@ -1247,14 +1152,13 @@ public class HTable implements Table {
Bytes.BYTES_COMPARATOR));
batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
new Callback<R>() {
-
- @Override
- public void update(byte[] region, byte[] row, R result) {
- if (region != null) {
- results.put(region, result);
- }
- }
- });
+ @Override
+ public void update(byte[] region, byte[] row, R result) {
+ if (region != null) {
+ results.put(region, result);
+ }
+ }
+ });
return results;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
index 66d3c21..8c4da68 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
@@ -21,16 +21,34 @@ package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+
/**
- * A RetryingCallable for master operations.
+ * A RetryingCallable for Master RPC operations.
+ * Implement the #rpcCall method. It will be retried on error. See its javadoc and the javadoc of
+ * #call(int). See {@link HBaseAdmin} for examples of how this is used. To get at the
+ * rpcController that has been created and configured to make this rpc call, use getRpcController().
+ * We are trying to contain all protobuf references including references to rpcController so we
+ * don't pollute codebase with protobuf references; keep the protobuf references contained and only
+ * present in a few classes rather than all about the code base.
+ * <p>Like {@link RegionServerCallable} only in here, we can safely be PayloadCarryingRpcController
+ * all the time. This is not possible in the similar {@link RegionServerCallable} Callable because
+ * it has to deal with Coprocessor Endpoints.
* @param <V> return type
*/
abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
- protected ClusterConnection connection;
+ protected final ClusterConnection connection;
protected MasterKeepAliveConnection master;
+ private final PayloadCarryingRpcController rpcController;
- public MasterCallable(final Connection connection) {
+ MasterCallable(final Connection connection, final RpcControllerFactory rpcConnectionFactory) {
this.connection = (ClusterConnection) connection;
+ this.rpcController = rpcConnectionFactory.newController();
}
@Override
@@ -43,6 +61,7 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
// The above prepare could fail but this would still be called though masterAdmin is null
if (this.master != null) {
this.master.close();
+ this.master = null;
}
}
@@ -59,4 +78,65 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
public long sleep(long pause, int tries) {
return ConnectionUtils.getPauseTime(pause, tries);
}
+
+ /**
+ * Override that changes the {@link Callable#call()} Exception from {@link Exception} to
+ * {@link IOException}. It also does setup of an rpcController and calls through to the rpcCall()
+ * method which callers are expected to implement. If rpcController is an instance of
+ * PayloadCarryingRpcController, we will set a timeout on it.
+ */
+ @Override
+ // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
+ // and so we contain references to protobuf. We can't set priority on the rpcController as
+ // we do in RegionServerCallable because we don't always have a Table when we call.
+ public V call(int callTimeout) throws IOException {
+ try {
+ if (this.rpcController != null) {
+ this.rpcController.setCallTimeout(callTimeout);
+ }
+ return rpcCall();
+ } catch (Exception e) {
+ throw ProtobufUtil.handleRemoteException(e);
+ }
+ }
+
+ /**
+ * Run the RPC call. Implement this method. To get at the rpcController that has been created
+ * and configured to make this rpc call, use getRpcController(). We are trying to contain
+ * rpcController references so we don't pollute codebase with protobuf references; keep the
+ * protobuf references contained and only present in a few classes rather than all about the
+ * code base.
+ * @throws Exception
+ */
+ protected abstract V rpcCall() throws Exception;
+
+ PayloadCarryingRpcController getRpcController() {
+ return this.rpcController;
+ }
+
+ void setPriority(final int priority) {
+ if (this.rpcController != null) {
+ this.rpcController.setPriority(priority);
+ }
+ }
+
+ void setPriority(final TableName tableName) {
+ if (this.rpcController != null) {
+ this.rpcController.setPriority(tableName);
+ }
+ }
+
+ /**
+ * @param regionName RegionName. If hbase:meta, we'll set high priority.
+ */
+ void setPriority(final byte [] regionName) {
+ if (isMetaRegion(regionName)) {
+ setPriority(TableName.META_TABLE_NAME);
+ }
+ }
+
+ private static boolean isMetaRegion(final byte[] regionName) {
+ return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
+ || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
index e445b78..47693f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
@@ -33,8 +33,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
* against the master on the MasterProtos.MasterService.BlockingInterface; but not by
* final user code. Hence it's package protected.
*/
-interface MasterKeepAliveConnection
-extends MasterProtos.MasterService.BlockingInterface {
+interface MasterKeepAliveConnection extends MasterProtos.MasterService.BlockingInterface {
// Do this instead of implement Closeable because closeable returning IOE is PITA.
void close();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index e764ceb..1ce4aab 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -30,8 +30,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -41,15 +42,15 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
/**
* Callable that handles the <code>multi</code> method call going against a single
- * regionserver; i.e. A {@link RegionServerCallable} for the multi call (It is not a
- * {@link RegionServerCallable} that goes against multiple regions.
+ * regionserver; i.e. A RegionServerCallable for the multi call (It is NOT a
+ * RegionServerCallable that goes against multiple regions).
* @param <R>
*/
-class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> {
+@InterfaceAudience.Private
+class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiResponse> {
private final MultiAction<R> multiAction;
private final boolean cellBlock;
@@ -79,7 +80,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
}
@Override
- public MultiResponse call(int callTimeout) throws IOException {
+ protected MultiResponse rpcCall() throws Exception {
int countOfActions = this.multiAction.size();
if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
@@ -98,10 +99,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
regionActionBuilder.clear();
regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName));
-
-
if (this.cellBlock) {
- // Presize. Presume at least a KV per Action. There are likely more.
+ // Pre-size. Presume at least a KV per Action. There are likely more.
if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
// Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
// They have already been handled above. Guess at count of cells
@@ -114,20 +113,13 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
multiRequestBuilder.addRegionAction(regionActionBuilder.build());
}
- // Controller optionally carries cell data over the proxy/service boundary and also
- // optionally ferries cell response data back out again.
- if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
- controller.setPriority(getTableName());
- controller.setCallTimeout(callTimeout);
- ClientProtos.MultiResponse responseProto;
- ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
- try {
- responseProto = getStub().multi(controller, requestProto);
- } catch (ServiceException e) {
- throw ProtobufUtil.getRemoteException(e);
+ if (cells != null) {
+ setRpcControllerCellScanner(CellUtil.createCellScanner(cells));
}
+ ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
+ ClientProtos.MultiResponse responseProto = getStub().multi(getRpcController(), requestProto);
if (responseProto == null) return null; // Occurs on cancel
- return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
+ return ResponseConverter.getResults(requestProto, responseProto, getRpcControllerCellScanner());
}
/**
@@ -151,4 +143,4 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
ServerName getServerName() {
return location.getServerName();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
new file mode 100644
index 0000000..21e77bd
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+
+/**
+ * Implementations make an rpc call against a RegionService via a protobuf Service.
+ * Implement #rpcCall(RpcController) and then call {@link #call(int)} to
+ * trigger the rpc. The {@link #call(int)} eventually invokes your
+ * #rpcCall(RpcController) meanwhile saving you having to write a bunch of
+ * boilerplate. The {@link #call(int)} implementation is from {@link RpcRetryingCaller} so rpcs are
+ * retried on fail.
+ *
+ * <p>TODO: this class is actually tied to one region, because most of the paths make use of
+ * the regioninfo part of location when building requests. The only reason it works for
+ * multi-region requests (e.g. batch) is that they happen to not use the region parts.
+ * This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
+ * RegionCallable and actual RegionServerCallable with ServerName.
+ * @param <T> the class that the ServerCallable handles
+ */
+@InterfaceAudience.Private
+public abstract class NoncedRegionServerCallable<T> extends AbstractRegionServerCallable<T> {
+ private ClientService.BlockingInterface stub;
+ private final PayloadCarryingRpcController rpcController;
+ private final long nonce;
+
+ /**
+ * @param connection Connection to use.
+ * @param tableName Table name to which <code>row</code> belongs.
+ * @param row The row we want in <code>tableName</code>.
+ */
+ public NoncedRegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory,
+ TableName tableName, byte [] row) {
+ this(connection, rpcControllerFactory.newController(), tableName, row);
+ }
+
+ public NoncedRegionServerCallable(Connection connection, PayloadCarryingRpcController rpcController,
+ TableName tableName, byte [] row) {
+ super(connection, tableName, row);
+ this.rpcController = rpcController;
+ if (this.rpcController != null) {
+ this.rpcController.setPriority(tableName);
+ }
+ this.nonce = getConnection().getNonceGenerator().newNonce();
+ }
+
+ void setClientByServiceName(ServerName service) throws IOException {
+ this.setStub(getConnection().getClient(service));
+ }
+
+ /**
+ * @return Client Rpc protobuf communication stub
+ */
+ protected ClientService.BlockingInterface getStub() {
+ return this.stub;
+ }
+
+ /**
+ * Set the client protobuf communication stub
+ * @param stub to set
+ */
+ void setStub(final ClientService.BlockingInterface stub) {
+ this.stub = stub;
+ }
+
+ /**
+ * Override that changes Exception from {@link Exception} to {@link IOException}. It also does
+ * setup of an rpcController and calls through to the unimplemented
+ * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
+ */
+ @Override
+ public T call(int callTimeout) throws IOException {
+ if (this.rpcController != null) {
+ this.rpcController.setCallTimeout(callTimeout);
+ }
+ try {
+ return call(this.rpcController);
+ } catch (Exception e) {
+ throw ProtobufUtil.handleRemoteException(e);
+ }
+ }
+
+ /**
+ * Run RPC call.
+ * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
+ * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
+ * class.
+ * @throws Exception
+ */
+ protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception;
+
+ public PayloadCarryingRpcController getRpcController() {
+ return this.rpcController;
+ }
+
+ long getNonceGroup() {
+ return getConnection().getNonceGenerator().getNonceGroup();
+ }
+
+ long getNonce() {
+ return this.nonce;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
deleted file mode 100644
index d94f069..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-
-/**
- * This class is used to unify HTable calls with AsyncProcess Framework.
- * HTable can use AsyncProcess directly though this class.
- */
-@InterfaceAudience.Private
-public abstract class PayloadCarryingServerCallable<T>
- extends RegionServerCallable<T> implements Cancellable {
- protected PayloadCarryingRpcController controller;
-
- public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
- RpcControllerFactory rpcControllerFactory) {
- super(connection, tableName, row);
- this.controller = rpcControllerFactory.newController();
- }
-
- @Override
- public void cancel() {
- controller.startCancel();
- }
-
- @Override
- public boolean isCancelled() {
- return controller.isCanceled();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
index 54c93a0..4e347dd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
@@ -27,31 +27,30 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.util.Bytes;
/**
- * Similar to {@link RegionServerCallable} but for the AdminService interface. This service callable
+ * Similar to RegionServerCallable but for the AdminService interface. This service callable
* assumes a Table and row and thus does region locating similar to RegionServerCallable.
+ * Works against Admin stub rather than Client stub.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD",
justification="stub used by ipc")
@InterfaceAudience.Private
public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> {
-
- protected final ClusterConnection connection;
-
- protected final RpcControllerFactory rpcControllerFactory;
-
protected AdminService.BlockingInterface stub;
+ protected final RpcControllerFactory rpcControllerFactory;
+ private PayloadCarryingRpcController controller = null;
+ protected final ClusterConnection connection;
protected HRegionLocation location;
-
protected final TableName tableName;
protected final byte[] row;
protected final int replicaId;
-
protected final static int MIN_WAIT_DEAD_SERVER = 10000;
public RegionAdminServiceCallable(ClusterConnection connection,
@@ -82,16 +81,13 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
-
if (reload || location == null) {
location = getLocation(!reload);
}
-
if (location == null) {
// With this exception, there will be a retry.
throw new HBaseIOException(getExceptionMessage());
}
-
this.setStub(connection.getAdmin(location.getServerName()));
}
@@ -167,7 +163,39 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
if (rl == null) {
throw new RetriesExhaustedException("Can't get the locations");
}
-
return rl;
}
-}
+
+ /**
+ * Override that changes Exception from {@link Exception} to {@link IOException}. It also does
+ * setup of an rpcController and calls through to the unimplemented
+ * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
+ */
+ @Override
+ // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
+ // and so we contain references to protobuf. We can't set priority on the rpcController as
+ // we do in RegionServerCallable because we don't always have a Table when we call.
+ public T call(int callTimeout) throws IOException {
+ this.controller = rpcControllerFactory.newController();
+ this.controller.setPriority(this.tableName);
+ this.controller.setCallTimeout(callTimeout);
+ try {
+ return call(this.controller);
+ } catch (Exception e) {
+ throw ProtobufUtil.handleRemoteException(e);
+ }
+ }
+
+ PayloadCarryingRpcController getCurrentPayloadCarryingRpcController() {
+ return this.controller;
+ }
+
+ /**
+ * Run RPC call.
+ * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
+ * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
+ * class.
+ * @throws Exception
+ */
+ protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index d878bae..3771c50 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,34 +20,62 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import com.google.protobuf.RpcController;
+
/**
- * Implementations call a RegionServer and implement {@link #call(int)}.
- * Passed to a {@link RpcRetryingCaller} so we retry on fail.
- * TODO: this class is actually tied to one region, because most of the paths make use of
+ * Implementations make an rpc call against a RegionService via a protobuf Service.
+ * Implement rpcCall(). Be sure to make use of the RpcController that this instance is carrying
+ * via {@link #getRpcController()}.
+ *
+ * <p>TODO: this class is actually tied to one region, because most of the paths make use of
* the regioninfo part of location when building requests. The only reason it works for
* multi-region requests (e.g. batch) is that they happen to not use the region parts.
* This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
* RegionCallable and actual RegionServerCallable with ServerName.
+ *
* @param <T> the class that the ServerCallable handles
*/
@InterfaceAudience.Private
-public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> implements
- RetryingCallable<T> {
-
+public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> {
private ClientService.BlockingInterface stub;
+ /* This is 99% of the time a PayloadCarryingRpcController but this RegionServerCallable is
+ * also used doing Coprocessor Endpoints and in this case, it is a ServerRpcControllable which is
+ * not a PayloadCarryingRpcController. Too hard to untangle it all at this stage since
+ * downstreamers are using RegionServerCallable invoking CPEPs so just do ugly instanceof
+ * checks in the below.
+ */
+ private final RpcController rpcController;
+
/**
* @param connection Connection to use.
* @param tableName Table name to which <code>row</code> belongs.
* @param row The row we want in <code>tableName</code>.
*/
- public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {
+ public RegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory,
+ TableName tableName, byte [] row) {
+ this(connection, rpcControllerFactory.newController(), tableName, row);
+ }
+
+ public RegionServerCallable(Connection connection, RpcController rpcController,
+ TableName tableName, byte [] row) {
super(connection, tableName, row);
+ this.rpcController = rpcController;
+ // If it is an instance of PayloadCarryingRpcController, we can set priority on the
+ // controller based off the tableName. RpcController may be null in tests when mocking so allow
+ // for null controller.
+ if (this.rpcController != null && this.rpcController instanceof PayloadCarryingRpcController) {
+ ((PayloadCarryingRpcController)this.rpcController).setPriority(tableName);
+ }
}
void setClientByServiceName(ServerName service) throws IOException {
@@ -69,4 +96,55 @@ public abstract class RegionServerCallable<T> extends AbstractRegionServerCallab
void setStub(final ClientService.BlockingInterface stub) {
this.stub = stub;
}
-}
+
+ /**
+ * Override that changes call Exception from {@link Exception} to {@link IOException}. It also
+ * does setup of an rpcController and calls through to the unimplemented
+ * rpcCall() method. If rpcController is an instance of PayloadCarryingRpcController,
+ * we will set a timeout on it.
+ */
+ @Override
+ public T call(int callTimeout) throws IOException {
+ try {
+ if (this.rpcController != null &&
+ this.rpcController instanceof PayloadCarryingRpcController) {
+ ((PayloadCarryingRpcController)this.rpcController).setCallTimeout(callTimeout);
+ // Do a reset of the CellScanner in case we are carrying any Cells since last time through.
+ setRpcControllerCellScanner(null);
+ }
+ return rpcCall();
+ } catch (Exception e) {
+ throw ProtobufUtil.handleRemoteException(e);
+ }
+ }
+
+ /**
+ * Run the RPC call. Implement this method. To get at the rpcController that has been created
+ * and configured to make this rpc call, use getRpcController(). We are trying to contain
+ * rpcController references so we don't pollute codebase with protobuf references; keep the
+ * protobuf references contained and only present in a few classes rather than all about the
+ * code base.
+ * @throws Exception
+ */
+ protected abstract T rpcCall() throws Exception;
+
+ protected RpcController getRpcController() {
+ return this.rpcController;
+ }
+
+ /**
+ * Get the RpcController CellScanner.
+ * If the RpcController is a PayloadCarryingRpcController, which it is in all cases except
+ * when we are processing Coprocessor Endpoint, then this method returns a reference to the
+ * CellScanner that the PayloadCarryingRpcController is carrying. Do it up here in this Callable
+ * so we don't have to scatter ugly instanceof tests around the codebase. Will fail if called in
+ * a Coproccessor Endpoint context. Should never happen.
+ */
+ protected CellScanner getRpcControllerCellScanner() {
+ return ((PayloadCarryingRpcController)this.rpcController).cellScanner();
+ }
+
+ protected void setRpcControllerCellScanner(CellScanner cellScanner) {
+ ((PayloadCarryingRpcController)this.rpcController).setCellScanner(cellScanner);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
index 2377a0d..afbcc9a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
@@ -36,4 +36,4 @@ public interface RetryingCallable<T> extends RetryingCallableBase {
* @throws Exception if unable to compute a result
*/
T call(int callTimeout) throws Exception;
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
index 24288e6..b9438e6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* Tracks the amount of time remaining for an operation.
*/
class RetryingTimeTracker {
-
private long globalStartTime = -1;
public void start() {
@@ -38,16 +37,19 @@ class RetryingTimeTracker {
if (callTimeout == Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
- int remainingTime = (int) (
- callTimeout -
- (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
+ long remaining = EnvironmentEdgeManager.currentTime() - this.globalStartTime;
+ long remainingTime = callTimeout - remaining;
if (remainingTime < 1) {
// If there is no time left, we're trying anyway. It's too late.
// 0 means no timeout, and it's not the intent here. So we secure both cases by
// resetting to the minimum.
remainingTime = 1;
}
- return remainingTime;
+ if (remainingTime > Integer.MAX_VALUE) {
+ throw new RuntimeException("remainingTime=" + remainingTime +
+ " which is > Integer.MAX_VALUE");
+ }
+ return (int)remainingTime;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
index 0c2d345..a5bebd0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
@@ -176,9 +176,9 @@ public class ReversedScannerCallable extends ScannerCallable {
@Override
public ScannerCallable getScannerCallableForReplica(int id) {
- ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, this.tableName,
- this.getScan(), this.scanMetrics, this.locateStartRow, controllerFactory, id);
+ ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), this.tableName,
+ this.getScan(), this.scanMetrics, this.locateStartRow, rpcControllerFactory, id);
r.setCaching(this.getCaching());
return r;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
new file mode 100644
index 0000000..68a4aa2
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+
+/**
+ * A RetryingCallable for RPC connection operations.
+ * @param <V> return type
+ */
+abstract class RpcRetryingCallable<V> implements RetryingCallable<V>, Closeable {
+ @Override
+ public void prepare(boolean reload) throws IOException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void throwable(Throwable t, boolean retrying) {
+ }
+
+ @Override
+ public String getExceptionMessageAdditionalDetail() {
+ return "";
+ }
+
+ @Override
+ public long sleep(long pause, int tries) {
+ return ConnectionUtils.getPauseTime(pause, tries);
+ }
+
+ @Override
+ // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
+ // and so we contain references to protobuf.
+ public V call(int callTimeout) throws IOException {
+ try {
+ return rpcCall(callTimeout);
+ } catch (Exception e) {
+ throw ProtobufUtil.handleRemoteException(e);
+ }
+ }
+
+ protected abstract V rpcCall(int callTimeout) throws Exception;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index b4cd2ef..2b2e4c8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -22,9 +22,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import java.io.IOException;
-/**
- *
- */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface RpcRetryingCaller<T> {
@@ -52,4 +49,4 @@ public interface RpcRetryingCaller<T> {
*/
T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
throws IOException, RuntimeException;
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 1c723c5..f92aeae 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -36,6 +36,7 @@ public class RpcRetryingCallerFactory {
private final int rpcTimeout;
private final RetryingCallerInterceptor interceptor;
private final int startLogErrorsCnt;
+ /* These below data members are UNUSED!!!*/
private final boolean enableBackPressure;
private ServerStatisticTracker stats;
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 65dbb10..8d63295 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -29,8 +29,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
@@ -46,8 +44,6 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import com.google.protobuf.ServiceException;
-
/**
* Caller that goes to replica if the primary region does no answer within a configurable
@@ -57,8 +53,6 @@ import com.google.protobuf.ServiceException;
*/
@InterfaceAudience.Private
public class RpcRetryingCallerWithReadReplicas {
- private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
-
protected final ExecutorService pool;
protected final ClusterConnection cConnection;
protected final Configuration conf;
@@ -98,7 +92,7 @@ public class RpcRetryingCallerWithReadReplicas {
private final PayloadCarryingRpcController controller;
public ReplicaRegionServerCallable(int id, HRegionLocation location) {
- super(RpcRetryingCallerWithReadReplicas.this.cConnection,
+ super(RpcRetryingCallerWithReadReplicas.this.cConnection, rpcControllerFactory,
RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
this.id = id;
this.location = location;
@@ -141,28 +135,22 @@ public class RpcRetryingCallerWithReadReplicas {
}
@Override
- public Result call(int callTimeout) throws Exception {
+ protected Result rpcCall() throws Exception {
if (controller.isCanceled()) return null;
-
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
-
byte[] reg = location.getRegionInfo().getRegionName();
-
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(reg, get);
- controller.setCallTimeout(callTimeout);
-
- try {
- ClientProtos.GetResponse response = getStub().get(controller, request);
- if (response == null) {
- return null;
- }
- return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ // Presumption that we are passed a PayloadCarryingRpcController here!
+ PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
+ pcrc.setCallTimeout(callTimeout);
+ ClientProtos.GetResponse response = getStub().get(controller, request);
+ if (response == null) {
+ return null;
}
+ return ProtobufUtil.toResult(response.getResult(), pcrc.cellScanner());
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 72d69ec..0ee54d0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
@@ -52,9 +51,6 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
-
/**
* Scanner operations such as create, next, etc.
* Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as
@@ -74,7 +70,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
protected boolean renew = false;
private Scan scan;
private int caching = 1;
- protected final ClusterConnection cConnection;
protected ScanMetrics scanMetrics;
private boolean logScannerActivity = false;
private int logCutOffLatency = 1000;
@@ -99,8 +94,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
// indicate if it is a remote server call
protected boolean isRegionServerRemote = true;
private long nextCallSeq = 0;
- protected RpcControllerFactory controllerFactory;
- protected PayloadCarryingRpcController controller;
+ protected final RpcControllerFactory rpcControllerFactory;
/**
* @param connection which connection
@@ -125,19 +119,14 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
*/
public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
- super(connection, tableName, scan.getStartRow());
+ super(connection, rpcControllerFactory, tableName, scan.getStartRow());
this.id = id;
- this.cConnection = connection;
this.scan = scan;
this.scanMetrics = scanMetrics;
Configuration conf = connection.getConfiguration();
logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
- this.controllerFactory = rpcControllerFactory;
- }
-
- PayloadCarryingRpcController getController() {
- return controller;
+ this.rpcControllerFactory = rpcControllerFactory;
}
/**
@@ -185,25 +174,16 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
}
}
-
- @Override
- public Result [] call(int callTimeout) throws IOException {
+ protected Result [] rpcCall() throws Exception {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
-
- if (controller == null) {
- controller = controllerFactory.newController();
- controller.setPriority(getTableName());
- controller.setCallTimeout(callTimeout);
- }
-
- if (closed) {
- if (scannerId != -1) {
+ if (this.closed) {
+ if (this.scannerId != -1) {
close();
}
} else {
- if (scannerId == -1L) {
+ if (this.scannerId == -1L) {
this.scannerId = openScanner();
} else {
Result [] rrs = null;
@@ -212,61 +192,54 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
setHeartbeatMessage(false);
try {
incRPCcallsMetrics();
- request =
- RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
+ request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
this.scanMetrics != null, renew);
ScanResponse response = null;
- try {
- response = getStub().scan(controller, request);
- // Client and RS maintain a nextCallSeq number during the scan. Every next() call
- // from client to server will increment this number in both sides. Client passes this
- // number along with the request and at RS side both the incoming nextCallSeq and its
- // nextCallSeq will be matched. In case of a timeout this increment at the client side
- // should not happen. If at the server side fetching of next batch of data was over,
- // there will be mismatch in the nextCallSeq number. Server will throw
- // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
- // as the last successfully retrieved row.
- // See HBASE-5974
- nextCallSeq++;
- long timestamp = System.currentTimeMillis();
- setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
- // Results are returned via controller
- CellScanner cellScanner = controller.cellScanner();
- rrs = ResponseConverter.getResults(cellScanner, response);
- if (logScannerActivity) {
- long now = System.currentTimeMillis();
- if (now - timestamp > logCutOffLatency) {
- int rows = rrs == null ? 0 : rrs.length;
- LOG.info("Took " + (now-timestamp) + "ms to fetch "
+ response = getStub().scan(getRpcController(), request);
+ // Client and RS maintain a nextCallSeq number during the scan. Every next() call
+ // from client to server will increment this number in both sides. Client passes this
+ // number along with the request and at RS side both the incoming nextCallSeq and its
+ // nextCallSeq will be matched. In case of a timeout this increment at the client side
+ // should not happen. If at the server side fetching of next batch of data was over,
+ // there will be mismatch in the nextCallSeq number. Server will throw
+ // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
+ // as the last successfully retrieved row.
+ // See HBASE-5974
+ nextCallSeq++;
+ long timestamp = System.currentTimeMillis();
+ setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
+ rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
+ if (logScannerActivity) {
+ long now = System.currentTimeMillis();
+ if (now - timestamp > logCutOffLatency) {
+ int rows = rrs == null ? 0 : rrs.length;
+ LOG.info("Took " + (now-timestamp) + "ms to fetch "
+ rows + " rows from scanner=" + scannerId);
- }
}
- updateServerSideMetrics(response);
- // moreResults is only used for the case where a filter exhausts all elements
- if (response.hasMoreResults() && !response.getMoreResults()) {
- scannerId = -1L;
- closed = true;
- // Implied that no results were returned back, either.
- return null;
- }
- // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
- // to size or quantity of results in the response.
- if (response.hasMoreResultsInRegion()) {
- // Set what the RS said
- setHasMoreResultsContext(true);
- setServerHasMoreResults(response.getMoreResultsInRegion());
- } else {
- // Server didn't respond whether it has more results or not.
- setHasMoreResultsContext(false);
- }
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ }
+ updateServerSideMetrics(response);
+ // moreResults is only used for the case where a filter exhausts all elements
+ if (response.hasMoreResults() && !response.getMoreResults()) {
+ this.scannerId = -1L;
+ this.closed = true;
+ // Implied that no results were returned back, either.
+ return null;
+ }
+ // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
+ // to size or quantity of results in the response.
+ if (response.hasMoreResultsInRegion()) {
+ // Set what the RS said
+ setHasMoreResultsContext(true);
+ setServerHasMoreResults(response.getMoreResultsInRegion());
+ } else {
+ // Server didn't respond whether it has more results or not.
+ setHasMoreResultsContext(false);
}
updateResultsMetrics(rrs);
} catch (IOException e) {
if (logScannerActivity) {
- LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
- + " to " + getLocation(), e);
+ LOG.info("Got exception making request " + ProtobufUtil.toText(request) + " to " +
+ getLocation(), e);
}
IOException ioe = e;
if (e instanceof RemoteException) {
@@ -275,9 +248,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
try {
HRegionLocation location =
- getConnection().relocateRegion(getTableName(), scan.getStartRow());
- LOG.info("Scanner=" + scannerId
- + " expired, current region location is " + location.toString());
+ getConnection().relocateRegion(getTableName(), scan.getStartRow());
+ LOG.info("Scanner=" + scannerId + " expired, current region location is " +
+ location.toString());
} catch (Throwable t) {
LOG.info("Failed to relocate region", t);
}
@@ -375,9 +348,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
ScanRequest request =
RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
try {
- getStub().scan(controller, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ getStub().scan(getRpcController(), request);
+ } catch (Exception e) {
+ throw ProtobufUtil.handleRemoteException(e);
}
} catch (IOException e) {
LOG.warn("Ignore, probably already closed", e);
@@ -387,20 +360,18 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
protected long openScanner() throws IOException {
incRPCcallsMetrics();
- ScanRequest request =
- RequestConverter.buildScanRequest(
- getLocation().getRegionInfo().getRegionName(),
- this.scan, 0, false);
+ ScanRequest request = RequestConverter.buildScanRequest(
+ getLocation().getRegionInfo().getRegionName(), this.scan, 0, false);
try {
- ScanResponse response = getStub().scan(controller, request);
+ ScanResponse response = getStub().scan(getRpcController(), request);
long id = response.getScannerId();
if (logScannerActivity) {
LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
+ " on region " + getLocation().toString());
}
return id;
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ } catch (Exception e) {
+ throw ProtobufUtil.handleRemoteException(e);
}
}
@@ -443,11 +414,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
return caching;
}
- @Override
- public ClusterConnection getConnection() {
- return cConnection;
- }
-
/**
* Set the number of rows that will be fetched on next
* @param caching the number of rows for caching
@@ -458,7 +424,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
public ScannerCallable getScannerCallableForReplica(int id) {
ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
- this.getScan(), this.scanMetrics, controllerFactory, id);
+ this.getScan(), this.scanMetrics, this.rpcControllerFactory, id);
s.setCaching(this.caching);
return s;
}
@@ -488,4 +454,4 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
this.serverHasMoreResultsContext = serverHasMoreResultsContext;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index c3a3834..096841b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -267,7 +267,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
/**
* When a scanner switches in the middle of scanning (the 'next' call fails
* for example), the upper layer {@link ClientScanner} needs to know
- * @return
*/
public boolean switchedToADifferentReplica() {
return replicaSwitched.get();
@@ -398,8 +397,8 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
public void cancel() {
cancelled = true;
caller.cancel();
- if (callable.getController() != null) {
- callable.getController().startCancel();
+ if (callable.getRpcController() != null) {
+ callable.getRpcController().startCancel();
}
someRPCcancelled = true;
}