You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/04/09 23:49:01 UTC
[10/18] hbase git commit: HBASE-13204 Procedure v2 - client
create/delete table sync
HBASE-13204 Procedure v2 - client create/delete table sync
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/02c15a2f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/02c15a2f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/02c15a2f
Branch: refs/heads/hbase-12439
Commit: 02c15a2fed5a9196e3d3d86b68bf4d3dc88ec0e9
Parents: aa934f8
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Apr 9 21:01:20 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Apr 9 22:44:59 2015 +0100
----------------------------------------------------------------------
.../hbase/client/ConnectionImplementation.java | 6 +
.../apache/hadoop/hbase/client/HBaseAdmin.java | 608 ++++-
.../hbase/client/TestProcedureFuture.java | 186 ++
.../hbase/protobuf/generated/MasterProtos.java | 2576 +++++++++++++++---
hbase-protocol/src/main/protobuf/Master.proto | 24 +
.../org/apache/hadoop/hbase/master/HMaster.java | 12 +-
.../hadoop/hbase/master/MasterRpcServices.java | 51 +-
.../hadoop/hbase/master/MasterServices.java | 4 +-
.../hadoop/hbase/master/TestCatalogJanitor.java | 7 +-
9 files changed, 2920 insertions(+), 554 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/02c15a2f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 8442a77..bc2d51a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -1598,6 +1598,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
@Override
+ public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller,
+ MasterProtos.GetProcedureResultRequest request) throws ServiceException {
+ return stub.getProcedureResult(controller, request);
+ }
+
+ @Override
public MasterProtos.IsMasterRunningResponse isMasterRunning(
RpcController controller, MasterProtos.IsMasterRunningRequest request)
throws ServiceException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/02c15a2f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 21a9139..1697c03 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -31,6 +31,10 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@@ -62,6 +66,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel;
@@ -89,10 +94,12 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
@@ -101,6 +108,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResp
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
@@ -142,6 +151,7 @@ import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@@ -186,6 +196,7 @@ public class HBaseAdmin implements Admin {
// numRetries is for 'normal' stuff... Multiply by this factor when
// want to wait a long time.
private final int retryLongerMultiplier;
+ private final int syncWaitTimeout;
private boolean aborted;
private boolean cleanupConnectionOnClose = false; // close the connection in close()
private boolean closed = false;
@@ -242,6 +253,8 @@ public class HBaseAdmin implements Admin {
"hbase.client.retries.longer.multiplier", 10);
this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+ this.syncWaitTimeout = this.conf.getInt(
+ "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
}
@@ -541,92 +554,23 @@ public class HBaseAdmin implements Admin {
*/
@Override
public void createTable(final HTableDescriptor desc, byte [][] splitKeys)
- throws IOException {
+ throws IOException {
+ Future<Void> future = createTableAsyncV2(desc, splitKeys);
try {
- createTableAsync(desc, splitKeys);
- } catch (SocketTimeoutException ste) {
- LOG.warn("Creating " + desc.getTableName() + " took too long", ste);
- }
- int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
- int prevRegCount = 0;
- boolean tableWasEnabled = false;
- for (int tries = 0; tries < this.numRetries * this.retryLongerMultiplier;
- ++tries) {
- if (tableWasEnabled) {
- // Wait all table regions comes online
- final AtomicInteger actualRegCount = new AtomicInteger(0);
- MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
- @Override
- public boolean visit(Result rowResult) throws IOException {
- RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
- if (list == null) {
- LOG.warn("No serialized HRegionInfo in " + rowResult);
- return true;
- }
- HRegionLocation l = list.getRegionLocation();
- if (l == null) {
- return true;
- }
- if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
- return false;
- }
- if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
- HRegionLocation[] locations = list.getRegionLocations();
- for (HRegionLocation location : locations) {
- if (location == null) continue;
- ServerName serverName = location.getServerName();
- // Make sure that regions are assigned to server
- if (serverName != null && serverName.getHostAndPort() != null) {
- actualRegCount.incrementAndGet();
- }
- }
- return true;
- }
- };
- MetaTableAccessor.scanMetaForTableRegions(connection, visitor, desc.getTableName());
- if (actualRegCount.get() < numRegs) {
- if (tries == this.numRetries * this.retryLongerMultiplier - 1) {
- throw new RegionOfflineException("Only " + actualRegCount.get() +
- " of " + numRegs + " regions are online; retries exhausted.");
- }
- try { // Sleep
- Thread.sleep(getPauseTime(tries));
- } catch (InterruptedException e) {
- throw new InterruptedIOException("Interrupted when opening" +
- " regions; " + actualRegCount.get() + " of " + numRegs +
- " regions processed so far");
- }
- if (actualRegCount.get() > prevRegCount) { // Making progress
- prevRegCount = actualRegCount.get();
- tries = -1;
- }
- } else {
- return;
- }
+ // TODO: how long should we wait? spin forever?
+ future.get(syncWaitTimeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted when waiting" +
+ " for table to be enabled; meta scan was done");
+ } catch (TimeoutException e) {
+ throw new TimeoutIOException(e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException)e.getCause();
} else {
- try {
- tableWasEnabled = isTableAvailable(desc.getTableName());
- } catch (TableNotFoundException tnfe) {
- LOG.debug(
- "Table " + desc.getTableName() + " was not enabled, sleeping, still " + numRetries
- + " retries left");
- }
- if (tableWasEnabled) {
- // no we will scan meta to ensure all regions are online
- tries = -1;
- } else {
- try { // Sleep
- Thread.sleep(getPauseTime(tries));
- } catch (InterruptedException e) {
- throw new InterruptedIOException("Interrupted when waiting" +
- " for table to be enabled; meta scan was done");
- }
- }
+ throw new IOException(e.getCause());
}
}
- throw new TableNotEnabledException(
- "Retries exhausted while still waiting for table: "
- + desc.getTableName() + " to be enabled");
}
/**
@@ -646,22 +590,42 @@ public class HBaseAdmin implements Admin {
* @throws IOException
*/
@Override
- public void createTableAsync(
- final HTableDescriptor desc, final byte [][] splitKeys)
- throws IOException {
- if(desc.getTableName() == null) {
+ public void createTableAsync(final HTableDescriptor desc, final byte [][] splitKeys)
+ throws IOException {
+ createTableAsyncV2(desc, splitKeys);
+ }
+
+ /**
+ * Creates a new table but does not block and wait for it to come online.
+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete.
+ * It may throw ExecutionException if there was an error while executing the operation
+ * or TimeoutException in case the wait timeout was not long enough to allow the
+ * operation to complete.
+ *
+ * @param desc table descriptor for table
+ * @param splitKeys keys to check if the table has been created with all split keys
+ * @throws IllegalArgumentException Bad table name, if the split keys
+ * are repeated and if the split key has empty byte array.
+ * @throws IOException if a remote or network exception occurs
+ * @return the result of the async creation. You can use Future.get(long, TimeUnit)
+ * to wait on the operation to complete.
+ */
+ // TODO: This should be called Async but it will break binary compatibility
+ private Future<Void> createTableAsyncV2(final HTableDescriptor desc, final byte[][] splitKeys)
+ throws IOException {
+ if (desc.getTableName() == null) {
throw new IllegalArgumentException("TableName cannot be null");
}
- if(splitKeys != null && splitKeys.length > 0) {
+ if (splitKeys != null && splitKeys.length > 0) {
Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
// Verify there are no duplicate split keys
- byte [] lastKey = null;
- for(byte [] splitKey : splitKeys) {
+ byte[] lastKey = null;
+ for (byte[] splitKey : splitKeys) {
if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
throw new IllegalArgumentException(
"Empty split key must not be passed in the split keys.");
}
- if(lastKey != null && Bytes.equals(splitKey, lastKey)) {
+ if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
throw new IllegalArgumentException("All split keys must be unique, " +
"found duplicate: " + Bytes.toStringBinary(splitKey) +
", " + Bytes.toStringBinary(lastKey));
@@ -670,14 +634,127 @@ public class HBaseAdmin implements Admin {
}
}
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ CreateTableResponse response = executeCallable(
+ new MasterCallable<CreateTableResponse>(getConnection()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
+ public CreateTableResponse call(int callTimeout) throws ServiceException {
CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys);
- master.createTable(null, request);
- return null;
+ return master.createTable(null, request);
}
});
+ return new CreateTableFuture(this, desc, splitKeys, response);
+ }
+
+ private static class CreateTableFuture extends ProcedureFuture<Void> {
+ private final HTableDescriptor desc;
+ private final byte[][] splitKeys;
+
+ public CreateTableFuture(final HBaseAdmin admin, final HTableDescriptor desc,
+ final byte[][] splitKeys, final CreateTableResponse response) {
+ super(admin, (response != null && response.hasProcId()) ? response.getProcId() : null);
+ this.splitKeys = splitKeys;
+ this.desc = desc;
+ }
+
+ @Override
+ protected Void waitOperationResult(final long deadlineTs)
+ throws IOException, TimeoutException {
+ waitForTableEnabled(deadlineTs);
+ waitForAllRegionsOnline(deadlineTs);
+ return null;
+ }
+
+ @Override
+ protected Void postOperationResult(final Void result, final long deadlineTs)
+ throws IOException, TimeoutException {
+ LOG.info("Created " + desc.getTableName());
+ return result;
+ }
+
+ private void waitForTableEnabled(final long deadlineTs)
+ throws IOException, TimeoutException {
+ waitForState(deadlineTs, new WaitForStateCallable() {
+ @Override
+ public boolean checkState(int tries) throws IOException {
+ try {
+ if (getAdmin().isTableAvailable(desc.getTableName())) {
+ return true;
+ }
+ } catch (TableNotFoundException tnfe) {
+ LOG.debug("Table "+ desc.getTableName() +" was not enabled, sleeping. tries="+ tries);
+ }
+ return false;
+ }
+
+ @Override
+ public void throwInterruptedException() throws InterruptedIOException {
+ throw new InterruptedIOException("Interrupted when waiting for table " +
+ desc.getTableName() + " to be enabled");
+ }
+
+ @Override
+ public void throwTimeoutException(long elapsedTime) throws TimeoutException {
+ throw new TimeoutException("Table " + desc.getTableName() +
+ " not enabled after " + elapsedTime + "msec");
+ }
+ });
+ }
+
+ private void waitForAllRegionsOnline(final long deadlineTs)
+ throws IOException, TimeoutException {
+ final AtomicInteger actualRegCount = new AtomicInteger(0);
+ final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
+ @Override
+ public boolean visit(Result rowResult) throws IOException {
+ RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
+ if (list == null) {
+ LOG.warn("No serialized HRegionInfo in " + rowResult);
+ return true;
+ }
+ HRegionLocation l = list.getRegionLocation();
+ if (l == null) {
+ return true;
+ }
+ if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
+ return false;
+ }
+ if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
+ HRegionLocation[] locations = list.getRegionLocations();
+ for (HRegionLocation location : locations) {
+ if (location == null) continue;
+ ServerName serverName = location.getServerName();
+ // Make sure that regions are assigned to server
+ if (serverName != null && serverName.getHostAndPort() != null) {
+ actualRegCount.incrementAndGet();
+ }
+ }
+ return true;
+ }
+ };
+
+ int tries = 0;
+ IOException serverEx = null;
+ int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
+ while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
+ actualRegCount.set(0);
+ MetaTableAccessor.scanMetaForTableRegions(
+ getAdmin().getConnection(), visitor, desc.getTableName());
+ if (actualRegCount.get() == numRegs) {
+ // all the regions are online
+ return;
+ }
+
+ try {
+ Thread.sleep(getAdmin().getPauseTime(tries++));
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted when opening" +
+ " regions; " + actualRegCount.get() + " of " + numRegs +
+ " regions processed so far");
+ }
+ }
+ throw new TimeoutException("Only " + actualRegCount.get() +
+ " of " + numRegs + " regions are online; retries exhausted.");
+ }
}
public void deleteTable(final String tableName) throws IOException {
@@ -697,48 +774,93 @@ public class HBaseAdmin implements Admin {
*/
@Override
public void deleteTable(final TableName tableName) throws IOException {
- boolean tableExists = true;
+ Future<Void> future = deleteTableAsyncV2(tableName);
+ try {
+ future.get(syncWaitTimeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted when waiting for table to be deleted");
+ } catch (TimeoutException e) {
+ throw new TimeoutIOException(e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException)e.getCause();
+ } else {
+ throw new IOException(e.getCause());
+ }
+ }
+ }
- executeCallable(new MasterCallable<Void>(getConnection()) {
+ /**
+ * Deletes the table but does not block and wait for it be completely removed.
+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete.
+ * It may throw ExecutionException if there was an error while executing the operation
+ * or TimeoutException in case the wait timeout was not long enough to allow the
+ * operation to complete.
+ *
+ * @param desc table descriptor for table
+ * @param tableName name of table to delete
+ * @throws IOException if a remote or network exception occurs
+ * @return the result of the async delete. You can use Future.get(long, TimeUnit)
+ * to wait on the operation to complete.
+ */
+ // TODO: This should be called Async but it will break binary compatibility
+ private Future<Void> deleteTableAsyncV2(final TableName tableName) throws IOException {
+ DeleteTableResponse response = executeCallable(
+ new MasterCallable<DeleteTableResponse>(getConnection()) {
@Override
- public Void call(int callTimeout) throws ServiceException {
+ public DeleteTableResponse call(int callTimeout) throws ServiceException {
DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName);
- master.deleteTable(null,req);
- return null;
+ return master.deleteTable(null,req);
}
});
+ return new DeleteTableFuture(this, tableName, response);
+ }
- int failures = 0;
- for (int tries = 0; tries < (this.numRetries * this.retryLongerMultiplier); tries++) {
- try {
- tableExists = tableExists(tableName);
- if (!tableExists)
- break;
- } catch (IOException ex) {
- failures++;
- if(failures >= numRetries - 1) { // no more tries left
- if (ex instanceof RemoteException) {
- throw ((RemoteException) ex).unwrapRemoteException();
- } else {
- throw ex;
- }
- }
- }
- try {
- Thread.sleep(getPauseTime(tries));
- } catch (InterruptedException e) {
- throw new InterruptedIOException("Interrupted when waiting" +
- " for table to be deleted");
- }
+ private static class DeleteTableFuture extends ProcedureFuture<Void> {
+ private final TableName tableName;
+
+ public DeleteTableFuture(final HBaseAdmin admin, final TableName tableName,
+ final DeleteTableResponse response) {
+ super(admin, (response != null && response.hasProcId()) ? response.getProcId() : null);
+ this.tableName = tableName;
+ }
+
+ @Override
+ protected Void waitOperationResult(final long deadlineTs)
+ throws IOException, TimeoutException {
+ waitTableNotFound(deadlineTs);
+ return null;
+ }
+
+ @Override
+ protected Void postOperationResult(final Void result, final long deadlineTs)
+ throws IOException, TimeoutException {
+ // Delete cached information to prevent clients from using old locations
+ getAdmin().getConnection().clearRegionCache(tableName);
+ LOG.info("Deleted " + tableName);
+ return result;
}
- if (tableExists) {
- throw new IOException("Retries exhausted, it took too long to wait"+
- " for the table " + tableName + " to be deleted.");
+ private void waitTableNotFound(final long deadlineTs)
+ throws IOException, TimeoutException {
+ waitForState(deadlineTs, new WaitForStateCallable() {
+ @Override
+ public boolean checkState(int tries) throws IOException {
+ return !getAdmin().tableExists(tableName);
+ }
+
+ @Override
+ public void throwInterruptedException() throws InterruptedIOException {
+ throw new InterruptedIOException("Interrupted when waiting for table to be deleted");
+ }
+
+ @Override
+ public void throwTimeoutException(long elapsedTime) throws TimeoutException {
+ throw new TimeoutException("Table " + tableName + " not yet deleted after " +
+ elapsedTime + "msec");
+ }
+ });
}
- // Delete cached information to prevent clients from using old locations
- this.connection.clearRegionCache(tableName);
- LOG.info("Deleted " + tableName);
}
/**
@@ -3834,4 +3956,236 @@ public class HBaseAdmin implements Admin {
}
});
}
+
+ /**
+ * Future that waits on a procedure result.
+ * Returned by the async version of the Admin calls,
+ * and used internally by the sync calls to wait on the result of the procedure.
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Evolving
+ protected static class ProcedureFuture<V> implements Future<V> {
+ private ExecutionException exception = null;
+ private boolean procResultFound = false;
+ private boolean done = false;
+ private V result = null;
+
+ private final HBaseAdmin admin;
+ private final Long procId;
+
+ public ProcedureFuture(final HBaseAdmin admin, final Long procId) {
+ this.admin = admin;
+ this.procId = procId;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isCancelled() {
+ // TODO: Abort not implemented yet
+ return false;
+ }
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ // TODO: should we ever spin forever?
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ if (!done) {
+ long deadlineTs = EnvironmentEdgeManager.currentTime() + unit.toMillis(timeout);
+ try {
+ try {
+ // if the master support procedures, try to wait the result
+ if (procId != null) {
+ result = waitProcedureResult(procId, deadlineTs);
+ }
+ // if we don't have a proc result, try the compatibility wait
+ if (!procResultFound) {
+ result = waitOperationResult(deadlineTs);
+ }
+ result = postOperationResult(result, deadlineTs);
+ done = true;
+ } catch (IOException e) {
+ result = postOpeartionFailure(e, deadlineTs);
+ done = true;
+ }
+ } catch (IOException e) {
+ exception = new ExecutionException(e);
+ done = true;
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ return result;
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ protected HBaseAdmin getAdmin() {
+ return admin;
+ }
+
+ private V waitProcedureResult(long procId, long deadlineTs)
+ throws IOException, TimeoutException, InterruptedException {
+ GetProcedureResultRequest request = GetProcedureResultRequest.newBuilder()
+ .setProcId(procId)
+ .build();
+
+ int tries = 0;
+ IOException serviceEx = null;
+ while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
+ GetProcedureResultResponse response = null;
+ try {
+ // Try to fetch the result
+ response = getProcedureResult(request);
+ } catch (IOException e) {
+ serviceEx = unwrapException(e);
+
+ // the master may be down
+ LOG.warn("failed to get the procedure result procId=" + procId, serviceEx);
+
+ // Not much to do, if we have a DoNotRetryIOException
+ if (serviceEx instanceof DoNotRetryIOException) {
+ // TODO: looks like there is no way to unwrap this exception and get the proper
+ // UnsupportedOperationException aside from looking at the message.
+ // anyway, if we fail here we just failover to the compatibility side
+ // and that is always a valid solution.
+ LOG.warn("Proc-v2 is unsupported on this master: " + serviceEx.getMessage(), serviceEx);
+ procResultFound = false;
+ return null;
+ }
+ }
+
+ // If the procedure is no longer running, we should have a result
+ if (response != null && response.getState() != GetProcedureResultResponse.State.RUNNING) {
+ procResultFound = response.getState() != GetProcedureResultResponse.State.NOT_FOUND;
+ return convertResult(response);
+ }
+
+ try {
+ Thread.sleep(getAdmin().getPauseTime(tries++));
+ } catch (InterruptedException e) {
+ throw new InterruptedException(
+ "Interrupted while waiting for the result of proc " + procId);
+ }
+ }
+ if (serviceEx != null) {
+ throw serviceEx;
+ } else {
+ throw new TimeoutException("The procedure " + procId + " is still running");
+ }
+ }
+
+ private static IOException unwrapException(IOException e) {
+ if (e instanceof RemoteException) {
+ return ((RemoteException)e).unwrapRemoteException();
+ }
+ return e;
+ }
+
+ protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest request)
+ throws IOException {
+ return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>(
+ admin.getConnection()) {
+ @Override
+ public GetProcedureResultResponse call(int callTimeout) throws ServiceException {
+ return master.getProcedureResult(null, request);
+ }
+ });
+ }
+
+ /**
+ * Convert the procedure result response to a specified type.
+ * @param response the procedure result object to parse
+ * @return the result data of the procedure.
+ */
+ protected V convertResult(final GetProcedureResultResponse response) throws IOException {
+ if (response.hasException()) {
+ throw ForeignExceptionUtil.toIOException(response.getException());
+ }
+ return null;
+ }
+
+ /**
+ * Fallback implementation in case the procedure is not supported by the server.
+ * It should try to wait until the operation is completed.
+ * @param deadlineTs the timestamp after which this method should throw a TimeoutException
+ * @return the result data of the operation
+ */
+ protected V waitOperationResult(final long deadlineTs)
+ throws IOException, TimeoutException {
+ return null;
+ }
+
+ /**
+ * Called after the operation is completed and the result fetched.
+ * this allows to perform extra steps after the procedure is completed.
+ * it allows to apply transformations to the result that will be returned by get().
+ * @param result the result of the procedure
+ * @param deadlineTs the timestamp after which this method should throw a TimeoutException
+ * @return the result of the procedure, which may be the same as the passed one
+ */
+ protected V postOperationResult(final V result, final long deadlineTs)
+ throws IOException, TimeoutException {
+ return result;
+ }
+
+ /**
+ * Called after the operation is terminated with a failure.
+ * this allows to perform extra steps after the procedure is terminated.
+ * it allows to apply transformations to the result that will be returned by get().
+ * The default implementation will rethrow the exception
+ * @param result the result of the procedure
+ * @param deadlineTs the timestamp after which this method should throw a TimeoutException
+ * @return the result of the procedure, which may be the same as the passed one
+ */
+ protected V postOpeartionFailure(final IOException exception, final long deadlineTs)
+ throws IOException, TimeoutException {
+ throw exception;
+ }
+
+ protected interface WaitForStateCallable {
+ boolean checkState(int tries) throws IOException;
+ void throwInterruptedException() throws InterruptedIOException;
+ void throwTimeoutException(long elapsed) throws TimeoutException;
+ }
+
+ protected void waitForState(final long deadlineTs, final WaitForStateCallable callable)
+ throws IOException, TimeoutException {
+ int tries = 0;
+ IOException serverEx = null;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ while (EnvironmentEdgeManager.currentTime() < deadlineTs) {
+ serverEx = null;
+ try {
+ if (callable.checkState(tries)) {
+ return;
+ }
+ } catch (IOException e) {
+ serverEx = e;
+ }
+ try {
+ Thread.sleep(getAdmin().getPauseTime(tries++));
+ } catch (InterruptedException e) {
+ callable.throwInterruptedException();
+ }
+ }
+ if (serverEx != null) {
+ throw unwrapException(serverEx);
+ } else {
+ callable.throwTimeoutException(EnvironmentEdgeManager.currentTime() - startTime);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/02c15a2f/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestProcedureFuture.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestProcedureFuture.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestProcedureFuture.java
new file mode 100644
index 0000000..da3ffe9
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestProcedureFuture.java
@@ -0,0 +1,186 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({ClientTests.class, SmallTests.class})
+public class TestProcedureFuture {
+ private static class TestFuture extends HBaseAdmin.ProcedureFuture<Void> {
+ private boolean postOperationResultCalled = false;
+ private boolean waitOperationResultCalled = false;
+ private boolean getProcedureResultCalled = false;
+ private boolean convertResultCalled = false;
+
+ public TestFuture(final HBaseAdmin admin, final Long procId) {
+ super(admin, procId);
+ }
+
+ public boolean wasPostOperationResultCalled() {
+ return postOperationResultCalled;
+ }
+
+ public boolean wasWaitOperationResultCalled() {
+ return waitOperationResultCalled;
+ }
+
+ public boolean wasGetProcedureResultCalled() {
+ return getProcedureResultCalled;
+ }
+
+ public boolean wasConvertResultCalled() {
+ return convertResultCalled;
+ }
+
+ @Override
+ protected GetProcedureResultResponse getProcedureResult(
+ final GetProcedureResultRequest request) throws IOException {
+ getProcedureResultCalled = true;
+ return GetProcedureResultResponse.newBuilder()
+ .setState(GetProcedureResultResponse.State.FINISHED)
+ .build();
+ }
+
+ @Override
+ protected Void convertResult(final GetProcedureResultResponse response) throws IOException {
+ convertResultCalled = true;
+ return null;
+ }
+
+ @Override
+ protected Void waitOperationResult(final long deadlineTs)
+ throws IOException, TimeoutException {
+ waitOperationResultCalled = true;
+ return null;
+ }
+
+ @Override
+ protected Void postOperationResult(final Void result, final long deadlineTs)
+ throws IOException, TimeoutException {
+ postOperationResultCalled = true;
+ return result;
+ }
+ }
+
+ /**
+ * When a master return a result with procId,
+ * we are skipping the waitOperationResult() call,
+ * since we are getting the procedure result.
+ */
+ @Test(timeout=60000)
+ public void testWithProcId() throws Exception {
+ HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
+ TestFuture f = new TestFuture(admin, 100L);
+ f.get(1, TimeUnit.MINUTES);
+
+ assertTrue("expected getProcedureResult() to be called", f.wasGetProcedureResultCalled());
+ assertTrue("expected convertResult() to be called", f.wasConvertResultCalled());
+ assertFalse("unexpected waitOperationResult() called", f.wasWaitOperationResultCalled());
+ assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
+ }
+
+ /**
+ * Verify that the spin loop for the procedure running works.
+ */
+ @Test(timeout=60000)
+ public void testWithProcIdAndSpinning() throws Exception {
+ final AtomicInteger spinCount = new AtomicInteger(0);
+ HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
+ TestFuture f = new TestFuture(admin, 100L) {
+ @Override
+ protected GetProcedureResultResponse getProcedureResult(
+ final GetProcedureResultRequest request) throws IOException {
+ boolean done = spinCount.incrementAndGet() >= 10;
+ return GetProcedureResultResponse.newBuilder()
+ .setState(done ? GetProcedureResultResponse.State.FINISHED :
+ GetProcedureResultResponse.State.RUNNING)
+ .build();
+ }
+ };
+ f.get(1, TimeUnit.MINUTES);
+
+ assertEquals(10, spinCount.get());
+ assertTrue("expected convertResult() to be called", f.wasConvertResultCalled());
+ assertFalse("unexpected waitOperationResult() called", f.wasWaitOperationResultCalled());
+ assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
+ }
+
+ /**
+ * When a master return a result without procId,
+ * we are skipping the getProcedureResult() call.
+ */
+ @Test(timeout=60000)
+ public void testWithoutProcId() throws Exception {
+ HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
+ TestFuture f = new TestFuture(admin, null);
+ f.get(1, TimeUnit.MINUTES);
+
+ assertFalse("unexpected getProcedureResult() called", f.wasGetProcedureResultCalled());
+ assertFalse("unexpected convertResult() called", f.wasConvertResultCalled());
+ assertTrue("expected waitOperationResult() to be called", f.wasWaitOperationResultCalled());
+ assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
+ }
+
+ /**
+ * When a new client with procedure support tries to ask an old-master without proc-support
+ * the procedure result we get a DoNotRetryIOException (which is an UnsupportedOperationException)
+ * The future should trap that and fallback to the waitOperationResult().
+ *
+ * This happens when the operation calls happens on a "new master" but while we are waiting
+ * the operation to be completed, we failover on an "old master".
+ */
+ @Test(timeout=60000)
+ public void testOnServerWithNoProcedureSupport() throws Exception {
+ HBaseAdmin admin = Mockito.mock(HBaseAdmin.class);
+ TestFuture f = new TestFuture(admin, 100L) {
+ @Override
+ protected GetProcedureResultResponse getProcedureResult(
+ final GetProcedureResultRequest request) throws IOException {
+ super.getProcedureResult(request);
+ throw new DoNotRetryIOException(new UnsupportedOperationException("getProcedureResult"));
+ }
+ };
+ f.get(1, TimeUnit.MINUTES);
+
+ assertTrue("expected getProcedureResult() to be called", f.wasGetProcedureResultCalled());
+ assertFalse("unexpected convertResult() called", f.wasConvertResultCalled());
+ assertTrue("expected waitOperationResult() to be called", f.wasWaitOperationResultCalled());
+ assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled());
+ }
+}
\ No newline at end of file