You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/08/17 18:34:45 UTC
[39/50] [abbrv] hbase git commit: REVERT of revert of "HBASE-16308
Contain protobuf references Gather up the pb references into a few locations
only rather than have pb references distributed all about the code base."
This is a revert of a revert; i.e. w
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 29650ef..48a614f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -18,10 +18,6 @@
*/
package org.apache.hadoop.hbase.client;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
-
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -32,6 +28,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -69,7 +66,6 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.security.SecurityCapability;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
@@ -183,6 +179,9 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+
/**
* HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that
* this is an HBase-internal class as defined in
@@ -211,10 +210,6 @@ public class HBaseAdmin implements Admin {
private volatile Configuration conf;
private final long pause;
private final int numRetries;
- // Some operations can take a long time such as disable of big table.
- // numRetries is for 'normal' stuff... Multiply by this factor when
- // want to wait a long time.
- private final int retryLongerMultiplier;
private final int syncWaitTimeout;
private boolean aborted;
private int operationTimeout;
@@ -239,8 +234,6 @@ public class HBaseAdmin implements Admin {
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- this.retryLongerMultiplier = this.conf.getInt(
- "hbase.client.retries.longer.multiplier", 10);
this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
@@ -262,7 +255,7 @@ public class HBaseAdmin implements Admin {
}
@Override
- public boolean isAborted(){
+ public boolean isAborted() {
return this.aborted;
}
@@ -274,24 +267,19 @@ public class HBaseAdmin implements Admin {
}
@Override
- public Future<Boolean> abortProcedureAsync(
- final long procId,
- final boolean mayInterruptIfRunning) throws IOException {
- Boolean abortProcResponse = executeCallable(
- new MasterCallable<AbortProcedureResponse>(getConnection()) {
- @Override
- public AbortProcedureResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- AbortProcedureRequest abortProcRequest =
- AbortProcedureRequest.newBuilder().setProcId(procId).build();
- return master.abortProcedure(controller, abortProcRequest);
- }
- }).getIsProcedureAborted();
-
- AbortProcedureFuture abortProcFuture =
- new AbortProcedureFuture(this, procId, abortProcResponse);
- return abortProcFuture;
+ public Future<Boolean> abortProcedureAsync(final long procId, final boolean mayInterruptIfRunning)
+ throws IOException {
+ Boolean abortProcResponse =
+ executeCallable(new MasterCallable<AbortProcedureResponse>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected AbortProcedureResponse rpcCall() throws Exception {
+ AbortProcedureRequest abortProcRequest =
+ AbortProcedureRequest.newBuilder().setProcId(procId).build();
+ return master.abortProcedure(getRpcController(), abortProcRequest);
+ }
+ }).getIsProcedureAborted();
+ return new AbortProcedureFuture(this, procId, abortProcResponse);
}
private static class AbortProcedureFuture extends ProcedureFuture<Boolean> {
@@ -324,9 +312,9 @@ public class HBaseAdmin implements Admin {
@Override
public boolean tableExists(final TableName tableName) throws IOException {
- return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
+ return executeCallable(new RpcRetryingCallable<Boolean>() {
@Override
- public Boolean call(int callTimeout) throws ServiceException, IOException {
+ protected Boolean rpcCall(int callTimeout) throws Exception {
return MetaTableAccessor.tableExists(connection, tableName);
}
});
@@ -350,14 +338,14 @@ public class HBaseAdmin implements Admin {
@Override
public HTableDescriptor[] listTables(final Pattern pattern, final boolean includeSysTables)
throws IOException {
- return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
+ return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public HTableDescriptor[] call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected HTableDescriptor[] rpcCall() throws Exception {
GetTableDescriptorsRequest req =
RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables);
- return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req));
+ return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(getRpcController(),
+ req));
}
});
}
@@ -386,14 +374,13 @@ public class HBaseAdmin implements Admin {
@Override
public TableName[] listTableNames(final Pattern pattern, final boolean includeSysTables)
throws IOException {
- return executeCallable(new MasterCallable<TableName[]>(getConnection()) {
+ return executeCallable(new MasterCallable<TableName[]>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public TableName[] call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected TableName[] rpcCall() throws Exception {
GetTableNamesRequest req =
RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables);
- return ProtobufUtil.getTableNameArray(master.getTableNames(controller, req)
+ return ProtobufUtil.getTableNameArray(master.getTableNames(getRpcController(), req)
.getTableNamesList());
}
});
@@ -414,27 +401,24 @@ public class HBaseAdmin implements Admin {
static HTableDescriptor getTableDescriptor(final TableName tableName, Connection connection,
RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
int operationTimeout, int rpcTimeout) throws IOException {
- if (tableName == null) return null;
- HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) {
- @Override
- public HTableDescriptor call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- GetTableDescriptorsResponse htds;
- GetTableDescriptorsRequest req =
- RequestConverter.buildGetTableDescriptorsRequest(tableName);
- htds = master.getTableDescriptors(controller, req);
-
- if (!htds.getTableSchemaList().isEmpty()) {
- return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
- }
- return null;
+ if (tableName == null) return null;
+ HTableDescriptor htd =
+ executeCallable(new MasterCallable<HTableDescriptor>(connection, rpcControllerFactory) {
+ @Override
+ protected HTableDescriptor rpcCall() throws Exception {
+ GetTableDescriptorsRequest req =
+ RequestConverter.buildGetTableDescriptorsRequest(tableName);
+ GetTableDescriptorsResponse htds = master.getTableDescriptors(getRpcController(), req);
+ if (!htds.getTableSchemaList().isEmpty()) {
+ return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
}
- }, rpcCallerFactory, operationTimeout, rpcTimeout);
- if (htd != null) {
- return htd;
+ return null;
}
- throw new TableNotFoundException(tableName.getNameAsString());
+ }, rpcCallerFactory, operationTimeout, rpcTimeout);
+ if (htd != null) {
+ return htd;
+ }
+ throw new TableNotFoundException(tableName.getNameAsString());
}
private long getPauseTime(int tries) {
@@ -502,15 +486,13 @@ public class HBaseAdmin implements Admin {
}
CreateTableResponse response = executeCallable(
- new MasterCallable<CreateTableResponse>(getConnection()) {
+ new MasterCallable<CreateTableResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public CreateTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(desc.getTableName());
+ protected CreateTableResponse rpcCall() throws Exception {
+ setPriority(desc.getTableName());
CreateTableRequest request = RequestConverter.buildCreateTableRequest(
desc, splitKeys, ng.getNonceGroup(), ng.newNonce());
- return master.createTable(controller, request);
+ return master.createTable(getRpcController(), request);
}
});
return new CreateTableFuture(this, desc, splitKeys, response);
@@ -554,15 +536,13 @@ public class HBaseAdmin implements Admin {
@Override
public Future<Void> deleteTableAsync(final TableName tableName) throws IOException {
DeleteTableResponse response = executeCallable(
- new MasterCallable<DeleteTableResponse>(getConnection()) {
+ new MasterCallable<DeleteTableResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public DeleteTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
+ protected DeleteTableResponse rpcCall() throws Exception {
+ setPriority(tableName);
DeleteTableRequest req =
RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
- return master.deleteTable(controller,req);
+ return master.deleteTable(getRpcController(), req);
}
});
return new DeleteTableFuture(this, tableName, response);
@@ -636,16 +616,15 @@ public class HBaseAdmin implements Admin {
public Future<Void> truncateTableAsync(final TableName tableName, final boolean preserveSplits)
throws IOException {
TruncateTableResponse response =
- executeCallable(new MasterCallable<TruncateTableResponse>(getConnection()) {
+ executeCallable(new MasterCallable<TruncateTableResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public TruncateTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
+ protected TruncateTableResponse rpcCall() throws Exception {
+ setPriority(tableName);
LOG.info("Started truncating " + tableName);
TruncateTableRequest req = RequestConverter.buildTruncateTableRequest(
tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce());
- return master.truncateTable(controller, req);
+ return master.truncateTable(getRpcController(), req);
}
});
return new TruncateTableFuture(this, tableName, preserveSplits, response);
@@ -701,17 +680,14 @@ public class HBaseAdmin implements Admin {
public Future<Void> enableTableAsync(final TableName tableName) throws IOException {
TableName.isLegalFullyQualifiedTableName(tableName.getName());
EnableTableResponse response = executeCallable(
- new MasterCallable<EnableTableResponse>(getConnection()) {
+ new MasterCallable<EnableTableResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public EnableTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected EnableTableResponse rpcCall() throws Exception {
+ setPriority(tableName);
LOG.info("Started enable of " + tableName);
EnableTableRequest req =
RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
- return master.enableTable(controller,req);
+ return master.enableTable(getRpcController(),req);
}
});
return new EnableTableFuture(this, tableName, response);
@@ -767,18 +743,15 @@ public class HBaseAdmin implements Admin {
public Future<Void> disableTableAsync(final TableName tableName) throws IOException {
TableName.isLegalFullyQualifiedTableName(tableName.getName());
DisableTableResponse response = executeCallable(
- new MasterCallable<DisableTableResponse>(getConnection()) {
+ new MasterCallable<DisableTableResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public DisableTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected DisableTableResponse rpcCall() throws Exception {
+ setPriority(tableName);
LOG.info("Started disable of " + tableName);
DisableTableRequest req =
RequestConverter.buildDisableTableRequest(
tableName, ng.getNonceGroup(), ng.newNonce());
- return master.disableTable(controller, req);
+ return master.disableTable(getRpcController(), req);
}
});
return new DisableTableFuture(this, tableName, response);
@@ -827,12 +800,13 @@ public class HBaseAdmin implements Admin {
@Override
public boolean isTableEnabled(final TableName tableName) throws IOException {
checkTableExists(tableName);
- return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
+ return executeCallable(new RpcRetryingCallable<Boolean>() {
@Override
- public Boolean call(int callTimeout) throws ServiceException, IOException {
- TableState tableState = MetaTableAccessor.getTableState(connection, tableName);
- if (tableState == null)
+ protected Boolean rpcCall(int callTimeout) throws Exception {
+ TableState tableState = MetaTableAccessor.getTableState(getConnection(), tableName);
+ if (tableState == null) {
throw new TableNotFoundException(tableName);
+ }
return tableState.inStates(TableState.State.ENABLED);
}
});
@@ -856,16 +830,14 @@ public class HBaseAdmin implements Admin {
@Override
public Pair<Integer, Integer> getAlterStatus(final TableName tableName) throws IOException {
- return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) {
+ return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public Pair<Integer, Integer> call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected Pair<Integer, Integer> rpcCall() throws Exception {
+ setPriority(tableName);
GetSchemaAlterStatusRequest req = RequestConverter
.buildGetSchemaAlterStatusRequest(tableName);
- GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(controller, req);
+ GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(getRpcController(), req);
Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(),
ret.getTotalRegions());
return pair;
@@ -894,17 +866,15 @@ public class HBaseAdmin implements Admin {
public Future<Void> addColumnFamily(final TableName tableName,
final HColumnDescriptor columnFamily) throws IOException {
AddColumnResponse response =
- executeCallable(new MasterCallable<AddColumnResponse>(getConnection()) {
+ executeCallable(new MasterCallable<AddColumnResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public AddColumnResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected AddColumnResponse rpcCall() throws Exception {
+ setPriority(tableName);
AddColumnRequest req =
RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
ng.newNonce());
- return master.addColumn(controller, req);
+ return master.addColumn(getRpcController(), req);
}
});
return new AddColumnFamilyFuture(this, tableName, response);
@@ -939,17 +909,15 @@ public class HBaseAdmin implements Admin {
public Future<Void> deleteColumnFamily(final TableName tableName, final byte[] columnFamily)
throws IOException {
DeleteColumnResponse response =
- executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection()) {
+ executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public DeleteColumnResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected DeleteColumnResponse rpcCall() throws Exception {
+ setPriority(tableName);
DeleteColumnRequest req =
RequestConverter.buildDeleteColumnRequest(tableName, columnFamily,
ng.getNonceGroup(), ng.newNonce());
- master.deleteColumn(controller, req);
+ master.deleteColumn(getRpcController(), req);
return null;
}
});
@@ -985,17 +953,15 @@ public class HBaseAdmin implements Admin {
public Future<Void> modifyColumnFamily(final TableName tableName,
final HColumnDescriptor columnFamily) throws IOException {
ModifyColumnResponse response =
- executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection()) {
+ executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public ModifyColumnResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected ModifyColumnResponse rpcCall() throws Exception {
+ setPriority(tableName);
ModifyColumnRequest req =
RequestConverter.buildModifyColumnRequest(tableName, columnFamily,
ng.getNonceGroup(), ng.newNonce());
- master.modifyColumn(controller, req);
+ master.modifyColumn(getRpcController(), req);
return null;
}
});
@@ -1043,34 +1009,34 @@ public class HBaseAdmin implements Admin {
@Override
public boolean closeRegionWithEncodedRegionName(final String encodedRegionName,
- final String serverName) throws IOException {
+ final String serverName)
+ throws IOException {
if (null == serverName || ("").equals(serverName.trim())) {
- throw new IllegalArgumentException(
- "The servername cannot be null or empty.");
+ throw new IllegalArgumentException("The servername cannot be null or empty.");
}
ServerName sn = ServerName.valueOf(serverName);
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
// Close the region without updating zk state.
CloseRegionRequest request =
RequestConverter.buildCloseRegionRequest(sn, encodedRegionName);
+ // TODO: There is no timeout on this controller. Set one!
+ PayloadCarryingRpcController controller = this.rpcControllerFactory.newController();
try {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
- // TODO: this does not do retries, it should. Set priority and timeout in controller
CloseRegionResponse response = admin.closeRegion(controller, request);
- boolean isRegionClosed = response.getClosed();
- if (false == isRegionClosed) {
+ boolean closed = response.getClosed();
+ if (false == closed) {
LOG.error("Not able to close the region " + encodedRegionName + ".");
}
- return isRegionClosed;
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ return closed;
+ } catch (Exception e) {
+ throw ProtobufUtil.handleRemoteException(e);
}
}
@Override
public void closeRegion(final ServerName sn, final HRegionInfo hri) throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+ // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
// Close the region without updating zk state.
@@ -1080,6 +1046,7 @@ public class HBaseAdmin implements Admin {
@Override
public List<HRegionInfo> getOnlineRegions(final ServerName sn) throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+ // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
return ProtobufUtil.getOnlineRegions(controller, admin);
}
@@ -1104,20 +1071,21 @@ public class HBaseAdmin implements Admin {
if (regionServerPair.getSecond() == null) {
throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
}
- HRegionInfo hRegionInfo = regionServerPair.getFirst();
+ final HRegionInfo hRegionInfo = regionServerPair.getFirst();
ServerName serverName = regionServerPair.getSecond();
-
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
- AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
- FlushRegionRequest request =
- RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
- try {
- // TODO: this does not do retries, it should. Set priority and timeout in controller
- admin.flushRegion(controller, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ final AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
+ Callable<Void> callable = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ // TODO: There is no timeout on this controller. Set one!
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ FlushRegionRequest request =
+ RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
+ admin.flushRegion(controller, request);
+ return null;
+ }
+ };
+ ProtobufUtil.call(callable);
}
/**
@@ -1268,67 +1236,46 @@ public class HBaseAdmin implements Admin {
private void compact(final ServerName sn, final HRegionInfo hri,
final boolean major, final byte [] family)
throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
- CompactRegionRequest request =
- RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
- try {
- // TODO: this does not do retries, it should. Set priority and timeout in controller
- admin.compactRegion(controller, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+ Callable<Void> callable = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ // TODO: There is no timeout on this controller. Set one!
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ CompactRegionRequest request =
+ RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
+ admin.compactRegion(controller, request);
+ return null;
+ }
+ };
+ ProtobufUtil.call(callable);
}
@Override
public void move(final byte [] encodedRegionName, final byte [] destServerName)
- throws IOException {
-
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ throws IOException {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // Hard to know the table name, at least check if meta
- if (isMetaRegion(encodedRegionName)) {
- controller.setPriority(TableName.META_TABLE_NAME);
- }
-
- try {
- MoveRegionRequest request =
- RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName);
- master.moveRegion(controller, request);
- } catch (DeserializationException de) {
- LOG.error("Could not parse destination server name: " + de);
- throw new ServiceException(new DoNotRetryIOException(de));
- }
+ protected Void rpcCall() throws Exception {
+ setPriority(encodedRegionName);
+ MoveRegionRequest request =
+ RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName);
+ master.moveRegion(getRpcController(), request);
return null;
}
});
}
- private boolean isMetaRegion(final byte[] regionName) {
- return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
- || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
- }
-
@Override
- public void assign(final byte[] regionName) throws MasterNotRunningException,
+ public void assign(final byte [] regionName) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException {
- final byte[] toBeAssigned = getRegionName(regionName);
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // Hard to know the table name, at least check if meta
- if (isMetaRegion(regionName)) {
- controller.setPriority(TableName.META_TABLE_NAME);
- }
-
+ protected Void rpcCall() throws Exception {
+ setPriority(regionName);
AssignRegionRequest request =
- RequestConverter.buildAssignRegionRequest(toBeAssigned);
- master.assignRegion(controller,request);
+ RequestConverter.buildAssignRegionRequest(getRegionName(regionName));
+ master.assignRegion(getRpcController(), request);
return null;
}
});
@@ -1338,18 +1285,13 @@ public class HBaseAdmin implements Admin {
public void unassign(final byte [] regionName, final boolean force)
throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
final byte[] toBeUnassigned = getRegionName(regionName);
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // Hard to know the table name, at least check if meta
- if (isMetaRegion(regionName)) {
- controller.setPriority(TableName.META_TABLE_NAME);
- }
+ protected Void rpcCall() throws Exception {
+ setPriority(regionName);
UnassignRegionRequest request =
- RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
- master.unassignRegion(controller, request);
+ RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
+ master.unassignRegion(getRpcController(), request);
return null;
}
});
@@ -1358,16 +1300,12 @@ public class HBaseAdmin implements Admin {
@Override
public void offline(final byte [] regionName)
throws IOException {
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // Hard to know the table name, at least check if meta
- if (isMetaRegion(regionName)) {
- controller.setPriority(TableName.META_TABLE_NAME);
- }
- master.offlineRegion(controller, RequestConverter.buildOfflineRegionRequest(regionName));
+ protected Void rpcCall() throws Exception {
+ setPriority(regionName);
+ master.offlineRegion(getRpcController(),
+ RequestConverter.buildOfflineRegionRequest(regionName));
return null;
}
});
@@ -1376,56 +1314,44 @@ public class HBaseAdmin implements Admin {
@Override
public boolean setBalancerRunning(final boolean on, final boolean synchronous)
throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
+ protected Boolean rpcCall() throws Exception {
SetBalancerRunningRequest req =
RequestConverter.buildSetBalancerRunningRequest(on, synchronous);
- return master.setBalancerRunning(controller, req).getPrevBalanceValue();
+ return master.setBalancerRunning(getRpcController(), req).getPrevBalanceValue();
}
});
}
@Override
public boolean balancer() throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.balance(controller,
- RequestConverter.buildBalanceRequest(false)).getBalancerRan();
+ protected Boolean rpcCall() throws Exception {
+ return master.balance(getRpcController(),
+ RequestConverter.buildBalanceRequest(false)).getBalancerRan();
}
});
}
@Override
public boolean balancer(final boolean force) throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.balance(controller,
- RequestConverter.buildBalanceRequest(force)).getBalancerRan();
+ protected Boolean rpcCall() throws Exception {
+ return master.balance(getRpcController(),
+ RequestConverter.buildBalanceRequest(force)).getBalancerRan();
}
});
}
@Override
public boolean isBalancerEnabled() throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.isBalancerEnabled(controller,
+ protected Boolean rpcCall() throws Exception {
+ return master.isBalancerEnabled(getRpcController(),
RequestConverter.buildIsBalancerEnabledRequest()).getEnabled();
}
});
@@ -1433,27 +1359,21 @@ public class HBaseAdmin implements Admin {
@Override
public boolean normalize() throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.normalize(controller,
- RequestConverter.buildNormalizeRequest()).getNormalizerRan();
+ protected Boolean rpcCall() throws Exception {
+ return master.normalize(getRpcController(),
+ RequestConverter.buildNormalizeRequest()).getNormalizerRan();
}
});
}
@Override
public boolean isNormalizerEnabled() throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.isNormalizerEnabled(controller,
+ protected Boolean rpcCall() throws Exception {
+ return master.isNormalizerEnabled(getRpcController(),
RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled();
}
});
@@ -1461,28 +1381,22 @@ public class HBaseAdmin implements Admin {
@Override
public boolean setNormalizerRunning(final boolean on) throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
+ protected Boolean rpcCall() throws Exception {
SetNormalizerRunningRequest req =
RequestConverter.buildSetNormalizerRunningRequest(on);
- return master.setNormalizerRunning(controller, req).getPrevNormalizerValue();
+ return master.setNormalizerRunning(getRpcController(), req).getPrevNormalizerValue();
}
});
}
@Override
public boolean enableCatalogJanitor(final boolean enable) throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.enableCatalogJanitor(controller,
+ protected Boolean rpcCall() throws Exception {
+ return master.enableCatalogJanitor(getRpcController(),
RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue();
}
});
@@ -1490,13 +1404,10 @@ public class HBaseAdmin implements Admin {
@Override
public int runCatalogScan() throws IOException {
- return executeCallable(new MasterCallable<Integer>(getConnection()) {
+ return executeCallable(new MasterCallable<Integer>(getConnection(), getRpcControllerFactory()) {
@Override
- public Integer call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.runCatalogScan(controller,
+ protected Integer rpcCall() throws Exception {
+ return master.runCatalogScan(getRpcController(),
RequestConverter.buildCatalogScanRequest()).getScanResult();
}
});
@@ -1504,13 +1415,10 @@ public class HBaseAdmin implements Admin {
@Override
public boolean isCatalogJanitorEnabled() throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.isCatalogJanitorEnabled(controller,
+ protected Boolean rpcCall() throws Exception {
+ return master.isCatalogJanitorEnabled(getRpcController(),
RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue();
}
});
@@ -1616,25 +1524,18 @@ public class HBaseAdmin implements Admin {
}
DispatchMergingRegionsResponse response =
- executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection()) {
+ executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public DispatchMergingRegionsResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- try {
- DispatchMergingRegionsRequest request = RequestConverter
- .buildDispatchMergingRegionsRequest(
+ protected DispatchMergingRegionsResponse rpcCall() throws Exception {
+ DispatchMergingRegionsRequest request = RequestConverter
+ .buildDispatchMergingRegionsRequest(
encodedNameOfRegionA,
encodedNameOfRegionB,
forcible,
ng.getNonceGroup(),
ng.newNonce());
- return master.dispatchMergingRegions(controller, request);
- } catch (DeserializationException de) {
- LOG.error("Could not parse destination server name: " + de);
- throw new ServiceException(new DoNotRetryIOException(de));
- }
+ return master.dispatchMergingRegions(getRpcController(), request);
}
});
return new DispatchMergingRegionsFuture(this, tableName, response);
@@ -1731,6 +1632,7 @@ public class HBaseAdmin implements Admin {
Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
throw new IOException("should not give a splitkey which equals to startkey!");
}
+ // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(hri.getTable());
@@ -1746,21 +1648,16 @@ public class HBaseAdmin implements Admin {
throw new IllegalArgumentException("the specified table name '" + tableName +
"' doesn't match with the HTD one: " + htd.getTableName());
}
-
ModifyTableResponse response = executeCallable(
- new MasterCallable<ModifyTableResponse>(getConnection()) {
+ new MasterCallable<ModifyTableResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public ModifyTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected ModifyTableResponse rpcCall() throws Exception {
+ setPriority(tableName);
ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
tableName, htd, ng.getNonceGroup(), ng.newNonce());
- return master.modifyTable(controller, request);
+ return master.modifyTable(getRpcController(), request);
}
});
-
return new ModifyTableFuture(this, tableName, response);
}
@@ -1875,9 +1772,9 @@ public class HBaseAdmin implements Admin {
*/
private TableName checkTableExists(final TableName tableName)
throws IOException {
- return executeCallable(new ConnectionCallable<TableName>(getConnection()) {
+ return executeCallable(new RpcRetryingCallable<TableName>() {
@Override
- public TableName call(int callTimeout) throws ServiceException, IOException {
+ protected TableName rpcCall(int callTimeout) throws Exception {
if (!MetaTableAccessor.tableExists(connection, tableName)) {
throw new TableNotFoundException(tableName);
}
@@ -1888,13 +1785,11 @@ public class HBaseAdmin implements Admin {
@Override
public synchronized void shutdown() throws IOException {
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(HConstants.HIGH_QOS);
- master.shutdown(controller, ShutdownRequest.newBuilder().build());
+ protected Void rpcCall() throws Exception {
+ setPriority(HConstants.HIGH_QOS);
+ master.shutdown(getRpcController(), ShutdownRequest.newBuilder().build());
return null;
}
});
@@ -1902,13 +1797,11 @@ public class HBaseAdmin implements Admin {
@Override
public synchronized void stopMaster() throws IOException {
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(HConstants.HIGH_QOS);
- master.stopMaster(controller, StopMasterRequest.newBuilder().build());
+ protected Void rpcCall() throws Exception {
+ setPriority(HConstants.HIGH_QOS);
+ master.stopMaster(getRpcController(), StopMasterRequest.newBuilder().build());
return null;
}
});
@@ -1919,43 +1812,41 @@ public class HBaseAdmin implements Admin {
throws IOException {
String hostname = Addressing.parseHostname(hostnamePort);
int port = Addressing.parsePort(hostnamePort);
- AdminService.BlockingInterface admin =
+ final AdminService.BlockingInterface admin =
this.connection.getAdmin(ServerName.valueOf(hostname, port, 0));
- StopServerRequest request = RequestConverter.buildStopServerRequest(
- "Called by admin client " + this.connection.toString());
+ // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
controller.setPriority(HConstants.HIGH_QOS);
+ StopServerRequest request = RequestConverter.buildStopServerRequest(
+ "Called by admin client " + this.connection.toString());
try {
- // TODO: this does not do retries, it should. Set priority and timeout in controller
admin.stopServer(controller, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ } catch (Exception e) {
+ throw ProtobufUtil.handleRemoteException(e);
}
}
@Override
public boolean isMasterInMaintenanceMode() throws IOException {
- return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection()) {
+ return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection(),
+ this.rpcControllerFactory) {
@Override
- public IsInMaintenanceModeResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.isMasterInMaintenanceMode(
- controller, IsInMaintenanceModeRequest.newBuilder().build());
+ protected IsInMaintenanceModeResponse rpcCall() throws Exception {
+ return master.isMasterInMaintenanceMode(getRpcController(),
+ IsInMaintenanceModeRequest.newBuilder().build());
}
}).getInMaintenanceMode();
}
@Override
public ClusterStatus getClusterStatus() throws IOException {
- return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) {
+ return executeCallable(new MasterCallable<ClusterStatus>(getConnection(),
+ this.rpcControllerFactory) {
@Override
- public ClusterStatus call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected ClusterStatus rpcCall() throws Exception {
GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest();
- return ProtobufUtil.convert(master.getClusterStatus(controller, req).getClusterStatus());
+ return ProtobufUtil.convert(master.getClusterStatus(getRpcController(), req).
+ getClusterStatus());
}
});
}
@@ -1996,19 +1887,15 @@ public class HBaseAdmin implements Admin {
public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor)
throws IOException {
CreateNamespaceResponse response =
- executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection()) {
- @Override
- public CreateNamespaceResponse call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // TODO: set priority based on NS?
- return master.createNamespace(controller,
- CreateNamespaceRequest.newBuilder()
- .setNamespaceDescriptor(ProtobufUtil
- .toProtoNamespaceDescriptor(descriptor)).build()
- );
- }
- });
+ executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected CreateNamespaceResponse rpcCall() throws Exception {
+ return master.createNamespace(getRpcController(),
+ CreateNamespaceRequest.newBuilder().setNamespaceDescriptor(ProtobufUtil.
+ toProtoNamespaceDescriptor(descriptor)).build());
+ }
+ });
return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
@Override
public String getOperationType() {
@@ -2027,16 +1914,15 @@ public class HBaseAdmin implements Admin {
public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor)
throws IOException {
ModifyNamespaceResponse response =
- executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection()) {
- @Override
- public ModifyNamespaceResponse call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // TODO: set priority based on NS?
- return master.modifyNamespace(controller, ModifyNamespaceRequest.newBuilder().
- setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
- }
- });
+ executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected ModifyNamespaceResponse rpcCall() throws Exception {
+ // TODO: set priority based on NS?
+ return master.modifyNamespace(getRpcController(), ModifyNamespaceRequest.newBuilder().
+ setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
+ }
+ });
return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
@Override
public String getOperationType() {
@@ -2055,16 +1941,15 @@ public class HBaseAdmin implements Admin {
public Future<Void> deleteNamespaceAsync(final String name)
throws IOException {
DeleteNamespaceResponse response =
- executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection()) {
- @Override
- public DeleteNamespaceResponse call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // TODO: set priority based on NS?
- return master.deleteNamespace(controller, DeleteNamespaceRequest.newBuilder().
- setNamespaceName(name).build());
- }
- });
+ executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected DeleteNamespaceResponse rpcCall() throws Exception {
+ // TODO: set priority based on NS?
+ return master.deleteNamespace(getRpcController(), DeleteNamespaceRequest.newBuilder().
+ setNamespaceName(name).build());
+ }
+ });
return new NamespaceFuture(this, name, response.getProcId()) {
@Override
public String getOperationType() {
@@ -2075,100 +1960,90 @@ public class HBaseAdmin implements Admin {
@Override
public NamespaceDescriptor getNamespaceDescriptor(final String name) throws IOException {
- return
- executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) {
- @Override
- public NamespaceDescriptor call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return ProtobufUtil.toNamespaceDescriptor(
- master.getNamespaceDescriptor(controller, GetNamespaceDescriptorRequest.newBuilder().
- setNamespaceName(name).build()).getNamespaceDescriptor());
- }
- });
+ return executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected NamespaceDescriptor rpcCall() throws Exception {
+ return ProtobufUtil.toNamespaceDescriptor(
+ master.getNamespaceDescriptor(getRpcController(),
+ GetNamespaceDescriptorRequest.newBuilder().
+ setNamespaceName(name).build()).getNamespaceDescriptor());
+ }
+ });
}
@Override
public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException {
- return
- executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) {
- @Override
- public NamespaceDescriptor[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- List<HBaseProtos.NamespaceDescriptor> list =
- master.listNamespaceDescriptors(controller,
- ListNamespaceDescriptorsRequest.newBuilder().build())
- .getNamespaceDescriptorList();
- NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
- for(int i = 0; i < list.size(); i++) {
- res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
- }
- return res;
- }
- });
+ return executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected NamespaceDescriptor[] rpcCall() throws Exception {
+ List<HBaseProtos.NamespaceDescriptor> list =
+ master.listNamespaceDescriptors(getRpcController(),
+ ListNamespaceDescriptorsRequest.newBuilder().build()).getNamespaceDescriptorList();
+ NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
+ for(int i = 0; i < list.size(); i++) {
+ res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
+ }
+ return res;
+ }
+ });
}
@Override
public ProcedureInfo[] listProcedures() throws IOException {
- return
- executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection()) {
- @Override
- public ProcedureInfo[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- List<ProcedureProtos.Procedure> procList = master.listProcedures(
- controller, ListProceduresRequest.newBuilder().build()).getProcedureList();
- ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()];
- for (int i = 0; i < procList.size(); i++) {
- procInfoList[i] = ProcedureUtil.convert(procList.get(i));
- }
- return procInfoList;
- }
- });
+ return executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected ProcedureInfo[] rpcCall() throws Exception {
+ List<ProcedureProtos.Procedure> procList = master.listProcedures(
+ getRpcController(), ListProceduresRequest.newBuilder().build()).getProcedureList();
+ ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()];
+ for (int i = 0; i < procList.size(); i++) {
+ procInfoList[i] = ProcedureUtil.convert(procList.get(i));
+ }
+ return procInfoList;
+ }
+ });
}
@Override
public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) throws IOException {
- return
- executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
- @Override
- public HTableDescriptor[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- List<TableSchema> list =
- master.listTableDescriptorsByNamespace(controller,
- ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
- .build()).getTableSchemaList();
- HTableDescriptor[] res = new HTableDescriptor[list.size()];
- for(int i=0; i < list.size(); i++) {
-
- res[i] = ProtobufUtil.convertToHTableDesc(list.get(i));
- }
- return res;
- }
- });
+ return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected HTableDescriptor[] rpcCall() throws Exception {
+ List<TableSchema> list =
+ master.listTableDescriptorsByNamespace(getRpcController(),
+ ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
+ .build()).getTableSchemaList();
+ HTableDescriptor[] res = new HTableDescriptor[list.size()];
+ for(int i=0; i < list.size(); i++) {
+
+ res[i] = ProtobufUtil.convertToHTableDesc(list.get(i));
+ }
+ return res;
+ }
+ });
}
@Override
public TableName[] listTableNamesByNamespace(final String name) throws IOException {
- return
- executeCallable(new MasterCallable<TableName[]>(getConnection()) {
- @Override
- public TableName[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- List<HBaseProtos.TableName> tableNames =
- master.listTableNamesByNamespace(controller, ListTableNamesByNamespaceRequest.
+ return executeCallable(new MasterCallable<TableName[]>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected TableName[] rpcCall() throws Exception {
+ List<HBaseProtos.TableName> tableNames =
+ master.listTableNamesByNamespace(getRpcController(), ListTableNamesByNamespaceRequest.
newBuilder().setNamespaceName(name).build())
- .getTableNameList();
- TableName[] result = new TableName[tableNames.size()];
- for (int i = 0; i < tableNames.size(); i++) {
- result[i] = ProtobufUtil.toTableName(tableNames.get(i));
- }
- return result;
- }
- });
+ .getTableNameList();
+ TableName[] result = new TableName[tableNames.size()];
+ for (int i = 0; i < tableNames.size(); i++) {
+ result[i] = ProtobufUtil.toTableName(tableNames.get(i));
+ }
+ return result;
+ }
+ });
}
/**
@@ -2176,10 +2051,26 @@ public class HBaseAdmin implements Admin {
* @param conf system configuration
* @throws MasterNotRunningException if the master is not running
* @throws ZooKeeperConnectionException if unable to connect to zookeeper
+ * @deprecated since hbase-2.0.0 because throws a ServiceException. We don't want to have
+ * protobuf as part of our public API. Use {@link #available(Configuration)}
*/
// Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not.
+ // MOB uses it too.
+ // NOTE: hbase-2.0.0 removes ServiceException from the throw.
+ @Deprecated
public static void checkHBaseAvailable(Configuration conf)
- throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
+ throws MasterNotRunningException, ZooKeeperConnectionException, IOException,
+ com.google.protobuf.ServiceException {
+ available(conf);
+ }
+
+ /**
+ * Is HBase available? Throw an exception if not.
+ * @param conf system configuration
+ * @throws ZooKeeperConnectionException if unable to connect to zookeeper]
+ */
+ public static void available(final Configuration conf)
+ throws ZooKeeperConnectionException, InterruptedIOException {
Configuration copyOfConf = HBaseConfiguration.create(conf);
// We set it to make it fail as soon as possible if HBase is not available
copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
@@ -2191,7 +2082,6 @@ public class HBaseAdmin implements Admin {
(ClusterConnection) ConnectionFactory.createConnection(copyOfConf);
ZooKeeperKeepAliveConnection zkw = ((ConnectionImplementation) connection).
getKeepAliveZooKeeperWatcher();) {
-
// This is NASTY. FIX!!!! Dependent on internal implementation! TODO
zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
connection.isMasterRunning();
@@ -2231,14 +2121,14 @@ public class HBaseAdmin implements Admin {
@Override
public HTableDescriptor[] getTableDescriptorsByTableName(final List<TableName> tableNames)
throws IOException {
- return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
+ return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public HTableDescriptor[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected HTableDescriptor[] rpcCall() throws Exception {
GetTableDescriptorsRequest req =
RequestConverter.buildGetTableDescriptorsRequest(tableNames);
- return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req));
+ return ProtobufUtil.
+ getHTableDescriptorArray(master.getTableDescriptors(getRpcController(), req));
}
});
}
@@ -2276,15 +2166,14 @@ public class HBaseAdmin implements Admin {
private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException,
FailedLogCloseException {
- AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+ final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
+ // TODO: There is no timeout on this controller. Set one!
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
try {
- // TODO: this does not do retries, it should. Set priority and timeout in controller
return admin.rollWALWriter(controller, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ } catch (ServiceException e) {
+ throw ProtobufUtil.handleRemoteException(e);
}
}
@@ -2321,8 +2210,7 @@ public class HBaseAdmin implements Admin {
}
byte[][] regionsToFlush = new byte[regionCount][];
for (int i = 0; i < regionCount; i++) {
- ByteString region = response.getRegionToFlush(i);
- regionsToFlush[i] = region.toByteArray();
+ regionsToFlush[i] = ProtobufUtil.toBytes(response.getRegionToFlush(i));
}
return regionsToFlush;
}
@@ -2352,28 +2240,29 @@ public class HBaseAdmin implements Admin {
@Override
public CompactionState getCompactionStateForRegion(final byte[] regionName)
throws IOException {
+ final Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
+ if (regionServerPair == null) {
+ throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
+ }
+ if (regionServerPair.getSecond() == null) {
+ throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
+ }
+ ServerName sn = regionServerPair.getSecond();
+ final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+ // TODO: There is no timeout on this controller. Set one!
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
+ regionServerPair.getFirst().getRegionName(), true);
+ GetRegionInfoResponse response;
try {
- Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
- if (regionServerPair == null) {
- throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
- }
- if (regionServerPair.getSecond() == null) {
- throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
- }
- ServerName sn = regionServerPair.getSecond();
- AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
- GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
- regionServerPair.getFirst().getRegionName(), true);
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- // TODO: this does not do retries, it should. Set priority and timeout in controller
- GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
- if (response.getCompactionState() != null) {
- return ProtobufUtil.createCompactionState(response.getCompactionState());
- }
- return null;
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ response = admin.getRegionInfo(controller, request);
+ } catch (ServiceException e) {
+ throw ProtobufUtil.handleRemoteException(e);
+ }
+ if (response.getCompactionState() != null) {
+ return ProtobufUtil.createCompactionState(response.getCompactionState());
}
+ return null;
}
@Override
@@ -2425,12 +2314,11 @@ public class HBaseAdmin implements Admin {
throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
}
LOG.debug("Getting current status of snapshot from master...");
- done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
+ done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.isSnapshotDone(controller, request);
+ protected IsSnapshotDoneResponse rpcCall() throws Exception {
+ return master.isSnapshotDone(getRpcController(), request);
}
});
}
@@ -2476,12 +2364,11 @@ public class HBaseAdmin implements Admin {
final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
.build();
// run the snapshot on the master
- return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) {
+ return executeCallable(new MasterCallable<SnapshotResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public SnapshotResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.snapshot(controller, request);
+ protected SnapshotResponse rpcCall() throws Exception {
+ return master.snapshot(getRpcController(), request);
}
});
}
@@ -2490,12 +2377,11 @@ public class HBaseAdmin implements Admin {
public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc)
throws IOException, HBaseSnapshotException, UnknownSnapshotException {
final HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc);
- return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
+ return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.isSnapshotDone(controller,
+ protected IsSnapshotDoneResponse rpcCall() throws Exception {
+ return master.isSnapshotDone(getRpcController(),
IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
}
}).getDone();
@@ -2674,12 +2560,10 @@ public class HBaseAdmin implements Admin {
.setProcedure(builder.build()).build();
// run the procedure on the master
ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
- getConnection()) {
+ getConnection(), getRpcControllerFactory()) {
@Override
- public ExecProcedureResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.execProcedureWithRet(controller, request);
+ protected ExecProcedureResponse rpcCall() throws Exception {
+ return master.execProcedureWithRet(getRpcController(), request);
}
});
@@ -2701,12 +2585,10 @@ public class HBaseAdmin implements Admin {
.setProcedure(builder.build()).build();
// run the procedure on the master
ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
- getConnection()) {
+ getConnection(), getRpcControllerFactory()) {
@Override
- public ExecProcedureResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.execProcedure(controller, request);
+ protected ExecProcedureResponse rpcCall() throws Exception {
+ return master.execProcedure(getRpcController(), request);
}
});
@@ -2750,12 +2632,10 @@ public class HBaseAdmin implements Admin {
}
final ProcedureDescription desc = builder.build();
return executeCallable(
- new MasterCallable<IsProcedureDoneResponse>(getConnection()) {
+ new MasterCallable<IsProcedureDoneResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public IsProcedureDoneResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.isProcedureDone(controller, IsProcedureDoneRequest
+ protected IsProcedureDoneResponse rpcCall() throws Exception {
+ return master.isProcedureDone(getRpcController(), IsProcedureDoneRequest
.newBuilder().setProcedure(desc).build());
}
}).getDone();
@@ -2781,17 +2661,15 @@ public class HBaseAdmin implements Admin {
ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
RestoreSnapshotResponse response = executeCallable(
- new MasterCallable<RestoreSnapshotResponse>(getConnection()) {
+ new MasterCallable<RestoreSnapshotResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public RestoreSnapshotResponse call(int callTimeout) throws ServiceException {
+ protected RestoreSnapshotResponse rpcCall() throws Exception {
final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder()
.setSnapshot(snapshot)
.setNonceGroup(ng.getNonceGroup())
.setNonce(ng.newNonce())
.build();
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.restoreSnapshot(controller, request);
+ return master.restoreSnapshot(getRpcController(), request);
}
});
@@ -2828,13 +2706,13 @@ public class HBaseAdmin implements Admin {
@Override
public List<SnapshotDescription> listSnapshots() throws IOException {
- return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) {
+ return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public List<SnapshotDescription> call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected List<SnapshotDescription> rpcCall() throws Exception {
List<HBaseProtos.SnapshotDescription> snapshotsList = master
- .getCompletedSnapshots(controller, GetCompletedSnapshotsRequest.newBuilder().build())
+ .getCompletedSnapshots(getRpcController(),
+ GetCompletedSnapshotsRequest.newBuilder().build())
.getSnapshotsList();
List<SnapshotDescription> result = new ArrayList<SnapshotDescription>(snapshotsList.size());
for (HBaseProtos.SnapshotDescription snapshot : snapshotsList) {
@@ -2897,14 +2775,11 @@ public class HBaseAdmin implements Admin {
// make sure the snapshot is possibly valid
TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName));
// do the delete
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- master.deleteSnapshot(controller,
- DeleteSnapshotRequest.newBuilder().
- setSnapshot(
+ protected Void rpcCall() throws Exception {
+ master.deleteSnapshot(getRpcController(),
+ DeleteSnapshotRequest.newBuilder().setSnapshot(
HBaseProtos.SnapshotDescription.newBuilder().setName(snapshotName).build())
.build()
);
@@ -2933,12 +2808,10 @@ public class HBaseAdmin implements Admin {
}
private void internalDeleteSnapshot(final SnapshotDescription snapshot) throws IOException {
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- this.master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder()
+ protected Void rpcCall() throws Exception {
+ this.master.deleteSnapshot(getRpcController(), DeleteSnapshotRequest.newBuilder()
.setSnapshot(createHBaseProtosSnapshotDesc(snapshot)).build());
return null;
}
@@ -2967,12 +2840,10 @@ public class HBaseAdmin implements Admin {
@Override
public void setQuota(final QuotaSettings quota) throws IOException {
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- this.master.setQuota(controller, QuotaSettings.buildSetQuotaRequestProto(quota));
+ protected Void rpcCall() throws Exception {
+ this.master.setQuota(getRpcController(), QuotaSettings.buildSetQuotaRequestProto(quota));
return null;
}
});
@@ -2989,8 +2860,8 @@ public class HBaseAdmin implements Admin {
}
static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable,
- RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout,
- int rpcTimeout) throws IOException {
+ RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout)
+ throws IOException {
RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
try {
return caller.callWithRetries(callable, operationTimeout);
@@ -3008,7 +2879,6 @@ public class HBaseAdmin implements Admin {
* Simple {@link Abortable}, throwing RuntimeException on abort.
*/
private static class ThrowableAbortable implements Abortable {
-
@Override
public void abort(String why, Throwable e) {
throw new RuntimeException(why, e);
@@ -3026,13 +2896,16 @@ public class HBaseAdmin implements Admin {
}
@Override
- public void updateConfiguration(ServerName server) throws IOException {
- try {
- this.connection.getAdmin(server).updateConfiguration(null,
- UpdateConfigurationRequest.getDefaultInstance());
- } catch (ServiceException e) {
- throw ProtobufUtil.getRemoteException(e);
- }
+ public void updateConfiguration(final ServerName server) throws IOException {
+ final AdminService.BlockingInterface admin = this.connection.getAdmin(server);
+ Callable<Void> callable = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ admin.updateConfiguration(null, UpdateConfigurationRequest.getDefaultInstance());
+ return null;
+ }
+ };
+ ProtobufUtil.call(callable);
}
@Override
@@ -3045,8 +2918,7 @@ public class HBaseAdmin implements Admin {
@Override
public int getMasterInfoPort() throws IOException {
// TODO: Fix! Reaching into internal implementation!!!!
- ConnectionImplementation connection =
- (ConnectionImplementation)this.connection;
+ ConnectionImplementation connection = (ConnectionImplementation)this.connection;
ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
try {
return MasterAddressTracker.getMasterInfoPort(zkw);
@@ -3057,8 +2929,7 @@ public class HBaseAdmin implements Admin {
private ServerName getMasterAddress() throws IOException {
// TODO: Fix! Reaching into internal implementation!!!!
- ConnectionImplementation connection =
- (ConnectionImplementation)this.connection;
+ ConnectionImplementation connection = (ConnectionImplementation)this.connection;
ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
try {
return MasterAddressTracker.getMasterAddress(zkw);
@@ -3069,33 +2940,27 @@ public class HBaseAdmin implements Admin {
@Override
public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException {
- return executeCallable(new MasterCallable<Long>(getConnection()) {
+ return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
@Override
- public Long call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected Long rpcCall() throws Exception {
MajorCompactionTimestampRequest req =
MajorCompactionTimestampRequest.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
- return master.getLastMajorCompactionTimestamp(controller, req).getCompactionTimestamp();
+ return master.getLastMajorCompactionTimestamp(getRpcController(), req).
+ getCompactionTimestamp();
}
});
}
@Override
public long getLastMajorCompactionTimestampForRegion(final byte[] regionName) throws IOException {
- return executeCallable(new MasterCallable<Long>(getConnection()) {
+ return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
@Override
- public Long call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected Long rpcCall() throws Exception {
MajorCompactionTimestampForRegionRequest req =
- MajorCompactionTimestampForRegionRequest
- .newBuilder()
- .setRegion(
- RequestConverter
+ MajorCompactionTimestampForRegionRequest.newBuilder().setRegion(RequestConverter
.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build();
- return master.getLastMajorCompactionTimestampForRegion(controller, req)
+ return master.getLastMajorCompactionTimestampForRegion(getRpcController(), req)
.getCompactionTimestamp();
}
});
@@ -3134,32 +2999,36 @@ public class HBaseAdmin implements Admin {
@Override
public void majorCompact(final TableName tableName, CompactType compactType)
throws IOException, InterruptedException {
- compact(tableName, null, true, compactType);
+ compact(tableName, null, true, compactType);
}
/**
* {@inheritDoc}
*/
@Override
- public CompactionState getCompactionState(TableName tableName,
+ public CompactionState getCompactionState(final TableName tableName,
CompactType compactType) throws IOException {
AdminProtos.GetRegionInfoResponse.CompactionState state =
AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
checkTableExists(tableName);
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ // TODO: There is no timeout on this controller. Set one!
+ final PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
switch (compactType) {
case MOB:
- try {
- ServerName master = getMasterAddress();
- HRegionInfo info = getMobRegionInfo(tableName);
- GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
- info.getRegionName(), true);
- GetRegionInfoResponse response = this.connection.getAdmin(master)
- .getRegionInfo(controller, request);
- state = response.getCompactionState();
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ final AdminProtos.AdminService.BlockingInterface masterAdmin =
+ this.connection.getAdmin(getMasterAddress());
+ Callable<AdminProtos.GetRegionInfoResponse.CompactionState> callable =
+ new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
+ @Override
+ public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception {
+ HRegionInfo info = getMobRegionInfo(tableName);
+ GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
+ info.getRegionName(), true);
+ GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request);
+ return response.getCompactionState();
+ }
+ };
+ state = ProtobufUtil.call(callable);
break;
case NORMAL:
default:
@@ -3173,15 +3042,23 @@ public class HBaseAdmin implements Admin {
} else {
pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
}
- for (Pair<HRegionInfo, ServerName> pair : pairs) {
+ for (Pair<HRegionInfo, ServerName> pair: pairs) {
if (pair.getFirst().isOffline()) continue;
if (pair.getSecond() == null) continue;
+ final ServerName sn = pair.getSecond();
+ final byte [] regionName = pair.getFirst().getRegionName();
+ final AdminService.BlockingInterface snAdmin = this.connection.getAdmin(sn);
try {
- ServerName sn = pair.getSecond();
- AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
- GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
- pair.getFirst().getRegionName(), true);
- GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
+ Callable<GetRegionInfoResponse> regionInfoCallable =
+ new Callable<GetRegionInfoResponse>() {
+ @Override
+ public GetRegionInfoResponse call() throws Exception {
+ GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
+ regionName, true);
+ return snAdmin.getRegionInfo(rpcController, request);
+ }
+ };
+ GetRegionInfoResponse response = ProtobufUtil.call(regionInfoCallable);
switch (response.getCompactionState()) {
case MAJOR_AND_MINOR:
return CompactionState.MAJOR_AND_MINOR;
@@ -3217,8 +3094,6 @@ public class HBaseAdmin implements Admin {
}
}
}
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
} finally {
if (zookeeper != null) {
zookeeper.close();
@@ -3283,12 +3158,10 @@ public class HBaseAdmin implements Admin {
protected AbortProcedureResponse abortProcedureResult(
final AbortProcedureRequest request) throws IOException {
return admin.executeCallable(new MasterCallable<AbortProcedureResponse>(
- admin.getConnection()) {
+ admin.getConnection(), admin.getRpcControllerFactory()) {
@Override
- public AbortProcedureResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = admin.getRpcControllerFactory().newController();
- controller.setCallTimeout(callTimeout);
- return master.abortProcedure(controller, request);
+ protected AbortProcedureResponse rpcCall() throws Exception {
+ return master.abortProcedure(getRpcController(), request);
}
});
}
@@ -3401,10 +3274,10 @@ public class HBaseAdmin implements Admin {
protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest request)
throws IOException {
return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>(
- admin.getConnection()) {
+ admin.getConnection(), admin.getRpcControllerFactory()) {
@Override
- public GetProcedureResultResponse call(int callTimeout) throws ServiceException {
- return master.getProcedureResult(null, request);
+ protected GetProcedureResultResponse rpcCall() throws Exception {
+ return master.getProcedureResult(getRpcController(), request);
}
});
}
@@ -3699,14 +3572,13 @@ public class HBaseAdmin implements Admin {
@Override
public List<SecurityCapability> getSecurityCapabilities() throws IOException {
try {
- return executeCallable(new MasterCallable<List<SecurityCapability>>(getConnection()) {
+
<TRUNCATED>