You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/08/05 17:14:15 UTC
[3/4] hbase git commit: HBASE-16308 Contain protobuf references
Gather up the pb references into a few locations only rather than have pb
references distributed all about the code base.
http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 29650ef..fa18bd8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -18,10 +18,6 @@
*/
package org.apache.hadoop.hbase.client;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
-
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -32,6 +28,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -69,7 +66,6 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.security.SecurityCapability;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
@@ -183,6 +179,8 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that
* this is an HBase-internal class as defined in
@@ -211,10 +209,6 @@ public class HBaseAdmin implements Admin {
private volatile Configuration conf;
private final long pause;
private final int numRetries;
- // Some operations can take a long time such as disable of big table.
- // numRetries is for 'normal' stuff... Multiply by this factor when
- // want to wait a long time.
- private final int retryLongerMultiplier;
private final int syncWaitTimeout;
private boolean aborted;
private int operationTimeout;
@@ -239,8 +233,6 @@ public class HBaseAdmin implements Admin {
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- this.retryLongerMultiplier = this.conf.getInt(
- "hbase.client.retries.longer.multiplier", 10);
this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
@@ -262,7 +254,7 @@ public class HBaseAdmin implements Admin {
}
@Override
- public boolean isAborted(){
+ public boolean isAborted() {
return this.aborted;
}
@@ -274,18 +266,16 @@ public class HBaseAdmin implements Admin {
}
@Override
- public Future<Boolean> abortProcedureAsync(
- final long procId,
- final boolean mayInterruptIfRunning) throws IOException {
+ public Future<Boolean> abortProcedureAsync(final long procId, final boolean mayInterruptIfRunning)
+ throws IOException {
Boolean abortProcResponse = executeCallable(
- new MasterCallable<AbortProcedureResponse>(getConnection()) {
+ new MasterCallable<AbortProcedureResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public AbortProcedureResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected AbortProcedureResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
AbortProcedureRequest abortProcRequest =
AbortProcedureRequest.newBuilder().setProcId(procId).build();
- return master.abortProcedure(controller, abortProcRequest);
+ return master.abortProcedure(rpcController, abortProcRequest);
}
}).getIsProcedureAborted();
@@ -324,9 +314,9 @@ public class HBaseAdmin implements Admin {
@Override
public boolean tableExists(final TableName tableName) throws IOException {
- return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
+ return executeCallable(new RpcRetryingCallable<Boolean>() {
@Override
- public Boolean call(int callTimeout) throws ServiceException, IOException {
+ protected Boolean rpcCall(int callTimeout) throws Exception {
return MetaTableAccessor.tableExists(connection, tableName);
}
});
@@ -350,14 +340,15 @@ public class HBaseAdmin implements Admin {
@Override
public HTableDescriptor[] listTables(final Pattern pattern, final boolean includeSysTables)
throws IOException {
- return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
+ return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public HTableDescriptor[] call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected HTableDescriptor[] call(PayloadCarryingRpcController rpcController)
+ throws Exception {
GetTableDescriptorsRequest req =
RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables);
- return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req));
+ return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(rpcController,
+ req));
}
});
}
@@ -386,14 +377,13 @@ public class HBaseAdmin implements Admin {
@Override
public TableName[] listTableNames(final Pattern pattern, final boolean includeSysTables)
throws IOException {
- return executeCallable(new MasterCallable<TableName[]>(getConnection()) {
+ return executeCallable(new MasterCallable<TableName[]>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public TableName[] call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected TableName[] call(PayloadCarryingRpcController rpcController) throws Exception {
GetTableNamesRequest req =
RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables);
- return ProtobufUtil.getTableNameArray(master.getTableNames(controller, req)
+ return ProtobufUtil.getTableNameArray(master.getTableNames(rpcController, req)
.getTableNamesList());
}
});
@@ -414,27 +404,25 @@ public class HBaseAdmin implements Admin {
static HTableDescriptor getTableDescriptor(final TableName tableName, Connection connection,
RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory,
int operationTimeout, int rpcTimeout) throws IOException {
- if (tableName == null) return null;
- HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) {
- @Override
- public HTableDescriptor call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- GetTableDescriptorsResponse htds;
- GetTableDescriptorsRequest req =
- RequestConverter.buildGetTableDescriptorsRequest(tableName);
- htds = master.getTableDescriptors(controller, req);
-
- if (!htds.getTableSchemaList().isEmpty()) {
- return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
- }
- return null;
+ if (tableName == null) return null;
+ HTableDescriptor htd =
+ executeCallable(new MasterCallable<HTableDescriptor>(connection, rpcControllerFactory) {
+ @Override
+ protected HTableDescriptor call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ GetTableDescriptorsRequest req =
+ RequestConverter.buildGetTableDescriptorsRequest(tableName);
+ GetTableDescriptorsResponse htds = master.getTableDescriptors(rpcController, req);
+ if (!htds.getTableSchemaList().isEmpty()) {
+ return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
}
- }, rpcCallerFactory, operationTimeout, rpcTimeout);
- if (htd != null) {
- return htd;
+ return null;
}
- throw new TableNotFoundException(tableName.getNameAsString());
+ }, rpcCallerFactory, operationTimeout, rpcTimeout);
+ if (htd != null) {
+ return htd;
+ }
+ throw new TableNotFoundException(tableName.getNameAsString());
}
private long getPauseTime(int tries) {
@@ -502,15 +490,14 @@ public class HBaseAdmin implements Admin {
}
CreateTableResponse response = executeCallable(
- new MasterCallable<CreateTableResponse>(getConnection()) {
+ new MasterCallable<CreateTableResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public CreateTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(desc.getTableName());
+ protected CreateTableResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ rpcController.setPriority(desc.getTableName());
CreateTableRequest request = RequestConverter.buildCreateTableRequest(
desc, splitKeys, ng.getNonceGroup(), ng.newNonce());
- return master.createTable(controller, request);
+ return master.createTable(rpcController, request);
}
});
return new CreateTableFuture(this, desc, splitKeys, response);
@@ -554,15 +541,14 @@ public class HBaseAdmin implements Admin {
@Override
public Future<Void> deleteTableAsync(final TableName tableName) throws IOException {
DeleteTableResponse response = executeCallable(
- new MasterCallable<DeleteTableResponse>(getConnection()) {
+ new MasterCallable<DeleteTableResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public DeleteTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
+ protected DeleteTableResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ rpcController.setPriority(tableName);
DeleteTableRequest req =
RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
- return master.deleteTable(controller,req);
+ return master.deleteTable(rpcController,req);
}
});
return new DeleteTableFuture(this, tableName, response);
@@ -636,16 +622,16 @@ public class HBaseAdmin implements Admin {
public Future<Void> truncateTableAsync(final TableName tableName, final boolean preserveSplits)
throws IOException {
TruncateTableResponse response =
- executeCallable(new MasterCallable<TruncateTableResponse>(getConnection()) {
+ executeCallable(new MasterCallable<TruncateTableResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public TruncateTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
+ protected TruncateTableResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ rpcController.setPriority(tableName);
LOG.info("Started truncating " + tableName);
TruncateTableRequest req = RequestConverter.buildTruncateTableRequest(
tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce());
- return master.truncateTable(controller, req);
+ return master.truncateTable(rpcController, req);
}
});
return new TruncateTableFuture(this, tableName, preserveSplits, response);
@@ -701,17 +687,15 @@ public class HBaseAdmin implements Admin {
public Future<Void> enableTableAsync(final TableName tableName) throws IOException {
TableName.isLegalFullyQualifiedTableName(tableName.getName());
EnableTableResponse response = executeCallable(
- new MasterCallable<EnableTableResponse>(getConnection()) {
+ new MasterCallable<EnableTableResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public EnableTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected EnableTableResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ rpcController.setPriority(tableName);
LOG.info("Started enable of " + tableName);
EnableTableRequest req =
RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(),ng.newNonce());
- return master.enableTable(controller,req);
+ return master.enableTable(rpcController,req);
}
});
return new EnableTableFuture(this, tableName, response);
@@ -767,18 +751,16 @@ public class HBaseAdmin implements Admin {
public Future<Void> disableTableAsync(final TableName tableName) throws IOException {
TableName.isLegalFullyQualifiedTableName(tableName.getName());
DisableTableResponse response = executeCallable(
- new MasterCallable<DisableTableResponse>(getConnection()) {
+ new MasterCallable<DisableTableResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public DisableTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected DisableTableResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ rpcController.setPriority(tableName);
LOG.info("Started disable of " + tableName);
DisableTableRequest req =
RequestConverter.buildDisableTableRequest(
tableName, ng.getNonceGroup(), ng.newNonce());
- return master.disableTable(controller, req);
+ return master.disableTable(rpcController, req);
}
});
return new DisableTableFuture(this, tableName, response);
@@ -827,9 +809,9 @@ public class HBaseAdmin implements Admin {
@Override
public boolean isTableEnabled(final TableName tableName) throws IOException {
checkTableExists(tableName);
- return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
+ return executeCallable(new RpcRetryingCallable<Boolean>() {
@Override
- public Boolean call(int callTimeout) throws ServiceException, IOException {
+ protected Boolean rpcCall(int callTimeout) throws Exception {
TableState tableState = MetaTableAccessor.getTableState(connection, tableName);
if (tableState == null)
throw new TableNotFoundException(tableName);
@@ -856,16 +838,15 @@ public class HBaseAdmin implements Admin {
@Override
public Pair<Integer, Integer> getAlterStatus(final TableName tableName) throws IOException {
- return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) {
+ return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public Pair<Integer, Integer> call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected Pair<Integer, Integer> call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ rpcController.setPriority(tableName);
GetSchemaAlterStatusRequest req = RequestConverter
.buildGetSchemaAlterStatusRequest(tableName);
- GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(controller, req);
+ GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(rpcController, req);
Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(),
ret.getTotalRegions());
return pair;
@@ -894,17 +875,16 @@ public class HBaseAdmin implements Admin {
public Future<Void> addColumnFamily(final TableName tableName,
final HColumnDescriptor columnFamily) throws IOException {
AddColumnResponse response =
- executeCallable(new MasterCallable<AddColumnResponse>(getConnection()) {
+ executeCallable(new MasterCallable<AddColumnResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public AddColumnResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected AddColumnResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ rpcController.setPriority(tableName);
AddColumnRequest req =
RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
ng.newNonce());
- return master.addColumn(controller, req);
+ return master.addColumn(rpcController, req);
}
});
return new AddColumnFamilyFuture(this, tableName, response);
@@ -939,17 +919,16 @@ public class HBaseAdmin implements Admin {
public Future<Void> deleteColumnFamily(final TableName tableName, final byte[] columnFamily)
throws IOException {
DeleteColumnResponse response =
- executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection()) {
+ executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public DeleteColumnResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected DeleteColumnResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ rpcController.setPriority(tableName);
DeleteColumnRequest req =
RequestConverter.buildDeleteColumnRequest(tableName, columnFamily,
ng.getNonceGroup(), ng.newNonce());
- master.deleteColumn(controller, req);
+ master.deleteColumn(rpcController, req);
return null;
}
});
@@ -985,17 +964,16 @@ public class HBaseAdmin implements Admin {
public Future<Void> modifyColumnFamily(final TableName tableName,
final HColumnDescriptor columnFamily) throws IOException {
ModifyColumnResponse response =
- executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection()) {
+ executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public ModifyColumnResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected ModifyColumnResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ rpcController.setPriority(tableName);
ModifyColumnRequest req =
RequestConverter.buildModifyColumnRequest(tableName, columnFamily,
ng.getNonceGroup(), ng.newNonce());
- master.modifyColumn(controller, req);
+ master.modifyColumn(rpcController, req);
return null;
}
});
@@ -1044,28 +1022,26 @@ public class HBaseAdmin implements Admin {
@Override
public boolean closeRegionWithEncodedRegionName(final String encodedRegionName,
final String serverName) throws IOException {
- if (null == serverName || ("").equals(serverName.trim())) {
- throw new IllegalArgumentException(
- "The servername cannot be null or empty.");
+ if (null == serverName || ("").equals(serverName.trim())) {
+ throw new IllegalArgumentException("The servername cannot be null or empty.");
}
- ServerName sn = ServerName.valueOf(serverName);
- AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
- // Close the region without updating zk state.
- CloseRegionRequest request =
- RequestConverter.buildCloseRegionRequest(sn, encodedRegionName);
- try {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
- // TODO: this does not do retries, it should. Set priority and timeout in controller
- CloseRegionResponse response = admin.closeRegion(controller, request);
- boolean isRegionClosed = response.getClosed();
- if (false == isRegionClosed) {
- LOG.error("Not able to close the region " + encodedRegionName + ".");
+ final ServerName sn = ServerName.valueOf(serverName);
+ final AdminService.BlockingInterface admin = connection.getAdmin(sn);
+ final PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ return executeCallable(new RpcRetryingCallable<Boolean>() {
+ @Override
+ protected Boolean rpcCall(int callTimeout) throws Exception {
+ controller.setCallTimeout(callTimeout);
+ CloseRegionRequest request =
+ RequestConverter.buildCloseRegionRequest(sn, encodedRegionName);
+ CloseRegionResponse response = admin.closeRegion(controller, request);
+ boolean closed = response.getClosed();
+ if (false == closed) {
+ LOG.error("Not able to close the region " + encodedRegionName + ".");
+ }
+ return closed;
}
- return isRegionClosed;
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ });
}
@Override
@@ -1104,20 +1080,20 @@ public class HBaseAdmin implements Admin {
if (regionServerPair.getSecond() == null) {
throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
}
- HRegionInfo hRegionInfo = regionServerPair.getFirst();
+ final HRegionInfo hRegionInfo = regionServerPair.getFirst();
ServerName serverName = regionServerPair.getSecond();
-
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
- AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
- FlushRegionRequest request =
- RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
- try {
- // TODO: this does not do retries, it should. Set priority and timeout in controller
- admin.flushRegion(controller, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ final PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ final AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
+ executeCallable(new RpcRetryingCallable<Void>() {
+ @Override
+ protected Void rpcCall(int callTimeout) throws Exception {
+ controller.setCallTimeout(callTimeout);
+ FlushRegionRequest request =
+ RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
+ admin.flushRegion(controller, request);
+ return null;
+ }
+ });
}
/**
@@ -1268,67 +1244,45 @@ public class HBaseAdmin implements Admin {
private void compact(final ServerName sn, final HRegionInfo hri,
final boolean major, final byte [] family)
throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
- CompactRegionRequest request =
- RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
- try {
- // TODO: this does not do retries, it should. Set priority and timeout in controller
- admin.compactRegion(controller, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ final PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+ executeCallable(new RpcRetryingCallable<Void>() {
+ @Override
+ protected Void rpcCall(int callTimeout) throws Exception {
+ controller.setCallTimeout(callTimeout);
+ CompactRegionRequest request =
+ RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family);
+ admin.compactRegion(controller, request);
+ return null;
+ }
+ });
}
@Override
public void move(final byte [] encodedRegionName, final byte [] destServerName)
- throws IOException {
-
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ throws IOException {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // Hard to know the table name, at least check if meta
- if (isMetaRegion(encodedRegionName)) {
- controller.setPriority(TableName.META_TABLE_NAME);
- }
-
- try {
- MoveRegionRequest request =
- RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName);
- master.moveRegion(controller, request);
- } catch (DeserializationException de) {
- LOG.error("Could not parse destination server name: " + de);
- throw new ServiceException(new DoNotRetryIOException(de));
- }
+ protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+ rpcController.setPriority(encodedRegionName);
+ MoveRegionRequest request =
+ RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName);
+ master.moveRegion(rpcController, request);
return null;
}
});
}
- private boolean isMetaRegion(final byte[] regionName) {
- return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
- || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
- }
-
@Override
- public void assign(final byte[] regionName) throws MasterNotRunningException,
+ public void assign(final byte [] regionName) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException {
- final byte[] toBeAssigned = getRegionName(regionName);
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // Hard to know the table name, at least check if meta
- if (isMetaRegion(regionName)) {
- controller.setPriority(TableName.META_TABLE_NAME);
- }
-
+ protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+ rpcController.setPriority(regionName);
AssignRegionRequest request =
- RequestConverter.buildAssignRegionRequest(toBeAssigned);
- master.assignRegion(controller,request);
+ RequestConverter.buildAssignRegionRequest(getRegionName(regionName));
+ master.assignRegion(rpcController, request);
return null;
}
});
@@ -1338,18 +1292,13 @@ public class HBaseAdmin implements Admin {
public void unassign(final byte [] regionName, final boolean force)
throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
final byte[] toBeUnassigned = getRegionName(regionName);
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // Hard to know the table name, at least check if meta
- if (isMetaRegion(regionName)) {
- controller.setPriority(TableName.META_TABLE_NAME);
- }
+ protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+ rpcController.setPriority(regionName);
UnassignRegionRequest request =
- RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
- master.unassignRegion(controller, request);
+ RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
+ master.unassignRegion(rpcController, request);
return null;
}
});
@@ -1358,16 +1307,11 @@ public class HBaseAdmin implements Admin {
@Override
public void offline(final byte [] regionName)
throws IOException {
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // Hard to know the table name, at least check if meta
- if (isMetaRegion(regionName)) {
- controller.setPriority(TableName.META_TABLE_NAME);
- }
- master.offlineRegion(controller, RequestConverter.buildOfflineRegionRequest(regionName));
+ protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+ rpcController.setPriority(regionName);
+ master.offlineRegion(rpcController, RequestConverter.buildOfflineRegionRequest(regionName));
return null;
}
});
@@ -1376,56 +1320,44 @@ public class HBaseAdmin implements Admin {
@Override
public boolean setBalancerRunning(final boolean on, final boolean synchronous)
throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
+ protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
SetBalancerRunningRequest req =
RequestConverter.buildSetBalancerRunningRequest(on, synchronous);
- return master.setBalancerRunning(controller, req).getPrevBalanceValue();
+ return master.setBalancerRunning(rpcController, req).getPrevBalanceValue();
}
});
}
@Override
public boolean balancer() throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.balance(controller,
- RequestConverter.buildBalanceRequest(false)).getBalancerRan();
+ protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+ return master.balance(rpcController,
+ RequestConverter.buildBalanceRequest(false)).getBalancerRan();
}
});
}
@Override
public boolean balancer(final boolean force) throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.balance(controller,
- RequestConverter.buildBalanceRequest(force)).getBalancerRan();
+ protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+ return master.balance(rpcController,
+ RequestConverter.buildBalanceRequest(force)).getBalancerRan();
}
});
}
@Override
public boolean isBalancerEnabled() throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.isBalancerEnabled(controller,
+ protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+ return master.isBalancerEnabled(rpcController,
RequestConverter.buildIsBalancerEnabledRequest()).getEnabled();
}
});
@@ -1433,13 +1365,10 @@ public class HBaseAdmin implements Admin {
@Override
public boolean normalize() throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.normalize(controller,
+ protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+ return master.normalize(rpcController,
RequestConverter.buildNormalizeRequest()).getNormalizerRan();
}
});
@@ -1447,13 +1376,10 @@ public class HBaseAdmin implements Admin {
@Override
public boolean isNormalizerEnabled() throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.isNormalizerEnabled(controller,
+ protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+ return master.isNormalizerEnabled(rpcController,
RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled();
}
});
@@ -1461,28 +1387,22 @@ public class HBaseAdmin implements Admin {
@Override
public boolean setNormalizerRunning(final boolean on) throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
+ protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
SetNormalizerRunningRequest req =
RequestConverter.buildSetNormalizerRunningRequest(on);
- return master.setNormalizerRunning(controller, req).getPrevNormalizerValue();
+ return master.setNormalizerRunning(rpcController, req).getPrevNormalizerValue();
}
});
}
@Override
public boolean enableCatalogJanitor(final boolean enable) throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.enableCatalogJanitor(controller,
+ protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+ return master.enableCatalogJanitor(rpcController,
RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue();
}
});
@@ -1490,13 +1410,10 @@ public class HBaseAdmin implements Admin {
@Override
public int runCatalogScan() throws IOException {
- return executeCallable(new MasterCallable<Integer>(getConnection()) {
+ return executeCallable(new MasterCallable<Integer>(getConnection(), getRpcControllerFactory()) {
@Override
- public Integer call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.runCatalogScan(controller,
+ protected Integer call(PayloadCarryingRpcController rpcController) throws Exception {
+ return master.runCatalogScan(rpcController,
RequestConverter.buildCatalogScanRequest()).getScanResult();
}
});
@@ -1504,13 +1421,10 @@ public class HBaseAdmin implements Admin {
@Override
public boolean isCatalogJanitorEnabled() throws IOException {
- return executeCallable(new MasterCallable<Boolean>(getConnection()) {
+ return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
- public Boolean call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- return master.isCatalogJanitorEnabled(controller,
+ protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception {
+ return master.isCatalogJanitorEnabled(rpcController,
RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue();
}
});
@@ -1616,25 +1530,19 @@ public class HBaseAdmin implements Admin {
}
DispatchMergingRegionsResponse response =
- executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection()) {
+ executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public DispatchMergingRegionsResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
-
- try {
- DispatchMergingRegionsRequest request = RequestConverter
- .buildDispatchMergingRegionsRequest(
+ protected DispatchMergingRegionsResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ DispatchMergingRegionsRequest request = RequestConverter
+ .buildDispatchMergingRegionsRequest(
encodedNameOfRegionA,
encodedNameOfRegionB,
forcible,
ng.getNonceGroup(),
ng.newNonce());
- return master.dispatchMergingRegions(controller, request);
- } catch (DeserializationException de) {
- LOG.error("Could not parse destination server name: " + de);
- throw new ServiceException(new DoNotRetryIOException(de));
- }
+ return master.dispatchMergingRegions(rpcController, request);
}
});
return new DispatchMergingRegionsFuture(this, tableName, response);
@@ -1746,21 +1654,17 @@ public class HBaseAdmin implements Admin {
throw new IllegalArgumentException("the specified table name '" + tableName +
"' doesn't match with the HTD one: " + htd.getTableName());
}
-
ModifyTableResponse response = executeCallable(
- new MasterCallable<ModifyTableResponse>(getConnection()) {
+ new MasterCallable<ModifyTableResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public ModifyTableResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(tableName);
-
+ protected ModifyTableResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ rpcController.setPriority(tableName);
ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
tableName, htd, ng.getNonceGroup(), ng.newNonce());
- return master.modifyTable(controller, request);
+ return master.modifyTable(rpcController, request);
}
});
-
return new ModifyTableFuture(this, tableName, response);
}
@@ -1875,9 +1779,9 @@ public class HBaseAdmin implements Admin {
*/
private TableName checkTableExists(final TableName tableName)
throws IOException {
- return executeCallable(new ConnectionCallable<TableName>(getConnection()) {
+ return executeCallable(new RpcRetryingCallable<TableName>() {
@Override
- public TableName call(int callTimeout) throws ServiceException, IOException {
+ protected TableName rpcCall(int callTimeout) throws Exception {
if (!MetaTableAccessor.tableExists(connection, tableName)) {
throw new TableNotFoundException(tableName);
}
@@ -1888,13 +1792,11 @@ public class HBaseAdmin implements Admin {
@Override
public synchronized void shutdown() throws IOException {
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(HConstants.HIGH_QOS);
- master.shutdown(controller, ShutdownRequest.newBuilder().build());
+ protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+ rpcController.setPriority(HConstants.HIGH_QOS);
+ master.shutdown(rpcController, ShutdownRequest.newBuilder().build());
return null;
}
});
@@ -1902,13 +1804,11 @@ public class HBaseAdmin implements Admin {
@Override
public synchronized void stopMaster() throws IOException {
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- controller.setPriority(HConstants.HIGH_QOS);
- master.stopMaster(controller, StopMasterRequest.newBuilder().build());
+ protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+ rpcController.setPriority(HConstants.HIGH_QOS);
+ master.stopMaster(rpcController, StopMasterRequest.newBuilder().build());
return null;
}
});
@@ -1919,43 +1819,41 @@ public class HBaseAdmin implements Admin {
throws IOException {
String hostname = Addressing.parseHostname(hostnamePort);
int port = Addressing.parsePort(hostnamePort);
- AdminService.BlockingInterface admin =
+ final AdminService.BlockingInterface admin =
this.connection.getAdmin(ServerName.valueOf(hostname, port, 0));
- StopServerRequest request = RequestConverter.buildStopServerRequest(
- "Called by admin client " + this.connection.toString());
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
- controller.setPriority(HConstants.HIGH_QOS);
- try {
- // TODO: this does not do retries, it should. Set priority and timeout in controller
- admin.stopServer(controller, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
+ @Override
+ protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+ rpcController.setPriority(HConstants.HIGH_QOS);
+ StopServerRequest request = RequestConverter.buildStopServerRequest(
+ "Called by admin client " + this.connection.toString());
+ admin.stopServer(rpcController, request);
+ return null;
+ }
+ });
}
@Override
public boolean isMasterInMaintenanceMode() throws IOException {
- return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection()) {
+ return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection(),
+ this.rpcControllerFactory) {
@Override
- public IsInMaintenanceModeResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.isMasterInMaintenanceMode(
- controller, IsInMaintenanceModeRequest.newBuilder().build());
+ protected IsInMaintenanceModeResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ return master.isMasterInMaintenanceMode(rpcController,
+ IsInMaintenanceModeRequest.newBuilder().build());
}
}).getInMaintenanceMode();
}
@Override
public ClusterStatus getClusterStatus() throws IOException {
- return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) {
+ return executeCallable(new MasterCallable<ClusterStatus>(getConnection(),
+ this.rpcControllerFactory) {
@Override
- public ClusterStatus call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected ClusterStatus call(PayloadCarryingRpcController rpcController) throws Exception {
GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest();
- return ProtobufUtil.convert(master.getClusterStatus(controller, req).getClusterStatus());
+ return ProtobufUtil.convert(master.getClusterStatus(rpcController, req).getClusterStatus());
}
});
}
@@ -1996,19 +1894,16 @@ public class HBaseAdmin implements Admin {
public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor)
throws IOException {
CreateNamespaceResponse response =
- executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection()) {
- @Override
- public CreateNamespaceResponse call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // TODO: set priority based on NS?
- return master.createNamespace(controller,
- CreateNamespaceRequest.newBuilder()
- .setNamespaceDescriptor(ProtobufUtil
- .toProtoNamespaceDescriptor(descriptor)).build()
- );
- }
- });
+ executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected CreateNamespaceResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ return master.createNamespace(rpcController,
+ CreateNamespaceRequest.newBuilder().setNamespaceDescriptor(ProtobufUtil.
+ toProtoNamespaceDescriptor(descriptor)).build());
+ }
+ });
return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
@Override
public String getOperationType() {
@@ -2027,16 +1922,16 @@ public class HBaseAdmin implements Admin {
public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor)
throws IOException {
ModifyNamespaceResponse response =
- executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection()) {
- @Override
- public ModifyNamespaceResponse call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // TODO: set priority based on NS?
- return master.modifyNamespace(controller, ModifyNamespaceRequest.newBuilder().
- setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
- }
- });
+ executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected ModifyNamespaceResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ // TODO: set priority based on NS?
+ return master.modifyNamespace(rpcController, ModifyNamespaceRequest.newBuilder().
+ setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
+ }
+ });
return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) {
@Override
public String getOperationType() {
@@ -2055,16 +1950,16 @@ public class HBaseAdmin implements Admin {
public Future<Void> deleteNamespaceAsync(final String name)
throws IOException {
DeleteNamespaceResponse response =
- executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection()) {
- @Override
- public DeleteNamespaceResponse call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- // TODO: set priority based on NS?
- return master.deleteNamespace(controller, DeleteNamespaceRequest.newBuilder().
- setNamespaceName(name).build());
- }
- });
+ executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected DeleteNamespaceResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ // TODO: set priority based on NS?
+ return master.deleteNamespace(rpcController, DeleteNamespaceRequest.newBuilder().
+ setNamespaceName(name).build());
+ }
+ });
return new NamespaceFuture(this, name, response.getProcId()) {
@Override
public String getOperationType() {
@@ -2075,100 +1970,94 @@ public class HBaseAdmin implements Admin {
@Override
public NamespaceDescriptor getNamespaceDescriptor(final String name) throws IOException {
- return
- executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) {
- @Override
- public NamespaceDescriptor call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return ProtobufUtil.toNamespaceDescriptor(
- master.getNamespaceDescriptor(controller, GetNamespaceDescriptorRequest.newBuilder().
+ return executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected NamespaceDescriptor call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ return ProtobufUtil.toNamespaceDescriptor(
+ master.getNamespaceDescriptor(rpcController, GetNamespaceDescriptorRequest.newBuilder().
setNamespaceName(name).build()).getNamespaceDescriptor());
- }
- });
+ }
+ });
}
@Override
public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException {
- return
- executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) {
- @Override
- public NamespaceDescriptor[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- List<HBaseProtos.NamespaceDescriptor> list =
- master.listNamespaceDescriptors(controller,
- ListNamespaceDescriptorsRequest.newBuilder().build())
- .getNamespaceDescriptorList();
- NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
- for(int i = 0; i < list.size(); i++) {
- res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
- }
- return res;
- }
- });
+ return executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected NamespaceDescriptor[] call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ List<HBaseProtos.NamespaceDescriptor> list =
+ master.listNamespaceDescriptors(rpcController,
+ ListNamespaceDescriptorsRequest.newBuilder().build()).getNamespaceDescriptorList();
+ NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
+ for(int i = 0; i < list.size(); i++) {
+ res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
+ }
+ return res;
+ }
+ });
}
@Override
public ProcedureInfo[] listProcedures() throws IOException {
- return
- executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection()) {
- @Override
- public ProcedureInfo[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- List<ProcedureProtos.Procedure> procList = master.listProcedures(
- controller, ListProceduresRequest.newBuilder().build()).getProcedureList();
- ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()];
- for (int i = 0; i < procList.size(); i++) {
- procInfoList[i] = ProcedureUtil.convert(procList.get(i));
- }
- return procInfoList;
- }
- });
+ return executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected ProcedureInfo[] call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ List<ProcedureProtos.Procedure> procList = master.listProcedures(
+ rpcController, ListProceduresRequest.newBuilder().build()).getProcedureList();
+ ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()];
+ for (int i = 0; i < procList.size(); i++) {
+ procInfoList[i] = ProcedureUtil.convert(procList.get(i));
+ }
+ return procInfoList;
+ }
+ });
}
@Override
public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) throws IOException {
- return
- executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
- @Override
- public HTableDescriptor[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- List<TableSchema> list =
- master.listTableDescriptorsByNamespace(controller,
- ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
- .build()).getTableSchemaList();
- HTableDescriptor[] res = new HTableDescriptor[list.size()];
- for(int i=0; i < list.size(); i++) {
-
- res[i] = ProtobufUtil.convertToHTableDesc(list.get(i));
- }
- return res;
- }
- });
+ return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected HTableDescriptor[] call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ List<TableSchema> list =
+ master.listTableDescriptorsByNamespace(rpcController,
+ ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
+ .build()).getTableSchemaList();
+ HTableDescriptor[] res = new HTableDescriptor[list.size()];
+ for(int i=0; i < list.size(); i++) {
+
+ res[i] = ProtobufUtil.convertToHTableDesc(list.get(i));
+ }
+ return res;
+ }
+ });
}
@Override
public TableName[] listTableNamesByNamespace(final String name) throws IOException {
- return
- executeCallable(new MasterCallable<TableName[]>(getConnection()) {
- @Override
- public TableName[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- List<HBaseProtos.TableName> tableNames =
- master.listTableNamesByNamespace(controller, ListTableNamesByNamespaceRequest.
+ return executeCallable(new MasterCallable<TableName[]>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected TableName[] call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ List<HBaseProtos.TableName> tableNames =
+ master.listTableNamesByNamespace(rpcController, ListTableNamesByNamespaceRequest.
newBuilder().setNamespaceName(name).build())
- .getTableNameList();
- TableName[] result = new TableName[tableNames.size()];
- for (int i = 0; i < tableNames.size(); i++) {
- result[i] = ProtobufUtil.toTableName(tableNames.get(i));
- }
- return result;
- }
- });
+ .getTableNameList();
+ TableName[] result = new TableName[tableNames.size()];
+ for (int i = 0; i < tableNames.size(); i++) {
+ result[i] = ProtobufUtil.toTableName(tableNames.get(i));
+ }
+ return result;
+ }
+ });
}
/**
@@ -2176,10 +2065,26 @@ public class HBaseAdmin implements Admin {
* @param conf system configuration
* @throws MasterNotRunningException if the master is not running
* @throws ZooKeeperConnectionException if unable to connect to zookeeper
+ * @deprecated since hbase-2.0.0 because throws a ServiceException. We don't want to have
+ * protobuf as part of our public API. Use {@link #available(Configuration)}
*/
// Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not.
+ // MOB uses it too.
+ // NOTE: hbase-2.0.0 removes ServiceException from the throw.
+ @Deprecated
public static void checkHBaseAvailable(Configuration conf)
- throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
+ throws MasterNotRunningException, ZooKeeperConnectionException, IOException,
+ com.google.protobuf.ServiceException {
+ available(conf);
+ }
+
+ /**
+ * Is HBase available? Throw an exception if not.
+ * @param conf system configuration
+ * @throws ZooKeeperConnectionException if unable to connect to zookeeper]
+ */
+ public static void available(final Configuration conf)
+ throws ZooKeeperConnectionException, InterruptedIOException {
Configuration copyOfConf = HBaseConfiguration.create(conf);
// We set it to make it fail as soon as possible if HBase is not available
copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
@@ -2191,7 +2096,6 @@ public class HBaseAdmin implements Admin {
(ClusterConnection) ConnectionFactory.createConnection(copyOfConf);
ZooKeeperKeepAliveConnection zkw = ((ConnectionImplementation) connection).
getKeepAliveZooKeeperWatcher();) {
-
// This is NASTY. FIX!!!! Dependent on internal implementation! TODO
zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false);
connection.isMasterRunning();
@@ -2231,14 +2135,15 @@ public class HBaseAdmin implements Admin {
@Override
public HTableDescriptor[] getTableDescriptorsByTableName(final List<TableName> tableNames)
throws IOException {
- return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
+ return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public HTableDescriptor[] call(int callTimeout) throws Exception {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected HTableDescriptor[] call(PayloadCarryingRpcController rpcController)
+ throws Exception {
GetTableDescriptorsRequest req =
RequestConverter.buildGetTableDescriptorsRequest(tableNames);
- return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req));
+ return ProtobufUtil.
+ getHTableDescriptorArray(master.getTableDescriptors(rpcController, req));
}
});
}
@@ -2276,16 +2181,16 @@ public class HBaseAdmin implements Admin {
private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException,
FailedLogCloseException {
- AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
- RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-
- try {
- // TODO: this does not do retries, it should. Set priority and timeout in controller
- return admin.rollWALWriter(controller, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+ Callable<RollWALWriterResponse> callable = new Callable<RollWALWriterResponse>() {
+ @Override
+ public RollWALWriterResponse call() throws Exception {
+ RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ return admin.rollWALWriter(controller, request);
+ }
+ };
+ return ProtobufUtil.call(callable);
}
/**
@@ -2321,8 +2226,7 @@ public class HBaseAdmin implements Admin {
}
byte[][] regionsToFlush = new byte[regionCount][];
for (int i = 0; i < regionCount; i++) {
- ByteString region = response.getRegionToFlush(i);
- regionsToFlush[i] = region.toByteArray();
+ regionsToFlush[i] = ProtobufUtil.toBytes(response.getRegionToFlush(i));
}
return regionsToFlush;
}
@@ -2352,28 +2256,31 @@ public class HBaseAdmin implements Admin {
@Override
public CompactionState getCompactionStateForRegion(final byte[] regionName)
throws IOException {
- try {
- Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
- if (regionServerPair == null) {
- throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
- }
- if (regionServerPair.getSecond() == null) {
- throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
- }
- ServerName sn = regionServerPair.getSecond();
- AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
- GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
- regionServerPair.getFirst().getRegionName(), true);
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- // TODO: this does not do retries, it should. Set priority and timeout in controller
- GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
- if (response.getCompactionState() != null) {
- return ProtobufUtil.createCompactionState(response.getCompactionState());
- }
- return null;
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ final Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
+ if (regionServerPair == null) {
+ throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
+ }
+ if (regionServerPair.getSecond() == null) {
+ throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
}
+ ServerName sn = regionServerPair.getSecond();
+ final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+ Callable<CompactionState> callable = new Callable<CompactionState>() {
+ @Override
+ public CompactionState call() throws Exception {
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
+ regionServerPair.getFirst().getRegionName(), true);
+
+ // TODO: this does not do retries, it should. Set priority and timeout in controller
+ GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
+ if (response.getCompactionState() != null) {
+ return ProtobufUtil.createCompactionState(response.getCompactionState());
+ }
+ return null;
+ }
+ };
+ return ProtobufUtil.call(callable);
}
@Override
@@ -2425,12 +2332,12 @@ public class HBaseAdmin implements Admin {
throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
}
LOG.debug("Getting current status of snapshot from master...");
- done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
+ done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.isSnapshotDone(controller, request);
+ protected IsSnapshotDoneResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ return master.isSnapshotDone(rpcController, request);
}
});
}
@@ -2476,12 +2383,12 @@ public class HBaseAdmin implements Admin {
final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
.build();
// run the snapshot on the master
- return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) {
+ return executeCallable(new MasterCallable<SnapshotResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public SnapshotResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.snapshot(controller, request);
+ protected SnapshotResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ return master.snapshot(rpcController, request);
}
});
}
@@ -2490,12 +2397,12 @@ public class HBaseAdmin implements Admin {
public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc)
throws IOException, HBaseSnapshotException, UnknownSnapshotException {
final HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc);
- return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
+ return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.isSnapshotDone(controller,
+ protected IsSnapshotDoneResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ return master.isSnapshotDone(rpcController,
IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
}
}).getDone();
@@ -2674,12 +2581,11 @@ public class HBaseAdmin implements Admin {
.setProcedure(builder.build()).build();
// run the procedure on the master
ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
- getConnection()) {
+ getConnection(), getRpcControllerFactory()) {
@Override
- public ExecProcedureResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.execProcedureWithRet(controller, request);
+ protected ExecProcedureResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ return master.execProcedureWithRet(rpcController, request);
}
});
@@ -2701,12 +2607,11 @@ public class HBaseAdmin implements Admin {
.setProcedure(builder.build()).build();
// run the procedure on the master
ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
- getConnection()) {
+ getConnection(), getRpcControllerFactory()) {
@Override
- public ExecProcedureResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.execProcedure(controller, request);
+ protected ExecProcedureResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ return master.execProcedure(rpcController, request);
}
});
@@ -2750,12 +2655,11 @@ public class HBaseAdmin implements Admin {
}
final ProcedureDescription desc = builder.build();
return executeCallable(
- new MasterCallable<IsProcedureDoneResponse>(getConnection()) {
+ new MasterCallable<IsProcedureDoneResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public IsProcedureDoneResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.isProcedureDone(controller, IsProcedureDoneRequest
+ protected IsProcedureDoneResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
+ return master.isProcedureDone(rpcController, IsProcedureDoneRequest
.newBuilder().setProcedure(desc).build());
}
}).getDone();
@@ -2781,17 +2685,16 @@ public class HBaseAdmin implements Admin {
ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
RestoreSnapshotResponse response = executeCallable(
- new MasterCallable<RestoreSnapshotResponse>(getConnection()) {
+ new MasterCallable<RestoreSnapshotResponse>(getConnection(), getRpcControllerFactory()) {
@Override
- public RestoreSnapshotResponse call(int callTimeout) throws ServiceException {
+ protected RestoreSnapshotResponse call(PayloadCarryingRpcController rpcController)
+ throws Exception {
final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder()
.setSnapshot(snapshot)
.setNonceGroup(ng.getNonceGroup())
.setNonce(ng.newNonce())
.build();
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- return master.restoreSnapshot(controller, request);
+ return master.restoreSnapshot(rpcController, request);
}
});
@@ -2828,13 +2731,13 @@ public class HBaseAdmin implements Admin {
@Override
public List<SnapshotDescription> listSnapshots() throws IOException {
- return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) {
+ return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection(),
+ getRpcControllerFactory()) {
@Override
- public List<SnapshotDescription> call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected List<SnapshotDescription> call(PayloadCarryingRpcController rpcController)
+ throws Exception {
List<HBaseProtos.SnapshotDescription> snapshotsList = master
- .getCompletedSnapshots(controller, GetCompletedSnapshotsRequest.newBuilder().build())
+ .getCompletedSnapshots(rpcController, GetCompletedSnapshotsRequest.newBuilder().build())
.getSnapshotsList();
List<SnapshotDescription> result = new ArrayList<SnapshotDescription>(snapshotsList.size());
for (HBaseProtos.SnapshotDescription snapshot : snapshotsList) {
@@ -2897,14 +2800,11 @@ public class HBaseAdmin implements Admin {
// make sure the snapshot is possibly valid
TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName));
// do the delete
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- master.deleteSnapshot(controller,
- DeleteSnapshotRequest.newBuilder().
- setSnapshot(
+ protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+ master.deleteSnapshot(rpcController,
+ DeleteSnapshotRequest.newBuilder().setSnapshot(
HBaseProtos.SnapshotDescription.newBuilder().setName(snapshotName).build())
.build()
);
@@ -2933,12 +2833,10 @@ public class HBaseAdmin implements Admin {
}
private void internalDeleteSnapshot(final SnapshotDescription snapshot) throws IOException {
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
- this.master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder()
+ protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
+ this.master.deleteSnapshot(rpcController, DeleteSnapshotRequest.newBuilder()
.setSnapshot(createHBaseProtosSnapshotDesc(snapshot)).build());
return null;
}
@@ -2967,11 +2865,10 @@ public class HBaseAdmin implements Admin {
@Override
public void setQuota(final QuotaSettings quota) throws IOException {
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
+ protected Void call(PayloadCarryingRpcController rpcController) throws Exception {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
this.master.setQuota(controller, QuotaSettings.buildSetQuotaRequestProto(quota));
return null;
}
@@ -2989,8 +2886,8 @@ public class HBaseAdmin implements Admin {
}
static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable,
- RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout,
- int rpcTimeout) throws IOException {
+ RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout)
+ throws IOException {
RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
try {
return caller.callWithRetries(callable, operationTimeout);
@@ -3008,7 +2905,6 @@ public class HBaseAdmin implements Admin {
* Simple {@link Abortable}, throwing RuntimeException on abort.
*/
private static class ThrowableAbortable implements Abortable {
-
@Override
public void abort(String why, Throwable e) {
throw new RuntimeException(why, e);
@@ -3026,13 +2922,16 @@ public class HBaseAdmin implements Admin {
}
@Override
- public void updateConfiguration(ServerName server) throws IOException {
- try {
- this.connection.getAdmin(server).updateConfiguration(null,
- UpdateConfigurationRequest.getDefaultInstance());
- } catch (ServiceException e) {
- throw ProtobufUtil.getRemoteException(e);
- }
+ public void updateConfiguration(final ServerName server) throws IOException {
+ final AdminService.BlockingInterface admin = this.connection.getAdmin(server);
+ Callable<Void> callable = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ admin.updateConfiguration(null, UpdateConfigurationRequest.getDefaultInstance());
+ return null;
+ }
+ };
+ ProtobufUtil.call(callable);
}
@Override
@@ -3045,8 +2944,7 @@ public class HBaseAdmin implements Admin {
@Override
public int getMasterInfoPort() throws IOException {
// TODO: Fix! Reaching into internal implementation!!!!
- ConnectionImplementation connection =
- (ConnectionImplementation)this.connection;
+ ConnectionImplementation connection = (ConnectionImplementation)this.connection;
ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
try {
return MasterAddressTracker.getMasterInfoPort(zkw);
@@ -3057,8 +2955,7 @@ public class HBaseAdmin implements Admin {
private ServerName getMasterAddress() throws IOException {
// TODO: Fix! Reaching into internal implementation!!!!
- ConnectionImplementation connection =
- (ConnectionImplementation)this.connection;
+ ConnectionImplementation connection = (ConnectionImplementation)this.connection;
ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher();
try {
return MasterAddressTracker.getMasterAddress(zkw);
@@ -3069,33 +2966,26 @@ public class HBaseAdmin implements Admin {
@Override
public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException {
- return executeCallable(new MasterCallable<Long>(getConnection()) {
+ return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
@Override
- public Long call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected Long call(PayloadCarryingRpcController rpcController) throws Exception {
MajorCompactionTimestampRequest req =
MajorCompactionTimestampRequest.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
- return master.getLastMajorCompactionTimestamp(controller, req).getCompactionTimestamp();
+ return master.getLastMajorCompactionTimestamp(rpcController, req).getCompactionTimestamp();
}
});
}
@Override
public long getLastMajorCompactionTimestampForRegion(final byte[] regionName) throws IOException {
- return executeCallable(new MasterCallable<Long>(getConnection()) {
+ return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) {
@Override
- public Long call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setCallTimeout(callTimeout);
+ protected Long call(PayloadCarryingRpcController rpcController) throws Exception {
MajorCompactionTimestampForRegionRequest req =
- MajorCompactionTimestampForRegionRequest
- .newBuilder()
- .setRegion(
- RequestConverter
+ MajorCompactionTimestampForRegionRequest.newBuilder().setRegion(RequestConverter
.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build();
- return master.getLastMajorCompactionTimestampForRegion(controller, req)
+ return master.getLastMajorCompactionTimestampForRegion(rpcController, req)
.getCompactionTimestamp();
}
});
@@ -3134,32 +3024,35 @@ public class HBaseAdmin implements Admin {
@Override
public void majorCompact(final TableName tableName, CompactType compactType)
throws IOException, InterruptedException {
- compact(tableName, null, true, compactType);
+ compact(tableName, null, true, compactType);
}
/**
* {@inheritDoc}
*/
@Override
- public CompactionState getCompactionState(TableName tableName,
+ public CompactionState getCompactionState(final TableName tableName,
CompactType compactType) throws IOException {
AdminProtos.GetRegionInfoResponse.CompactionState state =
AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
checkTableExists(tableName);
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ final PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
switch (compactType) {
case MOB:
- try {
- ServerName master = getMasterAddress();
- HRegionInfo info = getMobRegionInfo(tableName);
- GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
- info.getRegionName(), true);
- GetRegionInfoResponse response = this.connection.getAdmin(master)
- .getRegionInfo(controller, request);
- state = response.getCompactionState();
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
+ final AdminProtos.AdminService.BlockingInterface masterAdmin =
+ this.connection.getAdmin(getMasterAddress());
+ Callable<AdminProtos.GetRegionInfoResponse.CompactionState> callable =
+ new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
+ @Override
+ public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception {
+ HRegionInfo info = getMobRegionInfo(tableName);
+ GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
+ info.getRegionName(), true);
+ GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request);
+ return response.getCompactionState();
+ }
+ };
+ state = ProtobufUtil.call(callable);
break;
case NORMAL:
default:
@@ -3173,15 +3066,23 @@ public class HBaseAdmin implements Admin {
} else {
pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName);
}
- for (Pair<HRegionInfo, ServerName> pair : pairs) {
+ for (Pair<HRegionInfo, ServerName> pair: pairs) {
if (pair.getFirst().isOffline()) continue;
if (pair.getSecond() == null) continue;
+ final ServerName sn = pair.getSecond();
+ final byte [] regionName = pair.getFirst().getRegionName();
+ final AdminService.BlockingInterface snAdmin = this.connection.getAdmin(sn);
try {
- ServerName sn = pair.getSecond();
- AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
- GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
- pair.getFirst().getRegionName(), true);
- GetRegionInfoResponse response = admin.getRegionInfo(controller, request);
+ Callable<GetRegionInfoResponse> regionInfoCallable =
+ new Callable<GetRegionInfoResponse>() {
+ @Override
+ public GetRegionInfoResponse call() throws Exception {
+ GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(
+ regionName, true);
+ return snAdmin.getRegionInfo(rpcController, request);
+ }
+ };
+ GetRegionInfoResponse response = ProtobufUtil.call(regionInfoCallable);
switch (response.getCompactionState()) {
case MAJOR_AND_MINOR:
return CompactionState.MAJOR_AND_MINOR;
@@ -3217,8 +3118,6 @@ public class HBaseAdmin implements Admin {
}
}
}
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
} finally {
if (zookeeper != null) {
zookeeper.close();
@@ -3283,12 +3182,11 @@ public class HBaseAdmin implements Admin {
protected AbortProcedureResponse abortProcedureResult(
final AbortProcedureRequest request) throws IOException {
return admin.executeCallable(new MasterCallable<AbortProcedureResponse>(
- admin.getConnection()) {
+ admin.getConnection(), admin.getRpcControllerFactory()) {
@Override
- public AbortProcedureResponse call(int callTimeout) throws ServiceException {
- PayloadCarryingRpcController controller = admin.getRpcControllerFactory().newController();
- controller.setCallTimeout(callTimeout);
- return master.abortProcedure(controller, request);
+ protected AbortProcedureResponse call(PayloadCarr
<TRUNCATED>