You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/06/12 03:45:30 UTC
[kudu] 01/02: Modernize AsyncKuduClient
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 7767abcbcb092b8449a02d985d25c0e3b12023fa
Author: Will Berkeley <wd...@gmail.com>
AuthorDate: Mon Jun 10 15:44:31 2019 -0700
Modernize AsyncKuduClient
Lambdas, etc.
There are no functional changes in this patch.
Change-Id: I046971bf84c48d380c950c988a64c2effcab6f5e
Reviewed-on: http://gerrit.cloudera.org:8080/13589
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
.../org/apache/kudu/client/AsyncKuduClient.java | 509 +++++++++------------
1 file changed, 216 insertions(+), 293 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index b8dac87..e00888f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -584,12 +584,7 @@ public class AsyncKuduClient implements AutoCloseable {
// Add a callback that converts the response into a KuduTable.
Deferred<KuduTable> kuduTableD = createTableD.addCallbackDeferring(
- new Callback<Deferred<KuduTable>, CreateTableResponse>() {
- @Override
- public Deferred<KuduTable> call(CreateTableResponse resp) throws Exception {
- return getTableSchema(name, resp.getTableId(), create);
- }
- });
+ resp -> getTableSchema(name, resp.getTableId(), create));
if (!builder.shouldWait()) {
return kuduTableD;
@@ -597,13 +592,10 @@ public class AsyncKuduClient implements AutoCloseable {
// If requested, add a callback that waits until all of the table's tablets
// have been created.
- return kuduTableD.addCallbackDeferring(new Callback<Deferred<KuduTable>, KuduTable>() {
- @Override
- public Deferred<KuduTable> call(KuduTable tableResp) throws Exception {
- TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder()
- .setTableId(ByteString.copyFromUtf8(tableResp.getTableId()));
- return getDelayedIsCreateTableDoneDeferred(table, create, tableResp);
- }
+ return kuduTableD.addCallbackDeferring(tableResp -> {
+ TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder()
+ .setTableId(ByteString.copyFromUtf8(tableResp.getTableId()));
+ return getDelayedIsCreateTableDoneDeferred(table, create, tableResp);
});
}
@@ -700,15 +692,11 @@ public class AsyncKuduClient implements AutoCloseable {
// If requested, add a callback that waits until all of the table's tablets
// have been altered.
- return responseD.addCallbackDeferring(
- new Callback<Deferred<AlterTableResponse>, AlterTableResponse>() {
- @Override
- public Deferred<AlterTableResponse> call(AlterTableResponse resp) throws Exception {
- TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder()
- .setTableId(ByteString.copyFromUtf8(resp.getTableId()));
- return getDelayedIsAlterTableDoneDeferred(table, alter, resp);
- }
- });
+ return responseD.addCallbackDeferring(resp -> {
+ TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder()
+ .setTableId(ByteString.copyFromUtf8(resp.getTableId()));
+ return getDelayedIsAlterTableDoneDeferred(table, alter, resp);
+ });
}
/**
@@ -773,32 +761,28 @@ public class AsyncKuduClient implements AutoCloseable {
timer,
defaultAdminOperationTimeoutMs,
/*requiresAuthzTokenSupport=*/false);
-
rpc.setParentRpc(parent);
- return sendRpcToTablet(rpc).addCallback(new Callback<KuduTable, GetTableSchemaResponse>() {
- @Override
- public KuduTable call(GetTableSchemaResponse resp) throws Exception {
- // When opening a table, clear the existing cached non-covered range entries.
- // This avoids surprises where a new table instance won't be able to see the
- // current range partitions of a table for up to the ttl.
- TableLocationsCache cache = tableLocations.get(resp.getTableId());
- if (cache != null) {
- cache.clearNonCoveredRangeEntries();
- }
- SignedTokenPB authzToken = resp.getAuthzToken();
- if (authzToken != null) {
- authzTokenCache.put(resp.getTableId(), authzToken);
- }
-
- LOG.debug("Opened table {}", resp.getTableId());
- return new KuduTable(AsyncKuduClient.this,
- resp.getTableName(),
- resp.getTableId(),
- resp.getSchema(),
- resp.getPartitionSchema(),
- resp.getNumReplicas(),
- resp.getExtraConfig());
+ return sendRpcToTablet(rpc).addCallback(resp -> {
+ // When opening a table, clear the existing cached non-covered range entries.
+ // This avoids surprises where a new table instance won't be able to see the
+ // current range partitions of a table for up to the TTL.
+ TableLocationsCache cache = tableLocations.get(resp.getTableId());
+ if (cache != null) {
+ cache.clearNonCoveredRangeEntries();
}
+ SignedTokenPB authzToken = resp.getAuthzToken();
+ if (authzToken != null) {
+ authzTokenCache.put(resp.getTableId(), authzToken);
+ }
+
+ LOG.debug("Opened table {}", resp.getTableId());
+ return new KuduTable(AsyncKuduClient.this,
+ resp.getTableName(),
+ resp.getTableId(),
+ resp.getSchema(),
+ resp.getPartitionSchema(),
+ resp.getNumReplicas(),
+ resp.getExtraConfig());
});
}
@@ -834,26 +818,18 @@ public class AsyncKuduClient implements AutoCloseable {
throw new IllegalArgumentException("The table name cannot be null");
}
- Callback<Deferred<Boolean>, KuduTable> cb = new Callback<Deferred<Boolean>, KuduTable>() {
- @Override
- public Deferred<Boolean> call(KuduTable table) throws Exception {
- return Deferred.fromResult(true);
- }
- };
- Callback<Deferred<Boolean>, Exception> eb = new Callback<Deferred<Boolean>, Exception>() {
- @Override
- public Deferred<Boolean> call(Exception e) throws Exception {
- if (e instanceof NonRecoverableException) {
- Status status = ((NonRecoverableException) e).getStatus();
- if (status.isNotFound()) {
- return Deferred.fromResult(false);
+ return AsyncUtil.addCallbacksDeferring(
+ getTableSchema(name, null, null),
+ _table -> Deferred.fromResult(true),
+ (Callback<Deferred<Boolean>, Exception>) e -> {
+ if (e instanceof NonRecoverableException) {
+ Status status = ((NonRecoverableException) e).getStatus();
+ if (status.isNotFound()) {
+ return Deferred.fromResult(false);
+ }
}
- }
- return Deferred.fromError(e);
- }
- };
-
- return AsyncUtil.addCallbacksDeferring(getTableSchema(name, null, null), cb, eb);
+ return Deferred.fromError(e);
+ });
}
/**
@@ -904,8 +880,7 @@ public class AsyncKuduClient implements AutoCloseable {
// If we've already connected to the master, use the authentication
// credentials that we received when we connected.
if (hasConnectedToMaster) {
- fakeRpc.callback(
- securityContext.exportAuthenticationCredentials());
+ fakeRpc.callback(securityContext.exportAuthenticationCredentials());
return;
}
@@ -916,23 +891,15 @@ public class AsyncKuduClient implements AutoCloseable {
.addCallback(new MasterLookupCB(masterTable,
/* partitionKey */ null,
/* requestedBatchSize */ 1))
- .addCallback(new Callback<Void, Object>() {
- @Override
- public Void call(Object ignored) {
- // Just call ourselves again; we're guaranteed to have the
- // authentication credentials.
- assert hasConnectedToMaster;
- doExportAuthenticationCredentials(fakeRpc);
- return null;
- }
+ .addCallback((Callback<Void, Object>) ignored -> {
+ // Just call ourselves again; we're guaranteed to have the
+ // authentication credentials.
+ assert hasConnectedToMaster;
+ doExportAuthenticationCredentials(fakeRpc);
+ return null;
})
- .addErrback(new RetryTaskErrback<byte[]>(
- fakeRpc, new TimerTask() {
- @Override
- public void run(final Timeout ignored) {
- doExportAuthenticationCredentials(fakeRpc);
- }
- }));
+ .addErrback(new RetryTaskErrback<>(
+ fakeRpc, _ignored -> doExportAuthenticationCredentials(fakeRpc)));
}
@InterfaceAudience.LimitedPrivate("Test")
@@ -977,22 +944,14 @@ public class AsyncKuduClient implements AutoCloseable {
.addCallback(new MasterLookupCB(masterTable,
/* partitionKey */ null,
/* requestedBatchSize */ 1))
- .addCallback(new Callback<Void, Object>() {
- @Override
- public Void call(Object ignored) {
- // Just call ourselves again; we're guaranteed to have the HMS config.
- assert hasConnectedToMaster;
- doGetHiveMetastoreConfig(fakeRpc);
- return null;
- }
+ .addCallback((Callback<Void, Object>) ignored -> {
+ // Just call ourselves again; we're guaranteed to have the HMS config.
+ assert hasConnectedToMaster;
+ doGetHiveMetastoreConfig(fakeRpc);
+ return null;
})
- .addErrback(new RetryTaskErrback<HiveMetastoreConfig>(
- fakeRpc, new TimerTask() {
- @Override
- public void run(final Timeout ignored) {
- doGetHiveMetastoreConfig(fakeRpc);
- }
- }));
+ .addErrback(new RetryTaskErrback<>(
+ fakeRpc, ignored -> doGetHiveMetastoreConfig(fakeRpc)));
}
/**
@@ -1021,7 +980,7 @@ public class AsyncKuduClient implements AutoCloseable {
long sleepTime = getSleepTimeForRpcMillis(fakeRpc);
if (cannotRetryRequest(fakeRpc) ||
fakeRpc.timeoutTracker.wouldSleepingTimeoutMillis(sleepTime)) {
- tooManyAttemptsOrTimeout(fakeRpc, ex); // invokes fakeRpc.Deferred
+ tooManyAttemptsOrTimeout(fakeRpc, ex); // Invokes fakeRpc.Deferred.
return null;
}
fakeRpc.addTrace(
@@ -1033,7 +992,7 @@ public class AsyncKuduClient implements AutoCloseable {
newTimeout(timer, retryTask, sleepTime);
return null;
- // fakeRpc.Deferred was not invoked; the user continues to wait until
+ // fakeRpc.Deferred was not invoked: the user continues to wait until
// retryTask succeeds or fails with a fatal error.
}
@@ -1391,15 +1350,11 @@ public class AsyncKuduClient implements AutoCloseable {
* @param <R> RPC's return type
* @return newly created errback
*/
- private <R> Callback<Exception, Exception> getDelayedIsTableDoneEB(
- final KuduRpc<R> rpc) {
- return new Callback<Exception, Exception>() {
- @Override
- public Exception call(Exception e) throws Exception {
- // TODO maybe we can retry it?
- rpc.errback(e);
- return e;
- }
+ private <R> Callback<Exception, Exception> getDelayedIsTableDoneEB(final KuduRpc<R> rpc) {
+ return e -> {
+ // TODO maybe we can retry it?
+ rpc.errback(e);
+ return e;
};
}
@@ -1433,8 +1388,7 @@ public class AsyncKuduClient implements AutoCloseable {
}
@Override
- Pair<R, Object> deserialize(
- CallResponse callResponse, String tsUUID) throws KuduException {
+ Pair<R, Object> deserialize(CallResponse callResponse, String tsUUID) throws KuduException {
return null;
}
};
@@ -1541,24 +1495,21 @@ public class AsyncKuduClient implements AutoCloseable {
@Nonnull final KuduRpc<AlterTableResponse> rpc,
@Nonnull final TableIdentifierPB.Builder table,
@Nullable final AlterTableResponse alterResp) {
- return new Callback<Deferred<AlterTableResponse>, IsAlterTableDoneResponse>() {
- @Override
- public Deferred<AlterTableResponse> call(IsAlterTableDoneResponse resp) throws Exception {
- // Store the Deferred locally; callback() below will reset it and we'd
- // return a different, non-triggered Deferred.
- Deferred<AlterTableResponse> d = rpc.getDeferred();
- if (resp.isDone()) {
- rpc.callback(alterResp);
- } else {
- rpc.attempt++;
- delayedIsAlterTableDone(
- table,
- rpc,
- getDelayedIsAlterTableDoneCB(rpc, table, alterResp),
- getDelayedIsTableDoneEB(rpc));
- }
- return d;
+ return resp -> {
+ // Store the Deferred locally; callback() below will reset it and we'd
+ // return a different, non-triggered Deferred.
+ Deferred<AlterTableResponse> d = rpc.getDeferred();
+ if (resp.isDone()) {
+ rpc.callback(alterResp);
+ } else {
+ rpc.attempt++;
+ delayedIsAlterTableDone(
+ table,
+ rpc,
+ getDelayedIsAlterTableDoneCB(rpc, table, alterResp),
+ getDelayedIsTableDoneEB(rpc));
}
+ return d;
};
}
@@ -1577,24 +1528,21 @@ public class AsyncKuduClient implements AutoCloseable {
final KuduRpc<KuduTable> rpc,
final TableIdentifierPB.Builder table,
final KuduTable tableResp) {
- return new Callback<Deferred<KuduTable>, IsCreateTableDoneResponse>() {
- @Override
- public Deferred<KuduTable> call(IsCreateTableDoneResponse resp) throws Exception {
- // Store the Deferred locally; callback() below will reset it and we'd
- // return a different, non-triggered Deferred.
- Deferred<KuduTable> d = rpc.getDeferred();
- if (resp.isDone()) {
- rpc.callback(tableResp);
- } else {
- rpc.attempt++;
- delayedIsCreateTableDone(
- table,
- rpc,
- getDelayedIsCreateTableDoneCB(rpc, table, tableResp),
- getDelayedIsTableDoneEB(rpc));
- }
- return d;
+ return resp -> {
+ // Store the Deferred locally; callback() below will reset it and we'd
+ // return a different, non-triggered Deferred.
+ Deferred<KuduTable> d = rpc.getDeferred();
+ if (resp.isDone()) {
+ rpc.callback(tableResp);
+ } else {
+ rpc.attempt++;
+ delayedIsCreateTableDone(
+ table,
+ rpc,
+ getDelayedIsCreateTableDoneCB(rpc, table, tableResp),
+ getDelayedIsTableDoneEB(rpc));
}
+ return d;
};
}
@@ -1779,7 +1727,7 @@ public class AsyncKuduClient implements AutoCloseable {
}
d.addCallback(new MasterLookupCB(table, partitionKey, fetchBatchSize));
if (hasPermit) {
- d.addBoth(new ReleaseMasterLookupPermit<Master.GetTableLocationsResponsePB>());
+ d.addBoth(new ReleaseMasterLookupPermit<>());
}
return d;
}
@@ -1793,44 +1741,41 @@ public class AsyncKuduClient implements AutoCloseable {
// TODO(todd): stop using this 'masterTable' hack.
return ConnectToCluster.run(masterTable, masterAddresses, parentRpc,
defaultAdminOperationTimeoutMs, Connection.CredentialsPolicy.ANY_CREDENTIALS).addCallback(
- new Callback<Master.GetTableLocationsResponsePB, ConnectToClusterResponse>() {
- @Override
- public Master.GetTableLocationsResponsePB call(ConnectToClusterResponse resp) {
- if (resp.getConnectResponse().hasAuthnToken()) {
- // If the response has security info, adopt it.
- securityContext.setAuthenticationToken(resp.getConnectResponse().getAuthnToken());
- }
- List<ByteString> caCerts = resp.getConnectResponse().getCaCertDerList();
- if (!caCerts.isEmpty()) {
- try {
- securityContext.trustCertificates(caCerts);
- } catch (CertificateException e) {
- LOG.warn("Ignoring invalid CA cert from leader {}: {}",
- resp.getLeaderHostAndPort(),
- e.getMessage());
- }
- }
-
- HiveMetastoreConfig hiveMetastoreConfig = null;
- Master.ConnectToMasterResponsePB respPb = resp.getConnectResponse();
- if (respPb.hasHmsConfig()) {
- Master.HiveMetastoreConfig metastoreConf = respPb.getHmsConfig();
- hiveMetastoreConfig = new HiveMetastoreConfig(metastoreConf.getHmsUris(),
- metastoreConf.getHmsSaslEnabled(),
- metastoreConf.getHmsUuid());
- }
- synchronized (AsyncKuduClient.this) {
- AsyncKuduClient.this.hiveMetastoreConfig = hiveMetastoreConfig;
- location = respPb.getClientLocation();
- }
-
- hasConnectedToMaster = true;
-
- // Translate the located master into a TableLocations
- // since the rest of our locations caching code expects this type.
- return resp.getAsTableLocations();
- }
- });
+ resp -> {
+ if (resp.getConnectResponse().hasAuthnToken()) {
+ // If the response has security info, adopt it.
+ securityContext.setAuthenticationToken(resp.getConnectResponse().getAuthnToken());
+ }
+ List<ByteString> caCerts = resp.getConnectResponse().getCaCertDerList();
+ if (!caCerts.isEmpty()) {
+ try {
+ securityContext.trustCertificates(caCerts);
+ } catch (CertificateException e) {
+ LOG.warn("Ignoring invalid CA cert from leader {}: {}",
+ resp.getLeaderHostAndPort(),
+ e.getMessage());
+ }
+ }
+
+ HiveMetastoreConfig hiveMetastoreConfig = null;
+ Master.ConnectToMasterResponsePB respPb = resp.getConnectResponse();
+ if (respPb.hasHmsConfig()) {
+ Master.HiveMetastoreConfig metastoreConf = respPb.getHmsConfig();
+ hiveMetastoreConfig = new HiveMetastoreConfig(metastoreConf.getHmsUris(),
+ metastoreConf.getHmsSaslEnabled(),
+ metastoreConf.getHmsUuid());
+ }
+ synchronized (AsyncKuduClient.this) {
+ AsyncKuduClient.this.hiveMetastoreConfig = hiveMetastoreConfig;
+ location = respPb.getClientLocation();
+ }
+
+ hasConnectedToMaster = true;
+
+ // Translate the located master into a TableLocations
+ // since the rest of our locations caching code expects this type.
+ return resp.getAsTableLocations();
+ });
}
/**
@@ -2023,60 +1968,48 @@ public class AsyncKuduClient implements AutoCloseable {
final TimeoutTracker timeoutTracker = new TimeoutTracker();
timeoutTracker.setTimeout(deadline);
- Callback<Deferred<List<KeyRange>>, List<LocatedTablet>> locateTabletCB =
- new Callback<Deferred<List<KeyRange>>, List<LocatedTablet>>() {
- @Override
- public Deferred<List<KeyRange>> call(List<LocatedTablet> tablets) {
- if (splitSizeBytes <= 0) {
- final List<KeyRange> keyRanges = Lists.newArrayList();
- for (LocatedTablet tablet : tablets) {
- keyRanges.add(new KeyRange(tablet, startPrimaryKey, endPrimaryKey, -1));
- }
- return Deferred.fromResult(keyRanges);
- } else {
- List<Deferred<List<KeyRange>>> deferreds = new ArrayList<>();
- for (LocatedTablet tablet : tablets) {
- // Build a fake RPC to encapsulate and propagate the timeout.
- // There's no actual "RPC" to send.
- KuduRpc fakeRpc = buildFakeRpc("getTableKeyRanges",
- null,
- timeoutTracker.getMillisBeforeTimeout());
- deferreds.add(getTabletKeyRanges(table,
- startPrimaryKey,
- endPrimaryKey,
- tablet.getPartition().getPartitionKeyStart(),
- splitSizeBytes,
- fakeRpc).addCallbackDeferring(
- new Callback<Deferred<List<KeyRange>>, SplitKeyRangeResponse>() {
- @Override
- public Deferred<List<KeyRange>> call(SplitKeyRangeResponse resp) {
- final List<KeyRange> ranges = Lists.newArrayList();
- for (Common.KeyRangePB pb : resp.getKeyRanges()) {
- KeyRange newRange = new KeyRange(tablet,
- pb.getStartPrimaryKey().toByteArray(),
- pb.getStopPrimaryKey().toByteArray(),
- pb.getSizeBytesEstimates());
- ranges.add(newRange);
- LOG.debug("Add key range {}", newRange);
- }
- return Deferred.fromResult(ranges);
- }
- }));
- }
- // Must preserve the order.
- return Deferred.groupInOrder(deferreds).addCallbackDeferring(
- new Callback<Deferred<List<KeyRange>>, ArrayList<List<KeyRange>>>() {
- @Override
- public Deferred<List<KeyRange>> call(ArrayList<List<KeyRange>> rangeLists) {
- final List<KeyRange> ret = Lists.newArrayList();
- for (List<KeyRange> ranges : rangeLists) {
- ret.addAll(ranges);
- }
- return Deferred.fromResult(ret);
- }
- });
+ Callback<Deferred<List<KeyRange>>, List<LocatedTablet>> locateTabletCB = tablets -> {
+ if (splitSizeBytes <= 0) {
+ final List<KeyRange> keyRanges = Lists.newArrayList();
+ for (LocatedTablet tablet : tablets) {
+ keyRanges.add(new KeyRange(tablet, startPrimaryKey, endPrimaryKey, -1));
}
+ return Deferred.fromResult(keyRanges);
}
+ List<Deferred<List<KeyRange>>> deferreds = new java.util.ArrayList<>();
+ for (LocatedTablet tablet : tablets) {
+ // Build a fake RPC to encapsulate and propagate the timeout.
+ // There's no actual "RPC" to send.
+ KuduRpc fakeRpc = buildFakeRpc("getTableKeyRanges",
+ null,
+ timeoutTracker.getMillisBeforeTimeout());
+ deferreds.add(getTabletKeyRanges(table,
+ startPrimaryKey,
+ endPrimaryKey,
+ tablet.getPartition().getPartitionKeyStart(),
+ splitSizeBytes,
+ fakeRpc)
+ .addCallbackDeferring(resp -> {
+ final List<KeyRange> ranges = Lists.newArrayList();
+ for (Common.KeyRangePB pb : resp.getKeyRanges()) {
+ KeyRange newRange = new KeyRange(tablet,
+ pb.getStartPrimaryKey().toByteArray(),
+ pb.getStopPrimaryKey().toByteArray(),
+ pb.getSizeBytesEstimates());
+ ranges.add(newRange);
+ LOG.debug("Add key range {}", newRange);
+ }
+ return Deferred.fromResult(ranges);
+ }));
+ }
+ // Must preserve the order.
+ return Deferred.groupInOrder(deferreds).addCallbackDeferring(rangeLists -> {
+ final List<KeyRange> ret = Lists.newArrayList();
+ for (List<KeyRange> ranges : rangeLists) {
+ ret.addAll(ranges);
+ }
+ return Deferred.fromResult(ret);
+ });
};
final List<LocatedTablet> tablets = Lists.newArrayList();
@@ -2085,7 +2018,8 @@ public class AsyncKuduClient implements AutoCloseable {
endPartitionKey,
fetchBatchSize,
tablets,
- timeoutTracker).addCallbackDeferring(locateTabletCB);
+ timeoutTracker)
+ .addCallbackDeferring(locateTabletCB);
}
/**
@@ -2173,17 +2107,6 @@ public class AsyncKuduClient implements AutoCloseable {
* Deferred back to the user.
*/
private <R> Deferred<R> delayedSendRpcToTablet(final KuduRpc<R> rpc, KuduException ex) {
- // Here we simply retry the RPC later. We might be doing this along with a lot of other RPCs
- // in parallel. Asynchbase does some hacking with a "probe" RPC while putting the other ones
- // on hold but we won't be doing this for the moment. Regions in HBase can move a lot,
- // we're not expecting this in Kudu.
- final class RetryTimer implements TimerTask {
- @Override
- public void run(final Timeout timeout) {
- sendRpcToTablet(rpc);
- }
- }
-
assert (ex != null);
Status reasonForRetry = ex.getStatus();
rpc.addTrace(
@@ -2199,7 +2122,11 @@ public class AsyncKuduClient implements AutoCloseable {
// Don't let it retry.
return tooManyAttemptsOrTimeout(rpc, ex);
}
- newTimeout(timer, new RetryTimer(), sleepTime);
+ // Here we simply retry the RPC later. We might be doing this along with a lot of other RPCs
+ // in parallel. Asynchbase does some hacking with a "probe" RPC while putting the other ones
+ // on hold but we won't be doing this for the moment. Regions in HBase can move a lot,
+ // we're not expecting this in Kudu.
+ newTimeout(timer, _timeout -> sendRpcToTablet(rpc), sleepTime);
return rpc.getDeferred();
}
@@ -2442,13 +2369,13 @@ public class AsyncKuduClient implements AutoCloseable {
* @param table the table
* @param partitionKey the partition key of the tablet to look up in the table
* @param lookupType the type of lookup to use
- * @param deadline deadline in milliseconds for this lookup to finish
+ * @param timeoutMs timeout in milliseconds for this lookup to finish
* @return a deferred containing the located tablet
*/
Deferred<LocatedTablet> getTabletLocation(final KuduTable table,
final byte[] partitionKey,
final LookupType lookupType,
- long timeout) {
+ long timeoutMs) {
// Locate the tablet at the partition key by locating tablets between
// the partition key (inclusive), and the incremented partition key (exclusive).
@@ -2457,54 +2384,52 @@ public class AsyncKuduClient implements AutoCloseable {
byte[] endPartitionKey;
if (partitionKey.length == 0) {
startPartitionKey = null;
- endPartitionKey = new byte[] { 0x00 };
+ endPartitionKey = new byte[]{0x00};
} else {
startPartitionKey = partitionKey;
endPartitionKey = Arrays.copyOf(partitionKey, partitionKey.length + 1);
}
final TimeoutTracker timeoutTracker = new TimeoutTracker();
- timeoutTracker.setTimeout(timeout);
+ timeoutTracker.setTimeout(timeoutMs);
Deferred<List<LocatedTablet>> locatedTablets = locateTable(
- table, startPartitionKey, endPartitionKey, FETCH_TABLETS_PER_POINT_LOOKUP, timeout);
+ table, startPartitionKey, endPartitionKey, FETCH_TABLETS_PER_POINT_LOOKUP, timeoutMs);
// Then pick out the single tablet result from the list.
- return locatedTablets.addCallbackDeferring(
- new Callback<Deferred<LocatedTablet>, List<LocatedTablet>>() {
- @Override
- public Deferred<LocatedTablet> call(List<LocatedTablet> tablets) {
- Preconditions.checkArgument(tablets.size() <= 1,
- "found more than one tablet for a single partition key");
- if (tablets.isEmpty()) {
- // Most likely this indicates a non-covered range, but since this
- // could race with an alter table partitioning operation (which
- // clears the local table locations cache), we check again.
- TableLocationsCache.Entry entry = getTableLocationEntry(table.getTableId(),
- partitionKey);
-
- if (entry == null) {
- // This should be extremely rare, but a potential source of tight loops.
- LOG.debug("Table location expired before it could be processed; retrying.");
- return Deferred.fromError(new RecoverableException(Status.NotFound(
- "Table location expired before it could be processed")));
- }
- if (entry.isNonCoveredRange()) {
- if (lookupType == LookupType.POINT
- || entry.getUpperBoundPartitionKey().length == 0) {
- return Deferred.fromError(
- new NonCoveredRangeException(entry.getLowerBoundPartitionKey(),
- entry.getUpperBoundPartitionKey()));
- }
- // This is a LOWER_BOUND lookup, get the tablet location from the upper bound key
- // of the non-covered range to return the next valid tablet location.
- return getTabletLocation(table, entry.getUpperBoundPartitionKey(),
- LookupType.POINT, timeoutTracker.getMillisBeforeTimeout());
- }
- return Deferred.fromResult(new LocatedTablet(entry.getTablet()));
- }
- return Deferred.fromResult(tablets.get(0));
+ return locatedTablets.addCallbackDeferring(tablets -> {
+ Preconditions.checkArgument(tablets.size() <= 1,
+ "found more than one tablet for a single partition key");
+ if (tablets.isEmpty()) {
+ // Most likely this indicates a non-covered range, but since this
+ // could race with an alter table partitioning operation (which
+ // clears the local table locations cache), we check again.
+ TableLocationsCache.Entry entry = getTableLocationEntry(table.getTableId(),
+ partitionKey);
+
+ if (entry == null) {
+ // This should be extremely rare, but a potential source of tight loops.
+ LOG.debug("Table location expired before it could be processed; retrying.");
+ return Deferred.fromError(new RecoverableException(Status.NotFound(
+ "Table location expired before it could be processed")));
+ }
+ if (entry.isNonCoveredRange()) {
+ if (lookupType == LookupType.POINT
+ || entry.getUpperBoundPartitionKey().length == 0) {
+ return Deferred.fromError(
+ new NonCoveredRangeException(entry.getLowerBoundPartitionKey(),
+ entry.getUpperBoundPartitionKey()));
}
- });
+ // This is a LOWER_BOUND lookup, get the tablet location from the upper bound key
+ // of the non-covered range to return the next valid tablet location.
+ return getTabletLocation(table,
+ entry.getUpperBoundPartitionKey(),
+ LookupType.POINT,
+ timeoutTracker.getMillisBeforeTimeout());
+ }
+ return Deferred.fromResult(new LocatedTablet(entry.getTablet()));
+ }
+ return Deferred.fromResult(tablets.get(0));
+ });
}
/**
@@ -2685,8 +2610,7 @@ public class AsyncKuduClient implements AutoCloseable {
* @param masterAddresses comma-separated list of "host:port" pairs of the masters
*/
public AsyncKuduClientBuilder(String masterAddresses) {
- this.masterAddresses =
- NetUtil.parseStrings(masterAddresses, DEFAULT_MASTER_PORT);
+ this.masterAddresses = NetUtil.parseStrings(masterAddresses, DEFAULT_MASTER_PORT);
}
/**
@@ -2706,8 +2630,7 @@ public class AsyncKuduClient implements AutoCloseable {
* @param masterAddresses list of master addresses
*/
public AsyncKuduClientBuilder(List<String> masterAddresses) {
- this.masterAddresses =
- Lists.newArrayListWithCapacity(masterAddresses.size());
+ this.masterAddresses = Lists.newArrayListWithCapacity(masterAddresses.size());
for (String address : masterAddresses) {
this.masterAddresses.add(
NetUtil.parseString(address, DEFAULT_MASTER_PORT));