You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/06/12 03:45:30 UTC

[kudu] 01/02: Modernize AsyncKuduClient

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 7767abcbcb092b8449a02d985d25c0e3b12023fa
Author: Will Berkeley <wd...@gmail.com>
AuthorDate: Mon Jun 10 15:44:31 2019 -0700

    Modernize AsyncKuduClient
    
    Lambdas, etc.
    
    There are no functional changes in this patch.
    
    Change-Id: I046971bf84c48d380c950c988a64c2effcab6f5e
    Reviewed-on: http://gerrit.cloudera.org:8080/13589
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    | 509 +++++++++------------
 1 file changed, 216 insertions(+), 293 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index b8dac87..e00888f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -584,12 +584,7 @@ public class AsyncKuduClient implements AutoCloseable {
 
     // Add a callback that converts the response into a KuduTable.
     Deferred<KuduTable> kuduTableD = createTableD.addCallbackDeferring(
-        new Callback<Deferred<KuduTable>, CreateTableResponse>() {
-          @Override
-          public Deferred<KuduTable> call(CreateTableResponse resp) throws Exception {
-            return getTableSchema(name, resp.getTableId(), create);
-          }
-        });
+        resp -> getTableSchema(name, resp.getTableId(), create));
 
     if (!builder.shouldWait()) {
       return kuduTableD;
@@ -597,13 +592,10 @@ public class AsyncKuduClient implements AutoCloseable {
 
     // If requested, add a callback that waits until all of the table's tablets
     // have been created.
-    return kuduTableD.addCallbackDeferring(new Callback<Deferred<KuduTable>, KuduTable>() {
-      @Override
-      public Deferred<KuduTable> call(KuduTable tableResp) throws Exception {
-        TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder()
-            .setTableId(ByteString.copyFromUtf8(tableResp.getTableId()));
-        return getDelayedIsCreateTableDoneDeferred(table, create, tableResp);
-      }
+    return kuduTableD.addCallbackDeferring(tableResp -> {
+      TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder()
+          .setTableId(ByteString.copyFromUtf8(tableResp.getTableId()));
+      return getDelayedIsCreateTableDoneDeferred(table, create, tableResp);
     });
   }
 
@@ -700,15 +692,11 @@ public class AsyncKuduClient implements AutoCloseable {
 
     // If requested, add a callback that waits until all of the table's tablets
     // have been altered.
-    return responseD.addCallbackDeferring(
-        new Callback<Deferred<AlterTableResponse>, AlterTableResponse>() {
-      @Override
-      public Deferred<AlterTableResponse> call(AlterTableResponse resp) throws Exception {
-        TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder()
-            .setTableId(ByteString.copyFromUtf8(resp.getTableId()));
-        return getDelayedIsAlterTableDoneDeferred(table, alter, resp);
-      }
-    });
+    return responseD.addCallbackDeferring(resp -> {
+          TableIdentifierPB.Builder table = TableIdentifierPB.newBuilder()
+              .setTableId(ByteString.copyFromUtf8(resp.getTableId()));
+          return getDelayedIsAlterTableDoneDeferred(table, alter, resp);
+        });
   }
 
   /**
@@ -773,32 +761,28 @@ public class AsyncKuduClient implements AutoCloseable {
                                                           timer,
                                                           defaultAdminOperationTimeoutMs,
                                                           /*requiresAuthzTokenSupport=*/false);
-
     rpc.setParentRpc(parent);
-    return sendRpcToTablet(rpc).addCallback(new Callback<KuduTable, GetTableSchemaResponse>() {
-      @Override
-      public KuduTable call(GetTableSchemaResponse resp) throws Exception {
-        // When opening a table, clear the existing cached non-covered range entries.
-        // This avoids surprises where a new table instance won't be able to see the
-        // current range partitions of a table for up to the ttl.
-        TableLocationsCache cache = tableLocations.get(resp.getTableId());
-        if (cache != null) {
-          cache.clearNonCoveredRangeEntries();
-        }
-        SignedTokenPB authzToken = resp.getAuthzToken();
-        if (authzToken != null) {
-          authzTokenCache.put(resp.getTableId(), authzToken);
-        }
-
-        LOG.debug("Opened table {}", resp.getTableId());
-        return new KuduTable(AsyncKuduClient.this,
-            resp.getTableName(),
-            resp.getTableId(),
-            resp.getSchema(),
-            resp.getPartitionSchema(),
-            resp.getNumReplicas(),
-            resp.getExtraConfig());
+    return sendRpcToTablet(rpc).addCallback(resp -> {
+      // When opening a table, clear the existing cached non-covered range entries.
+      // This avoids surprises where a new table instance won't be able to see the
+      // current range partitions of a table for up to the TTL.
+      TableLocationsCache cache = tableLocations.get(resp.getTableId());
+      if (cache != null) {
+        cache.clearNonCoveredRangeEntries();
       }
+      SignedTokenPB authzToken = resp.getAuthzToken();
+      if (authzToken != null) {
+        authzTokenCache.put(resp.getTableId(), authzToken);
+      }
+
+      LOG.debug("Opened table {}", resp.getTableId());
+      return new KuduTable(AsyncKuduClient.this,
+                           resp.getTableName(),
+                           resp.getTableId(),
+                           resp.getSchema(),
+                           resp.getPartitionSchema(),
+                           resp.getNumReplicas(),
+                           resp.getExtraConfig());
     });
   }
 
@@ -834,26 +818,18 @@ public class AsyncKuduClient implements AutoCloseable {
       throw new IllegalArgumentException("The table name cannot be null");
     }
 
-    Callback<Deferred<Boolean>, KuduTable> cb = new Callback<Deferred<Boolean>, KuduTable>() {
-      @Override
-      public Deferred<Boolean> call(KuduTable table) throws Exception {
-        return Deferred.fromResult(true);
-      }
-    };
-    Callback<Deferred<Boolean>, Exception> eb = new Callback<Deferred<Boolean>, Exception>() {
-      @Override
-      public Deferred<Boolean> call(Exception e) throws Exception {
-        if (e instanceof NonRecoverableException) {
-          Status status = ((NonRecoverableException) e).getStatus();
-          if (status.isNotFound()) {
-            return Deferred.fromResult(false);
+    return AsyncUtil.addCallbacksDeferring(
+        getTableSchema(name, null, null),
+        _table -> Deferred.fromResult(true),
+        (Callback<Deferred<Boolean>, Exception>) e -> {
+          if (e instanceof NonRecoverableException) {
+            Status status = ((NonRecoverableException) e).getStatus();
+            if (status.isNotFound()) {
+              return Deferred.fromResult(false);
+            }
           }
-        }
-        return Deferred.fromError(e);
-      }
-    };
-
-    return AsyncUtil.addCallbacksDeferring(getTableSchema(name, null, null), cb, eb);
+          return Deferred.fromError(e);
+        });
   }
 
   /**
@@ -904,8 +880,7 @@ public class AsyncKuduClient implements AutoCloseable {
     // If we've already connected to the master, use the authentication
     // credentials that we received when we connected.
     if (hasConnectedToMaster) {
-      fakeRpc.callback(
-          securityContext.exportAuthenticationCredentials());
+      fakeRpc.callback(securityContext.exportAuthenticationCredentials());
       return;
     }
 
@@ -916,23 +891,15 @@ public class AsyncKuduClient implements AutoCloseable {
         .addCallback(new MasterLookupCB(masterTable,
                                         /* partitionKey */ null,
                                         /* requestedBatchSize */ 1))
-        .addCallback(new Callback<Void, Object>() {
-          @Override
-          public Void call(Object ignored) {
-            // Just call ourselves again; we're guaranteed to have the
-            // authentication credentials.
-            assert hasConnectedToMaster;
-            doExportAuthenticationCredentials(fakeRpc);
-            return null;
-          }
+        .addCallback((Callback<Void, Object>) ignored -> {
+          // Just call ourselves again; we're guaranteed to have the
+          // authentication credentials.
+          assert hasConnectedToMaster;
+          doExportAuthenticationCredentials(fakeRpc);
+          return null;
         })
-        .addErrback(new RetryTaskErrback<byte[]>(
-            fakeRpc, new TimerTask() {
-                @Override
-                public void run(final Timeout ignored) {
-                  doExportAuthenticationCredentials(fakeRpc);
-                }
-              }));
+        .addErrback(new RetryTaskErrback<>(
+            fakeRpc, _ignored -> doExportAuthenticationCredentials(fakeRpc)));
   }
 
   @InterfaceAudience.LimitedPrivate("Test")
@@ -977,22 +944,14 @@ public class AsyncKuduClient implements AutoCloseable {
         .addCallback(new MasterLookupCB(masterTable,
                                         /* partitionKey */ null,
                                         /* requestedBatchSize */ 1))
-        .addCallback(new Callback<Void, Object>() {
-          @Override
-          public Void call(Object ignored) {
-            // Just call ourselves again; we're guaranteed to have the HMS config.
-            assert hasConnectedToMaster;
-            doGetHiveMetastoreConfig(fakeRpc);
-            return null;
-          }
+        .addCallback((Callback<Void, Object>) ignored -> {
+          // Just call ourselves again; we're guaranteed to have the HMS config.
+          assert hasConnectedToMaster;
+          doGetHiveMetastoreConfig(fakeRpc);
+          return null;
         })
-        .addErrback(new RetryTaskErrback<HiveMetastoreConfig>(
-            fakeRpc, new TimerTask() {
-                @Override
-                public void run(final Timeout ignored) {
-                  doGetHiveMetastoreConfig(fakeRpc);
-                }
-              }));
+        .addErrback(new RetryTaskErrback<>(
+            fakeRpc, ignored -> doGetHiveMetastoreConfig(fakeRpc)));
   }
 
   /**
@@ -1021,7 +980,7 @@ public class AsyncKuduClient implements AutoCloseable {
       long sleepTime = getSleepTimeForRpcMillis(fakeRpc);
       if (cannotRetryRequest(fakeRpc) ||
           fakeRpc.timeoutTracker.wouldSleepingTimeoutMillis(sleepTime)) {
-        tooManyAttemptsOrTimeout(fakeRpc, ex); // invokes fakeRpc.Deferred
+        tooManyAttemptsOrTimeout(fakeRpc, ex); // Invokes fakeRpc.Deferred.
         return null;
       }
       fakeRpc.addTrace(
@@ -1033,7 +992,7 @@ public class AsyncKuduClient implements AutoCloseable {
       newTimeout(timer, retryTask, sleepTime);
       return null;
 
-      // fakeRpc.Deferred was not invoked; the user continues to wait until
+      // fakeRpc.Deferred was not invoked: the user continues to wait until
       // retryTask succeeds or fails with a fatal error.
     }
 
@@ -1391,15 +1350,11 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param <R> RPC's return type
    * @return newly created errback
    */
-  private <R> Callback<Exception, Exception> getDelayedIsTableDoneEB(
-      final KuduRpc<R> rpc) {
-    return new Callback<Exception, Exception>() {
-      @Override
-      public Exception call(Exception e) throws Exception {
-        // TODO maybe we can retry it?
-        rpc.errback(e);
-        return e;
-      }
+  private <R> Callback<Exception, Exception> getDelayedIsTableDoneEB(final KuduRpc<R> rpc) {
+    return e -> {
+      // TODO maybe we can retry it?
+      rpc.errback(e);
+      return e;
     };
   }
 
@@ -1433,8 +1388,7 @@ public class AsyncKuduClient implements AutoCloseable {
       }
 
       @Override
-      Pair<R, Object> deserialize(
-          CallResponse callResponse, String tsUUID) throws KuduException {
+      Pair<R, Object> deserialize(CallResponse callResponse, String tsUUID) throws KuduException {
         return null;
       }
     };
@@ -1541,24 +1495,21 @@ public class AsyncKuduClient implements AutoCloseable {
       @Nonnull final KuduRpc<AlterTableResponse> rpc,
       @Nonnull final TableIdentifierPB.Builder table,
       @Nullable final AlterTableResponse alterResp) {
-    return new Callback<Deferred<AlterTableResponse>, IsAlterTableDoneResponse>() {
-      @Override
-      public Deferred<AlterTableResponse> call(IsAlterTableDoneResponse resp) throws Exception {
-        // Store the Deferred locally; callback() below will reset it and we'd
-        // return a different, non-triggered Deferred.
-        Deferred<AlterTableResponse> d = rpc.getDeferred();
-        if (resp.isDone()) {
-          rpc.callback(alterResp);
-        } else {
-          rpc.attempt++;
-          delayedIsAlterTableDone(
-              table,
-              rpc,
-              getDelayedIsAlterTableDoneCB(rpc, table, alterResp),
-              getDelayedIsTableDoneEB(rpc));
-        }
-        return d;
+    return resp -> {
+      // Store the Deferred locally; callback() below will reset it and we'd
+      // return a different, non-triggered Deferred.
+      Deferred<AlterTableResponse> d = rpc.getDeferred();
+      if (resp.isDone()) {
+        rpc.callback(alterResp);
+      } else {
+        rpc.attempt++;
+        delayedIsAlterTableDone(
+            table,
+            rpc,
+            getDelayedIsAlterTableDoneCB(rpc, table, alterResp),
+            getDelayedIsTableDoneEB(rpc));
       }
+      return d;
     };
   }
 
@@ -1577,24 +1528,21 @@ public class AsyncKuduClient implements AutoCloseable {
       final KuduRpc<KuduTable> rpc,
       final TableIdentifierPB.Builder table,
       final KuduTable tableResp) {
-    return new Callback<Deferred<KuduTable>, IsCreateTableDoneResponse>() {
-      @Override
-      public Deferred<KuduTable> call(IsCreateTableDoneResponse resp) throws Exception {
-        // Store the Deferred locally; callback() below will reset it and we'd
-        // return a different, non-triggered Deferred.
-        Deferred<KuduTable> d = rpc.getDeferred();
-        if (resp.isDone()) {
-          rpc.callback(tableResp);
-        } else {
-          rpc.attempt++;
-          delayedIsCreateTableDone(
-              table,
-              rpc,
-              getDelayedIsCreateTableDoneCB(rpc, table, tableResp),
-              getDelayedIsTableDoneEB(rpc));
-        }
-        return d;
+    return resp -> {
+      // Store the Deferred locally; callback() below will reset it and we'd
+      // return a different, non-triggered Deferred.
+      Deferred<KuduTable> d = rpc.getDeferred();
+      if (resp.isDone()) {
+        rpc.callback(tableResp);
+      } else {
+        rpc.attempt++;
+        delayedIsCreateTableDone(
+            table,
+            rpc,
+            getDelayedIsCreateTableDoneCB(rpc, table, tableResp),
+            getDelayedIsTableDoneEB(rpc));
       }
+      return d;
     };
   }
 
@@ -1779,7 +1727,7 @@ public class AsyncKuduClient implements AutoCloseable {
     }
     d.addCallback(new MasterLookupCB(table, partitionKey, fetchBatchSize));
     if (hasPermit) {
-      d.addBoth(new ReleaseMasterLookupPermit<Master.GetTableLocationsResponsePB>());
+      d.addBoth(new ReleaseMasterLookupPermit<>());
     }
     return d;
   }
@@ -1793,44 +1741,41 @@ public class AsyncKuduClient implements AutoCloseable {
     // TODO(todd): stop using this 'masterTable' hack.
     return ConnectToCluster.run(masterTable, masterAddresses, parentRpc,
         defaultAdminOperationTimeoutMs, Connection.CredentialsPolicy.ANY_CREDENTIALS).addCallback(
-            new Callback<Master.GetTableLocationsResponsePB, ConnectToClusterResponse>() {
-              @Override
-              public Master.GetTableLocationsResponsePB call(ConnectToClusterResponse resp) {
-                if (resp.getConnectResponse().hasAuthnToken()) {
-                  // If the response has security info, adopt it.
-                  securityContext.setAuthenticationToken(resp.getConnectResponse().getAuthnToken());
-                }
-                List<ByteString> caCerts = resp.getConnectResponse().getCaCertDerList();
-                if (!caCerts.isEmpty()) {
-                  try {
-                    securityContext.trustCertificates(caCerts);
-                  } catch (CertificateException e) {
-                    LOG.warn("Ignoring invalid CA cert from leader {}: {}",
-                        resp.getLeaderHostAndPort(),
-                        e.getMessage());
-                  }
-                }
-
-                HiveMetastoreConfig hiveMetastoreConfig = null;
-                Master.ConnectToMasterResponsePB respPb = resp.getConnectResponse();
-                if (respPb.hasHmsConfig()) {
-                  Master.HiveMetastoreConfig metastoreConf = respPb.getHmsConfig();
-                  hiveMetastoreConfig = new HiveMetastoreConfig(metastoreConf.getHmsUris(),
-                                                                metastoreConf.getHmsSaslEnabled(),
-                                                                metastoreConf.getHmsUuid());
-                }
-                synchronized (AsyncKuduClient.this) {
-                  AsyncKuduClient.this.hiveMetastoreConfig = hiveMetastoreConfig;
-                  location = respPb.getClientLocation();
-                }
-
-                hasConnectedToMaster = true;
-
-                // Translate the located master into a TableLocations
-                // since the rest of our locations caching code expects this type.
-                return resp.getAsTableLocations();
-              }
-            });
+        resp -> {
+          if (resp.getConnectResponse().hasAuthnToken()) {
+            // If the response has security info, adopt it.
+            securityContext.setAuthenticationToken(resp.getConnectResponse().getAuthnToken());
+          }
+          List<ByteString> caCerts = resp.getConnectResponse().getCaCertDerList();
+          if (!caCerts.isEmpty()) {
+            try {
+              securityContext.trustCertificates(caCerts);
+            } catch (CertificateException e) {
+              LOG.warn("Ignoring invalid CA cert from leader {}: {}",
+                       resp.getLeaderHostAndPort(),
+                       e.getMessage());
+            }
+          }
+
+          HiveMetastoreConfig hiveMetastoreConfig = null;
+          Master.ConnectToMasterResponsePB respPb = resp.getConnectResponse();
+          if (respPb.hasHmsConfig()) {
+            Master.HiveMetastoreConfig metastoreConf = respPb.getHmsConfig();
+            hiveMetastoreConfig = new HiveMetastoreConfig(metastoreConf.getHmsUris(),
+                                                          metastoreConf.getHmsSaslEnabled(),
+                                                          metastoreConf.getHmsUuid());
+          }
+          synchronized (AsyncKuduClient.this) {
+            AsyncKuduClient.this.hiveMetastoreConfig = hiveMetastoreConfig;
+            location = respPb.getClientLocation();
+          }
+
+          hasConnectedToMaster = true;
+
+          // Translate the located master into a TableLocations
+          // since the rest of our locations caching code expects this type.
+          return resp.getAsTableLocations();
+        });
   }
 
   /**
@@ -2023,60 +1968,48 @@ public class AsyncKuduClient implements AutoCloseable {
     final TimeoutTracker timeoutTracker = new TimeoutTracker();
     timeoutTracker.setTimeout(deadline);
 
-    Callback<Deferred<List<KeyRange>>, List<LocatedTablet>> locateTabletCB =
-        new Callback<Deferred<List<KeyRange>>, List<LocatedTablet>>() {
-      @Override
-      public Deferred<List<KeyRange>> call(List<LocatedTablet> tablets) {
-        if (splitSizeBytes <= 0) {
-          final List<KeyRange> keyRanges = Lists.newArrayList();
-          for (LocatedTablet tablet : tablets) {
-            keyRanges.add(new KeyRange(tablet, startPrimaryKey, endPrimaryKey, -1));
-          }
-          return Deferred.fromResult(keyRanges);
-        } else {
-          List<Deferred<List<KeyRange>>> deferreds = new ArrayList<>();
-          for (LocatedTablet tablet : tablets) {
-            // Build a fake RPC to encapsulate and propagate the timeout.
-            // There's no actual "RPC" to send.
-            KuduRpc fakeRpc = buildFakeRpc("getTableKeyRanges",
-                                           null,
-                                           timeoutTracker.getMillisBeforeTimeout());
-            deferreds.add(getTabletKeyRanges(table,
-                                             startPrimaryKey,
-                                             endPrimaryKey,
-                                             tablet.getPartition().getPartitionKeyStart(),
-                                             splitSizeBytes,
-                                             fakeRpc).addCallbackDeferring(
-                new Callback<Deferred<List<KeyRange>>, SplitKeyRangeResponse>() {
-                  @Override
-                  public Deferred<List<KeyRange>> call(SplitKeyRangeResponse resp) {
-                    final List<KeyRange> ranges = Lists.newArrayList();
-                    for (Common.KeyRangePB pb : resp.getKeyRanges()) {
-                      KeyRange newRange = new KeyRange(tablet,
-                                                       pb.getStartPrimaryKey().toByteArray(),
-                                                       pb.getStopPrimaryKey().toByteArray(),
-                                                       pb.getSizeBytesEstimates());
-                      ranges.add(newRange);
-                      LOG.debug("Add key range {}", newRange);
-                    }
-                    return Deferred.fromResult(ranges);
-                  }
-                }));
-          }
-          // Must preserve the order.
-          return Deferred.groupInOrder(deferreds).addCallbackDeferring(
-              new Callback<Deferred<List<KeyRange>>, ArrayList<List<KeyRange>>>() {
-                @Override
-                public Deferred<List<KeyRange>> call(ArrayList<List<KeyRange>> rangeLists) {
-                  final List<KeyRange> ret = Lists.newArrayList();
-                  for (List<KeyRange> ranges : rangeLists) {
-                    ret.addAll(ranges);
-                  }
-                  return Deferred.fromResult(ret);
-                }
-              });
+    Callback<Deferred<List<KeyRange>>, List<LocatedTablet>> locateTabletCB = tablets -> {
+      if (splitSizeBytes <= 0) {
+        final List<KeyRange> keyRanges = Lists.newArrayList();
+        for (LocatedTablet tablet : tablets) {
+          keyRanges.add(new KeyRange(tablet, startPrimaryKey, endPrimaryKey, -1));
         }
+        return Deferred.fromResult(keyRanges);
       }
+      List<Deferred<List<KeyRange>>> deferreds = new java.util.ArrayList<>();
+      for (LocatedTablet tablet : tablets) {
+        // Build a fake RPC to encapsulate and propagate the timeout.
+        // There's no actual "RPC" to send.
+        KuduRpc fakeRpc = buildFakeRpc("getTableKeyRanges",
+                                       null,
+                                       timeoutTracker.getMillisBeforeTimeout());
+        deferreds.add(getTabletKeyRanges(table,
+                                         startPrimaryKey,
+                                         endPrimaryKey,
+                                         tablet.getPartition().getPartitionKeyStart(),
+                                         splitSizeBytes,
+                                         fakeRpc)
+                          .addCallbackDeferring(resp -> {
+                            final List<KeyRange> ranges = Lists.newArrayList();
+                            for (Common.KeyRangePB pb : resp.getKeyRanges()) {
+                              KeyRange newRange = new KeyRange(tablet,
+                                                               pb.getStartPrimaryKey().toByteArray(),
+                                                               pb.getStopPrimaryKey().toByteArray(),
+                                                               pb.getSizeBytesEstimates());
+                              ranges.add(newRange);
+                              LOG.debug("Add key range {}", newRange);
+                            }
+                            return Deferred.fromResult(ranges);
+                          }));
+      }
+      // Must preserve the order.
+      return Deferred.groupInOrder(deferreds).addCallbackDeferring(rangeLists -> {
+            final List<KeyRange> ret = Lists.newArrayList();
+            for (List<KeyRange> ranges : rangeLists) {
+              ret.addAll(ranges);
+            }
+            return Deferred.fromResult(ret);
+          });
     };
 
     final List<LocatedTablet> tablets = Lists.newArrayList();
@@ -2085,7 +2018,8 @@ public class AsyncKuduClient implements AutoCloseable {
                            endPartitionKey,
                            fetchBatchSize,
                            tablets,
-                           timeoutTracker).addCallbackDeferring(locateTabletCB);
+                           timeoutTracker)
+        .addCallbackDeferring(locateTabletCB);
   }
 
   /**
@@ -2173,17 +2107,6 @@ public class AsyncKuduClient implements AutoCloseable {
    * Deferred back to the user.
    */
   private <R> Deferred<R> delayedSendRpcToTablet(final KuduRpc<R> rpc, KuduException ex) {
-    // Here we simply retry the RPC later. We might be doing this along with a lot of other RPCs
-    // in parallel. Asynchbase does some hacking with a "probe" RPC while putting the other ones
-    // on hold but we won't be doing this for the moment. Regions in HBase can move a lot,
-    // we're not expecting this in Kudu.
-    final class RetryTimer implements TimerTask {
-      @Override
-      public void run(final Timeout timeout) {
-        sendRpcToTablet(rpc);
-      }
-    }
-
     assert (ex != null);
     Status reasonForRetry = ex.getStatus();
     rpc.addTrace(
@@ -2199,7 +2122,11 @@ public class AsyncKuduClient implements AutoCloseable {
       // Don't let it retry.
       return tooManyAttemptsOrTimeout(rpc, ex);
     }
-    newTimeout(timer, new RetryTimer(), sleepTime);
+    // Here we simply retry the RPC later. We might be doing this along with a lot of other RPCs
+    // in parallel. Asynchbase does some hacking with a "probe" RPC while putting the other ones
+    // on hold but we won't be doing this for the moment. Regions in HBase can move a lot,
+    // we're not expecting this in Kudu.
+    newTimeout(timer, _timeout -> sendRpcToTablet(rpc), sleepTime);
     return rpc.getDeferred();
   }
 
@@ -2442,13 +2369,13 @@ public class AsyncKuduClient implements AutoCloseable {
    * @param table the table
    * @param partitionKey the partition key of the tablet to look up in the table
    * @param lookupType the type of lookup to use
-   * @param deadline deadline in milliseconds for this lookup to finish
+   * @param timeoutMs timeout in milliseconds for this lookup to finish
    * @return a deferred containing the located tablet
    */
   Deferred<LocatedTablet> getTabletLocation(final KuduTable table,
                                             final byte[] partitionKey,
                                             final LookupType lookupType,
-                                            long timeout) {
+                                            long timeoutMs) {
 
     // Locate the tablet at the partition key by locating tablets between
     // the partition key (inclusive), and the incremented partition key (exclusive).
@@ -2457,54 +2384,52 @@ public class AsyncKuduClient implements AutoCloseable {
     byte[] endPartitionKey;
     if (partitionKey.length == 0) {
       startPartitionKey = null;
-      endPartitionKey = new byte[] { 0x00 };
+      endPartitionKey = new byte[]{0x00};
     } else {
       startPartitionKey = partitionKey;
       endPartitionKey = Arrays.copyOf(partitionKey, partitionKey.length + 1);
     }
 
     final TimeoutTracker timeoutTracker = new TimeoutTracker();
-    timeoutTracker.setTimeout(timeout);
+    timeoutTracker.setTimeout(timeoutMs);
     Deferred<List<LocatedTablet>> locatedTablets = locateTable(
-        table, startPartitionKey, endPartitionKey, FETCH_TABLETS_PER_POINT_LOOKUP, timeout);
+        table, startPartitionKey, endPartitionKey, FETCH_TABLETS_PER_POINT_LOOKUP, timeoutMs);
 
     // Then pick out the single tablet result from the list.
-    return locatedTablets.addCallbackDeferring(
-        new Callback<Deferred<LocatedTablet>, List<LocatedTablet>>() {
-          @Override
-          public Deferred<LocatedTablet> call(List<LocatedTablet> tablets) {
-            Preconditions.checkArgument(tablets.size() <= 1,
-                                        "found more than one tablet for a single partition key");
-            if (tablets.isEmpty()) {
-              // Most likely this indicates a non-covered range, but since this
-              // could race with an alter table partitioning operation (which
-              // clears the local table locations cache), we check again.
-              TableLocationsCache.Entry entry = getTableLocationEntry(table.getTableId(),
-                                                                      partitionKey);
-
-              if (entry == null) {
-                // This should be extremely rare, but a potential source of tight loops.
-                LOG.debug("Table location expired before it could be processed; retrying.");
-                return Deferred.fromError(new RecoverableException(Status.NotFound(
-                    "Table location expired before it could be processed")));
-              }
-              if (entry.isNonCoveredRange()) {
-                if (lookupType == LookupType.POINT
-                    || entry.getUpperBoundPartitionKey().length == 0) {
-                  return Deferred.fromError(
-                      new NonCoveredRangeException(entry.getLowerBoundPartitionKey(),
-                          entry.getUpperBoundPartitionKey()));
-                }
-                // This is a LOWER_BOUND lookup, get the tablet location from the upper bound key
-                // of the non-covered range to return the next valid tablet location.
-                return getTabletLocation(table, entry.getUpperBoundPartitionKey(),
-                    LookupType.POINT, timeoutTracker.getMillisBeforeTimeout());
-              }
-              return Deferred.fromResult(new LocatedTablet(entry.getTablet()));
-            }
-            return Deferred.fromResult(tablets.get(0));
+    return locatedTablets.addCallbackDeferring(tablets -> {
+      Preconditions.checkArgument(tablets.size() <= 1,
+                                  "found more than one tablet for a single partition key");
+      if (tablets.isEmpty()) {
+        // Most likely this indicates a non-covered range, but since this
+        // could race with an alter table partitioning operation (which
+        // clears the local table locations cache), we check again.
+        TableLocationsCache.Entry entry = getTableLocationEntry(table.getTableId(),
+                                                                partitionKey);
+
+        if (entry == null) {
+          // This should be extremely rare, but a potential source of tight loops.
+          LOG.debug("Table location expired before it could be processed; retrying.");
+          return Deferred.fromError(new RecoverableException(Status.NotFound(
+              "Table location expired before it could be processed")));
+        }
+        if (entry.isNonCoveredRange()) {
+          if (lookupType == LookupType.POINT
+              || entry.getUpperBoundPartitionKey().length == 0) {
+            return Deferred.fromError(
+                new NonCoveredRangeException(entry.getLowerBoundPartitionKey(),
+                                             entry.getUpperBoundPartitionKey()));
           }
-        });
+          // This is a LOWER_BOUND lookup, get the tablet location from the upper bound key
+          // of the non-covered range to return the next valid tablet location.
+          return getTabletLocation(table,
+                                   entry.getUpperBoundPartitionKey(),
+                                   LookupType.POINT,
+                                   timeoutTracker.getMillisBeforeTimeout());
+        }
+        return Deferred.fromResult(new LocatedTablet(entry.getTablet()));
+      }
+      return Deferred.fromResult(tablets.get(0));
+    });
   }
 
   /**
@@ -2685,8 +2610,7 @@ public class AsyncKuduClient implements AutoCloseable {
      * @param masterAddresses comma-separated list of "host:port" pairs of the masters
      */
     public AsyncKuduClientBuilder(String masterAddresses) {
-      this.masterAddresses =
-          NetUtil.parseStrings(masterAddresses, DEFAULT_MASTER_PORT);
+      this.masterAddresses = NetUtil.parseStrings(masterAddresses, DEFAULT_MASTER_PORT);
     }
 
     /**
@@ -2706,8 +2630,7 @@ public class AsyncKuduClient implements AutoCloseable {
      * @param masterAddresses list of master addresses
      */
     public AsyncKuduClientBuilder(List<String> masterAddresses) {
-      this.masterAddresses =
-          Lists.newArrayListWithCapacity(masterAddresses.size());
+      this.masterAddresses = Lists.newArrayListWithCapacity(masterAddresses.size());
       for (String address : masterAddresses) {
         this.masterAddresses.add(
             NetUtil.parseString(address, DEFAULT_MASTER_PORT));