You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/04 06:40:33 UTC

[07/15] hbase git commit: HBASE-17356 Add replica get support

http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 4f73909..869a630 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcChannel;
@@ -491,23 +492,23 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
     CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
-    this.<List<TableSchema>> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
-                controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s,
-                    c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp
-                    .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> {
-          if (error != null) {
-            future.completeExceptionally(error);
-            return;
-          }
-          if (!tableSchemas.isEmpty()) {
-            future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
-          } else {
-            future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
-          }
-        });
+    addListener(this.<List<TableSchema>> newMasterCaller()
+      .action((controller, stub) -> this
+        .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
+          controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
+          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
+          (resp) -> resp.getTableSchemaList()))
+      .call(), (tableSchemas, error) -> {
+        if (error != null) {
+          future.completeExceptionally(error);
+          return;
+        }
+        if (!tableSchemas.isEmpty()) {
+          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
+        } else {
+          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
+        }
+      });
     return future;
   }
 
@@ -590,7 +591,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
     CompletableFuture<Boolean> future = new CompletableFuture<>();
-    AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
         return;
@@ -607,7 +608,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
     CompletableFuture<Boolean> future = new CompletableFuture<>();
-    AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+    addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
         return;
@@ -636,40 +637,37 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   private CompletableFuture<Boolean> isTableAvailable(TableName tableName,
       Optional<byte[][]> splitKeys) {
     CompletableFuture<Boolean> future = new CompletableFuture<>();
-    isTableEnabled(tableName).whenComplete(
-      (enabled, error) -> {
-        if (error != null) {
-          future.completeExceptionally(error);
-          return;
-        }
-        if (!enabled) {
-          future.complete(false);
-        } else {
-          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
-              .whenComplete(
-                (locations, error1) -> {
-                  if (error1 != null) {
-                    future.completeExceptionally(error1);
-                    return;
-                  }
-                  List<HRegionLocation> notDeployedRegions =
-                      locations.stream().filter(loc -> loc.getServerName() == null)
-                          .collect(Collectors.toList());
-                  if (notDeployedRegions.size() > 0) {
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug("Table " + tableName + " has " + notDeployedRegions.size()
-                          + " regions");
-                    }
-                    future.complete(false);
-                    return;
-                  }
+    addListener(isTableEnabled(tableName), (enabled, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      if (!enabled) {
+        future.complete(false);
+      } else {
+        addListener(
+          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)),
+          (locations, error1) -> {
+            if (error1 != null) {
+              future.completeExceptionally(error1);
+              return;
+            }
+            List<HRegionLocation> notDeployedRegions = locations.stream()
+              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
+            if (notDeployedRegions.size() > 0) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
+              }
+              future.complete(false);
+              return;
+            }
 
-                  Optional<Boolean> available =
-                      splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
-                  future.complete(available.orElse(true));
-                });
-        }
-      });
+            Optional<Boolean> available =
+              splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
+            future.complete(available.orElse(true));
+          });
+      }
+    });
     return future;
   }
 
@@ -791,20 +789,20 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Void> flush(TableName tableName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    tableExists(tableName).whenComplete((exists, err) -> {
+    addListener(tableExists(tableName), (exists, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
       } else if (!exists) {
         future.completeExceptionally(new TableNotFoundException(tableName));
       } else {
-        isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> {
+        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
           if (err2 != null) {
             future.completeExceptionally(err2);
           } else if (!tableEnabled) {
             future.completeExceptionally(new TableNotEnabledException(tableName));
           } else {
-            execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
-              new HashMap<>()).whenComplete((ret, err3) -> {
+            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
+              new HashMap<>()), (ret, err3) -> {
                 if (err3 != null) {
                   future.completeExceptionally(err3);
                 } else {
@@ -821,27 +819,25 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Void> flushRegion(byte[] regionName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
+    addListener(getRegionLocation(regionName), (location, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      ServerName serverName = location.getServerName();
+      if (serverName == null) {
+        future
+          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+        return;
+      }
+      addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
+        if (err2 != null) {
+          future.completeExceptionally(err2);
+        } else {
+          future.complete(ret);
         }
-        flush(serverName, location.getRegion())
-          .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-          });
       });
+    });
     return future;
   }
 
@@ -859,7 +855,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Void> flushRegionServer(ServerName sn) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegions(sn).whenComplete((hRegionInfos, err) -> {
+    addListener(getRegions(sn), (hRegionInfos, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
         return;
@@ -868,9 +864,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       if (hRegionInfos != null) {
         hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
       }
-      CompletableFuture
-        .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
-        .whenComplete((ret, err2) -> {
+      addListener(CompletableFuture.allOf(
+        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
           if (err2 != null) {
             future.completeExceptionally(err2);
           } else {
@@ -943,7 +938,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegions(sn).whenComplete((hRegionInfos, err) -> {
+    addListener(getRegions(sn), (hRegionInfos, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
         return;
@@ -952,15 +947,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       if (hRegionInfos != null) {
         hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
       }
-      CompletableFuture
-          .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
-          .whenComplete((ret, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-            } else {
-              future.complete(ret);
-            }
-          });
+      addListener(CompletableFuture.allOf(
+        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
     });
     return future;
   }
@@ -968,28 +962,26 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
       boolean major) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
-        }
-        compact(location.getServerName(), location.getRegion(), major, columnFamily)
-            .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
+    addListener(getRegionLocation(regionName), (location, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      ServerName serverName = location.getServerName();
+      if (serverName == null) {
+        future
+          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+        return;
+      }
+      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
+        (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
+    });
     return future;
   }
 
@@ -1001,19 +993,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
       // For meta table, we use zk to fetch all locations.
       AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
-      registry.getMetaRegionLocation().whenComplete(
-        (metaRegions, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-          } else if (metaRegions == null || metaRegions.isEmpty()
-              || metaRegions.getDefaultRegionLocation() == null) {
-            future.completeExceptionally(new IOException("meta region does not found"));
-          } else {
-            future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
-          }
-          // close the registry.
-          IOUtils.closeQuietly(registry);
-        });
+      addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+        } else if (metaRegions == null || metaRegions.isEmpty() ||
+          metaRegions.getDefaultRegionLocation() == null) {
+          future.completeExceptionally(new IOException("meta region does not found"));
+        } else {
+          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
+        }
+        // close the registry.
+        IOUtils.closeQuietly(registry);
+      });
       return future;
     } else {
       // For non-meta table, we fetch all locations by scanning hbase:meta table
@@ -1024,40 +1015,40 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   /**
    * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
    */
-  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
-      boolean major, CompactType compactType) {
+  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
+      CompactType compactType) {
     CompletableFuture<Void> future = new CompletableFuture<>();
 
     switch (compactType) {
       case MOB:
-        connection.registry.getMasterAddress().whenComplete((serverName, err) -> {
+        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
           if (err != null) {
             future.completeExceptionally(err);
             return;
           }
           RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
-          compact(serverName, regionInfo, major, columnFamily)
-              .whenComplete((ret, err2) -> {
-                if (err2 != null) {
-                  future.completeExceptionally(err2);
-                } else {
-                  future.complete(ret);
-                }
-              });
+          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+            } else {
+              future.complete(ret);
+            }
+          });
         });
         break;
       case NORMAL:
-        getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
+        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
           if (err != null) {
             future.completeExceptionally(err);
             return;
           }
-          CompletableFuture<?>[] compactFutures = locations.stream().filter(l -> l.getRegion() != null)
+          CompletableFuture<?>[] compactFutures =
+            locations.stream().filter(l -> l.getRegion() != null)
               .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
               .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
               .toArray(CompletableFuture<?>[]::new);
           // future complete unless all of the compact futures are completed.
-          CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> {
+          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
             if (err2 != null) {
               future.completeExceptionally(err2);
             } else {
@@ -1098,29 +1089,28 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
       CompletableFuture<TableName> result) {
-    getRegionLocation(encodeRegionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          result.completeExceptionally(err);
-          return;
-        }
-        RegionInfo regionInfo = location.getRegion();
-        if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
-          result.completeExceptionally(new IllegalArgumentException(
-              "Can't invoke merge on non-default regions directly"));
-          return;
-        }
-        if (!tableName.compareAndSet(null, regionInfo.getTable())) {
-          if (!tableName.get().equals(regionInfo.getTable())) {
-            // tables of this two region should be same.
-            result.completeExceptionally(new IllegalArgumentException(
-                "Cannot merge regions from two different tables " + tableName.get() + " and "
-                    + regionInfo.getTable()));
-          } else {
-            result.complete(tableName.get());
-          }
+    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
+      if (err != null) {
+        result.completeExceptionally(err);
+        return;
+      }
+      RegionInfo regionInfo = location.getRegion();
+      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+        result.completeExceptionally(
+          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
+        return;
+      }
+      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
+        if (!tableName.get().equals(regionInfo.getTable())) {
+          // tables of this two region should be same.
+          result.completeExceptionally(
+            new IllegalArgumentException("Cannot merge regions from two different tables " +
+              tableName.get() + " and " + regionInfo.getTable()));
+        } else {
+          result.complete(tableName.get());
         }
-      });
+      }
+    });
   }
 
   private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
@@ -1185,41 +1175,42 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
     final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
 
-    checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB)
-        .whenComplete((tableName, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-            return;
-          }
+    addListener(checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB),
+      (tableName, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
 
-          MergeTableRegionsRequest request = null;
-          try {
-            request = RequestConverter.buildMergeTableRegionsRequest(
-              new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
-              ng.newNonce());
-          } catch (DeserializationException e) {
-            future.completeExceptionally(e);
-            return;
-          }
+        MergeTableRegionsRequest request = null;
+        try {
+          request = RequestConverter.buildMergeTableRegionsRequest(
+            new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
+            ng.newNonce());
+        } catch (DeserializationException e) {
+          future.completeExceptionally(e);
+          return;
+        }
 
+        addListener(
           this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
             (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
-            new MergeTableRegionProcedureBiConsumer(tableName)).whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-
-        });
+            new MergeTableRegionProcedureBiConsumer(tableName)),
+          (ret, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+            } else {
+              future.complete(ret);
+            }
+          });
+      });
     return future;
   }
 
   @Override
   public CompletableFuture<Void> split(TableName tableName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    tableExists(tableName).whenComplete((exist, error) -> {
+    addListener(tableExists(tableName), (exist, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
         return;
@@ -1228,45 +1219,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         future.completeExceptionally(new TableNotFoundException(tableName));
         return;
       }
-      metaTable
+      addListener(
+        metaTable
           .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
-              .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
-              .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION)))
-          .whenComplete((results, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-              return;
-            }
-            if (results != null && !results.isEmpty()) {
-              List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
-              for (Result r : results) {
-                if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) continue;
-                RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
-                if (rl != null) {
-                  for (HRegionLocation h : rl.getRegionLocations()) {
-                    if (h != null && h.getServerName() != null) {
-                      RegionInfo hri = h.getRegion();
-                      if (hri == null || hri.isSplitParent()
-                          || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID)
-                        continue;
-                      splitFutures.add(split(hri, null));
+            .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
+            .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
+        (results, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+            return;
+          }
+          if (results != null && !results.isEmpty()) {
+            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
+            for (Result r : results) {
+              if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
+                continue;
+              }
+              RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
+              if (rl != null) {
+                for (HRegionLocation h : rl.getRegionLocations()) {
+                  if (h != null && h.getServerName() != null) {
+                    RegionInfo hri = h.getRegion();
+                    if (hri == null || hri.isSplitParent() ||
+                      hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+                      continue;
                     }
+                    splitFutures.add(split(hri, null));
                   }
                 }
               }
-              CompletableFuture
-                  .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()]))
-                  .whenComplete((ret, exception) -> {
-                    if (exception != null) {
-                      future.completeExceptionally(exception);
-                      return;
-                    }
-                    future.complete(ret);
-                  });
-            } else {
-              future.complete(null);
             }
-          });
+            addListener(
+              CompletableFuture
+                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
+              (ret, exception) -> {
+                if (exception != null) {
+                  future.completeExceptionally(exception);
+                  return;
+                }
+                future.complete(ret);
+              });
+          } else {
+            future.complete(null);
+          }
+        });
     });
     return future;
   }
@@ -1277,54 +1273,52 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     if (splitPoint == null) {
       return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
     }
-    connection.getRegionLocator(tableName).getRegionLocation(splitPoint)
-        .whenComplete((loc, err) -> {
-          if (err != null) {
-            result.completeExceptionally(err);
-          } else if (loc == null || loc.getRegion() == null) {
-            result.completeExceptionally(new IllegalArgumentException(
-                "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
-          } else {
-            splitRegion(loc.getRegion().getRegionName(), splitPoint)
-                .whenComplete((ret, err2) -> {
-                  if (err2 != null) {
-                    result.completeExceptionally(err2);
-                  } else {
-                    result.complete(ret);
-                  }
+    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint),
+      (loc, err) -> {
+        if (err != null) {
+          result.completeExceptionally(err);
+        } else if (loc == null || loc.getRegion() == null) {
+          result.completeExceptionally(new IllegalArgumentException(
+            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
+        } else {
+          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
+            if (err2 != null) {
+              result.completeExceptionally(err2);
+            } else {
+              result.complete(ret);
+            }
 
-                });
-          }
-        });
+          });
+        }
+      });
     return result;
   }
 
   @Override
   public CompletableFuture<Void> splitRegion(byte[] regionName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        RegionInfo regionInfo = location.getRegion();
-        if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
-          future.completeExceptionally(new IllegalArgumentException(
-              "Can't split replicas directly. "
-                  + "Replicas are auto-split when their primary is split."));
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
+    addListener(getRegionLocation(regionName), (location, err) -> {
+      RegionInfo regionInfo = location.getRegion();
+      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+        future
+          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
+            "Replicas are auto-split when their primary is split."));
+        return;
+      }
+      ServerName serverName = location.getServerName();
+      if (serverName == null) {
+        future
+          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+        return;
+      }
+      addListener(split(regionInfo, null), (ret, err2) -> {
+        if (err2 != null) {
+          future.completeExceptionally(err2);
+        } else {
+          future.complete(ret);
         }
-        split(regionInfo, null).whenComplete((ret, err2) -> {
-          if (err2 != null) {
-            future.completeExceptionally(err2);
-          } else {
-            future.complete(ret);
-          }
-        });
       });
+    });
     return future;
   }
 
@@ -1333,35 +1327,34 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     Preconditions.checkNotNull(splitPoint,
       "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        RegionInfo regionInfo = location.getRegion();
-        if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
-          future.completeExceptionally(new IllegalArgumentException(
-              "Can't split replicas directly. "
-                  + "Replicas are auto-split when their primary is split."));
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
-        }
-        if (regionInfo.getStartKey() != null
-            && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
-          future.completeExceptionally(new IllegalArgumentException(
-              "should not give a splitkey which equals to startkey!"));
-          return;
+    addListener(getRegionLocation(regionName), (location, err) -> {
+      RegionInfo regionInfo = location.getRegion();
+      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+        future
+          .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
+            "Replicas are auto-split when their primary is split."));
+        return;
+      }
+      ServerName serverName = location.getServerName();
+      if (serverName == null) {
+        future
+          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+        return;
+      }
+      if (regionInfo.getStartKey() != null &&
+        Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
+        future.completeExceptionally(
+          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
+        return;
+      }
+      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
+        if (err2 != null) {
+          future.completeExceptionally(err2);
+        } else {
+          future.complete(ret);
         }
-        split(regionInfo, splitPoint).whenComplete((ret, err2) -> {
-          if (err2 != null) {
-            future.completeExceptionally(err2);
-          } else {
-            future.complete(ret);
-          }
-        });
       });
+    });
     return future;
   }
 
@@ -1370,121 +1363,119 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     TableName tableName = hri.getTable();
     SplitTableRegionRequest request = null;
     try {
-      request = RequestConverter
-          .buildSplitTableRegionRequest(hri, splitPoint,
-              ng.getNonceGroup(), ng.newNonce());
+      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
+        ng.newNonce());
     } catch (DeserializationException e) {
       future.completeExceptionally(e);
       return future;
     }
 
-    this.<SplitTableRegionRequest, SplitTableRegionResponse>procedureCall(request,
-        (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
-        new SplitTableRegionProcedureBiConsumer(tableName)).whenComplete((ret, err2) -> {
-      if (err2 != null) {
-        future.completeExceptionally(err2);
-      } else {
-        future.complete(ret);
-      }
-    });
+    addListener(this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(request,
+      (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
+      new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
+        if (err2 != null) {
+          future.completeExceptionally(err2);
+        } else {
+          future.complete(ret);
+        }
+      });
     return future;
   }
 
   @Override
   public CompletableFuture<Void> assign(byte[] regionName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        this.<Void> newMasterCaller()
-            .action(
-              ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
-                controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo
-                    .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done),
-                resp -> null))).call().whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
+    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      addListener(this.<Void> newMasterCaller()
+        .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
+          controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
+          (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
+        .call(), (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
+    });
     return future;
   }
 
   @Override
   public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
+    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      addListener(
         this.<Void> newMasterCaller()
-            .action(
-              ((controller, stub) -> this
-                  .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
-                    RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
-                    (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call()
-            .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
+          .action(((controller, stub) -> this
+            .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
+              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
+              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
+          .call(),
+        (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
+    });
     return future;
   }
 
   @Override
   public CompletableFuture<Void> offline(byte[] regionName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
+    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      addListener(
         this.<Void> newMasterCaller()
-            .action(
-              ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
-                controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo
-                    .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done),
-                resp -> null))).call().whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
+          .action(((controller, stub) -> this
+            .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
+              RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
+              (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
+          .call(),
+        (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
+    });
     return future;
   }
 
   @Override
   public CompletableFuture<Void> move(byte[] regionName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
+    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      addListener(
         moveRegion(
-          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null))
-            .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
+          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
+        (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
+    });
     return future;
   }
 
@@ -1493,20 +1484,20 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     Preconditions.checkNotNull(destServerName,
       "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete((regionInfo, err) -> {
+    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
         return;
       }
-      moveRegion(
-        RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName))
-            .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
+      addListener(moveRegion(RequestConverter
+        .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
+        (ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
     });
     return future;
   }
@@ -1636,11 +1627,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
 
     CompletableFuture<Void> future = new CompletableFuture<Void>();
-    getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
+    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
       if (!completeExceptionally(future, error)) {
         ReplicationPeerConfig newPeerConfig =
-            ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
-        updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> {
+          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
+        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
           if (!completeExceptionally(future, error)) {
             future.complete(result);
           }
@@ -1658,24 +1649,23 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
 
     CompletableFuture<Void> future = new CompletableFuture<Void>();
-    getReplicationPeerConfig(id).whenComplete(
-      (peerConfig, error) -> {
-        if (!completeExceptionally(future, error)) {
-          ReplicationPeerConfig newPeerConfig = null;
-          try {
-            newPeerConfig = ReplicationPeerConfigUtil
-                .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
-          } catch (ReplicationException e) {
-            future.completeExceptionally(e);
-            return;
-          }
-          updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> {
-            if (!completeExceptionally(future, error)) {
-              future.complete(result);
-            }
-          });
+    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
+      if (!completeExceptionally(future, error)) {
+        ReplicationPeerConfig newPeerConfig = null;
+        try {
+          newPeerConfig = ReplicationPeerConfigUtil
+            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
+        } catch (ReplicationException e) {
+          future.completeExceptionally(e);
+          return;
         }
-      });
+        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
+          if (!completeExceptionally(future, error)) {
+            future.complete(result);
+          }
+        });
+      }
+    });
     return future;
   }
 
@@ -1710,31 +1700,30 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
     CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
-    listTableDescriptors().whenComplete(
-      (tables, error) -> {
-        if (!completeExceptionally(future, error)) {
-          List<TableCFs> replicatedTableCFs = new ArrayList<>();
-          tables.forEach(table -> {
-            Map<String, Integer> cfs = new HashMap<>();
-            Stream.of(table.getColumnFamilies())
-                .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
-                .forEach(column -> {
-                  cfs.put(column.getNameAsString(), column.getScope());
-                });
-            if (!cfs.isEmpty()) {
-              replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
-            }
-          });
-          future.complete(replicatedTableCFs);
-        }
-      });
+    addListener(listTableDescriptors(), (tables, error) -> {
+      if (!completeExceptionally(future, error)) {
+        List<TableCFs> replicatedTableCFs = new ArrayList<>();
+        tables.forEach(table -> {
+          Map<String, Integer> cfs = new HashMap<>();
+          Stream.of(table.getColumnFamilies())
+            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
+            .forEach(column -> {
+              cfs.put(column.getNameAsString(), column.getScope());
+            });
+          if (!cfs.isEmpty()) {
+            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
+          }
+        });
+        future.complete(replicatedTableCFs);
+      }
+    });
     return future;
   }
 
   @Override
   public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
-    SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil
-        .createHBaseProtosSnapshotDesc(snapshotDesc);
+    SnapshotProtos.SnapshotDescription snapshot =
+      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
     try {
       ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
     } catch (IllegalArgumentException e) {
@@ -1742,47 +1731,47 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
     CompletableFuture<Void> future = new CompletableFuture<>();
     final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
-    this.<Long> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
-            stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
-            resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-            return;
-          }
-          TimerTask pollingTask = new TimerTask() {
-            int tries = 0;
-            long startTime = EnvironmentEdgeManager.currentTime();
-            long endTime = startTime + expectedTimeout;
-            long maxPauseTime = expectedTimeout / maxAttempts;
-
-            @Override
-            public void run(Timeout timeout) throws Exception {
-              if (EnvironmentEdgeManager.currentTime() < endTime) {
-                isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> {
-                  if (err2 != null) {
-                    future.completeExceptionally(err2);
-                  } else if (done) {
-                    future.complete(null);
-                  } else {
-                    // retry again after pauseTime.
-                  long pauseTime = ConnectionUtils.getPauseTime(
-                    TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+    addListener(this.<Long> newMasterCaller()
+      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
+        stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
+        resp -> resp.getExpectedTimeout()))
+      .call(), (expectedTimeout, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        TimerTask pollingTask = new TimerTask() {
+          int tries = 0;
+          long startTime = EnvironmentEdgeManager.currentTime();
+          long endTime = startTime + expectedTimeout;
+          long maxPauseTime = expectedTimeout / maxAttempts;
+
+          @Override
+          public void run(Timeout timeout) throws Exception {
+            if (EnvironmentEdgeManager.currentTime() < endTime) {
+              addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
+                if (err2 != null) {
+                  future.completeExceptionally(err2);
+                } else if (done) {
+                  future.complete(null);
+                } else {
+                  // retry again after pauseTime.
+                  long pauseTime =
+                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
                   pauseTime = Math.min(pauseTime, maxPauseTime);
-                  AsyncConnectionImpl.RETRY_TIMER
-                      .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
+                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
+                    TimeUnit.MILLISECONDS);
                 }
-              } );
-              } else {
-                future.completeExceptionally(new SnapshotCreationException("Snapshot '"
-                    + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout
-                    + " ms", snapshotDesc));
-              }
+              });
+            } else {
+              future.completeExceptionally(
+                new SnapshotCreationException("Snapshot '" + snapshot.getName() +
+                  "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
             }
-          };
-          AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
-        });
+          }
+        };
+        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+      });
     return future;
   }
 
@@ -1808,52 +1797,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) {
+  public CompletableFuture<Void> restoreSnapshot(String snapshotName,
+      boolean takeFailSafeSnapshot) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    listSnapshots(Pattern.compile(snapshotName)).whenComplete(
-      (snapshotDescriptions, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        TableName tableName = null;
-        if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
-          for (SnapshotDescription snap : snapshotDescriptions) {
-            if (snap.getName().equals(snapshotName)) {
-              tableName = snap.getTableName();
-              break;
-            }
+    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      TableName tableName = null;
+      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
+        for (SnapshotDescription snap : snapshotDescriptions) {
+          if (snap.getName().equals(snapshotName)) {
+            tableName = snap.getTableName();
+            break;
           }
         }
-        if (tableName == null) {
-          future.completeExceptionally(new RestoreSnapshotException(
-              "Unable to find the table name for snapshot=" + snapshotName));
-          return;
-        }
-        final TableName finalTableName = tableName;
-        tableExists(finalTableName)
-            .whenComplete((exists, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else if (!exists) {
-                // if table does not exist, then just clone snapshot into new table.
-              completeConditionalOnFuture(future,
-                internalRestoreSnapshot(snapshotName, finalTableName));
+      }
+      if (tableName == null) {
+        future.completeExceptionally(new RestoreSnapshotException(
+          "Unable to find the table name for snapshot=" + snapshotName));
+        return;
+      }
+      final TableName finalTableName = tableName;
+      addListener(tableExists(finalTableName), (exists, err2) -> {
+        if (err2 != null) {
+          future.completeExceptionally(err2);
+        } else if (!exists) {
+          // if table does not exist, then just clone snapshot into new table.
+          completeConditionalOnFuture(future,
+            internalRestoreSnapshot(snapshotName, finalTableName));
+        } else {
+          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
+            if (err4 != null) {
+              future.completeExceptionally(err4);
+            } else if (!disabled) {
+              future.completeExceptionally(new TableNotDisabledException(finalTableName));
             } else {
-              isTableDisabled(finalTableName).whenComplete(
-                (disabled, err4) -> {
-                  if (err4 != null) {
-                    future.completeExceptionally(err4);
-                  } else if (!disabled) {
-                    future.completeExceptionally(new TableNotDisabledException(finalTableName));
-                  } else {
-                    completeConditionalOnFuture(future,
-                      restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
-                  }
-                });
+              completeConditionalOnFuture(future,
+                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
             }
-          } );
+          });
+        }
       });
+    });
     return future;
   }
 
@@ -1862,49 +1849,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     if (takeFailSafeSnapshot) {
       CompletableFuture<Void> future = new CompletableFuture<>();
       // Step.1 Take a snapshot of the current state
-      String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get(
-        HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
-        HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
-      final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat
-          .replace("{snapshot.name}", snapshotName)
+      String failSafeSnapshotSnapshotNameFormat =
+        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
+          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
+      final String failSafeSnapshotSnapshotName =
+        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
           .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
           .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
       LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
-      snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> {
+      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
         if (err != null) {
           future.completeExceptionally(err);
         } else {
           // Step.2 Restore snapshot
-        internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> {
-          if (err2 != null) {
-            // Step.3.a Something went wrong during the restore and try to rollback.
-          internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete(
-            (void3, err3) -> {
-              if (err3 != null) {
-                future.completeExceptionally(err3);
-              } else {
-                String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
-                    + failSafeSnapshotSnapshotName + " succeeded.";
-                future.completeExceptionally(new RestoreSnapshotException(msg));
-              }
-            });
-        } else {
-          // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
-          LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
-          deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete(
-            (ret3, err3) -> {
-              if (err3 != null) {
-                LOG.error(
-                  "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3);
-                future.completeExceptionally(err3);
-              } else {
-                future.complete(ret3);
-              }
-            });
+          addListener(internalRestoreSnapshot(snapshotName, tableName), (void2, err2) -> {
+            if (err2 != null) {
+              // Step.3.a Something went wrong during the restore and try to rollback.
+              addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName),
+                (void3, err3) -> {
+                  if (err3 != null) {
+                    future.completeExceptionally(err3);
+                  } else {
+                    String msg =
+                      "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
+                        failSafeSnapshotSnapshotName + " succeeded.";
+                    future.completeExceptionally(new RestoreSnapshotException(msg));
+                  }
+                });
+            } else {
+              // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
+              LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
+              addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
+                if (err3 != null) {
+                  LOG.error(
+                    "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
+                    err3);
+                  future.completeExceptionally(err3);
+                } else {
+                  future.complete(ret3);
+                }
+              });
+            }
+          });
         }
-      } );
-      }
-    } );
+      });
       return future;
     } else {
       return internalRestoreSnapshot(snapshotName, tableName);
@@ -1913,7 +1901,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
       CompletableFuture<T> parentFuture) {
-    parentFuture.whenComplete((res, err) -> {
+    addListener(parentFuture, (res, err) -> {
       if (err != null) {
         dependentFuture.completeExceptionally(err);
       } else {
@@ -1925,7 +1913,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    tableExists(tableName).whenComplete((exists, err) -> {
+    addListener(tableExists(tableName), (exists, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
       } else if (exists) {
@@ -1995,31 +1983,29 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
       Pattern tableNamePattern, Pattern snapshotNamePattern) {
     CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
-    listTableNames(tableNamePattern, false).whenComplete(
-      (tableNames, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
+    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      if (tableNames == null || tableNames.size() <= 0) {
+        future.complete(Collections.emptyList());
+        return;
+      }
+      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
+        if (err2 != null) {
+          future.completeExceptionally(err2);
           return;
         }
-        if (tableNames == null || tableNames.size() <= 0) {
+        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
           future.complete(Collections.emptyList());
           return;
         }
-        getCompletedSnapshots(snapshotNamePattern).whenComplete(
-          (snapshotDescList, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-              return;
-            }
-            if (snapshotDescList == null || snapshotDescList.isEmpty()) {
-              future.complete(Collections.emptyList());
-              return;
-            }
-            future.complete(snapshotDescList.stream()
-                .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
-                .collect(Collectors.toList()));
-          });
+        future.complete(snapshotDescList.stream()
+          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
+          .collect(Collectors.toList()));
       });
+    });
     return future;
   }
 
@@ -2066,7 +2052,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
     }
     CompletableFuture<Void> future = new CompletableFuture<>();
-    listSnapshotsFuture.whenComplete(((snapshotDescriptions, err) -> {
+    addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
         return;
@@ -2075,12 +2061,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         future.complete(null);
         return;
       }
-      List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>();
-      snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures
-          .add(internalDeleteSnapshot(snapDesc)));
-      CompletableFuture.allOf(
-        deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()]))
-          .thenAccept(v -> future.complete(v));
+      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
+        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
+          if (e != null) {
+            future.completeExceptionally(e);
+          } else {
+            future.complete(v);
+          }
+        });
     }));
     return future;
   }
@@ -2102,50 +2090,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       Map<String, String> props) {
     CompletableFuture<Void> future = new CompletableFuture<>();
     ProcedureDescription procDesc =
-        ProtobufUtil.buildProcedureDescription(signature, instance, props);
-    this.<Long> newMasterCaller()
-        .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
-          controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
-          (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
-        .call().whenComplete((expectedTimeout, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-            return;
-          }
-          TimerTask pollingTask = new TimerTask() {
-            int tries = 0;
-            long startTime = EnvironmentEdgeManager.currentTime();
-            long endTime = startTime + expectedTimeout;
-            long maxPauseTime = expectedTimeout / maxAttempts;
-
-            @Override
-            public void run(Timeout timeout) throws Exception {
-              if (EnvironmentEdgeManager.currentTime() < endTime) {
-                isProcedureFinished(signature, instance, props).whenComplete((done, err2) -> {
-                  if (err2 != null) {
-                    future.completeExceptionally(err2);
-                    return;
-                  }
-                  if (done) {
-                    future.complete(null);
-                  } else {
-                    // retry again after pauseTime.
-                    long pauseTime = ConnectionUtils
-                        .getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
-                    pauseTime = Math.min(pauseTime, maxPauseTime);
-                    AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
-                      TimeUnit.MICROSECONDS);
-                  }
-                });
-              } else {
-                future.completeExceptionally(new IOException("Procedure '" + signature + " : "
-                    + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
-              }
+      ProtobufUtil.buildProcedureDescription(signature, instance, props);
+    addListener(this.<Long> newMasterCaller()
+      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
+        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
+        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
+      .call(), (expectedTimeout, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        TimerTask pollingTask = new TimerTask() {
+          int tries = 0;
+          long startTime = EnvironmentEdgeManager.currentTime();
+          long endTime = startTime + expectedTimeout;
+          long maxPauseTime = expectedTimeout / maxAttempts;
+
+          @Override
+          public void run(Timeout timeout) throws Exception {
+            if (EnvironmentEdgeManager.currentTime() < endTime) {
+              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
+                if (err2 != null) {
+                  future.completeExceptionally(err2);
+                  return;
+                }
+                if (done) {
+                  future.complete(null);
+                } else {
+                  // retry again after pauseTime.
+                  long pauseTime =
+                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+                  pauseTime = Math.min(pauseTime, maxPauseTime);
+                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
+                    TimeUnit.MICROSECONDS);
+                }
+              });
+            } else {
+              future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
+                instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
             }
-          };
-          // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
-          AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
-        });
+          }
+        };
+        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
+        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+      });
     return future;
   }
 
@@ -2264,15 +2252,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
       }
 
       CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
-      future.whenComplete((location, err) -> {
+      addListener(future, (location, err) -> {
         if (err != null) {
           returnedFuture.completeExceptionally(err);
           return;
         }
         if (!location.isPresent() || location.get().getRegion() == null) {
-          returnedFuture.completeExceptionally(new UnknownRegionException(
-              "Invalid region name or encoded region name: "
-                  + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
+          returnedFuture.completeExceptionally(
+            new UnknownRegionException("Invalid region name or encoded region name: " +
+              Bytes.toStringBinary(regionNameOrEncodedRegionName)));
         } else {
           returnedFuture.complete(location.get());
         }
@@ -2296,14 +2284,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
 
     if (Bytes.equals(regionNameOrEncodedRegionName,
-      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
-        || Bytes.equals(regionNameOrEncodedRegionName,
+      RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
+      Bytes.equals(regionNameOrEncodedRegionName,
         RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
       return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
     }
 
     CompletableFuture<RegionInfo> future = new CompletableFuture<>();
-    getRegionLocation(regionNameOrEncodedRegionName).whenComplete((location, err) -> {
+    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
       } else {
@@ -2345,7 +2333,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
+  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
 
     abstract void onFinished();
 
@@ -2361,7 +2349,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
+  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
     protected final TableName tableName;
 
     TableProcedureBiConsumer(TableName tableName) {
@@ -2386,7 +2374,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
+  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
     protected final String namespaceName;
 
     NamespaceProcedureBiConsumer(String namespaceName) {
@@ -2410,7 +2398,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
 
     CreateTableProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2422,7 +2410,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
 
     ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
       super(tableName);
@@ -2452,7 +2440,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
 
     TruncateTableProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2464,7 +2452,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
 
     EnableTableProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2476,7 +2464,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
 
     DisableTableProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2488,7 +2476,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
 
     AddColumnFamilyProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2500,7 +2488,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
 
     DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2512,7 +2500,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
 
     ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2524,7 +2512,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
+  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
 
     CreateNamespaceProcedureBiConsumer(String namespaceName) {
       super(namespaceName);
@@ -2536,7 +2524,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
+  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
 
     DeleteNamespaceProcedureBiConsumer(String namespaceName) {
       super(namespaceName);
@@ -2548,7 +2536,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
+  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
 
     ModifyNamespaceProcedureBiConsumer(String namespaceName) {
       super(namespaceName);
@@ -2560,7 +2548,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
+  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
 
     MergeTableRegionProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2572,7 +2560,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class SplitTableRegionProcedureBiConsumer extends  TableProcedureBiConsumer {
+  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
 
     SplitTableRegionProcedureBiConsumer(TableName tableName) {
       super(tableName);
@@ -2584,7 +2572,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
-  private class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
+  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
     private final String peerId;
     private final Supplier<String> getOperation;
 
@@ -2610,7 +2598,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    procFuture.whenComplete((procId, error) -> {
+    addListener(procFuture, (procId, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
         return;
@@ -2621,30 +2609,33 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   }
 
   private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
-    this.<GetProcedureResultResponse> newMasterCaller().action((controller, stub) -> this
-        .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
-          controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
-          (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
-        .call().whenComplete((response, error) -> {
-          if (error != null) {
-            LOG.warn("failed to get the procedure result procId={}", procId,
-              ConnectionUtils.translateException(error));
-            retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
-              ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
-            return;
-          }
-          if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
-            retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
-              ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
-            return;
-          }
-          if (response.hasException()) {
-            IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
-            future.completeExceptionally(ioe);
-          } else {
-            future.complete(null);
-          }
-        });
+    addListener(
+      this.<GetProcedureResultResponse> newMasterCaller()
+        .action((controller, stub) -> this
+          .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
+            controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
+            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
+        .call(),
+      (response, error) -> {
+        if (error != null) {
+          LOG.warn("failed to get the procedure result procId={}", procId,
+            ConnectionUtils.translateException(error));
+          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
+            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
+          return;
+        }
+        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
+          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
+            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
+          return;
+        }
+        if (response.hasException()) {
+          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
+          future.completeExceptionally(ioe);
+        } else {
+          future.complete(null);
+        }
+      });
   }
 
   private <T> CompletableFuture<T> failedFuture(Throwable error) {
@@ -2726,24 +2717,26 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Void> updateConfiguration() {
     CompletableFuture<Void> future = new CompletableFuture<Void>();
-    getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS))
-      .whenComplete((status, err) -> {
+    addListener(
+      getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)),
+      (status, err) -> {
         if (err != null) {
           future.completeExceptionally(err);
         } else {
           List<CompletableFuture<Void>> futures = new ArrayList<>();
           status.getLiveServerMetrics().keySet()
-              .forEach(server -> futures.add(updateConfiguration(server)));
+            .forEach(server -> futures.add(updateConfiguration(server)));
           futures.add(updateConfiguration(status.getMasterName()));
           status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
-          CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
-              .whenComplete((result, err2) -> {
-                if (err2 != null) {
-                  future.completeExceptionally(err2);
-                } else {
-                  future.complete(result);
-                }
-              });
+          addListener(
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
+            (result, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(result);
+              }
+            });
         }
       });
     return future;
@@ -2826,88 +2819,87 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
     switch (compactType) {
       case MOB:
-        connection.registry.getMasterAddress().whenComplete((serverName, err) -> {
+        addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
           if (err != null) {
             future.completeExceptionally(err);
             return;
           }
           RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
 
-          this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName).action(
-            (controller, stub) -> this
-            .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
+          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
+            .action((controller, stub) -> this
+              .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
                 controller, stub,
                 RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
-                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)
-          ).call().whenComplete((resp2, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-            } else {
-              if (resp2.hasCompactionState()) {
-                future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
+                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
+            .call(), (resp2, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
               } else {
-                future.complete(CompactionState.NONE);
+                if (resp2.hasCompactionState()) {
+                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
+                } else {
+                  future.complete(CompactionState.NONE);
+                }
               }
-            }
-          });
+            });
         });
         break;
       case NORMAL:
-        getTableHRegionLocations(tableName).whenComplete(
-          (locations, err) -> {
-            if (err != null) {
-              future.completeExceptionally(err);
-              return;
-            }
-            List<CompactionState> regionStates = new ArrayList<>();
-            List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
-            locations.stream().filter(loc -> loc.getServerName() != null)
-                .filter(loc -> loc.getRegion() != null)
-                .filter(loc -> !loc.getRegion().isOffline())
-                .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
-                  futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
-                    // If any region compaction state is MAJOR_AND_MINOR
-                    // the table compaction state is MAJOR_AND_MINOR, too.
-                    if (err2 != null) {
-                      future.completeExceptionally(err2);
-                    } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
-                      future.complete(regionState);
-                    } else {
-                      regionStates.add(regionState);
-                    }
-                  }));
-                });
-            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
-                .whenComplete((ret, err3) -> {
-                  // If future not completed, check all regions's compaction state
-                  if (!future.isCompletedExceptionally() && !future.isDone()) {
-                    CompactionState state = CompactionState.NONE;
-                    for (CompactionState regionState : regionStates) {
-                      switch (regionState) {
-                        case MAJOR:
-                          if (state == CompactionState.MINOR) {
-                            future.complete(CompactionState.MAJOR_AND_MINOR);
-                          } else {
-                            state = CompactionState.MAJOR;
-                          }
-                          break;
-                        case MINOR:
-                          if (state == CompactionState.MAJOR) {
-                            future.complete(CompactionState.MAJOR_AND_MINOR);
-                          } else {
-                            state = CompactionState.MINOR;
-                          }
-                          break;
-                        case NONE:
-                        default:
+        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
+          if (err != null) {
+            future.completeExceptionally(err);
+            return;
+          }
+          List<CompactionState> regionStates = new ArrayList<>();
+          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
+          locations.stream().filter(loc -> loc.getServerName() != null)
+            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
+            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
+              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
+                // If any region compaction state is MAJOR_AND_MINOR
+                // the table compaction state is MAJOR_AND_MINOR, too.
+                if (err2 != null) {
+                  future.completeExceptionally(err2);
+                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
+                  future.complete(regionState);
+                } else {
+                  regionStates.add(regionState);
+                }
+              }));
+            });
+          addListener(
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
+            (ret, err3) -> {
+              // If future not completed, check all regions's compaction state
+              if (!future.isCompletedExceptionally() && !future.isDone()) {
+                CompactionState state = CompactionState.NONE;
+                for (CompactionState regionState : regionStates) {
+                  switch (regionState) {
+                    case MAJOR:
+                      if (state == CompactionState.MINOR) {
+                        future.complete(CompactionState.MAJOR_AND_MINOR);
+                      } else {
+                        state = CompactionState.MAJOR;
                       }
-                      if (!future.isDone()) {
-                        future.complete(state);
+                      break;
+                    case MINOR:
+                      if (state == CompactionState.MAJOR) {
+                        future.complete(CompactionState.MAJOR_AND_MINOR);
+                      } else {
+                        state = CompactionState.MINOR;
                       }
-                    }
+                      break;
+                    case NONE:
+                    default:
                   }
-                });
-          });
+                  if (!future.isDone()) {
+                    future.complete(state);
+                  }
+                }
+              }
+            });
+        });
         break;
       default:
         throw new IllegalArgumentException("Unknown compactType: " + compactType);
@@ -2919,37 +2911,38 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
     CompletableFuture<CompactionState> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
-        }
+    addListener(getRegionLocation(regionName), (location, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      ServerName serverName = location.getServerName();
+      if (serverName == null) {
+        future
+          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+        return;
+      }
+      addListener(
         this.<GetRegionInfoResponse> newAdminCaller()
-            .action(
-              (controller, stub) -> this
-                  .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
-                    controller, stub, RequestConverter.buildGetRegionInfoRequest(location
-                        .getRegion().getRegionName(), true), (s, c, req, done) -> s
-                 

<TRUNCATED>