You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/07/04 02:14:04 UTC

[1/5] hbase git commit: HBASE-18283 Provide a construct method which accept a thread pool for AsyncAdmin

Repository: hbase
Updated Branches:
  refs/heads/master 8318a092a -> 14f0423b5


http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
index 6586a03..f75c346 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -32,11 +33,11 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder.ModifyableTableDescriptor;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -51,23 +53,20 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Class to test asynchronous table admin operations.
  */
+@RunWith(Parameterized.class)
 @Category({LargeTests.class, ClientTests.class})
 public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
 
-  @Rule
-  public TestName name = new TestName();
-
   @Test
   public void testTableExist() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
     boolean exist;
     exist = admin.tableExists(tableName).get();
     assertEquals(false, exist);
@@ -81,12 +80,12 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
   @Test
   public void testListTables() throws Exception {
     int numTables = admin.listTables().get().size();
-    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
-    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
-    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
+    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "1");
+    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2");
+    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "3");
     TableName[] tables = new TableName[] { tableName1, tableName2, tableName3 };
     for (int i = 0; i < tables.length; i++) {
-      TEST_UTIL.createTable(tables[i], FAMILY);
+      createTableWithDefaultConf(tables[i]);
     }
 
     List<TableDescriptor> tableDescs = admin.listTables().get();
@@ -118,7 +117,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     }
 
     for (int i = 0; i < tables.length; i++) {
-      TEST_UTIL.deleteTable(tables[i]);
+      admin.disableTable(tables[i]).join();
+      admin.deleteTable(tables[i]).join();
     }
 
     tableDescs = admin.listTables(Optional.empty(), true).get();
@@ -127,27 +127,25 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     assertTrue("Not found system tables", tableNames.size() > 0);
   }
 
-  @Test(timeout = 300000)
+  @Test
   public void testGetTableDescriptor() throws Exception {
-    HColumnDescriptor fam1 = new HColumnDescriptor("fam1");
-    HColumnDescriptor fam2 = new HColumnDescriptor("fam2");
-    HColumnDescriptor fam3 = new HColumnDescriptor("fam3");
-    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
-    htd.addFamily(fam1);
-    htd.addFamily(fam2);
-    htd.addFamily(fam3);
-    admin.createTable(htd).join();
-    TableDescriptor confirmedHtd = admin.getTableDescriptor(htd.getTableName()).get();
-    assertEquals(htd.compareTo(new HTableDescriptor(confirmedHtd)), 0);
+    byte[][] families = { FAMILY, FAMILY_0, FAMILY_1 };
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+    for (byte[] family : families) {
+      builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).build());
+    }
+    TableDescriptor desc = builder.build();
+    admin.createTable(desc).join();
+    ModifyableTableDescriptor modifyableDesc = ((ModifyableTableDescriptor) desc);
+    TableDescriptor confirmedHtd = admin.getTableDescriptor(tableName).get();
+    assertEquals(modifyableDesc.compareTo((ModifyableTableDescriptor) confirmedHtd), 0);
   }
 
-  @Test(timeout = 300000)
+  @Test
   public void testCreateTable() throws Exception {
     List<TableDescriptor> tables = admin.listTables().get();
     int numTables = tables.size();
-    final  TableName tableName = TableName.valueOf(name.getMethodName());
-    admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(FAMILY)))
-        .join();
+    createTableWithDefaultConf(tableName);
     tables = admin.listTables().get();
     assertEquals(numTables + 1, tables.size());
     assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
@@ -162,118 +160,102 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     return state.get().getState();
   }
 
-  @Test(timeout = 300000)
+  @Test
   public void testCreateTableNumberOfRegions() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
-    HTableDescriptor desc = new HTableDescriptor(tableName);
-    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
-    admin.createTable(desc).join();
-    List<HRegionLocation> regions;
-    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
-      regions = l.getAllRegionLocations();
-      assertEquals("Table should have only 1 region", 1, regions.size());
-    }
+    RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
+
+    createTableWithDefaultConf(tableName);
+    List<HRegionLocation> regionLocations =
+        AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+    assertEquals("Table should have only 1 region", 1, regionLocations.size());
 
     final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "_2");
-    desc = new HTableDescriptor(tableName2);
-    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
-    admin.createTable(desc, new byte[][] { new byte[] { 42 } }).join();
-    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName2)) {
-      regions = l.getAllRegionLocations();
-      assertEquals("Table should have only 2 region", 2, regions.size());
-    }
+    createTableWithDefaultConf(tableName2, Optional.of(new byte[][] { new byte[] { 42 } }));
+    regionLocations =
+        AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName2)).get();
+    assertEquals("Table should have only 2 region", 2, regionLocations.size());
 
     final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "_3");
-    desc = new HTableDescriptor(tableName3);
-    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
-    admin.createTable(desc, "a".getBytes(), "z".getBytes(), 3).join();
-    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName3)) {
-      regions = l.getAllRegionLocations();
-      assertEquals("Table should have only 3 region", 3, regions.size());
-    }
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName3);
+    builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
+    admin.createTable(builder.build(), "a".getBytes(), "z".getBytes(), 3).join();
+    regionLocations =
+        AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName3)).get();
+    assertEquals("Table should have only 3 region", 3, regionLocations.size());
 
     final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "_4");
-    desc = new HTableDescriptor(tableName4);
-    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
+    builder = TableDescriptorBuilder.newBuilder(tableName4);
+    builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
     try {
-      admin.createTable(desc, "a".getBytes(), "z".getBytes(), 2).join();
+      admin.createTable(builder.build(), "a".getBytes(), "z".getBytes(), 2).join();
       fail("Should not be able to create a table with only 2 regions using this API.");
     } catch (CompletionException e) {
       assertTrue(e.getCause() instanceof IllegalArgumentException);
     }
 
     final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "_5");
-    desc = new HTableDescriptor(tableName5);
-    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
-    admin.createTable(desc, new byte[] { 1 }, new byte[] { 127 }, 16).join();
-    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName5)) {
-      regions = l.getAllRegionLocations();
-      assertEquals("Table should have 16 region", 16, regions.size());
-    }
+    builder = TableDescriptorBuilder.newBuilder(tableName5);
+    builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
+    admin.createTable(builder.build(), new byte[] { 1 }, new byte[] { 127 }, 16).join();
+    regionLocations =
+        AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName5)).get();
+    assertEquals("Table should have 16 region", 16, regionLocations.size());
   }
 
-  @Test(timeout = 300000)
+  @Test
   public void testCreateTableWithRegions() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
-
     byte[][] splitKeys = { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, new byte[] { 3, 3, 3 },
         new byte[] { 4, 4, 4 }, new byte[] { 5, 5, 5 }, new byte[] { 6, 6, 6 },
         new byte[] { 7, 7, 7 }, new byte[] { 8, 8, 8 }, new byte[] { 9, 9, 9 }, };
     int expectedRegions = splitKeys.length + 1;
-
-    HTableDescriptor desc = new HTableDescriptor(tableName);
-    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
-    admin.createTable(desc, splitKeys).join();
+    createTableWithDefaultConf(tableName, Optional.of(splitKeys));
 
     boolean tableAvailable = admin.isTableAvailable(tableName, splitKeys).get();
     assertTrue("Table should be created with splitKyes + 1 rows in META", tableAvailable);
 
-    List<HRegionLocation> regions;
-    Iterator<HRegionLocation> hris;
+    RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
+    List<HRegionLocation> regions =
+        AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+    Iterator<HRegionLocation> hris = regions.iterator();
+
+    assertEquals(
+      "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
+      expectedRegions, regions.size());
+    System.err.println("Found " + regions.size() + " regions");
+
     HRegionInfo hri;
-    ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
-    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
-      regions = l.getAllRegionLocations();
-
-      assertEquals(
-        "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
-        expectedRegions, regions.size());
-      System.err.println("Found " + regions.size() + " regions");
-
-      hris = regions.iterator();
-      hri = hris.next().getRegionInfo();
-      assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0);
-      assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[0]));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[0]));
-      assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[1]));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[1]));
-      assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[2]));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[2]));
-      assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[3]));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[3]));
-      assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[4]));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[4]));
-      assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[5]));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[5]));
-      assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[6]));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[6]));
-      assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[7]));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[7]));
-      assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[8]));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[8]));
-      assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
-
-      verifyRoundRobinDistribution(conn, l, expectedRegions);
-    }
+    hris = regions.iterator();
+    hri = hris.next().getRegionInfo();
+    assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0);
+    assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[0]));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[0]));
+    assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[1]));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[1]));
+    assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[2]));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[2]));
+    assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[3]));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[3]));
+    assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[4]));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[4]));
+    assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[5]));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[5]));
+    assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[6]));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[6]));
+    assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[7]));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[7]));
+    assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[8]));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[8]));
+    assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
+    verifyRoundRobinDistribution(regions, expectedRegions);
 
     // Now test using start/end with a number of regions
 
@@ -283,99 +265,86 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
 
     // Splitting into 10 regions, we expect (null,1) ... (9, null)
     // with (1,2) (2,3) (3,4) (4,5) (5,6) (6,7) (7,8) (8,9) in the middle
-
     expectedRegions = 10;
-
     final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "_2");
-
-    desc = new HTableDescriptor(tableName2);
-    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
-    admin.createTable(desc, startKey, endKey, expectedRegions).join();
-
-    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName2)) {
-      regions = l.getAllRegionLocations();
-      assertEquals(
-        "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
-        expectedRegions, regions.size());
-      System.err.println("Found " + regions.size() + " regions");
-
-      hris = regions.iterator();
-      hri = hris.next().getRegionInfo();
-      assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0);
-      assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }));
-      assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 }));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 }));
-      assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 }));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 }));
-      assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 }));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 }));
-      assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 }));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 }));
-      assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 }));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 }));
-      assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 }));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 }));
-      assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 }));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 }));
-      assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 }));
-      hri = hris.next().getRegionInfo();
-      assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 }));
-      assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
-
-      verifyRoundRobinDistribution(conn, l, expectedRegions);
-    }
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName2);
+    builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
+    admin.createTable(builder.build(), startKey, endKey, expectedRegions).join();
+
+    regions =
+        AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName2)).get();
+    assertEquals(
+      "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
+      expectedRegions, regions.size());
+    System.err.println("Found " + regions.size() + " regions");
+
+    hris = regions.iterator();
+    hri = hris.next().getRegionInfo();
+    assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0);
+    assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }));
+    assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 }));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 }));
+    assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 }));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 }));
+    assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 }));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 }));
+    assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 }));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 }));
+    assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 }));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 }));
+    assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 }));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 }));
+    assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 }));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 }));
+    assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 }));
+    hri = hris.next().getRegionInfo();
+    assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 }));
+    assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
+    verifyRoundRobinDistribution(regions, expectedRegions);
 
     // Try once more with something that divides into something infinite
-
     startKey = new byte[] { 0, 0, 0, 0, 0, 0 };
     endKey = new byte[] { 1, 0, 0, 0, 0, 0 };
 
     expectedRegions = 5;
-
     final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "_3");
-
-    desc = new HTableDescriptor(tableName3);
-    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
-    admin.createTable(desc, startKey, endKey, expectedRegions).join();
-
-    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName3)) {
-      regions = l.getAllRegionLocations();
-      assertEquals(
-        "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
-        expectedRegions, regions.size());
-      System.err.println("Found " + regions.size() + " regions");
-
-      verifyRoundRobinDistribution(conn, l, expectedRegions);
-    }
+    builder = TableDescriptorBuilder.newBuilder(tableName3);
+    builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
+    admin.createTable(builder.build(), startKey, endKey, expectedRegions).join();
+
+    regions =
+        AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName3)).get();
+    assertEquals(
+      "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
+      expectedRegions, regions.size());
+    System.err.println("Found " + regions.size() + " regions");
+    verifyRoundRobinDistribution(regions, expectedRegions);
 
     // Try an invalid case where there are duplicate split keys
     splitKeys = new byte[][] { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 },
         new byte[] { 3, 3, 3 }, new byte[] { 2, 2, 2 } };
-
-    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "_4");
-    desc = new HTableDescriptor(tableName4);
-    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
+    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "_4");;
     try {
-      admin.createTable(desc, splitKeys).join();
+      createTableWithDefaultConf(tableName4, Optional.of(splitKeys));
       fail("Should not be able to create this table because of " + "duplicate split keys");
     } catch (CompletionException e) {
       assertTrue(e.getCause() instanceof IllegalArgumentException);
     }
   }
 
-  private void verifyRoundRobinDistribution(ClusterConnection c, RegionLocator regionLocator,
-      int expectedRegions) throws IOException {
-    int numRS = c.getCurrentNrHRS();
-    List<HRegionLocation> regions = regionLocator.getAllRegionLocations();
+  private void verifyRoundRobinDistribution(List<HRegionLocation> regions, int expectedRegions)
+      throws IOException {
+    int numRS = ((ClusterConnection) TEST_UTIL.getConnection()).getCurrentNrHRS();
+
     Map<ServerName, List<HRegionInfo>> server2Regions = new HashMap<>();
     regions.stream().forEach((loc) -> {
       ServerName server = loc.getServerName();
@@ -394,103 +363,93 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     });
   }
 
-  @Test(timeout = 300000)
-  public void testCreateTableWithOnlyEmptyStartRow() throws IOException {
-    byte[] tableName = Bytes.toBytes(name.getMethodName());
+  @Test
+  public void testCreateTableWithOnlyEmptyStartRow() throws Exception {
     byte[][] splitKeys = new byte[1][];
     splitKeys[0] = HConstants.EMPTY_BYTE_ARRAY;
-    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
-    desc.addFamily(new HColumnDescriptor("col"));
     try {
-      admin.createTable(desc, splitKeys).join();
+      createTableWithDefaultConf(tableName, Optional.of(splitKeys));
       fail("Test case should fail as empty split key is passed.");
     } catch (CompletionException e) {
       assertTrue(e.getCause() instanceof IllegalArgumentException);
     }
   }
 
-  @Test(timeout = 300000)
-  public void testCreateTableWithEmptyRowInTheSplitKeys() throws IOException {
-    byte[] tableName = Bytes.toBytes(name.getMethodName());
+  @Test
+  public void testCreateTableWithEmptyRowInTheSplitKeys() throws Exception {
     byte[][] splitKeys = new byte[3][];
     splitKeys[0] = "region1".getBytes();
     splitKeys[1] = HConstants.EMPTY_BYTE_ARRAY;
     splitKeys[2] = "region2".getBytes();
-    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
-    desc.addFamily(new HColumnDescriptor("col"));
     try {
-      admin.createTable(desc, splitKeys).join();
+      createTableWithDefaultConf(tableName, Optional.of(splitKeys));
       fail("Test case should fail as empty split key is passed.");
     } catch (CompletionException e) {
       assertTrue(e.getCause() instanceof IllegalArgumentException);
     }
   }
 
-  @Test(timeout = 300000)
+  @Test
   public void testDeleteTable() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
-    admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(FAMILY))).join();
+    createTableWithDefaultConf(tableName);
     assertTrue(admin.tableExists(tableName).get());
     TEST_UTIL.getAdmin().disableTable(tableName);
     admin.deleteTable(tableName).join();
     assertFalse(admin.tableExists(tableName).get());
   }
 
-  @Test(timeout = 300000)
+  @Test
   public void testDeleteTables() throws Exception {
-    TableName[] tables = { TableName.valueOf(name.getMethodName() + "1"),
-        TableName.valueOf(name.getMethodName() + "2"), TableName.valueOf(name.getMethodName() + "3") };
-    Arrays.stream(tables).map(HTableDescriptor::new)
-        .map((table) -> table.addFamily(new HColumnDescriptor(FAMILY))).forEach((table) -> {
-          admin.createTable(table).join();
-          admin.tableExists(table.getTableName()).thenAccept((exist) -> assertTrue(exist)).join();
-          try {
-            TEST_UTIL.getAdmin().disableTable(table.getTableName());
-          } catch (Exception e) {
-          }
-        });
-    List<TableDescriptor> failed = admin.deleteTables(Pattern.compile("testDeleteTables.*")).get();
+    TableName[] tables =
+        { TableName.valueOf(tableName.getNameAsString() + "1"),
+            TableName.valueOf(tableName.getNameAsString() + "2"),
+            TableName.valueOf(tableName.getNameAsString() + "3") };
+    Arrays.stream(tables).forEach((table) -> {
+      createTableWithDefaultConf(table);
+      admin.tableExists(table).thenAccept((exist) -> assertTrue(exist)).join();
+      admin.disableTable(table).join();
+    });
+    List<TableDescriptor> failed =
+        admin.deleteTables(Pattern.compile(tableName.getNameAsString() + ".*")).get();
     assertEquals(0, failed.size());
     Arrays.stream(tables).forEach((table) -> {
       admin.tableExists(table).thenAccept((exist) -> assertFalse(exist)).join();
     });
   }
 
-  @Test(timeout = 300000)
-  public void testTruncateTable() throws IOException {
-    testTruncateTable(TableName.valueOf(name.getMethodName()), false);
+  @Test
+  public void testTruncateTable() throws Exception {
+    testTruncateTable(tableName, false);
   }
 
-  @Test(timeout = 300000)
-  public void testTruncateTablePreservingSplits() throws IOException {
-    testTruncateTable(TableName.valueOf(name.getMethodName()), true);
+  @Test
+  public void testTruncateTablePreservingSplits() throws Exception {
+    testTruncateTable(tableName, true);
   }
 
   private void testTruncateTable(final TableName tableName, boolean preserveSplits)
-      throws IOException {
+      throws Exception {
     byte[][] splitKeys = new byte[2][];
     splitKeys[0] = Bytes.toBytes(4);
     splitKeys[1] = Bytes.toBytes(8);
 
     // Create & Fill the table
-    Table table = TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY, splitKeys);
-    try {
-      TEST_UTIL.loadNumericRows(table, HConstants.CATALOG_FAMILY, 0, 10);
-      assertEquals(10, TEST_UTIL.countRows(table));
-    } finally {
-      table.close();
-    }
+    createTableWithDefaultConf(tableName, Optional.of(splitKeys));
+    RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
+    int expectedRows = 10;
+    for (int i = 0; i < expectedRows; i++) {
+      byte[] data = Bytes.toBytes(String.valueOf(i));
+      Put put = new Put(data);
+      put.addColumn(FAMILY, null, data);
+      table.put(put).join();
+    }
+    assertEquals(10, table.scanAll(new Scan()).get().size());
     assertEquals(3, TEST_UTIL.getHBaseCluster().getRegions(tableName).size());
 
     // Truncate & Verify
-    TEST_UTIL.getAdmin().disableTable(tableName);
+    admin.disableTable(tableName).join();
     admin.truncateTable(tableName, preserveSplits).join();
-    table = TEST_UTIL.getConnection().getTable(tableName);
-    try {
-      assertEquals(0, TEST_UTIL.countRows(table));
-    } finally {
-      table.close();
-    }
+    assertEquals(0, table.scanAll(new Scan()).get().size());
     if (preserveSplits) {
       assertEquals(3, TEST_UTIL.getHBaseCluster().getRegions(tableName).size());
     } else {
@@ -498,149 +457,147 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     }
   }
 
-  @Test(timeout = 300000)
+  @Test
   public void testDisableAndEnableTable() throws Exception {
+    createTableWithDefaultConf(tableName);
+    RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
     final byte[] row = Bytes.toBytes("row");
     final byte[] qualifier = Bytes.toBytes("qualifier");
     final byte[] value = Bytes.toBytes("value");
-    final TableName tableName = TableName.valueOf(name.getMethodName());
-    Table ht = TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
     Put put = new Put(row);
-    put.addColumn(HConstants.CATALOG_FAMILY, qualifier, value);
-    ht.put(put);
+    put.addColumn(FAMILY, qualifier, value);
+    table.put(put).join();
     Get get = new Get(row);
-    get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
-    ht.get(get);
+    get.addColumn(FAMILY, qualifier);
+    table.get(get).get();
 
-    this.admin.disableTable(ht.getName()).join();
+    this.admin.disableTable(tableName).join();
     assertTrue("Table must be disabled.", TEST_UTIL.getHBaseCluster().getMaster()
-        .getTableStateManager().isTableState(ht.getName(), TableState.State.DISABLED));
+        .getTableStateManager().isTableState(tableName, TableState.State.DISABLED));
     assertEquals(TableState.State.DISABLED, getStateFromMeta(tableName));
 
     // Test that table is disabled
     get = new Get(row);
-    get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
+    get.addColumn(FAMILY, qualifier);
     boolean ok = false;
     try {
-      ht.get(get);
-    } catch (TableNotEnabledException e) {
+      table.get(get).get();
+    } catch (ExecutionException e) {
       ok = true;
     }
     ok = false;
     // verify that scan encounters correct exception
-    Scan scan = new Scan();
     try {
-      ResultScanner scanner = ht.getScanner(scan);
-      Result res = null;
-      do {
-        res = scanner.next();
-      } while (res != null);
-    } catch (TableNotEnabledException e) {
+      table.scanAll(new Scan()).get();
+    } catch (ExecutionException e) {
       ok = true;
     }
     assertTrue(ok);
     this.admin.enableTable(tableName).join();
     assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
-        .getTableStateManager().isTableState(ht.getName(), TableState.State.ENABLED));
+        .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
     assertEquals(TableState.State.ENABLED, getStateFromMeta(tableName));
 
     // Test that table is enabled
     try {
-      ht.get(get);
-    } catch (RetriesExhaustedException e) {
+      table.get(get).get();
+    } catch (Exception e) {
       ok = false;
     }
     assertTrue(ok);
-    ht.close();
   }
 
-  @Test(timeout = 300000)
+  @Test
   public void testDisableAndEnableTables() throws Exception {
+    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "1");
+    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2");
+    createTableWithDefaultConf(tableName1);
+    createTableWithDefaultConf(tableName2);
+    RawAsyncTable table1 = ASYNC_CONN.getRawTable(tableName1);
+    RawAsyncTable table2 = ASYNC_CONN.getRawTable(tableName1);
+
     final byte[] row = Bytes.toBytes("row");
     final byte[] qualifier = Bytes.toBytes("qualifier");
     final byte[] value = Bytes.toBytes("value");
-    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
-    final TableName tableName2 = TableName.valueOf(name.getMethodName());
-    Table ht1 = TEST_UTIL.createTable(tableName1, HConstants.CATALOG_FAMILY);
-    Table ht2 = TEST_UTIL.createTable(tableName2, HConstants.CATALOG_FAMILY);
     Put put = new Put(row);
-    put.addColumn(HConstants.CATALOG_FAMILY, qualifier, value);
-    ht1.put(put);
-    ht2.put(put);
+    put.addColumn(FAMILY, qualifier, value);
+    table1.put(put).join();
+    table2.put(put).join();
     Get get = new Get(row);
-    get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
-    ht1.get(get);
-    ht2.get(get);
+    get.addColumn(FAMILY, qualifier);
+    table1.get(get).get();
+    table2.get(get).get();
 
-    this.admin.disableTables(Pattern.compile("testDisableAndEnableTable.*")).join();
+    this.admin.disableTables(Pattern.compile(tableName.getNameAsString() + ".*")).join();
 
     // Test that tables are disabled
     get = new Get(row);
-    get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
+    get.addColumn(FAMILY, qualifier);
     boolean ok = false;
     try {
-      ht1.get(get);
-      ht2.get(get);
-    } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
+      table1.get(get).get();
+    } catch (ExecutionException e) {
       ok = true;
     }
+    assertTrue(ok);
 
+    ok = false;
+    try {
+      table2.get(get).get();
+    } catch (ExecutionException e) {
+      ok = true;
+    }
+    assertTrue(ok);
     assertEquals(TableState.State.DISABLED, getStateFromMeta(tableName1));
     assertEquals(TableState.State.DISABLED, getStateFromMeta(tableName2));
 
-    assertTrue(ok);
-    this.admin.enableTables(Pattern.compile("testDisableAndEnableTable.*")).join();
+    this.admin.enableTables(Pattern.compile("testDisableAndEnableTables.*")).join();
 
     // Test that tables are enabled
     try {
-      ht1.get(get);
-    } catch (IOException e) {
+      table1.get(get).get();
+    } catch (Exception e) {
       ok = false;
     }
     try {
-      ht2.get(get);
-    } catch (IOException e) {
+      table2.get(get).get();
+    } catch (Exception e) {
       ok = false;
     }
     assertTrue(ok);
-
-    ht1.close();
-    ht2.close();
-
     assertEquals(TableState.State.ENABLED, getStateFromMeta(tableName1));
     assertEquals(TableState.State.ENABLED, getStateFromMeta(tableName2));
   }
 
-  @Test(timeout = 300000)
+  @Test
   public void testEnableTableRetainAssignment() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
-    byte[][] splitKeys = { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, new byte[] { 3, 3, 3 },
-        new byte[] { 4, 4, 4 }, new byte[] { 5, 5, 5 }, new byte[] { 6, 6, 6 },
-        new byte[] { 7, 7, 7 }, new byte[] { 8, 8, 8 }, new byte[] { 9, 9, 9 } };
+    byte[][] splitKeys =
+        { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, new byte[] { 3, 3, 3 },
+            new byte[] { 4, 4, 4 }, new byte[] { 5, 5, 5 }, new byte[] { 6, 6, 6 },
+            new byte[] { 7, 7, 7 }, new byte[] { 8, 8, 8 }, new byte[] { 9, 9, 9 } };
     int expectedRegions = splitKeys.length + 1;
-    HTableDescriptor desc = new HTableDescriptor(tableName);
-    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
-    admin.createTable(desc, splitKeys).join();
-
-    try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
-      List<HRegionLocation> regions = l.getAllRegionLocations();
-
-      assertEquals(
-        "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
-        expectedRegions, regions.size());
-      // Disable table.
-      admin.disableTable(tableName).join();
-      // Enable table, use retain assignment to assign regions.
-      admin.enableTable(tableName).join();
-      List<HRegionLocation> regions2 = l.getAllRegionLocations();
-
-      // Check the assignment.
-      assertEquals(regions.size(), regions2.size());
-      assertTrue(regions2.containsAll(regions));
-    }
+    createTableWithDefaultConf(tableName, Optional.of(splitKeys));
+
+    RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
+    List<HRegionLocation> regions =
+        AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+    assertEquals(
+      "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
+      expectedRegions, regions.size());
+
+    // Disable table.
+    admin.disableTable(tableName).join();
+    // Enable table, use retain assignment to assign regions.
+    admin.enableTable(tableName).join();
+
+    List<HRegionLocation> regions2 =
+        AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+    // Check the assignment.
+    assertEquals(regions.size(), regions2.size());
+    assertTrue(regions2.containsAll(regions));
   }
 
-  @Test(timeout = 300000)
+  @Test
   public void testDisableCatalogTable() throws Exception {
     try {
       this.admin.disableTable(TableName.META_TABLE_NAME).join();
@@ -649,188 +606,149 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
     }
     // Before the fix for HBASE-6146, the below table creation was failing as the hbase:meta table
     // actually getting disabled by the disableTable() call.
-    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName().getBytes()));
-    HColumnDescriptor hcd = new HColumnDescriptor("cf1".getBytes());
-    htd.addFamily(hcd);
-    admin.createTable(htd).join();
+    createTableWithDefaultConf(tableName);
   }
 
   @Test
-  public void testAddColumnFamily() throws IOException {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+  public void testAddColumnFamily() throws Exception {
     // Create a table with two families
-    HTableDescriptor baseHtd = new HTableDescriptor(tableName);
-    baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
-    admin.createTable(baseHtd).join();
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+    builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).build());
+    admin.createTable(builder.build()).join();
     admin.disableTable(tableName).join();
-    try {
-      // Verify the table descriptor
-      verifyTableDescriptor(tableName, FAMILY_0);
-
-      // Modify the table removing one family and verify the descriptor
-      admin.addColumnFamily(tableName, new HColumnDescriptor(FAMILY_1)).join();
-      verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
-    } finally {
-      admin.deleteTable(tableName);
-    }
+    // Verify the table descriptor
+    verifyTableDescriptor(tableName, FAMILY_0);
+
+    // Modify the table removing one family and verify the descriptor
+    admin.addColumnFamily(tableName, ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).build())
+        .join();
+    verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
   }
 
   @Test
   public void testAddSameColumnFamilyTwice() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
     // Create a table with one families
-    HTableDescriptor baseHtd = new HTableDescriptor(tableName);
-    baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
-    admin.createTable(baseHtd).join();
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+    builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).build());
+    admin.createTable(builder.build()).join();
     admin.disableTable(tableName).join();
+    // Verify the table descriptor
+    verifyTableDescriptor(tableName, FAMILY_0);
+
+    // Modify the table removing one family and verify the descriptor
+    admin.addColumnFamily(tableName, ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).build())
+        .join();
+    verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
+
     try {
-      // Verify the table descriptor
-      verifyTableDescriptor(tableName, FAMILY_0);
-
-      // Modify the table removing one family and verify the descriptor
-      this.admin.addColumnFamily(tableName, new HColumnDescriptor(FAMILY_1)).join();
-      verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
-
-      try {
-        // Add same column family again - expect failure
-        this.admin.addColumnFamily(tableName, new HColumnDescriptor(FAMILY_1)).join();
-        Assert.fail("Delete a non-exist column family should fail");
-      } catch (Exception e) {
-        // Expected.
-      }
-    } finally {
-      admin.deleteTable(tableName).join();
+      // Add same column family again - expect failure
+      this.admin.addColumnFamily(tableName,
+        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).build()).join();
+      Assert.fail("Delete a non-exist column family should fail");
+    } catch (Exception e) {
+      // Expected.
     }
   }
 
   @Test
   public void testModifyColumnFamily() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
-
-    HColumnDescriptor cfDescriptor = new HColumnDescriptor(FAMILY_0);
-    int blockSize = cfDescriptor.getBlocksize();
-    // Create a table with one families
-    HTableDescriptor baseHtd = new HTableDescriptor(tableName);
-    baseHtd.addFamily(cfDescriptor);
-    admin.createTable(baseHtd).join();
+    TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
+    ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).build();
+    int blockSize = cfd.getBlocksize();
+    admin.createTable(tdBuilder.addColumnFamily(cfd).build()).join();
     admin.disableTable(tableName).join();
-    try {
-      // Verify the table descriptor
-      verifyTableDescriptor(tableName, FAMILY_0);
+    // Verify the table descriptor
+    verifyTableDescriptor(tableName, FAMILY_0);
 
-      int newBlockSize = 2 * blockSize;
-      cfDescriptor.setBlocksize(newBlockSize);
+    int newBlockSize = 2 * blockSize;
+    cfd = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).setBlocksize(newBlockSize).build();
+    // Modify colymn family
+    admin.modifyColumnFamily(tableName, cfd).join();
 
-      // Modify colymn family
-      admin.modifyColumnFamily(tableName, cfDescriptor).join();
-
-      TableDescriptor htd = admin.getTableDescriptor(tableName).get();
-      ColumnFamilyDescriptor hcfd = htd.getColumnFamily(FAMILY_0);
-      assertTrue(hcfd.getBlocksize() == newBlockSize);
-    } finally {
-      admin.deleteTable(tableName).join();
-    }
+    TableDescriptor htd = admin.getTableDescriptor(tableName).get();
+    ColumnFamilyDescriptor hcfd = htd.getColumnFamily(FAMILY_0);
+    assertTrue(hcfd.getBlocksize() == newBlockSize);
   }
 
   @Test
-  public void testModifyNonExistingColumnFamily() throws IOException {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
-
-    HColumnDescriptor cfDescriptor = new HColumnDescriptor(FAMILY_1);
-    int blockSize = cfDescriptor.getBlocksize();
-    // Create a table with one families
-    HTableDescriptor baseHtd = new HTableDescriptor(tableName);
-    baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
-    admin.createTable(baseHtd).join();
+  public void testModifyNonExistingColumnFamily() throws Exception {
+    TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
+    ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).build();
+    int blockSize = cfd.getBlocksize();
+    admin.createTable(tdBuilder.addColumnFamily(cfd).build()).join();
     admin.disableTable(tableName).join();
+    // Verify the table descriptor
+    verifyTableDescriptor(tableName, FAMILY_0);
+
+    int newBlockSize = 2 * blockSize;
+    cfd = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).setBlocksize(newBlockSize).build();
+
+    // Modify a column family that is not in the table.
     try {
-      // Verify the table descriptor
-      verifyTableDescriptor(tableName, FAMILY_0);
-
-      int newBlockSize = 2 * blockSize;
-      cfDescriptor.setBlocksize(newBlockSize);
-
-      // Modify a column family that is not in the table.
-      try {
-        admin.modifyColumnFamily(tableName, cfDescriptor).join();
-        Assert.fail("Modify a non-exist column family should fail");
-      } catch (Exception e) {
-        // Expected.
-      }
-    } finally {
-      admin.deleteTable(tableName).join();
+      admin.modifyColumnFamily(tableName, cfd).join();
+      Assert.fail("Modify a non-exist column family should fail");
+    } catch (Exception e) {
+      // Expected.
     }
   }
 
   @Test
-  public void testDeleteColumnFamily() throws IOException {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+  public void testDeleteColumnFamily() throws Exception {
     // Create a table with two families
-    HTableDescriptor baseHtd = new HTableDescriptor(tableName);
-    baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
-    baseHtd.addFamily(new HColumnDescriptor(FAMILY_1));
-    admin.createTable(baseHtd).join();
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+    builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).build())
+        .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).build());
+    admin.createTable(builder.build()).join();
     admin.disableTable(tableName).join();
-    try {
-      // Verify the table descriptor
-      verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
+    // Verify the table descriptor
+    verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
 
-      // Modify the table removing one family and verify the descriptor
-      admin.deleteColumnFamily(tableName, FAMILY_1).join();
-      verifyTableDescriptor(tableName, FAMILY_0);
-    } finally {
-      admin.deleteTable(tableName).join();
-    }
+    // Modify the table removing one family and verify the descriptor
+    admin.deleteColumnFamily(tableName, FAMILY_1).join();
+    verifyTableDescriptor(tableName, FAMILY_0);
   }
 
   @Test
-  public void testDeleteSameColumnFamilyTwice() throws IOException {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+  public void testDeleteSameColumnFamilyTwice() throws Exception {
     // Create a table with two families
-    HTableDescriptor baseHtd = new HTableDescriptor(tableName);
-    baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
-    baseHtd.addFamily(new HColumnDescriptor(FAMILY_1));
-    admin.createTable(baseHtd).join();
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+    builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).build())
+        .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).build());
+    admin.createTable(builder.build()).join();
     admin.disableTable(tableName).join();
-    try {
-      // Verify the table descriptor
-      verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
+    // Verify the table descriptor
+    verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
+
+    // Modify the table removing one family and verify the descriptor
+    admin.deleteColumnFamily(tableName, FAMILY_1).join();
+    verifyTableDescriptor(tableName, FAMILY_0);
 
-      // Modify the table removing one family and verify the descriptor
+    try {
+      // Delete again - expect failure
       admin.deleteColumnFamily(tableName, FAMILY_1).join();
-      verifyTableDescriptor(tableName, FAMILY_0);
-
-      try {
-        // Delete again - expect failure
-        admin.deleteColumnFamily(tableName, FAMILY_1).join();
-        Assert.fail("Delete a non-exist column family should fail");
-      } catch (Exception e) {
-        // Expected.
-      }
-    } finally {
-      admin.deleteTable(tableName).join();
+      Assert.fail("Delete a non-exist column family should fail");
+    } catch (Exception e) {
+      // Expected.
     }
   }
 
   private void verifyTableDescriptor(final TableName tableName, final byte[]... families)
-      throws IOException {
-    Admin admin = TEST_UTIL.getAdmin();
-
+      throws Exception {
     // Verify descriptor from master
-    HTableDescriptor htd = admin.getTableDescriptor(tableName);
+    TableDescriptor htd = admin.getTableDescriptor(tableName).get();
     verifyTableDescriptor(htd, tableName, families);
 
     // Verify descriptor from HDFS
     MasterFileSystem mfs = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem();
     Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), tableName);
-    HTableDescriptor td = FSTableDescriptors
-        .getTableDescriptorFromFs(mfs.getFileSystem(), tableDir);
+    HTableDescriptor td =
+        FSTableDescriptors.getTableDescriptorFromFs(mfs.getFileSystem(), tableDir);
     verifyTableDescriptor(td, tableName, families);
   }
 
-  private void verifyTableDescriptor(final HTableDescriptor htd, final TableName tableName,
+  private void verifyTableDescriptor(final TableDescriptor htd, final TableName tableName,
       final byte[]... families) {
-    Set<byte[]> htdFamilies = htd.getFamiliesKeys();
+    Set<byte[]> htdFamilies = htd.getColumnFamilyNames();
     assertEquals(tableName, htd.getTableName());
     assertEquals(families.length, htdFamilies.size());
     for (byte[] familyName : families) {
@@ -840,28 +758,20 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
 
   @Test
   public void testIsTableEnabledAndDisabled() throws Exception {
-    final TableName table = TableName.valueOf("testIsTableEnabledAndDisabled");
-    HTableDescriptor desc = new HTableDescriptor(table);
-    desc.addFamily(new HColumnDescriptor(FAMILY));
-    admin.createTable(desc).join();
-    assertTrue(admin.isTableEnabled(table).get());
-    assertFalse(admin.isTableDisabled(table).get());
-    admin.disableTable(table).join();
-    assertFalse(admin.isTableEnabled(table).get());
-    assertTrue(admin.isTableDisabled(table).get());
-    admin.deleteTable(table).join();
+    createTableWithDefaultConf(tableName);
+    assertTrue(admin.isTableEnabled(tableName).get());
+    assertFalse(admin.isTableDisabled(tableName).get());
+    admin.disableTable(tableName).join();
+    assertFalse(admin.isTableEnabled(tableName).get());
+    assertTrue(admin.isTableDisabled(tableName).get());
   }
 
   @Test
   public void testTableAvailableWithRandomSplitKeys() throws Exception {
-    TableName tableName = TableName.valueOf("testTableAvailableWithRandomSplitKeys");
-    HTableDescriptor desc = new HTableDescriptor(tableName);
-    desc.addFamily(new HColumnDescriptor("col"));
+    createTableWithDefaultConf(tableName);
     byte[][] splitKeys = new byte[1][];
     splitKeys = new byte[][] { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 } };
-    admin.createTable(desc).join();
     boolean tableAvailable = admin.isTableAvailable(tableName, splitKeys).get();
     assertFalse("Table should be created with 1 row in META", tableAvailable);
   }
-
 }


[4/5] hbase git commit: HBASE-18283 Provide a construct method which accept a thread pool for AsyncAdmin

Posted by zg...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 1da660c..36fd60d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -17,174 +17,27 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
+import java.util.concurrent.ExecutorService;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
 
-import java.util.stream.Stream;
-
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
-import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
-import org.apache.hadoop.hbase.TableNotDisabledException;
-import org.apache.hadoop.hbase.TableNotEnabledException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.UnknownRegionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
-import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
-import org.apache.hadoop.hbase.client.Scan.ReadType;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
 import org.apache.hadoop.hbase.client.replication.TableCFs;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.quotas.QuotaFilter;
 import org.apache.hadoop.hbase.quotas.QuotaSettings;
-import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
-import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
-import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.Pair;
 
 /**
@@ -192,2110 +45,404 @@ import org.apache.hadoop.hbase.util.Pair;
  */
 @InterfaceAudience.Private
 public class AsyncHBaseAdmin implements AsyncAdmin {
-  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
 
   private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class);
 
-  private final AsyncConnectionImpl connection;
-
-  private final RawAsyncTable metaTable;
-
-  private final long rpcTimeoutNs;
-
-  private final long operationTimeoutNs;
-
-  private final long pauseNs;
-
-  private final int maxAttempts;
-
-  private final int startLogErrorsCnt;
+  private final RawAsyncHBaseAdmin rawAdmin;
 
-  private final NonceGenerator ng;
-
-  AsyncHBaseAdmin(AsyncConnectionImpl connection) {
-    this.connection = connection;
-    this.metaTable = connection.getRawTable(META_TABLE_NAME);
-    this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
-    this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs();
-    this.pauseNs = connection.connConf.getPauseNs();
-    this.maxAttempts = connection.connConf.getMaxRetries();
-    this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt();
-    this.ng = connection.getNonceGenerator();
-  }
-
-  private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
-    return this.connection.callerFactory.<T> masterRequest()
-        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
-        .startLogErrorsCnt(startLogErrorsCnt);
-  }
-
-  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
-    return this.connection.callerFactory.<T> adminRequest()
-        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
-        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
-        .startLogErrorsCnt(startLogErrorsCnt);
-  }
-
-  @FunctionalInterface
-  private interface MasterRpcCall<RESP, REQ> {
-    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
-        RpcCallback<RESP> done);
-  }
-
-  @FunctionalInterface
-  private interface AdminRpcCall<RESP, REQ> {
-    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
-        RpcCallback<RESP> done);
-  }
+  private final ExecutorService pool;
 
-  @FunctionalInterface
-  private interface Converter<D, S> {
-    D convert(S src) throws IOException;
+  AsyncHBaseAdmin(RawAsyncHBaseAdmin rawAdmin, ExecutorService pool) {
+    this.rawAdmin = rawAdmin;
+    this.pool = pool;
   }
 
-  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
-      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
-      Converter<RESP, PRESP> respConverter) {
-    CompletableFuture<RESP> future = new CompletableFuture<>();
-    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
-
-      @Override
-      public void run(PRESP resp) {
-        if (controller.failed()) {
-          future.completeExceptionally(controller.getFailed());
-        } else {
-          try {
-            future.complete(respConverter.convert(resp));
-          } catch (IOException e) {
-            future.completeExceptionally(e);
-          }
-        }
-      }
-    });
-    return future;
-  }
-
-  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
-      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
-      Converter<RESP, PRESP> respConverter) {
-
-    CompletableFuture<RESP> future = new CompletableFuture<>();
-    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
-
-      @Override
-      public void run(PRESP resp) {
-        if (controller.failed()) {
-          future.completeExceptionally(new IOException(controller.errorText()));
-        } else {
-          try {
-            future.complete(respConverter.convert(resp));
-          } catch (IOException e) {
-            future.completeExceptionally(e);
-          }
-        }
+  private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
+    CompletableFuture<T> asyncFuture = new CompletableFuture<>();
+    future.whenCompleteAsync((r, e) -> {
+      if (e != null) {
+        asyncFuture.completeExceptionally(e);
+      } else {
+        asyncFuture.complete(r);
       }
-    });
-    return future;
-  }
-
-  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
-      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
-      ProcedureBiConsumer consumer) {
-    CompletableFuture<Long> procFuture = this
-        .<Long> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall,
-            respConverter)).call();
-    return waitProcedureResult(procFuture).whenComplete(consumer);
-  }
-
-  @FunctionalInterface
-  private interface TableOperator {
-    CompletableFuture<Void> operate(TableName table);
-  }
-
-  private CompletableFuture<List<TableDescriptor>> batchTableOperations(Pattern pattern,
-      TableOperator operator, String operationType) {
-    CompletableFuture<List<TableDescriptor>> future = new CompletableFuture<>();
-    List<TableDescriptor> failed = new LinkedList<>();
-    listTables(Optional.ofNullable(pattern), false).whenComplete(
-      (tables, error) -> {
-        if (error != null) {
-          future.completeExceptionally(error);
-          return;
-        }
-        CompletableFuture[] futures =
-            tables.stream()
-                .map((table) -> operator.operate(table.getTableName()).whenComplete((v, ex) -> {
-                  if (ex != null) {
-                    LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex);
-                    failed.add(table);
-                  }
-                })).<CompletableFuture> toArray(size -> new CompletableFuture[size]);
-        CompletableFuture.allOf(futures).thenAccept((v) -> {
-          future.complete(failed);
-        });
-      });
-    return future;
-  }
-
-  @Override
-  public AsyncConnectionImpl getConnection() {
-    return this.connection;
+    }, pool);
+    return asyncFuture;
   }
 
   @Override
   public CompletableFuture<Boolean> tableExists(TableName tableName) {
-    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
+    return wrap(rawAdmin.tableExists(tableName));
   }
 
   @Override
   public CompletableFuture<List<TableDescriptor>> listTables(Optional<Pattern> pattern,
       boolean includeSysTables) {
-    return this.<List<TableDescriptor>> newMasterCaller()
-        .action((controller, stub) -> this
-            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
-              controller, stub,
-              RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables),
-              (s, c, req, done) -> s.getTableDescriptors(c, req, done),
-              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
-        .call();
+    return wrap(rawAdmin.listTables(pattern, includeSysTables));
   }
 
   @Override
   public CompletableFuture<List<TableName>> listTableNames(Optional<Pattern> pattern,
       boolean includeSysTables) {
-    return this.<List<TableName>> newMasterCaller()
-        .action((controller, stub) -> this
-            .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller, stub,
-              RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables),
-              (s, c, req, done) -> s.getTableNames(c, req, done),
-              (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
-        .call();
+    return wrap(rawAdmin.listTableNames(pattern, includeSysTables));
   }
 
   @Override
   public CompletableFuture<TableDescriptor> getTableDescriptor(TableName tableName) {
-    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
-    this.<List<TableSchema>> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
-                controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s,
-                    c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp
-                    .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> {
-          if (error != null) {
-            future.completeExceptionally(error);
-            return;
-          }
-          if (!tableSchemas.isEmpty()) {
-            future.complete(ProtobufUtil.convertToTableDesc(tableSchemas.get(0)));
-          } else {
-            future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
-          }
-        });
-    return future;
-  }
-
-  @Override
-  public CompletableFuture<Void> createTable(TableDescriptor desc) {
-    return createTable(desc, null);
+    return wrap(rawAdmin.getTableDescriptor(tableName));
   }
 
   @Override
   public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
       int numRegions) {
-    try {
-      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
-    } catch (IllegalArgumentException e) {
-      return failedFuture(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
-    if (desc.getTableName() == null) {
-      return failedFuture(new IllegalArgumentException("TableName cannot be null"));
-    }
-    if (splitKeys != null && splitKeys.length > 0) {
-      Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
-      // Verify there are no duplicate split keys
-      byte[] lastKey = null;
-      for (byte[] splitKey : splitKeys) {
-        if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
-          return failedFuture(new IllegalArgumentException(
-              "Empty split key must not be passed in the split keys."));
-        }
-        if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
-          return failedFuture(new IllegalArgumentException("All split keys must be unique, "
-              + "found duplicate: " + Bytes.toStringBinary(splitKey) + ", "
-              + Bytes.toStringBinary(lastKey)));
-        }
-        lastKey = splitKey;
-      }
-    }
+    return wrap(rawAdmin.createTable(desc, startKey, endKey, numRegions));
+  }
 
-    return this.<CreateTableRequest, CreateTableResponse> procedureCall(
-      RequestConverter.buildCreateTableRequest(desc, splitKeys, ng.getNonceGroup(), ng.newNonce()),
-      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
-      new CreateTableProcedureBiConsumer(this, desc.getTableName()));
+  @Override
+  public CompletableFuture<Void> createTable(TableDescriptor desc, Optional<byte[][]> splitKeys) {
+    return wrap(rawAdmin.createTable(desc, splitKeys));
   }
 
   @Override
   public CompletableFuture<Void> deleteTable(TableName tableName) {
-    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
-        .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
-      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
-      new DeleteTableProcedureBiConsumer(this, tableName));
+    return wrap(rawAdmin.deleteTable(tableName));
   }
 
   @Override
   public CompletableFuture<List<TableDescriptor>> deleteTables(Pattern pattern) {
-    return batchTableOperations(pattern, (table) -> deleteTable(table), "DELETE");
+    return wrap(rawAdmin.deleteTables(pattern));
   }
 
   @Override
   public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
-    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(
-      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
-        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
-      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(this, tableName));
+    return wrap(rawAdmin.truncateTable(tableName, preserveSplits));
   }
 
   @Override
   public CompletableFuture<Void> enableTable(TableName tableName) {
-    return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter
-        .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
-      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
-      new EnableTableProcedureBiConsumer(this, tableName));
+    return wrap(rawAdmin.enableTable(tableName));
   }
 
   @Override
   public CompletableFuture<List<TableDescriptor>> enableTables(Pattern pattern) {
-    return batchTableOperations(pattern, (table) -> enableTable(table), "ENABLE");
+    return wrap(rawAdmin.enableTables(pattern));
   }
 
   @Override
   public CompletableFuture<Void> disableTable(TableName tableName) {
-    return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter
-        .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
-      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
-      new DisableTableProcedureBiConsumer(this, tableName));
+    return wrap(rawAdmin.disableTable(tableName));
   }
 
   @Override
   public CompletableFuture<List<TableDescriptor>> disableTables(Pattern pattern) {
-    return batchTableOperations(pattern, (table) -> disableTable(table), "DISABLE");
+    return wrap(rawAdmin.disableTables(pattern));
   }
 
   @Override
   public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
-    CompletableFuture<Boolean> future = new CompletableFuture<>();
-    AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
-      if (error != null) {
-        future.completeExceptionally(error);
-        return;
-      }
-      if (state.isPresent()) {
-        future.complete(state.get().inStates(TableState.State.ENABLED));
-      } else {
-        future.completeExceptionally(new TableNotFoundException(tableName));
-      }
-    });
-    return future;
+    return wrap(rawAdmin.isTableEnabled(tableName));
   }
 
   @Override
   public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
-    CompletableFuture<Boolean> future = new CompletableFuture<>();
-    AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
-      if (error != null) {
-        future.completeExceptionally(error);
-        return;
-      }
-      if (state.isPresent()) {
-        future.complete(state.get().inStates(TableState.State.DISABLED));
-      } else {
-        future.completeExceptionally(new TableNotFoundException(tableName));
-      }
-    });
-    return future;
-  }
-
-  @Override
-  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
-    return isTableAvailable(tableName, null);
+    return wrap(rawAdmin.isTableDisabled(tableName));
   }
 
   @Override
   public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
-    CompletableFuture<Boolean> future = new CompletableFuture<>();
-    isTableEnabled(tableName).whenComplete(
-      (enabled, error) -> {
-        if (error != null) {
-          future.completeExceptionally(error);
-          return;
-        }
-        if (!enabled) {
-          future.complete(false);
-        } else {
-          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
-              .whenComplete(
-                (locations, error1) -> {
-                  if (error1 != null) {
-                    future.completeExceptionally(error1);
-                    return;
-                  }
-                  int notDeployed = 0;
-                  int regionCount = 0;
-                  for (HRegionLocation location : locations) {
-                    HRegionInfo info = location.getRegionInfo();
-                    if (location.getServerName() == null) {
-                      if (LOG.isDebugEnabled()) {
-                        LOG.debug("Table " + tableName + " has not deployed region "
-                            + info.getEncodedName());
-                      }
-                      notDeployed++;
-                    } else if (splitKeys != null
-                        && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
-                      for (byte[] splitKey : splitKeys) {
-                        // Just check if the splitkey is available
-                        if (Bytes.equals(info.getStartKey(), splitKey)) {
-                          regionCount++;
-                          break;
-                        }
-                      }
-                    } else {
-                      // Always empty start row should be counted
-                      regionCount++;
-                    }
-                  }
-                  if (notDeployed > 0) {
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug("Table " + tableName + " has " + notDeployed + " regions");
-                    }
-                    future.complete(false);
-                  } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug("Table " + tableName + " expected to have "
-                          + (splitKeys.length + 1) + " regions, but only " + regionCount
-                          + " available");
-                    }
-                    future.complete(false);
-                  } else {
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug("Table " + tableName + " should be available");
-                    }
-                    future.complete(true);
-                  }
-                });
-        }
-      });
-    return future;
+    return wrap(rawAdmin.isTableAvailable(tableName, splitKeys));
   }
 
   @Override
   public CompletableFuture<Pair<Integer, Integer>> getAlterStatus(TableName tableName) {
-    return this
-        .<Pair<Integer, Integer>>newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetSchemaAlterStatusRequest, GetSchemaAlterStatusResponse, Pair<Integer, Integer>> call(
-                controller, stub, RequestConverter.buildGetSchemaAlterStatusRequest(tableName), (s,
-                    c, req, done) -> s.getSchemaAlterStatus(c, req, done), (resp) -> new Pair<>(
-                    resp.getYetToUpdateRegions(), resp.getTotalRegions()))).call();
+    return wrap(rawAdmin.getAlterStatus(tableName));
   }
 
   @Override
-  public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
-    return this.<AddColumnRequest, AddColumnResponse> procedureCall(
-      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
-        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
-      new AddColumnFamilyProcedureBiConsumer(this, tableName));
+  public CompletableFuture<Void> addColumnFamily(TableName tableName,
+      ColumnFamilyDescriptor columnFamily) {
+    return wrap(rawAdmin.addColumnFamily(tableName, columnFamily));
   }
 
   @Override
   public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
-    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(
-      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
-        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
-      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(this, tableName));
+    return wrap(rawAdmin.deleteColumnFamily(tableName, columnFamily));
   }
 
   @Override
   public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
       ColumnFamilyDescriptor columnFamily) {
-    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(
-      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
-        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
-      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(this, tableName));
+    return wrap(rawAdmin.modifyColumnFamily(tableName, columnFamily));
   }
 
   @Override
   public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
-    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
-      RequestConverter.buildCreateNamespaceRequest(descriptor),
-      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
-      new CreateNamespaceProcedureBiConsumer(this, descriptor.getName()));
+    return wrap(rawAdmin.createNamespace(descriptor));
   }
 
   @Override
   public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
-    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
-      RequestConverter.buildModifyNamespaceRequest(descriptor),
-      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
-      new ModifyNamespaceProcedureBiConsumer(this, descriptor.getName()));
+    return wrap(rawAdmin.modifyNamespace(descriptor));
   }
 
   @Override
   public CompletableFuture<Void> deleteNamespace(String name) {
-    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
-      RequestConverter.buildDeleteNamespaceRequest(name),
-      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
-      new DeleteNamespaceProcedureBiConsumer(this, name));
+    return wrap(rawAdmin.deleteNamespace(name));
   }
 
   @Override
   public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
-    return this
-        .<NamespaceDescriptor> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
-                controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
-                    req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
-                    .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
+    return wrap(rawAdmin.getNamespaceDescriptor(name));
   }
 
   @Override
   public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
-    return this
-        .<List<NamespaceDescriptor>> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
-                controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
-                    done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
-                    .toNamespaceDescriptorList(resp))).call();
+    return wrap(rawAdmin.listNamespaceDescriptors());
   }
 
   @Override
-  public CompletableFuture<Boolean> setBalancerOn(final boolean on) {
-    return this
-        .<Boolean> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller,
-                stub, RequestConverter.buildSetBalancerRunningRequest(on, true),
-                (s, c, req, done) -> s.setBalancerRunning(c, req, done),
-                (resp) -> resp.getPrevBalanceValue())).call();
+  public CompletableFuture<Boolean> setBalancerOn(boolean on) {
+    return wrap(rawAdmin.setBalancerOn(on));
   }
 
   @Override
   public CompletableFuture<Boolean> balance(boolean forcible) {
-    return this
-        .<Boolean> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
-            stub, RequestConverter.buildBalanceRequest(forcible),
-            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
+    return wrap(rawAdmin.balance(forcible));
   }
 
   @Override
   public CompletableFuture<Boolean> isBalancerOn() {
-    return this
-        .<Boolean> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
-            controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
-            (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
-        .call();
+    return wrap(rawAdmin.isBalancerOn());
   }
 
   @Override
   public CompletableFuture<Boolean> closeRegion(byte[] regionName, Optional<ServerName> serverName) {
-    CompletableFuture<Boolean> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete((location, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-        return;
-      }
-      ServerName server = serverName.isPresent() ? serverName.get() : location.getServerName();
-      if (server == null) {
-        future.completeExceptionally(new NotServingRegionException(regionName));
-      } else {
-        closeRegion(location.getRegionInfo(), server).whenComplete((result, err2) -> {
-          if (err2 != null) {
-            future.completeExceptionally(err2);
-          } else {
-            future.complete(result);
-          }
-        });
-      }
-    });
-    return future;
-  }
-
-  private CompletableFuture<Boolean> closeRegion(HRegionInfo hri, ServerName serverName) {
-    return this
-        .<Boolean> newAdminCaller()
-        .action(
-          (controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Boolean> adminCall(
-            controller, stub,
-            ProtobufUtil.buildCloseRegionRequest(serverName, hri.getRegionName()),
-            (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> resp.getClosed()))
-        .serverName(serverName).call();
+    return wrap(rawAdmin.closeRegion(regionName, serverName));
   }
 
   @Override
-  public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) {
-    return this.<List<HRegionInfo>> newAdminCaller()
-        .action((controller, stub) -> this
-            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>> adminCall(
-              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
-              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
-              resp -> ProtobufUtil.getRegionInfos(resp)))
-        .serverName(sn).call();
+  public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName) {
+    return wrap(rawAdmin.getOnlineRegions(serverName));
   }
 
   @Override
   public CompletableFuture<Void> flush(TableName tableName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    tableExists(tableName).whenComplete((exists, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-      } else if (!exists) {
-        future.completeExceptionally(new TableNotFoundException(tableName));
-      } else {
-        isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> {
-          if (err2 != null) {
-            future.completeExceptionally(err2);
-          } else if (!tableEnabled) {
-            future.completeExceptionally(new TableNotEnabledException(tableName));
-          } else {
-            execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
-              new HashMap<>()).whenComplete((ret, err3) -> {
-                if (err3 != null) {
-                  future.completeExceptionally(err3);
-                } else {
-                  future.complete(ret);
-                }
-              });
-          }
-        });
-      }
-    });
-    return future;
+    return wrap(rawAdmin.flush(tableName));
   }
 
   @Override
   public CompletableFuture<Void> flushRegion(byte[] regionName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
-        }
-
-        HRegionInfo regionInfo = location.getRegionInfo();
-        this.<Void> newAdminCaller()
-            .serverName(serverName)
-            .action(
-              (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
-                controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
-                    .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
-                resp -> null)).call().whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
-    return future;
+    return wrap(rawAdmin.flushRegion(regionName));
   }
 
   @Override
   public CompletableFuture<Void> compact(TableName tableName, Optional<byte[]> columnFamily) {
-    return compact(tableName, columnFamily, false, CompactType.NORMAL);
+    return wrap(rawAdmin.compact(tableName, columnFamily));
   }
 
   @Override
   public CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
-    return compactRegion(regionName, columnFamily, false);
+    return wrap(rawAdmin.compactRegion(regionName, columnFamily));
   }
 
   @Override
   public CompletableFuture<Void> majorCompact(TableName tableName, Optional<byte[]> columnFamily) {
-    return compact(tableName, columnFamily, true, CompactType.NORMAL);
+    return wrap(rawAdmin.majorCompact(tableName, columnFamily));
   }
 
   @Override
-  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
-    return compactRegion(regionName, columnFamily, true);
+  public CompletableFuture<Void>
+      majorCompactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
+    return wrap(rawAdmin.majorCompactRegion(regionName, columnFamily));
   }
 
   @Override
-  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
-    return compactRegionServer(sn, false);
+  public CompletableFuture<Void> compactRegionServer(ServerName serverName) {
+    return wrap(rawAdmin.compactRegionServer(serverName));
   }
 
   @Override
-  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
-    return compactRegionServer(sn, true);
-  }
-
-  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getOnlineRegions(sn).whenComplete((hRegionInfos, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-        return;
-      }
-      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
-      if (hRegionInfos != null) {
-        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, Optional.empty())));
-      }
-      CompletableFuture
-          .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
-          .whenComplete((ret, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-            } else {
-              future.complete(ret);
-            }
-          });
-    });
-    return future;
-  }
-
-  private CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily,
-      boolean major) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
-        }
-        compact(location.getServerName(), location.getRegionInfo(), major, columnFamily)
-            .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
-    return future;
-  }
-
-  /**
-   * List all region locations for the specific table.
-   */
-  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
-    if (TableName.META_TABLE_NAME.equals(tableName)) {
-      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
-      // For meta table, we use zk to fetch all locations.
-      AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
-      registry.getMetaRegionLocation().whenComplete(
-        (metaRegions, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-          } else if (metaRegions == null || metaRegions.isEmpty()
-              || metaRegions.getDefaultRegionLocation() == null) {
-            future.completeExceptionally(new IOException("meta region does not found"));
-          } else {
-            future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
-          }
-          // close the registry.
-          IOUtils.closeQuietly(registry);
-        });
-      return future;
-    } else {
-      // For non-meta table, we fetch all locations by scanning hbase:meta table
-      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName));
-    }
-  }
-
-  /**
-   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
-   */
-  private CompletableFuture<Void> compact(final TableName tableName, Optional<byte[]> columnFamily,
-      final boolean major, CompactType compactType) {
-    if (CompactType.MOB.equals(compactType)) {
-      // TODO support MOB compact.
-      return failedFuture(new UnsupportedOperationException("MOB compact does not support"));
-    }
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-        return;
-      }
-      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
-      for (HRegionLocation location : locations) {
-        if (location.getRegionInfo() == null || location.getRegionInfo().isOffline()) continue;
-        if (location.getServerName() == null) continue;
-        compactFutures
-            .add(compact(location.getServerName(), location.getRegionInfo(), major, columnFamily));
-      }
-      // future complete unless all of the compact futures are completed.
-      CompletableFuture
-          .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
-          .whenComplete((ret, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-            } else {
-              future.complete(ret);
-            }
-          });
-    });
-    return future;
-  }
-
-  /**
-   * Compact the region at specific region server.
-   */
-  private CompletableFuture<Void> compact(final ServerName sn, final HRegionInfo hri,
-      final boolean major, Optional<byte[]> columnFamily) {
-    return this
-        .<Void> newAdminCaller()
-        .serverName(sn)
-        .action(
-          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
-            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
-              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
-            resp -> null)).call();
-  }
-
-  private byte[] toEncodeRegionName(byte[] regionName) {
-    try {
-      return HRegionInfo.isEncodedRegionName(regionName) ? regionName
-          : Bytes.toBytes(HRegionInfo.encodeRegionName(regionName));
-    } catch (IOException e) {
-      return regionName;
-    }
-  }
-
-  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
-      CompletableFuture<TableName> result) {
-    getRegionLocation(encodeRegionName).whenComplete(
-      (location, err) -> {
-        if (err != null) {
-          result.completeExceptionally(err);
-          return;
-        }
-        HRegionInfo regionInfo = location.getRegionInfo();
-        if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
-          result.completeExceptionally(new IllegalArgumentException(
-              "Can't invoke merge on non-default regions directly"));
-          return;
-        }
-        if (!tableName.compareAndSet(null, regionInfo.getTable())) {
-          if (!tableName.get().equals(regionInfo.getTable())) {
-            // tables of this two region should be same.
-            result.completeExceptionally(new IllegalArgumentException(
-                "Cannot merge regions from two different tables " + tableName.get() + " and "
-                    + regionInfo.getTable()));
-          } else {
-            result.complete(tableName.get());
-          }
-        }
-      });
-  }
-
-  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
-      byte[] encodeRegionNameB) {
-    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
-    CompletableFuture<TableName> future = new CompletableFuture<>();
-
-    checkAndGetTableName(encodeRegionNameA, tableNameRef, future);
-    checkAndGetTableName(encodeRegionNameB, tableNameRef, future);
-    return future;
+  public CompletableFuture<Void> majorCompactRegionServer(ServerName serverName) {
+    return wrap(rawAdmin.majorCompactRegionServer(serverName));
   }
 
   @Override
   public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB,
       boolean forcible) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
-    final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
-
-    checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB)
-        .whenComplete((tableName, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-            return;
-          }
-
-          MergeTableRegionsRequest request = null;
-          try {
-            request = RequestConverter.buildMergeTableRegionsRequest(
-              new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
-              ng.newNonce());
-          } catch (DeserializationException e) {
-            future.completeExceptionally(e);
-            return;
-          }
-
-          this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
-            (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
-            new MergeTableRegionProcedureBiConsumer(this, tableName)).whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-
-        });
-    return future;
+    return wrap(rawAdmin.mergeRegions(nameOfRegionA, nameOfRegionB, forcible));
   }
 
   @Override
   public CompletableFuture<Void> split(TableName tableName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    tableExists(tableName).whenComplete((exist, error) -> {
-      if (error != null) {
-        future.completeExceptionally(error);
-        return;
-      }
-      if (!exist) {
-        future.completeExceptionally(new TableNotFoundException(tableName));
-        return;
-      }
-      metaTable
-          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
-              .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
-              .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION)))
-          .whenComplete((results, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-              return;
-            }
-            if (results != null && !results.isEmpty()) {
-              List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
-              for (Result r : results) {
-                if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue;
-                RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
-                if (rl != null) {
-                  for (HRegionLocation h : rl.getRegionLocations()) {
-                    if (h != null && h.getServerName() != null) {
-                      HRegionInfo hri = h.getRegionInfo();
-                      if (hri == null || hri.isSplitParent()
-                          || hri.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID)
-                        continue;
-                      splitFutures.add(split(h.getServerName(), hri, Optional.empty()));
-                    }
-                  }
-                }
-              }
-              CompletableFuture
-                  .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()]))
-                  .whenComplete((ret, exception) -> {
-                    if (exception != null) {
-                      future.completeExceptionally(exception);
-                      return;
-                    }
-                    future.complete(ret);
-                  });
-            } else {
-              future.complete(null);
-            }
-          });
-    });
-    return future;
+    return wrap(rawAdmin.split(tableName));
   }
 
   @Override
   public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
-    CompletableFuture<Void> result = new CompletableFuture<>();
-    if (splitPoint == null) {
-      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
-    }
-    connection.getRegionLocator(tableName).getRegionLocation(splitPoint)
-        .whenComplete((loc, err) -> {
-          if (err != null) {
-            result.completeExceptionally(err);
-          } else if (loc == null || loc.getRegionInfo() == null) {
-            result.completeExceptionally(new IllegalArgumentException(
-                "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
-          } else {
-            splitRegion(loc.getRegionInfo().getRegionName(), Optional.of(splitPoint))
-                .whenComplete((ret, err2) -> {
-                  if (err2 != null) {
-                    result.completeExceptionally(err2);
-                  } else {
-                    result.complete(ret);
-                  }
-
-                });
-          }
-        });
-    return result;
+    return wrap(rawAdmin.split(tableName, splitPoint));
   }
 
   @Override
   public CompletableFuture<Void> splitRegion(byte[] regionName, Optional<byte[]> splitPoint) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionLocation(regionName).whenComplete(
-      (location, err) -> {
-        HRegionInfo regionInfo = location.getRegionInfo();
-        if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
-          future.completeExceptionally(new IllegalArgumentException(
-              "Can't split replicas directly. "
-                  + "Replicas are auto-split when their primary is split."));
-          return;
-        }
-        ServerName serverName = location.getServerName();
-        if (serverName == null) {
-          future.completeExceptionally(new NoServerForRegionException(Bytes
-              .toStringBinary(regionName)));
-          return;
-        }
-        split(serverName, regionInfo, splitPoint).whenComplete((ret, err2) -> {
-          if (err2 != null) {
-            future.completeExceptionally(err2);
-          } else {
-            future.complete(ret);
-          }
-        });
-      });
-    return future;
-  }
-
-  private CompletableFuture<Void> split(final ServerName sn, final HRegionInfo hri,
-      Optional<byte[]> splitPoint) {
-    if (hri.getStartKey() != null && splitPoint.isPresent()
-        && Bytes.compareTo(hri.getStartKey(), splitPoint.get()) == 0) {
-      return failedFuture(new IllegalArgumentException(
-          "should not give a splitkey which equals to startkey!"));
-    }
-    return this
-        .<Void> newAdminCaller()
-        .action(
-          (controller, stub) -> this.<SplitRegionRequest, SplitRegionResponse, Void> adminCall(
-            controller, stub,
-            ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), splitPoint),
-            (s, c, req, done) -> s.splitRegion(controller, req, done), resp -> null))
-        .serverName(sn).call();
+    return wrap(rawAdmin.splitRegion(regionName, splitPoint));
   }
 
   @Override
   public CompletableFuture<Void> assign(byte[] regionName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        this.<Void> newMasterCaller()
-            .action(
-              ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
-                controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo
-                    .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done),
-                resp -> null))).call().whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
-    return future;
+    return wrap(rawAdmin.assign(regionName));
   }
 
   @Override
   public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        this.<Void> newMasterCaller()
-            .action(
-              ((controller, stub) -> this
-                  .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
-                    RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
-                    (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call()
-            .whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
-    return future;
+    return wrap(rawAdmin.unassign(regionName, forcible));
   }
 
   @Override
   public CompletableFuture<Void> offline(byte[] regionName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        this.<Void> newMasterCaller()
-            .action(
-              ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
-                controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo
-                    .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done),
-                resp -> null))).call().whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
-    return future;
+    return wrap(rawAdmin.offline(regionName));
   }
 
   @Override
   public CompletableFuture<Void> move(byte[] regionName, Optional<ServerName> destServerName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionInfo(regionName).whenComplete(
-      (regionInfo, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        this.<Void> newMasterCaller()
-            .action(
-              (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(
-                controller, stub, RequestConverter.buildMoveRegionRequest(
-                  regionInfo.getEncodedNameAsBytes(), destServerName), (s, c, req, done) -> s
-                    .moveRegion(c, req, done), resp -> null)).call().whenComplete((ret, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else {
-                future.complete(ret);
-              }
-            });
-      });
-    return future;
+    return wrap(rawAdmin.move(regionName, destServerName));
   }
 
   @Override
   public CompletableFuture<Void> setQuota(QuotaSettings quota) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
-            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
-            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
+    return wrap(rawAdmin.setQuota(quota));
   }
 
   @Override
   public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
-    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
-    Scan scan = QuotaTableUtil.makeScan(filter);
-    this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
-        .scan(scan, new RawScanResultConsumer() {
-          List<QuotaSettings> settings = new ArrayList<>();
-
-          @Override
-          public void onNext(Result[] results, ScanController controller) {
-            for (Result result : results) {
-              try {
-                QuotaTableUtil.parseResultToCollection(result, settings);
-              } catch (IOException e) {
-                controller.terminate();
-                future.completeExceptionally(e);
-              }
-            }
-          }
-
-          @Override
-          public void onError(Throwable error) {
-            future.completeExceptionally(error);
-          }
-
-          @Override
-          public void onComplete() {
-            future.complete(settings);
-          }
-        });
-    return future;
-  }
-
-  public CompletableFuture<Void> addReplicationPeer(String peerId,
-      ReplicationPeerConfig peerConfig) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub,
-                RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req,
-                    done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
+    return wrap(rawAdmin.getQuota(filter));
+  }
+
+  @Override
+  public CompletableFuture<Void>
+      addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) {
+    return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig));
   }
 
   @Override
   public CompletableFuture<Void> removeReplicationPeer(String peerId) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller,
-                stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId),
-                (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call();
+    return wrap(rawAdmin.removeReplicationPeer(peerId));
   }
 
   @Override
   public CompletableFuture<Void> enableReplicationPeer(String peerId) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller,
-                stub, RequestConverter.buildEnableReplicationPeerRequest(peerId),
-                (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call();
+    return wrap(rawAdmin.enableReplicationPeer(peerId));
   }
 
   @Override
   public CompletableFuture<Void> disableReplicationPeer(String peerId) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call(
-                controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s,
-                    c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null))
-        .call();
+    return wrap(rawAdmin.disableReplicationPeer(peerId));
   }
 
+  @Override
   public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
-    return this
-        .<ReplicationPeerConfig> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
-                controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
-                    s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
-                (resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call();
+    return wrap(rawAdmin.getReplicationPeerConfig(peerId));
   }
 
   @Override
   public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
       ReplicationPeerConfig peerConfig) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call(
-                controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId,
-                  peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), (
-                    resp) -> null)).call();
+    return wrap(rawAdmin.updateReplicationPeerConfig(peerId, peerConfig));
   }
 
   @Override
-  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
+  public CompletableFuture<Void> appendReplicationPeerTableCFs(String peerId,
       Map<TableName, ? extends Collection<String>> tableCfs) {
-    if (tableCfs == null) {
-      return failedFuture(new ReplicationException("tableCfs is null"));
-    }
-
-    CompletableFuture<Void> future = new CompletableFuture<Void>();
-    getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
-      if (!completeExceptionally(future, error)) {
-        ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
-        updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
-          if (!completeExceptionally(future, error)) {
-            future.complete(result);
-          }
-        });
-      }
-    });
-    return future;
+    return wrap(rawAdmin.appendReplicationPeerTableCFs(peerId, tableCfs));
   }
 
   @Override
-  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
+  public CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
       Map<TableName, ? extends Collection<String>> tableCfs) {
-    if (tableCfs == null) {
-      return failedFuture(new ReplicationException("tableCfs is null"));
-    }
-
-    CompletableFuture<Void> future = new CompletableFuture<Void>();
-    getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
-      if (!completeExceptionally(future, error)) {
-        try {
-          ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
-        } catch (ReplicationException e) {
-          future.completeExceptionally(e);
-          return;
-        }
-        updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
-          if (!completeExceptionally(future, error)) {
-            future.complete(result);
-          }
-        });
-      }
-    });
-    return future;
+    return wrap(rawAdmin.removeReplicationPeerTableCFs(peerId, tableCfs));
   }
 
   @Override
-  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Optional<Pattern> pattern) {
-    return this
-        .<List<ReplicationPeerDescription>> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
-                controller,
-                stub,
-                RequestConverter.buildListReplicationPeersRequest(pattern),
-                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
-                (resp) -> resp.getPeerDescList().stream()
-                    .map(ReplicationSerDeHelper::toReplicationPeerDescription)
-                    .collect(Collectors.toList()))).call();
+  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
+      Optional<Pattern> pattern) {
+    return wrap(rawAdmin.listReplicationPeers(pattern));
   }
 
   @Override
   public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
-    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
-    listTables().whenComplete(
-      (tables, error) -> {
-        if (!completeExceptionally(future, error)) {
-          List<TableCFs> replicatedTableCFs = new ArrayList<>();
-          tables.forEach(table -> {
-            Map<String, Integer> cfs = new HashMap<>();
-            Stream.of(table.getColumnFamilies())
-                .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
-                .forEach(column -> {
-                  cfs.put(column.getNameAsString(), column.getScope());
-                });
-            if (!cfs.isEmpty()) {
-              replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
-            }
-          });
-          future.complete(replicatedTableCFs);
-        }
-      });
-    return future;
-  }
-
-  @Override
-  public CompletableFuture<Void> snapshot(String snapshotName, TableName tableName) {
-    return snapshot(snapshotName, tableName, SnapshotType.FLUSH);
-  }
-
-  @Override
-  public CompletableFuture<Void> snapshot(String snapshotName, TableName tableName,
-      SnapshotType type) {
-    return snapshot(new SnapshotDescription(snapshotName, tableName, type));
-  }
-
-  @Override
-  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
-    SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil
-        .createHBaseProtosSnapshotDesc(snapshotDesc);
-    try {
-      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
-    } catch (IllegalArgumentException e) {
-      return failedFuture(e);
-    }
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
-    this.<Long> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
-            stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
-            resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-            return;
-          }
-          TimerTask pollingTask = new TimerTask() {
-            int tries = 0;
-            long startTime = EnvironmentEdgeManager.currentTime();
-            long endTime = startTime + expectedTimeout;
-            long maxPauseTime = expectedTimeout / maxAttempts;
-
-            @Override
-            public void run(Timeout timeout) throws Exception {
-              if (EnvironmentEdgeManager.currentTime() < endTime) {
-                isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> {
-                  if (err2 != null) {
-                    future.completeExceptionally(err2);
-                  } else if (done) {
-                    future.complete(null);
-                  } else {
-                    // retry again after pauseTime.
-                  long pauseTime = ConnectionUtils.getPauseTime(
-                    TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
-                  pauseTime = Math.min(pauseTime, maxPauseTime);
-                  AsyncConnectionImpl.RETRY_TIMER
-                      .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
-                }
-              } );
-              } else {
-                future.completeExceptionally(new SnapshotCreationException("Snapshot '"
-                    + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout
-                    + " ms", snapshotDesc));
-              }
-            }
-          };
-          AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
-        });
-    return future;
+    return wrap(rawAdmin.listReplicatedTableCFs());
+  }
+
+  @Override
+  public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
+    return wrap(rawAdmin.snapshot(snapshot));
   }
 
   @Override
   public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
-    return this
-        .<Boolean> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
-            controller,
-            stub,
-            IsSnapshotDoneRequest.newBuilder()
-                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
-                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
+    return wrap(rawAdmin.isSnapshotFinished(snapshot));
   }
 
   @Override
   public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
-    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
-      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
-      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
-    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
+    return wrap(rawAdmin.restoreSnapshot(snapshotName));
   }
 
   @Override
   public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    listSnapshots(Pattern.compile(snapshotName)).whenComplete(
-      (snapshotDescriptions, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        TableName tableName = null;
-        if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
-          for (SnapshotDescription snap : snapshotDescriptions) {
-            if (snap.getName().equals(snapshotName)) {
-              tableName = snap.getTableName();
-              break;
-            }
-          }
-        }
-        if (tableName == null) {
-          future.completeExceptionally(new RestoreSnapshotException(
-              "Unable to find the table name for snapshot=" + snapshotName));
-          return;
-        }
-        final TableName finalTableName = tableName;
-        tableExists(finalTableName)
-            .whenComplete((exists, err2) -> {
-              if (err2 != null) {
-                future.completeExceptionally(err2);
-              } else if (!exists) {
-                // if table does not exist, then just clone snapshot into new table.
-              completeConditionalOnFuture(future,
-                internalRestoreSnapshot(snapshotName, finalTableName));
-            } else {
-              isTableDisabled(finalTableName).whenComplete(
-                (disabled, err4) -> {
-                  if (err4 != null) {
-                    future.completeExceptionally(err4);
-                  } else if (!disabled) {
-                    future.completeExceptionally(new TableNotDisabledException(finalTableName));
-                  } else {
-                    completeConditionalOnFuture(future,
-                      restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
-                  }
-                });
-            }
-          } );
-      });
-    return future;
-  }
-
-  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
-      boolean takeFailSafeSnapshot) {
-    if (takeFailSafeSnapshot) {
-      CompletableFuture<Void> future = new CompletableFuture<>();
-      // Step.1 Take a snapshot of the current state
-      String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get(
-        HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
-        HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
-      final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat
-          .replace("{snapshot.name}", snapshotName)
-          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
-          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
-      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
-      snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-        } else {
-          // Step.2 Restore snapshot
-        internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> {
-          if (err2 != null) {
-            // Step.3.a Something went wrong during the restore and try to rollback.
-          internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete(
-            (void3, err3) -> {
-              if (err3 != null) {
-                future.completeExceptionally(err3);
-              } else {
-                String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
-                    + failSafeSnapshotSnapshotName + " succeeded.";
-                future.completeExceptionally(new RestoreSnapshotException(msg));
-              }
-            });
-        } else {
-          // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
-          LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
-          deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete(
-            (ret3, err3) -> {
-              if (err3 != null) {
-                LOG.error(
-                  "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3);
-                future.completeExceptionally(err3);
-              } else {
-                future.complete(ret3);
-              }
-            });
-        }
-      } );
-      }
-    } );
-      return future;
-    } else {
-      return internalRestoreSnapshot(snapshotName, tableName);
-    }
-  }
-
-  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
-      CompletableFuture<T> parentFuture) {
-    parentFuture.whenComplete((res, err) -> {
-      if (err != null) {
-        dependentFuture.completeExceptionally(err);
-      } else {
-        dependentFuture.complete(res);
-      }
-    });
+    return wrap(rawAdmin.restoreSnapshot(snapshotName, takeFailSafeSnapshot));
   }
 
   @Override
   public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    tableExists(tableName).whenComplete((exists, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-      } else if (exists) {
-        future.completeExceptionally(new TableExistsException(tableName));
-      } else {
-        completeConditionalOnFuture(future, internalRestoreSnapshot(snapshotName, tableName));
-      }
-    });
-    return future;
-  }
-
-  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName) {
-    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
-        .setName(snapshotName).setTable(tableName.getNameAsString()).build();
-    try {
-      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
-    } catch (IllegalArgumentException e) {
-      return failedFuture(e);
-    }
-    return waitProcedureResult(this
-        .<Long> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(
-            controller, stub, RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot)
-                .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req,
-                done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())).call());
-  }
-
-  @Override
-  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
-    return this
-        .<List<SnapshotDescription>> newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(
-                controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), (s, c, req,
-                    done) -> s.getCompletedSnapshots(c, req, done), resp -> resp.getSnapshotsList()
-                    .stream().map(ProtobufUtil::createSnapshotDesc).collect(Collectors.toList())))
-        .call();
-  }
-
-  @Override
-  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
-    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
-    listSnapshots()
-        .whenComplete(
-          (snapshotDescList, err) -> {
-            if (err != null) {
-              future.completeExceptionally(err);
-              return;
-            }
-            if (snapshotDescList == null || snapshotDescList.isEmpty()) {
-              future.complete(Collections.emptyList());
-              return;
-            }
-            future.complete(snapshotDescList.stream()
-                .filter(snap -> pattern.matcher(snap.getName()).matches())
-                .collect(Collectors.toList()));
-          });
-    return future;
+    return wrap(rawAdmin.cloneSnapshot(snapshotName, tableName));
   }
 
   @Override
-  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
-      Pattern snapshotNamePattern) {
-    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
-    listTableNames(Optional.ofNullable(tableNamePattern), false).whenComplete(
-      (tableNames, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        if (tableNames == null || tableNames.size() <= 0) {
-          future.complete(Collections.emptyList());
-          return;
-        }
-        listSnapshots(snapshotNamePattern).whenComplete(
-          (snapshotDescList, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-              return;
-            }
-            if (snapshotDescList == null || snapshotDescList.isEmpty()) {
-              future.complete(Collections.emptyList());
-              return;
-            }
-            future.complete(snapshotDescList.stream()
-                .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
-                .collect(Collectors.toList()));
-          });
-      });
-    return future;
+  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Optional<Pattern> pattern) {
+    return wrap(rawAdmin.listSnapshots(pattern));
   }
 
   @Override
-  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
-    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
+  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
+      Pattern snapshotNamePattern) {
+    return wrap(rawAdmin.listTableSnapshots(tableNamePattern, snapshotNamePattern));
   }
 
   @Override
-  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
-    return deleteTableSnapshots(null, snapshotNamePattern);
+  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
+    return wrap(rawAdmin.deleteSnapshot(snapshotName));
   }
 
   @Override
   public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
       Pattern snapshotNamePattern) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    listTableSnapshots(tableNamePattern, snapshotNamePattern).whenComplete(
-      ((snapshotDescriptions, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-          return;
-        }
-        if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
-          future.complete(null);
-          return;
-        }
-        List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>();
-        snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures
-            .add(internalDeleteSnapshot(snapDesc)));
-        CompletableFuture.allOf(
-          deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()]))
-            .thenAccept(v -> future.complete(v));
-      }));
-    return future;
-  }
-
-  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
-    return this
-        .<Void> newMasterCaller()
-        .action(
-          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
-            controller,
-            stub,
-            DeleteSnapshotRequest.newBuilder()
-                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
-                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
+    return wrap(rawAdmin.deleteTableSnapshots(tableNamePattern, snapshotNamePattern));
   }
 
   @Override
   public CompletableFuture<Void> execProcedure(String signature, String instance,
       Map<String, String> props) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    ProcedureDescription procDesc =
-        ProtobufUtil.buildProcedureDescription(signature, instance, props);
-    this.<Long> newMasterCaller()
-        .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
-          controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
-          (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
-        .call().whenComplete((expectedTimeout, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-            return;
-          }
-          TimerTask pollingTask = new TimerTask() {
-            int tries = 0;
-            lon

<TRUNCATED>

[3/5] hbase git commit: HBASE-18283 Provide a construct method which accept a thread pool for AsyncAdmin

Posted by zg...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
new file mode 100644
index 0000000..fcfdf93
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -0,0 +1,2278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+
+import java.util.stream.Stream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
+import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.TableCFs;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.quotas.QuotaFilter;
+import org.apache.hadoop.hbase.quotas.QuotaSettings;
+import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
+import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
+import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * The implementation of AsyncAdmin.
+ */
+@InterfaceAudience.Private
+public class RawAsyncHBaseAdmin implements AsyncAdmin {
+  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
+
+  private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class);
+
+  private final AsyncConnectionImpl connection;
+
+  private final RawAsyncTable metaTable;
+
+  private final long rpcTimeoutNs;
+
+  private final long operationTimeoutNs;
+
+  private final long pauseNs;
+
+  private final int maxAttempts;
+
+  private final int startLogErrorsCnt;
+
+  private final NonceGenerator ng;
+
+  RawAsyncHBaseAdmin(AsyncConnectionImpl connection) {
+    this.connection = connection;
+    this.metaTable = connection.getRawTable(META_TABLE_NAME);
+    this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
+    this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs();
+    this.pauseNs = connection.connConf.getPauseNs();
+    this.maxAttempts = connection.connConf.getMaxRetries();
+    this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt();
+    this.ng = connection.getNonceGenerator();
+  }
+
+  private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
+    return this.connection.callerFactory.<T> masterRequest()
+        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+        .startLogErrorsCnt(startLogErrorsCnt);
+  }
+
+  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
+    return this.connection.callerFactory.<T> adminRequest()
+        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+        .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+        .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+        .startLogErrorsCnt(startLogErrorsCnt);
+  }
+
+  @FunctionalInterface
+  private interface MasterRpcCall<RESP, REQ> {
+    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
+        RpcCallback<RESP> done);
+  }
+
+  @FunctionalInterface
+  private interface AdminRpcCall<RESP, REQ> {
+    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
+        RpcCallback<RESP> done);
+  }
+
+  @FunctionalInterface
+  private interface Converter<D, S> {
+    D convert(S src) throws IOException;
+  }
+
+  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
+      MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
+      Converter<RESP, PRESP> respConverter) {
+    CompletableFuture<RESP> future = new CompletableFuture<>();
+    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
+
+      @Override
+      public void run(PRESP resp) {
+        if (controller.failed()) {
+          future.completeExceptionally(controller.getFailed());
+        } else {
+          try {
+            future.complete(respConverter.convert(resp));
+          } catch (IOException e) {
+            future.completeExceptionally(e);
+          }
+        }
+      }
+    });
+    return future;
+  }
+
+  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
+      AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
+      Converter<RESP, PRESP> respConverter) {
+
+    CompletableFuture<RESP> future = new CompletableFuture<>();
+    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
+
+      @Override
+      public void run(PRESP resp) {
+        if (controller.failed()) {
+          future.completeExceptionally(new IOException(controller.errorText()));
+        } else {
+          try {
+            future.complete(respConverter.convert(resp));
+          } catch (IOException e) {
+            future.completeExceptionally(e);
+          }
+        }
+      }
+    });
+    return future;
+  }
+
+  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
+      MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
+      ProcedureBiConsumer consumer) {
+    CompletableFuture<Long> procFuture = this
+        .<Long> newMasterCaller()
+        .action(
+          (controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall,
+            respConverter)).call();
+    return waitProcedureResult(procFuture).whenComplete(consumer);
+  }
+
+  @FunctionalInterface
+  private interface TableOperator {
+    CompletableFuture<Void> operate(TableName table);
+  }
+
+  private CompletableFuture<List<TableDescriptor>> batchTableOperations(Pattern pattern,
+      TableOperator operator, String operationType) {
+    CompletableFuture<List<TableDescriptor>> future = new CompletableFuture<>();
+    List<TableDescriptor> failed = new LinkedList<>();
+    listTables(Optional.ofNullable(pattern), false).whenComplete(
+      (tables, error) -> {
+        if (error != null) {
+          future.completeExceptionally(error);
+          return;
+        }
+        CompletableFuture[] futures =
+            tables.stream()
+                .map((table) -> operator.operate(table.getTableName()).whenComplete((v, ex) -> {
+                  if (ex != null) {
+                    LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex);
+                    failed.add(table);
+                  }
+                })).<CompletableFuture> toArray(size -> new CompletableFuture[size]);
+        CompletableFuture.allOf(futures).thenAccept((v) -> {
+          future.complete(failed);
+        });
+      });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Boolean> tableExists(TableName tableName) {
+    return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
+  }
+
+  @Override
+  public CompletableFuture<List<TableDescriptor>> listTables(Optional<Pattern> pattern,
+      boolean includeSysTables) {
+    return this.<List<TableDescriptor>> newMasterCaller()
+        .action((controller, stub) -> this
+            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
+              controller, stub,
+              RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables),
+              (s, c, req, done) -> s.getTableDescriptors(c, req, done),
+              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<List<TableName>> listTableNames(Optional<Pattern> pattern,
+      boolean includeSysTables) {
+    return this.<List<TableName>> newMasterCaller()
+        .action((controller, stub) -> this
+            .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller, stub,
+              RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables),
+              (s, c, req, done) -> s.getTableNames(c, req, done),
+              (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<TableDescriptor> getTableDescriptor(TableName tableName) {
+    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
+    this.<List<TableSchema>> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
+                controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s,
+                    c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp
+                    .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> {
+          if (error != null) {
+            future.completeExceptionally(error);
+            return;
+          }
+          if (!tableSchemas.isEmpty()) {
+            future.complete(ProtobufUtil.convertToTableDesc(tableSchemas.get(0)));
+          } else {
+            future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
+          }
+        });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
+      int numRegions) {
+    try {
+      return createTable(desc, Optional.of(getSplitKeys(startKey, endKey, numRegions)));
+    } catch (IllegalArgumentException e) {
+      return failedFuture(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> createTable(TableDescriptor desc, Optional<byte[][]> splitKeys) {
+    if (desc.getTableName() == null) {
+      return failedFuture(new IllegalArgumentException("TableName cannot be null"));
+    }
+    try {
+      splitKeys.ifPresent(keys -> verifySplitKeys(keys));
+      return this.<CreateTableRequest, CreateTableResponse> procedureCall(RequestConverter
+          .buildCreateTableRequest(desc, splitKeys, ng.getNonceGroup(), ng.newNonce()), (s, c, req,
+          done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
+        new CreateTableProcedureBiConsumer(this, desc.getTableName()));
+    } catch (IllegalArgumentException e) {
+      return failedFuture(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteTable(TableName tableName) {
+    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
+        .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
+      new DeleteTableProcedureBiConsumer(this, tableName));
+  }
+
+  @Override
+  public CompletableFuture<List<TableDescriptor>> deleteTables(Pattern pattern) {
+    return batchTableOperations(pattern, (table) -> deleteTable(table), "DELETE");
+  }
+
+  @Override
+  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
+    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(
+      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
+        ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
+      (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(this, tableName));
+  }
+
+  @Override
+  public CompletableFuture<Void> enableTable(TableName tableName) {
+    return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter
+        .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
+      new EnableTableProcedureBiConsumer(this, tableName));
+  }
+
+  @Override
+  public CompletableFuture<List<TableDescriptor>> enableTables(Pattern pattern) {
+    return batchTableOperations(pattern, (table) -> enableTable(table), "ENABLE");
+  }
+
+  @Override
+  public CompletableFuture<Void> disableTable(TableName tableName) {
+    return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter
+        .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
+      new DisableTableProcedureBiConsumer(this, tableName));
+  }
+
+  @Override
+  public CompletableFuture<List<TableDescriptor>> disableTables(Pattern pattern) {
+    return batchTableOperations(pattern, (table) -> disableTable(table), "DISABLE");
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
+    CompletableFuture<Boolean> future = new CompletableFuture<>();
+    AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      if (state.isPresent()) {
+        future.complete(state.get().inStates(TableState.State.ENABLED));
+      } else {
+        future.completeExceptionally(new TableNotFoundException(tableName));
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
+    CompletableFuture<Boolean> future = new CompletableFuture<>();
+    AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      if (state.isPresent()) {
+        future.complete(state.get().inStates(TableState.State.DISABLED));
+      } else {
+        future.completeExceptionally(new TableNotFoundException(tableName));
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
+    return isTableAvailable(tableName, null);
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
+    CompletableFuture<Boolean> future = new CompletableFuture<>();
+    isTableEnabled(tableName).whenComplete(
+      (enabled, error) -> {
+        if (error != null) {
+          future.completeExceptionally(error);
+          return;
+        }
+        if (!enabled) {
+          future.complete(false);
+        } else {
+          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
+              .whenComplete(
+                (locations, error1) -> {
+                  if (error1 != null) {
+                    future.completeExceptionally(error1);
+                    return;
+                  }
+                  int notDeployed = 0;
+                  int regionCount = 0;
+                  for (HRegionLocation location : locations) {
+                    HRegionInfo info = location.getRegionInfo();
+                    if (location.getServerName() == null) {
+                      if (LOG.isDebugEnabled()) {
+                        LOG.debug("Table " + tableName + " has not deployed region "
+                            + info.getEncodedName());
+                      }
+                      notDeployed++;
+                    } else if (splitKeys != null
+                        && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
+                      for (byte[] splitKey : splitKeys) {
+                        // Just check if the splitkey is available
+                        if (Bytes.equals(info.getStartKey(), splitKey)) {
+                          regionCount++;
+                          break;
+                        }
+                      }
+                    } else {
+                      // Always empty start row should be counted
+                      regionCount++;
+                    }
+                  }
+                  if (notDeployed > 0) {
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug("Table " + tableName + " has " + notDeployed + " regions");
+                    }
+                    future.complete(false);
+                  } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug("Table " + tableName + " expected to have "
+                          + (splitKeys.length + 1) + " regions, but only " + regionCount
+                          + " available");
+                    }
+                    future.complete(false);
+                  } else {
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug("Table " + tableName + " should be available");
+                    }
+                    future.complete(true);
+                  }
+                });
+        }
+      });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Pair<Integer, Integer>> getAlterStatus(TableName tableName) {
+    return this
+        .<Pair<Integer, Integer>>newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<GetSchemaAlterStatusRequest, GetSchemaAlterStatusResponse, Pair<Integer, Integer>> call(
+                controller, stub, RequestConverter.buildGetSchemaAlterStatusRequest(tableName), (s,
+                    c, req, done) -> s.getSchemaAlterStatus(c, req, done), (resp) -> new Pair<>(
+                    resp.getYetToUpdateRegions(), resp.getTotalRegions()))).call();
+  }
+
+  @Override
+  public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
+    return this.<AddColumnRequest, AddColumnResponse> procedureCall(
+      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
+        ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
+      new AddColumnFamilyProcedureBiConsumer(this, tableName));
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
+    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(
+      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
+        ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
+      (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(this, tableName));
+  }
+
+  @Override
+  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
+      ColumnFamilyDescriptor columnFamily) {
+    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(
+      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
+        ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
+      (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(this, tableName));
+  }
+
+  @Override
+  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
+    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
+      RequestConverter.buildCreateNamespaceRequest(descriptor),
+      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
+      new CreateNamespaceProcedureBiConsumer(this, descriptor.getName()));
+  }
+
+  @Override
+  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
+    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
+      RequestConverter.buildModifyNamespaceRequest(descriptor),
+      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
+      new ModifyNamespaceProcedureBiConsumer(this, descriptor.getName()));
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteNamespace(String name) {
+    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
+      RequestConverter.buildDeleteNamespaceRequest(name),
+      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
+      new DeleteNamespaceProcedureBiConsumer(this, name));
+  }
+
+  @Override
+  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
+    return this
+        .<NamespaceDescriptor> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
+                controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
+                    req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
+                    .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
+  }
+
+  @Override
+  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
+    return this
+        .<List<NamespaceDescriptor>> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
+                controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
+                    done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
+                    .toNamespaceDescriptorList(resp))).call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> setBalancerOn(final boolean on) {
+    return this
+        .<Boolean> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller,
+                stub, RequestConverter.buildSetBalancerRunningRequest(on, true),
+                (s, c, req, done) -> s.setBalancerRunning(c, req, done),
+                (resp) -> resp.getPrevBalanceValue())).call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> balance(boolean forcible) {
+    return this
+        .<Boolean> newMasterCaller()
+        .action(
+          (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
+            stub, RequestConverter.buildBalanceRequest(forcible),
+            (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isBalancerOn() {
+    return this
+        .<Boolean> newMasterCaller()
+        .action(
+          (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
+            controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
+            (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> closeRegion(byte[] regionName, Optional<ServerName> serverName) {
+    CompletableFuture<Boolean> future = new CompletableFuture<>();
+    getRegionLocation(regionName).whenComplete((location, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      ServerName server = serverName.isPresent() ? serverName.get() : location.getServerName();
+      if (server == null) {
+        future.completeExceptionally(new NotServingRegionException(regionName));
+      } else {
+        closeRegion(location.getRegionInfo(), server).whenComplete((result, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(result);
+          }
+        });
+      }
+    });
+    return future;
+  }
+
+  private CompletableFuture<Boolean> closeRegion(HRegionInfo hri, ServerName serverName) {
+    return this
+        .<Boolean> newAdminCaller()
+        .action(
+          (controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Boolean> adminCall(
+            controller, stub,
+            ProtobufUtil.buildCloseRegionRequest(serverName, hri.getRegionName()),
+            (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> resp.getClosed()))
+        .serverName(serverName).call();
+  }
+
+  @Override
+  public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) {
+    return this.<List<HRegionInfo>> newAdminCaller()
+        .action((controller, stub) -> this
+            .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>> adminCall(
+              controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
+              (s, c, req, done) -> s.getOnlineRegion(c, req, done),
+              resp -> ProtobufUtil.getRegionInfos(resp)))
+        .serverName(sn).call();
+  }
+
+  @Override
+  public CompletableFuture<Void> flush(TableName tableName) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    tableExists(tableName).whenComplete((exists, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+      } else if (!exists) {
+        future.completeExceptionally(new TableNotFoundException(tableName));
+      } else {
+        isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else if (!tableEnabled) {
+            future.completeExceptionally(new TableNotEnabledException(tableName));
+          } else {
+            execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
+              new HashMap<>()).whenComplete((ret, err3) -> {
+                if (err3 != null) {
+                  future.completeExceptionally(err3);
+                } else {
+                  future.complete(ret);
+                }
+              });
+          }
+        });
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> flushRegion(byte[] regionName) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegionLocation(regionName).whenComplete(
+      (location, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        ServerName serverName = location.getServerName();
+        if (serverName == null) {
+          future.completeExceptionally(new NoServerForRegionException(Bytes
+              .toStringBinary(regionName)));
+          return;
+        }
+
+        HRegionInfo regionInfo = location.getRegionInfo();
+        this.<Void> newAdminCaller()
+            .serverName(serverName)
+            .action(
+              (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
+                controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
+                    .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
+                resp -> null)).call().whenComplete((ret, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(ret);
+              }
+            });
+      });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> compact(TableName tableName, Optional<byte[]> columnFamily) {
+    return compact(tableName, columnFamily, false, CompactType.NORMAL);
+  }
+
+  @Override
+  public CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
+    return compactRegion(regionName, columnFamily, false);
+  }
+
+  @Override
+  public CompletableFuture<Void> majorCompact(TableName tableName, Optional<byte[]> columnFamily) {
+    return compact(tableName, columnFamily, true, CompactType.NORMAL);
+  }
+
+  @Override
+  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
+    return compactRegion(regionName, columnFamily, true);
+  }
+
+  @Override
+  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
+    return compactRegionServer(sn, false);
+  }
+
+  @Override
+  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
+    return compactRegionServer(sn, true);
+  }
+
+  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getOnlineRegions(sn).whenComplete((hRegionInfos, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
+      if (hRegionInfos != null) {
+        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, Optional.empty())));
+      }
+      CompletableFuture
+          .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
+          .whenComplete((ret, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+            } else {
+              future.complete(ret);
+            }
+          });
+    });
+    return future;
+  }
+
+  private CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily,
+      boolean major) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegionLocation(regionName).whenComplete(
+      (location, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        ServerName serverName = location.getServerName();
+        if (serverName == null) {
+          future.completeExceptionally(new NoServerForRegionException(Bytes
+              .toStringBinary(regionName)));
+          return;
+        }
+        compact(location.getServerName(), location.getRegionInfo(), major, columnFamily)
+            .whenComplete((ret, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(ret);
+              }
+            });
+      });
+    return future;
+  }
+
+  /**
+   * List all region locations for the specific table.
+   */
+  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
+    if (TableName.META_TABLE_NAME.equals(tableName)) {
+      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
+      // For meta table, we use zk to fetch all locations.
+      AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
+      registry.getMetaRegionLocation().whenComplete(
+        (metaRegions, err) -> {
+          if (err != null) {
+            future.completeExceptionally(err);
+          } else if (metaRegions == null || metaRegions.isEmpty()
+              || metaRegions.getDefaultRegionLocation() == null) {
+            future.completeExceptionally(new IOException("meta region does not found"));
+          } else {
+            future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
+          }
+          // close the registry.
+          IOUtils.closeQuietly(registry);
+        });
+      return future;
+    } else {
+      // For non-meta table, we fetch all locations by scanning hbase:meta table
+      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName));
+    }
+  }
+
+  /**
+   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
+   */
+  private CompletableFuture<Void> compact(final TableName tableName, Optional<byte[]> columnFamily,
+      final boolean major, CompactType compactType) {
+    if (CompactType.MOB.equals(compactType)) {
+      // TODO support MOB compact.
+      return failedFuture(new UnsupportedOperationException("MOB compact does not support"));
+    }
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+        return;
+      }
+      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
+      for (HRegionLocation location : locations) {
+        if (location.getRegionInfo() == null || location.getRegionInfo().isOffline()) continue;
+        if (location.getServerName() == null) continue;
+        compactFutures
+            .add(compact(location.getServerName(), location.getRegionInfo(), major, columnFamily));
+      }
+      // future complete unless all of the compact futures are completed.
+      CompletableFuture
+          .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
+          .whenComplete((ret, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+            } else {
+              future.complete(ret);
+            }
+          });
+    });
+    return future;
+  }
+
+  /**
+   * Compact the region at specific region server.
+   */
+  private CompletableFuture<Void> compact(final ServerName sn, final HRegionInfo hri,
+      final boolean major, Optional<byte[]> columnFamily) {
+    return this
+        .<Void> newAdminCaller()
+        .serverName(sn)
+        .action(
+          (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
+            controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
+              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
+            resp -> null)).call();
+  }
+
+  private byte[] toEncodeRegionName(byte[] regionName) {
+    try {
+      return HRegionInfo.isEncodedRegionName(regionName) ? regionName
+          : Bytes.toBytes(HRegionInfo.encodeRegionName(regionName));
+    } catch (IOException e) {
+      return regionName;
+    }
+  }
+
+  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
+      CompletableFuture<TableName> result) {
+    getRegionLocation(encodeRegionName).whenComplete(
+      (location, err) -> {
+        if (err != null) {
+          result.completeExceptionally(err);
+          return;
+        }
+        HRegionInfo regionInfo = location.getRegionInfo();
+        if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+          result.completeExceptionally(new IllegalArgumentException(
+              "Can't invoke merge on non-default regions directly"));
+          return;
+        }
+        if (!tableName.compareAndSet(null, regionInfo.getTable())) {
+          if (!tableName.get().equals(regionInfo.getTable())) {
+            // tables of this two region should be same.
+            result.completeExceptionally(new IllegalArgumentException(
+                "Cannot merge regions from two different tables " + tableName.get() + " and "
+                    + regionInfo.getTable()));
+          } else {
+            result.complete(tableName.get());
+          }
+        }
+      });
+  }
+
+  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
+      byte[] encodeRegionNameB) {
+    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
+    CompletableFuture<TableName> future = new CompletableFuture<>();
+
+    checkAndGetTableName(encodeRegionNameA, tableNameRef, future);
+    checkAndGetTableName(encodeRegionNameB, tableNameRef, future);
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB,
+      boolean forcible) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
+    final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
+
+    checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB)
+        .whenComplete((tableName, err) -> {
+          if (err != null) {
+            future.completeExceptionally(err);
+            return;
+          }
+
+          MergeTableRegionsRequest request = null;
+          try {
+            request = RequestConverter.buildMergeTableRegionsRequest(
+              new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
+              ng.newNonce());
+          } catch (DeserializationException e) {
+            future.completeExceptionally(e);
+            return;
+          }
+
+          this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
+            (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
+            new MergeTableRegionProcedureBiConsumer(this, tableName)).whenComplete((ret, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(ret);
+              }
+            });
+
+        });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> split(TableName tableName) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    tableExists(tableName).whenComplete((exist, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      if (!exist) {
+        future.completeExceptionally(new TableNotFoundException(tableName));
+        return;
+      }
+      metaTable
+          .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
+              .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
+              .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION)))
+          .whenComplete((results, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+              return;
+            }
+            if (results != null && !results.isEmpty()) {
+              List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
+              for (Result r : results) {
+                if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue;
+                RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
+                if (rl != null) {
+                  for (HRegionLocation h : rl.getRegionLocations()) {
+                    if (h != null && h.getServerName() != null) {
+                      HRegionInfo hri = h.getRegionInfo();
+                      if (hri == null || hri.isSplitParent()
+                          || hri.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID)
+                        continue;
+                      splitFutures.add(split(h.getServerName(), hri, Optional.empty()));
+                    }
+                  }
+                }
+              }
+              CompletableFuture
+                  .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()]))
+                  .whenComplete((ret, exception) -> {
+                    if (exception != null) {
+                      future.completeExceptionally(exception);
+                      return;
+                    }
+                    future.complete(ret);
+                  });
+            } else {
+              future.complete(null);
+            }
+          });
+    });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
+    CompletableFuture<Void> result = new CompletableFuture<>();
+    if (splitPoint == null) {
+      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
+    }
+    connection.getRegionLocator(tableName).getRegionLocation(splitPoint)
+        .whenComplete((loc, err) -> {
+          if (err != null) {
+            result.completeExceptionally(err);
+          } else if (loc == null || loc.getRegionInfo() == null) {
+            result.completeExceptionally(new IllegalArgumentException(
+                "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
+          } else {
+            splitRegion(loc.getRegionInfo().getRegionName(), Optional.of(splitPoint))
+                .whenComplete((ret, err2) -> {
+                  if (err2 != null) {
+                    result.completeExceptionally(err2);
+                  } else {
+                    result.complete(ret);
+                  }
+
+                });
+          }
+        });
+    return result;
+  }
+
+  @Override
+  public CompletableFuture<Void> splitRegion(byte[] regionName, Optional<byte[]> splitPoint) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegionLocation(regionName).whenComplete(
+      (location, err) -> {
+        HRegionInfo regionInfo = location.getRegionInfo();
+        if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+          future.completeExceptionally(new IllegalArgumentException(
+              "Can't split replicas directly. "
+                  + "Replicas are auto-split when their primary is split."));
+          return;
+        }
+        ServerName serverName = location.getServerName();
+        if (serverName == null) {
+          future.completeExceptionally(new NoServerForRegionException(Bytes
+              .toStringBinary(regionName)));
+          return;
+        }
+        split(serverName, regionInfo, splitPoint).whenComplete((ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
+      });
+    return future;
+  }
+
+  private CompletableFuture<Void> split(final ServerName sn, final HRegionInfo hri,
+      Optional<byte[]> splitPoint) {
+    if (hri.getStartKey() != null && splitPoint.isPresent()
+        && Bytes.compareTo(hri.getStartKey(), splitPoint.get()) == 0) {
+      return failedFuture(new IllegalArgumentException(
+          "should not give a splitkey which equals to startkey!"));
+    }
+    return this
+        .<Void> newAdminCaller()
+        .action(
+          (controller, stub) -> this.<SplitRegionRequest, SplitRegionResponse, Void> adminCall(
+            controller, stub,
+            ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), splitPoint),
+            (s, c, req, done) -> s.splitRegion(controller, req, done), resp -> null))
+        .serverName(sn).call();
+  }
+
+  @Override
+  public CompletableFuture<Void> assign(byte[] regionName) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegionInfo(regionName).whenComplete(
+      (regionInfo, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        this.<Void> newMasterCaller()
+            .action(
+              ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
+                controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo
+                    .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done),
+                resp -> null))).call().whenComplete((ret, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(ret);
+              }
+            });
+      });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegionInfo(regionName).whenComplete(
+      (regionInfo, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        this.<Void> newMasterCaller()
+            .action(
+              ((controller, stub) -> this
+                  .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
+                    RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
+                    (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call()
+            .whenComplete((ret, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(ret);
+              }
+            });
+      });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> offline(byte[] regionName) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegionInfo(regionName).whenComplete(
+      (regionInfo, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        this.<Void> newMasterCaller()
+            .action(
+              ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
+                controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo
+                    .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done),
+                resp -> null))).call().whenComplete((ret, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(ret);
+              }
+            });
+      });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> move(byte[] regionName, Optional<ServerName> destServerName) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    getRegionInfo(regionName).whenComplete(
+      (regionInfo, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        this.<Void> newMasterCaller()
+            .action(
+              (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(
+                controller, stub, RequestConverter.buildMoveRegionRequest(
+                  regionInfo.getEncodedNameAsBytes(), destServerName), (s, c, req, done) -> s
+                    .moveRegion(c, req, done), resp -> null)).call().whenComplete((ret, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(ret);
+              }
+            });
+      });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
+    return this
+        .<Void> newMasterCaller()
+        .action(
+          (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
+            stub, QuotaSettings.buildSetQuotaRequestProto(quota),
+            (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
+  }
+
+  @Override
+  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
+    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
+    Scan scan = QuotaTableUtil.makeScan(filter);
+    this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
+        .scan(scan, new RawScanResultConsumer() {
+          List<QuotaSettings> settings = new ArrayList<>();
+
+          @Override
+          public void onNext(Result[] results, ScanController controller) {
+            for (Result result : results) {
+              try {
+                QuotaTableUtil.parseResultToCollection(result, settings);
+              } catch (IOException e) {
+                controller.terminate();
+                future.completeExceptionally(e);
+              }
+            }
+          }
+
+          @Override
+          public void onError(Throwable error) {
+            future.completeExceptionally(error);
+          }
+
+          @Override
+          public void onComplete() {
+            future.complete(settings);
+          }
+        });
+    return future;
+  }
+
+  public CompletableFuture<Void> addReplicationPeer(String peerId,
+      ReplicationPeerConfig peerConfig) {
+    return this
+        .<Void> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub,
+                RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req,
+                    done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
+  }
+
+  @Override
+  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
+    return this
+        .<Void> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller,
+                stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId),
+                (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call();
+  }
+
+  @Override
+  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
+    return this
+        .<Void> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller,
+                stub, RequestConverter.buildEnableReplicationPeerRequest(peerId),
+                (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call();
+  }
+
+  @Override
+  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
+    return this
+        .<Void> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call(
+                controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s,
+                    c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null))
+        .call();
+  }
+
+  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
+    return this
+        .<ReplicationPeerConfig> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
+                controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
+                    s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
+                (resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call();
+  }
+
+  @Override
+  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
+      ReplicationPeerConfig peerConfig) {
+    return this
+        .<Void> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call(
+                controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId,
+                  peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), (
+                    resp) -> null)).call();
+  }
+
+  @Override
+  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
+      Map<TableName, ? extends Collection<String>> tableCfs) {
+    if (tableCfs == null) {
+      return failedFuture(new ReplicationException("tableCfs is null"));
+    }
+
+    CompletableFuture<Void> future = new CompletableFuture<Void>();
+    getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
+      if (!completeExceptionally(future, error)) {
+        ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
+        updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
+          if (!completeExceptionally(future, error)) {
+            future.complete(result);
+          }
+        });
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
+      Map<TableName, ? extends Collection<String>> tableCfs) {
+    if (tableCfs == null) {
+      return failedFuture(new ReplicationException("tableCfs is null"));
+    }
+
+    CompletableFuture<Void> future = new CompletableFuture<Void>();
+    getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
+      if (!completeExceptionally(future, error)) {
+        try {
+          ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
+        } catch (ReplicationException e) {
+          future.completeExceptionally(e);
+          return;
+        }
+        updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
+          if (!completeExceptionally(future, error)) {
+            future.complete(result);
+          }
+        });
+      }
+    });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Optional<Pattern> pattern) {
+    return this
+        .<List<ReplicationPeerDescription>> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
+                controller,
+                stub,
+                RequestConverter.buildListReplicationPeersRequest(pattern),
+                (s, c, req, done) -> s.listReplicationPeers(c, req, done),
+                (resp) -> resp.getPeerDescList().stream()
+                    .map(ReplicationSerDeHelper::toReplicationPeerDescription)
+                    .collect(Collectors.toList()))).call();
+  }
+
+  @Override
+  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
+    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
+    listTables().whenComplete(
+      (tables, error) -> {
+        if (!completeExceptionally(future, error)) {
+          List<TableCFs> replicatedTableCFs = new ArrayList<>();
+          tables.forEach(table -> {
+            Map<String, Integer> cfs = new HashMap<>();
+            Stream.of(table.getColumnFamilies())
+                .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
+                .forEach(column -> {
+                  cfs.put(column.getNameAsString(), column.getScope());
+                });
+            if (!cfs.isEmpty()) {
+              replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
+            }
+          });
+          future.complete(replicatedTableCFs);
+        }
+      });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
+    SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil
+        .createHBaseProtosSnapshotDesc(snapshotDesc);
+    try {
+      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
+    } catch (IllegalArgumentException e) {
+      return failedFuture(e);
+    }
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
+    this.<Long> newMasterCaller()
+        .action(
+          (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
+            stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
+            resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> {
+          if (err != null) {
+            future.completeExceptionally(err);
+            return;
+          }
+          TimerTask pollingTask = new TimerTask() {
+            int tries = 0;
+            long startTime = EnvironmentEdgeManager.currentTime();
+            long endTime = startTime + expectedTimeout;
+            long maxPauseTime = expectedTimeout / maxAttempts;
+
+            @Override
+            public void run(Timeout timeout) throws Exception {
+              if (EnvironmentEdgeManager.currentTime() < endTime) {
+                isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> {
+                  if (err2 != null) {
+                    future.completeExceptionally(err2);
+                  } else if (done) {
+                    future.complete(null);
+                  } else {
+                    // retry again after pauseTime.
+                  long pauseTime = ConnectionUtils.getPauseTime(
+                    TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+                  pauseTime = Math.min(pauseTime, maxPauseTime);
+                  AsyncConnectionImpl.RETRY_TIMER
+                      .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
+                }
+              } );
+              } else {
+                future.completeExceptionally(new SnapshotCreationException("Snapshot '"
+                    + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout
+                    + " ms", snapshotDesc));
+              }
+            }
+          };
+          AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+        });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
+    return this
+        .<Boolean> newMasterCaller()
+        .action(
+          (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
+            controller,
+            stub,
+            IsSnapshotDoneRequest.newBuilder()
+                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
+                req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
+  }
+
+  @Override
+  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
+    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
+      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
+      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
+    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
+  }
+
+  @Override
+  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    listSnapshots(Optional.of(Pattern.compile(snapshotName))).whenComplete(
+      (snapshotDescriptions, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        TableName tableName = null;
+        if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
+          for (SnapshotDescription snap : snapshotDescriptions) {
+            if (snap.getName().equals(snapshotName)) {
+              tableName = snap.getTableName();
+              break;
+            }
+          }
+        }
+        if (tableName == null) {
+          future.completeExceptionally(new RestoreSnapshotException(
+              "Unable to find the table name for snapshot=" + snapshotName));
+          return;
+        }
+        final TableName finalTableName = tableName;
+        tableExists(finalTableName)
+            .whenComplete((exists, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else if (!exists) {
+                // if table does not exist, then just clone snapshot into new table.
+              completeConditionalOnFuture(future,
+                internalRestoreSnapshot(snapshotName, finalTableName));
+            } else {
+              isTableDisabled(finalTableName).whenComplete(
+                (disabled, err4) -> {
+                  if (err4 != null) {
+                    future.completeExceptionally(err4);
+                  } else if (!disabled) {
+                    future.completeExceptionally(new TableNotDisabledException(finalTableName));
+                  } else {
+                    completeConditionalOnFuture(future,
+                      restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
+                  }
+                });
+            }
+          } );
+      });
+    return future;
+  }
+
+  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
+      boolean takeFailSafeSnapshot) {
+    if (takeFailSafeSnapshot) {
+      CompletableFuture<Void> future = new CompletableFuture<>();
+      // Step.1 Take a snapshot of the current state
+      String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get(
+        HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
+        HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
+      final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat
+          .replace("{snapshot.name}", snapshotName)
+          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
+          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
+      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
+      snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+        } else {
+          // Step.2 Restore snapshot
+        internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> {
+          if (err2 != null) {
+            // Step.3.a Something went wrong during the restore and try to rollback.
+          internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete(
+            (void3, err3) -> {
+              if (err3 != null) {
+                future.completeExceptionally(err3);
+              } else {
+                String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
+                    + failSafeSnapshotSnapshotName + " succeeded.";
+                future.completeExceptionally(new RestoreSnapshotException(msg));
+              }
+            });
+        } else {
+          // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
+          LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
+          deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete(
+            (ret3, err3) -> {
+              if (err3 != null) {
+                LOG.error(
+                  "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3);
+                future.completeExceptionally(err3);
+              } else {
+                future.complete(ret3);
+              }
+            });
+        }
+      } );
+      }
+    } );
+      return future;
+    } else {
+      return internalRestoreSnapshot(snapshotName, tableName);
+    }
+  }
+
+  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
+      CompletableFuture<T> parentFuture) {
+    parentFuture.whenComplete((res, err) -> {
+      if (err != null) {
+        dependentFuture.completeExceptionally(err);
+      } else {
+        dependentFuture.complete(res);
+      }
+    });
+  }
+
+  @Override
+  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    tableExists(tableName).whenComplete((exists, err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+      } else if (exists) {
+        future.completeExceptionally(new TableExistsException(tableName));
+      } else {
+        completeConditionalOnFuture(future, internalRestoreSnapshot(snapshotName, tableName));
+      }
+    });
+    return future;
+  }
+
+  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName) {
+    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
+        .setName(snapshotName).setTable(tableName.getNameAsString()).build();
+    try {
+      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
+    } catch (IllegalArgumentException e) {
+      return failedFuture(e);
+    }
+    return waitProcedureResult(this
+        .<Long> newMasterCaller()
+        .action(
+          (controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(
+            controller, stub, RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot)
+                .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req,
+                done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())).call());
+  }
+
+  @Override
+  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Optional<Pattern> pattern) {
+    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
+    this.<GetCompletedSnapshotsResponse> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, GetCompletedSnapshotsResponse> call(
+                controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), (s, c, req,
+                    done) -> s.getCompletedSnapshots(c, req, done), resp -> resp))
+        .call()
+        .whenComplete(
+          (resp, err) -> {
+            if (err != null) {
+              future.completeExceptionally(err);
+              return;
+            }
+            future.complete(resp
+                .getSnapshotsList()
+                .stream()
+                .map(ProtobufUtil::createSnapshotDesc)
+                .filter(
+                  snap -> pattern.isPresent() ? pattern.get().matcher(snap.getName()).matches()
+                      : true).collect(Collectors.toList()));
+          });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
+      Pattern snapshotNamePattern) {
+    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
+    listTableNames(Optional.ofNullable(tableNamePattern), false).whenComplete(
+      (tableNames, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        if (tableNames == null || tableNames.size() <= 0) {
+          future.complete(Collections.emptyList());
+          return;
+        }
+        listSnapshots(Optional.ofNullable(snapshotNamePattern)).whenComplete(
+          (snapshotDescList, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+              return;
+            }
+            if (snapshotDescList == null || snapshotDescList.isEmpty()) {
+              future.complete(Collections.emptyList());
+              return;
+            }
+            future.complete(snapshotDescList.stream()
+                .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
+                .collect(Collectors.toList()));
+          });
+      });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
+    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
+    return deleteTableSnapshots(null, snapshotNamePattern);
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
+      Pattern snapshotNamePattern) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    listTableSnapshots(tableNamePattern, snapshotNamePattern).whenComplete(
+      ((snapshotDescriptions, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
+          future.complete(null);
+          return;
+        }
+        List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>();
+        snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures
+            .add(internalDeleteSnapshot(snapDesc)));
+        CompletableFuture.allOf(
+          deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()]))
+            .thenAccept(v -> future.complete(v));
+      }));
+    return future;
+  }
+
+  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
+    return this
+        .<Void> newMasterCaller()
+        .action(
+          (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
+            controller,
+            stub,
+            DeleteSnapshotRequest.newBuilder()
+                .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
+                req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
+  }
+
+  @Override
+  public CompletableFuture<Void> execProcedure(String signature, String instance,
+      Map<String, String> props) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    ProcedureDescription procDesc =
+        ProtobufUtil.buildProcedureDescription(signature, instance, props);
+    this.<Long> newMasterCaller()
+        .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
+          controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
+          (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
+        .call().whenComplete((expectedTimeout, err) -> {
+          if (err != null) {
+            future.completeExceptionally(err);
+            return;
+          }
+          TimerTask pollingTask = new TimerTask() {
+            int tries = 0;
+            long startTime = EnvironmentEdgeManager.currentTime();
+            long endTime = startTime + expectedTimeout;
+            long maxPauseTime = expectedTimeout / maxAttempts;
+
+            @Override
+            public void run(Timeout timeout) throws Exception {
+              if (EnvironmentEdgeManager.currentTime() < endTime) {
+                isProcedureFinished(signature, instance, props).whenComplete((done, err2) -> {
+                  if (err2 != null) {
+                    future.completeExceptionally(err2);
+                    return;
+                  }
+                  if (done) {
+                    future.complete(null);
+                  } else {
+                    // retry again after pauseTime.
+                    long pauseTime = ConnectionUtils
+                        .getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+                    pauseTime = Math.min(pauseTime, maxPauseTime);
+                    AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
+                      TimeUnit.MICROSECONDS);
+                  }
+                });
+              } else {
+                future.completeExceptionally(new IOException("Procedure '" + signature + " : "
+                    + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
+              }
+            }
+          };
+          // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
+          AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+        });
+    return future;
+  }
+
+  @Override
+  public CompletableFuture<byte[]> execProcedureWithRet(String signature, String instance,
+      Map<String, String> props) {
+    ProcedureDescription proDesc =
+        ProtobufUtil.buildProcedureDescription(signature, instance, props);
+    return this.<byte[]> newMasterCaller()
+        .action(
+          (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
+            controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
+            (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
+            resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
+      Map<String, String> props) {
+    ProcedureDescription proDesc =
+        ProtobufUtil.buildProcedureDescription(signature, instance, props);
+    return this.<Boolean> newMasterCaller()
+        .action((controller, stub) -> this
+            .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
+              IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
+              (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
+    return this.<Boolean> newMasterCaller().action(
+      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
+        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
+        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<List<ProcedureInfo>> listProcedures() {
+    return this
+        .<List<ProcedureInfo>> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<ListProceduresRequest, ListProceduresResponse, List<ProcedureInfo>> call(
+                controller, stub, ListProceduresRequest.newBuilder().build(),
+                (s, c, req, done) -> s.listProcedures(c, req, done),
+                resp -> resp.getProcedureList().stream().map(ProtobufUtil::toProcedureInfo)
+                    .collect(Collectors.toList()))).call();
+  }
+
+  /**
+   * Get the region location for the passed region name. The region name may be a full region name
+   * or encoded region name. If the region does not found, then it'll throw an
+   * UnknownRegionException wrapped by a {@link CompletableFuture}
+   * @param regionNameOrEncodedRegionName
+   * @return region location, wrapped by a {@link CompletableFuture}
+   */
+  @VisibleForTesting
+  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
+    if (regionNameOrEncodedRegionName == null) {
+      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
+    }
+    try {
+      CompletableFuture<Optional<HRegionLocation>> future;
+      if (HRegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
+        future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
+          regionNameOrEncodedRegionName);
+      } else {
+        future = AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
+      }
+
+      CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
+      future.whenComplete((location, err) -> {
+        if (err != null) {
+          returnedFuture.completeExceptionally(err);
+          return;
+        }
+        LOG.info("location is " + location);
+        if (!location.isPresent() || location.get().getRegionInfo() == null) {
+          LOG.info("unknown location is " + location);
+          returnedFuture.completeExceptionally(new UnknownRegionException(
+              "Invalid region name or encoded region name: "
+                  + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
+        } else {
+          returnedFuture.complete(location.get());
+        }
+      });
+      return returnedFuture;
+    } catch (IOException e) {
+      return failedFuture(e);
+    }
+  }
+
+  /**
+   * Get the region info for the passed region name. The region name may be a full region name or
+   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
+   * wrapped by a {@link CompletableFuture}
+   * @param regionNameOrEncodedRegionName
+   * @return region info, wrapped by a {@link CompletableFuture}
+   */
+  private CompletableFuture<HRegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
+    if (regionNameOrEncodedRegionName == null) {
+      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
+    }
+
+    if (Bytes.equals(regionNameOrEncodedRegionName,
+      HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
+        || Bytes.equals(regionNameOrEncodedRegionName,
+          HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
+      return

<TRUNCATED>

[5/5] hbase git commit: HBASE-18283 Provide a construct method which accept a thread pool for AsyncAdmin

Posted by zg...@apache.org.
HBASE-18283 Provide a construct method which accept a thread pool for AsyncAdmin


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/14f0423b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/14f0423b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/14f0423b

Branch: refs/heads/master
Commit: 14f0423b58256d872a1ddde1712a1fc60b4c5671
Parents: 8318a09
Author: Guanghao Zhang <zg...@apache.org>
Authored: Tue Jul 4 09:51:41 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Tue Jul 4 09:51:41 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/AsyncAdmin.java  |   67 +-
 .../hadoop/hbase/client/AsyncConnection.java    |   14 +-
 .../hbase/client/AsyncConnectionImpl.java       |    7 +-
 .../hadoop/hbase/client/AsyncHBaseAdmin.java    | 2085 +---------------
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 2278 ++++++++++++++++++
 .../hbase/shaded/protobuf/RequestConverter.java |   21 +-
 .../hadoop/hbase/client/TestAsyncAdminBase.java |   80 +-
 .../hbase/client/TestAsyncBalancerAdminApi.java |    3 +
 .../client/TestAsyncNamespaceAdminApi.java      |    3 +
 .../client/TestAsyncProcedureAdminApi.java      |    4 +-
 .../hbase/client/TestAsyncQuotaAdminApi.java    |   37 +-
 .../hbase/client/TestAsyncRegionAdminApi.java   |  737 +++---
 .../client/TestAsyncReplicationAdminApi.java    |   30 +-
 .../hbase/client/TestAsyncSnapshotAdminApi.java |   24 +-
 .../hbase/client/TestAsyncTableAdminApi.java    |  818 +++----
 15 files changed, 3288 insertions(+), 2920 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 3b022f4..ff35d46 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -47,11 +47,6 @@ import org.apache.hadoop.hbase.util.Pair;
 public interface AsyncAdmin {
 
   /**
-   * @return Async Connection used by this object.
-   */
-  AsyncConnectionImpl getConnection();
-
-  /**
    * @param tableName Table to check.
    * @return True if table exists already. The return value will be wrapped by a
    *         {@link CompletableFuture}.
@@ -105,7 +100,9 @@ public interface AsyncAdmin {
    * Creates a new table.
    * @param desc table descriptor for table
    */
-  CompletableFuture<Void> createTable(TableDescriptor desc);
+  default CompletableFuture<Void> createTable(TableDescriptor desc) {
+    return createTable(desc, Optional.empty());
+  }
 
   /**
    * Creates a new table with the specified number of regions. The start key specified will become
@@ -128,7 +125,7 @@ public interface AsyncAdmin {
    * @param desc table descriptor for table
    * @param splitKeys array of split keys for the initial regions of the table
    */
-  CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys);
+  CompletableFuture<Void> createTable(TableDescriptor desc, Optional<byte[][]> splitKeys);
 
   /**
    * Deletes a table.
@@ -188,6 +185,13 @@ public interface AsyncAdmin {
 
   /**
    * @param tableName name of table to check
+   * @return true if table is on-line. The return value will be wrapped by a
+   *         {@link CompletableFuture}.
+   */
+  CompletableFuture<Boolean> isTableEnabled(TableName tableName);
+
+  /**
+   * @param tableName name of table to check
    * @return true if table is off-line. The return value will be wrapped by a
    *         {@link CompletableFuture}.
    */
@@ -198,7 +202,9 @@ public interface AsyncAdmin {
    * @return true if all regions of the table are available. The return value will be wrapped by a
    *         {@link CompletableFuture}.
    */
-  CompletableFuture<Boolean> isTableAvailable(TableName tableName);
+  default CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
+    return isTableAvailable(tableName, null);
+  }
 
   /**
    * Use this api to check if the table has been created with the specified number of splitkeys
@@ -275,13 +281,6 @@ public interface AsyncAdmin {
   CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors();
 
   /**
-   * @param tableName name of table to check
-   * @return true if table is on-line. The return value will be wrapped by a
-   *         {@link CompletableFuture}.
-   */
-  CompletableFuture<Boolean> isTableEnabled(TableName tableName);
-
-  /**
    * Turn the load balancer on or off.
    * @param on
    * @return Previous balancer value wrapped by a {@link CompletableFuture}.
@@ -330,7 +329,7 @@ public interface AsyncAdmin {
   /**
    * Get all the online regions on a region server.
    */
-  CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn);
+  CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName);
 
   /**
    * Flush a table.
@@ -422,15 +421,15 @@ public interface AsyncAdmin {
 
   /**
    * Compact all regions on the region server.
-   * @param sn the region server name
+   * @param serverName the region server name
    */
-  CompletableFuture<Void> compactRegionServer(ServerName sn);
+  CompletableFuture<Void> compactRegionServer(ServerName serverName);
 
   /**
    * Compact all regions on the region server.
-   * @param sn the region server name
+   * @param serverName the region server name
    */
-  CompletableFuture<Void> majorCompactRegionServer(ServerName sn);
+  CompletableFuture<Void> majorCompactRegionServer(ServerName serverName);
 
   /**
    * Merge two regions.
@@ -563,18 +562,18 @@ public interface AsyncAdmin {
 
   /**
    * Append the replicable table-cf config of the specified peer
-   * @param id a short that identifies the cluster
+   * @param peerId a short that identifies the cluster
    * @param tableCfs A map from tableName to column family names
    */
-  CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
+  CompletableFuture<Void> appendReplicationPeerTableCFs(String peerId,
       Map<TableName, ? extends Collection<String>> tableCfs);
 
   /**
    * Remove some table-cfs from config of the specified peer
-   * @param id a short name that identifies the cluster
+   * @param peerId a short name that identifies the cluster
    * @param tableCfs A map from tableName to column family names
    */
-  CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
+  CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
       Map<TableName, ? extends Collection<String>> tableCfs);
 
   /**
@@ -613,7 +612,9 @@ public interface AsyncAdmin {
    * @param snapshotName name of the snapshot to be created
    * @param tableName name of the table for which snapshot is created
    */
-  CompletableFuture<Void> snapshot(String snapshotName, TableName tableName);
+  default CompletableFuture<Void> snapshot(String snapshotName, TableName tableName) {
+    return snapshot(snapshotName, tableName, SnapshotType.FLUSH);
+  }
 
   /**
    * Create typed snapshot of the table. Snapshots are considered unique based on <b>the name of the
@@ -627,8 +628,10 @@ public interface AsyncAdmin {
    * @param tableName name of the table to snapshot
    * @param type type of snapshot to take
    */
-  CompletableFuture<Void> snapshot(String snapshotName, TableName tableName,
-      SnapshotType type);
+  default CompletableFuture<Void> snapshot(String snapshotName, TableName tableName,
+      SnapshotType type) {
+    return snapshot(new SnapshotDescription(snapshotName, tableName, type));
+  }
 
   /**
    * Take a snapshot and wait for the server to complete that snapshot asynchronously. Only a single
@@ -695,14 +698,16 @@ public interface AsyncAdmin {
    * @return a list of snapshot descriptors for completed snapshots wrapped by a
    *         {@link CompletableFuture}
    */
-  CompletableFuture<List<SnapshotDescription>> listSnapshots();
+  default CompletableFuture<List<SnapshotDescription>> listSnapshots() {
+    return listSnapshots(Optional.empty());
+  }
 
   /**
    * List all the completed snapshots matching the given pattern.
    * @param pattern The compiled regular expression to match against
    * @return - returns a List of SnapshotDescription wrapped by a {@link CompletableFuture}
    */
-  CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern);
+  CompletableFuture<List<SnapshotDescription>> listSnapshots(Optional<Pattern> pattern);
 
   /**
    * List all the completed snapshots matching the given table name regular expression and snapshot
@@ -725,7 +730,9 @@ public interface AsyncAdmin {
    * Delete existing snapshots whose names match the pattern passed.
    * @param pattern pattern for names of the snapshot to match
    */
-  CompletableFuture<Void> deleteSnapshots(Pattern pattern);
+  default CompletableFuture<Void> deleteSnapshots(Pattern pattern) {
+    return deleteTableSnapshots(null, pattern);
+  }
 
   /**
    * Delete all existing snapshots matching the given table name regular expression and snapshot

http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index 65005fa..22ed064 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -96,11 +96,17 @@ public interface AsyncConnection extends Closeable {
   AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool);
 
   /**
-   * Retrieve an AsyncAdmin implementation to administer an HBase cluster. The returned AsyncAdmin
-   * is not guaranteed to be thread-safe. A new instance should be created for each using thread.
-   * This is a lightweight operation. Pooling or caching of the returned AsyncAdmin is not
-   * recommended.
+   * Retrieve an AsyncAdmin implementation to administer an HBase cluster. The returned
+   * {@code CompletableFuture} will be finished directly in the rpc framework's callback thread, so
+   * typically you should not do any time consuming work inside these methods.
    * @return an AsyncAdmin instance for cluster administration
    */
   AsyncAdmin getAdmin();
+
+  /**
+   * Retrieve an AsyncAdmin implementation to administer an HBase cluster.
+   * @param pool the thread pool to use for executing callback
+   * @return an AsyncAdmin instance for cluster administration
+   */
+  AsyncAdmin getAdmin(ExecutorService pool);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 776498a..c170bce 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -279,6 +279,11 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   @Override
   public AsyncAdmin getAdmin() {
-    return new AsyncHBaseAdmin(this);
+    return new RawAsyncHBaseAdmin(this);
+  }
+
+  @Override
+  public AsyncAdmin getAdmin(ExecutorService pool) {
+    return new AsyncHBaseAdmin(new RawAsyncHBaseAdmin(this), pool);
   }
 }
\ No newline at end of file


[2/5] hbase git commit: HBASE-18283 Provide a construct method which accept a thread pool for AsyncAdmin

Posted by zg...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 39ae6a5..dff9116 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -19,11 +19,13 @@ package org.apache.hadoop.hbase.shaded.protobuf;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -1271,19 +1273,26 @@ public final class RequestConverter {
       final byte [][] splitKeys,
       final long nonceGroup,
       final long nonce) {
+    return buildCreateTableRequest(hTableDesc, Optional.ofNullable(splitKeys), nonceGroup, nonce);
+  }
+
+  /**
+   * Creates a protocol buffer CreateTableRequest
+   * @param hTableDesc
+   * @param splitKeys
+   * @return a CreateTableRequest
+   */
+  public static CreateTableRequest buildCreateTableRequest(TableDescriptor hTableDesc,
+      Optional<byte[][]> splitKeys, long nonceGroup, long nonce) {
     CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
     builder.setTableSchema(ProtobufUtil.convertToTableSchema(hTableDesc));
-    if (splitKeys != null) {
-      for (byte [] splitKey : splitKeys) {
-        builder.addSplitKeys(UnsafeByteOperations.unsafeWrap(splitKey));
-      }
-    }
+    splitKeys.ifPresent(keys -> Arrays.stream(keys).forEach(
+      key -> builder.addSplitKeys(UnsafeByteOperations.unsafeWrap(key))));
     builder.setNonceGroup(nonceGroup);
     builder.setNonce(nonce);
     return builder.build();
   }
 
-
   /**
    * Creates a protocol buffer ModifyTableRequest
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
index cdb5433..b182563 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
@@ -19,15 +19,32 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Class to test AsyncAdmin.
@@ -36,13 +53,34 @@ public abstract class TestAsyncAdminBase {
 
   protected static final Log LOG = LogFactory.getLog(TestAsyncAdminBase.class);
   protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  protected static byte[] FAMILY = Bytes.toBytes("testFamily");
+  protected static final byte[] FAMILY = Bytes.toBytes("testFamily");
   protected static final byte[] FAMILY_0 = Bytes.toBytes("cf0");
   protected static final byte[] FAMILY_1 = Bytes.toBytes("cf1");
 
   protected static AsyncConnection ASYNC_CONN;
   protected AsyncAdmin admin;
 
+  @Parameter
+  public Supplier<AsyncAdmin> getAdmin;
+
+  private static AsyncAdmin getRawAsyncAdmin() {
+    return ASYNC_CONN.getAdmin();
+  }
+
+  private static AsyncAdmin getAsyncAdmin() {
+    return ASYNC_CONN.getAdmin(ForkJoinPool.commonPool());
+  }
+
+  @Parameters
+  public static List<Object[]> params() {
+    return Arrays.asList(new Supplier<?>[] { TestAsyncAdminBase::getRawAsyncAdmin },
+      new Supplier<?>[] { TestAsyncAdminBase::getAsyncAdmin });
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+  protected TableName tableName;
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
@@ -60,7 +98,43 @@ public abstract class TestAsyncAdminBase {
   }
 
   @Before
-  public void setUp() throws Exception {
-    this.admin = ASYNC_CONN.getAdmin();
+  public void setUp() {
+    admin = ASYNC_CONN.getAdmin();
+    String methodName = testName.getMethodName();
+    tableName = TableName.valueOf(methodName.substring(0, methodName.length() - 3));
+  }
+
+  @After
+  public void tearDown() {
+    admin.listTableNames(Optional.of(Pattern.compile(tableName.getNameAsString() + ".*")), false)
+        .whenCompleteAsync((tables, err) -> {
+          if (tables != null) {
+            tables.forEach(table -> {
+              try {
+                admin.disableTable(table).join();
+              } catch (Exception e) {
+                LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
+              }
+              admin.deleteTable(table).join();
+            });
+          }
+        }, ForkJoinPool.commonPool()).join();
+  }
+
+  protected void createTableWithDefaultConf(TableName tableName) {
+    createTableWithDefaultConf(tableName, Optional.empty());
+  }
+
+  protected void createTableWithDefaultConf(TableName tableName, Optional<byte[][]> splitKeys) {
+    createTableWithDefaultConf(tableName, splitKeys, FAMILY);
+  }
+
+  protected void createTableWithDefaultConf(TableName tableName, Optional<byte[][]> splitKeys,
+      byte[]... families) {
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+    for (byte[] family : families) {
+      builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).build());
+    }
+    admin.createTable(builder.build(), splitKeys).join();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBalancerAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBalancerAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBalancerAdminApi.java
index 00303e2..995e0aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBalancerAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBalancerAdminApi.java
@@ -23,7 +23,10 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 @Category({ MediumTests.class, ClientTests.class })
 public class TestAsyncBalancerAdminApi extends TestAsyncAdminBase {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNamespaceAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNamespaceAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNamespaceAdminApi.java
index eccff3f..dd3655e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNamespaceAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNamespaceAdminApi.java
@@ -42,10 +42,13 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Class to test asynchronous namespace admin operations.
  */
+@RunWith(Parameterized.class)
 @Category({ LargeTests.class, ClientTests.class })
 public class TestAsyncNamespaceAdminApi extends TestAsyncAdminBase {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java
index 832bcbe..12c699b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java
@@ -66,7 +66,7 @@ public class TestAsyncProcedureAdminApi extends TestAsyncAdminBase {
 
   @Test
   public void testExecProcedure() throws Exception {
-    TableName tableName = TableName.valueOf("testExecProcedure");
+    String snapshotString = "offlineTableSnapshot";
     try {
       Table table = TEST_UTIL.createTable(tableName, Bytes.toBytes("cf"));
       for (int i = 0; i < 100; i++) {
@@ -74,13 +74,13 @@ public class TestAsyncProcedureAdminApi extends TestAsyncAdminBase {
         table.put(put);
       }
       // take a snapshot of the enabled table
-      String snapshotString = "offlineTableSnapshot";
       Map<String, String> props = new HashMap<>();
       props.put("table", tableName.getNameAsString());
       admin.execProcedure(SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, snapshotString,
         props).get();
       LOG.debug("Snapshot completed.");
     } finally {
+      admin.deleteSnapshot(snapshotString).join();
       TEST_UTIL.deleteTable(tableName);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java
index ac9bc16..c39a582 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java
@@ -18,10 +18,6 @@
 
 package org.apache.hadoop.hbase.client;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.quotas.QuotaCache;
@@ -34,50 +30,35 @@ import org.apache.hadoop.hbase.quotas.ThrottleType;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+@RunWith(Parameterized.class)
 @Category({ ClientTests.class, MediumTests.class })
-public class TestAsyncQuotaAdminApi {
-  private static final Log LOG = LogFactory.getLog(TestAsyncQuotaAdminApi.class);
-
-  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  protected static AsyncConnection ASYNC_CONN;
-  protected AsyncAdmin admin;
+public class TestAsyncQuotaAdminApi extends TestAsyncAdminBase {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
     TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 2000);
-    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
-    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
-    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
-    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
-    TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
     TEST_UTIL.startMiniCluster(1);
     TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
     ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
   }
 
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    IOUtils.closeQuietly(ASYNC_CONN);
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    this.admin = ASYNC_CONN.getAdmin();
-  }
-
   @Test
   public void testThrottleType() throws Exception {
     String userName = User.getCurrent().getShortName();

http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
index a3afabc..7c8b236 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -31,11 +32,10 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.master.HMaster;
@@ -51,36 +51,29 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Class to test asynchronous region admin operations.
  */
+@RunWith(Parameterized.class)
 @Category({ LargeTests.class, ClientTests.class })
 public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
 
   public static Random RANDOM = new Random(System.currentTimeMillis());
 
-  private void createTableWithDefaultConf(TableName TABLENAME) throws Exception {
-    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
-    HColumnDescriptor hcd = new HColumnDescriptor("value");
-    htd.addFamily(hcd);
-
-    admin.createTable(htd, null).get();
-  }
-
   @Test
   public void testCloseRegion() throws Exception {
-    TableName TABLENAME = TableName.valueOf("TestHBACloseRegion");
-    createTableWithDefaultConf(TABLENAME);
+    createTableWithDefaultConf(tableName);
 
     HRegionInfo info = null;
-    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TABLENAME);
+    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
     List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
     for (HRegionInfo regionInfo : onlineRegions) {
       if (!regionInfo.getTable().isSystemTable()) {
@@ -102,16 +95,14 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
 
   @Test
   public void testCloseRegionIfInvalidRegionNameIsPassed() throws Exception {
-    final String name = "TestHBACloseRegion1";
-    byte[] TABLENAME = Bytes.toBytes(name);
-    createTableWithDefaultConf(TableName.valueOf(TABLENAME));
+    createTableWithDefaultConf(tableName);
 
     HRegionInfo info = null;
-    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TableName.valueOf(TABLENAME));
+    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
     List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
     for (HRegionInfo regionInfo : onlineRegions) {
       if (!regionInfo.isMetaTable()) {
-        if (regionInfo.getRegionNameAsString().contains(name)) {
+        if (regionInfo.getRegionNameAsString().contains(tableName.getNameAsString())) {
           info = regionInfo;
           boolean catchNotServingException = false;
           try {
@@ -132,10 +123,9 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
 
   @Test
   public void testCloseRegionWhenServerNameIsEmpty() throws Exception {
-    byte[] TABLENAME = Bytes.toBytes("TestHBACloseRegionWhenServerNameIsEmpty");
-    createTableWithDefaultConf(TableName.valueOf(TABLENAME));
+    createTableWithDefaultConf(tableName);
 
-    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TableName.valueOf(TABLENAME));
+    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
     List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
     for (HRegionInfo regionInfo : onlineRegions) {
       if (!regionInfo.isMetaTable()) {
@@ -147,166 +137,61 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
   }
 
   @Test
-  public void testGetRegion() throws Exception {
-    AsyncHBaseAdmin rawAdmin = (AsyncHBaseAdmin) admin;
-
-    final TableName tableName = TableName.valueOf("testGetRegion");
-    LOG.info("Started " + tableName);
+  public void testGetRegionLocation() throws Exception {
+    RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
     TEST_UTIL.createMultiRegionTable(tableName, HConstants.CATALOG_FAMILY);
-
-    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
-      HRegionLocation regionLocation = locator.getRegionLocation(Bytes.toBytes("mmm"));
-      HRegionInfo region = regionLocation.getRegionInfo();
-      byte[] regionName = region.getRegionName();
-      HRegionLocation location = rawAdmin.getRegionLocation(regionName).get();
-      assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName()));
-      location = rawAdmin.getRegionLocation(region.getEncodedNameAsBytes()).get();
-      assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName()));
-    }
-  }
-
-  @Test
-  public void testMergeRegions() throws Exception {
-    final TableName tableName = TableName.valueOf("testMergeRegions");
-    HColumnDescriptor cd = new HColumnDescriptor("d");
-    HTableDescriptor td = new HTableDescriptor(tableName);
-    td.addFamily(cd);
-    byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") };
-    Admin syncAdmin = TEST_UTIL.getAdmin();
-    try {
-      TEST_UTIL.createTable(td, splitRows);
-      TEST_UTIL.waitTableAvailable(tableName);
-
-      List<HRegionInfo> tableRegions;
-      HRegionInfo regionA;
-      HRegionInfo regionB;
-
-      // merge with full name
-      tableRegions = syncAdmin.getTableRegions(tableName);
-      assertEquals(3, syncAdmin.getTableRegions(tableName).size());
-      regionA = tableRegions.get(0);
-      regionB = tableRegions.get(1);
-      admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
-
-      assertEquals(2, syncAdmin.getTableRegions(tableName).size());
-
-      // merge with encoded name
-      tableRegions = syncAdmin.getTableRegions(tableName);
-      regionA = tableRegions.get(0);
-      regionB = tableRegions.get(1);
-      admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
-
-      assertEquals(1, syncAdmin.getTableRegions(tableName).size());
-    } finally {
-      syncAdmin.disableTable(tableName);
-      syncAdmin.deleteTable(tableName);
-    }
+    AsyncTableRegionLocator locator = ASYNC_CONN.getRegionLocator(tableName);
+    HRegionLocation regionLocation = locator.getRegionLocation(Bytes.toBytes("mmm")).get();
+    HRegionInfo region = regionLocation.getRegionInfo();
+    byte[] regionName = regionLocation.getRegionInfo().getRegionName();
+    HRegionLocation location = rawAdmin.getRegionLocation(regionName).get();
+    assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName()));
+    location = rawAdmin.getRegionLocation(region.getEncodedNameAsBytes()).get();
+    assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName()));
   }
 
   @Test
-  public void testSplitTable() throws Exception {
-    splitTests(TableName.valueOf("testSplitTable"), 3000, false, null);
-    splitTests(TableName.valueOf("testSplitTableWithSplitPoint"), 3000, false, Bytes.toBytes("3"));
-    splitTests(TableName.valueOf("testSplitRegion"), 3000, true, null);
-    splitTests(TableName.valueOf("testSplitRegionWithSplitPoint"), 3000, true, Bytes.toBytes("3"));
-  }
-
-  private void splitTests(TableName tableName, int rowCount, boolean isSplitRegion,
-      byte[] splitPoint) throws Exception {
-    int count = 0;
-    // create table
-    HColumnDescriptor cd = new HColumnDescriptor("d");
-    HTableDescriptor td = new HTableDescriptor(tableName);
-    td.addFamily(cd);
-    Table table = TEST_UTIL.createTable(td, null);
-    TEST_UTIL.waitTableAvailable(tableName);
-
-    List<HRegionInfo> regions = TEST_UTIL.getAdmin().getTableRegions(tableName);
-    assertEquals(regions.size(), 1);
-
-    List<Put> puts = new ArrayList<>();
-    for (int i = 0; i < rowCount; i++) {
-      Put put = new Put(Bytes.toBytes(i));
-      put.addColumn(Bytes.toBytes("d"), null, Bytes.toBytes("value" + i));
-      puts.add(put);
-    }
-    table.put(puts);
-
-    if (isSplitRegion) {
-      admin.splitRegion(regions.get(0).getRegionName(), Optional.ofNullable(splitPoint)).get();
-    } else {
-      if (splitPoint == null) {
-        admin.split(tableName).get();
-      } else {
-        admin.split(tableName, splitPoint).get();
-      }
-    }
+  public void testAssignRegionAndUnassignRegion() throws Exception {
+    createTableWithDefaultConf(tableName);
 
-    for (int i = 0; i < 45; i++) {
-      try {
-        List<HRegionInfo> hRegionInfos = TEST_UTIL.getAdmin().getTableRegions(tableName);
-        count = hRegionInfos.size();
-        if (count >= 2) {
-          break;
-        }
-        Thread.sleep(1000L);
-      } catch (Exception e) {
-        LOG.error(e);
-      }
+    // assign region.
+    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+    AssignmentManager am = master.getAssignmentManager();
+    HRegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
+
+    // assert region on server
+    RegionStates regionStates = am.getRegionStates();
+    ServerName serverName = regionStates.getRegionServerOfRegion(hri);
+    TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
+    assertTrue(regionStates.getRegionState(hri).isOpened());
+
+    // Region is assigned now. Let's assign it again.
+    // Master should not abort, and region should stay assigned.
+    admin.assign(hri.getRegionName()).get();
+    try {
+      am.waitForAssignment(hri);
+      fail("Expected NoSuchProcedureException");
+    } catch (NoSuchProcedureException e) {
+      // Expected
     }
+    assertTrue(regionStates.getRegionState(hri).isOpened());
 
-    assertEquals(count, 2);
-  }
-
-  @Test
-  public void testAssignRegionAndUnassignRegion() throws Exception {
-    final TableName tableName = TableName.valueOf("testAssignRegionAndUnassignRegion");
+    // unassign region
+    admin.unassign(hri.getRegionName(), true).get();
     try {
-      // create test table
-      HTableDescriptor desc = new HTableDescriptor(tableName);
-      desc.addFamily(new HColumnDescriptor(FAMILY));
-      admin.createTable(desc).get();
-
-      // assign region.
-      HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
-      AssignmentManager am = master.getAssignmentManager();
-      HRegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
-
-      // assert region on server
-      RegionStates regionStates = am.getRegionStates();
-      ServerName serverName = regionStates.getRegionServerOfRegion(hri);
-      TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
-      assertTrue(regionStates.getRegionState(hri).isOpened());
-
-      // Region is assigned now. Let's assign it again.
-      // Master should not abort, and region should stay assigned.
-      admin.assign(hri.getRegionName()).get();
-      try {
-        am.waitForAssignment(hri);
-        fail("Expected NoSuchProcedureException");
-      } catch (NoSuchProcedureException e) {
-        // Expected
-      }
-      assertTrue(regionStates.getRegionState(hri).isOpened());
-
-      // unassign region
-      admin.unassign(hri.getRegionName(), true).get();
-      try {
-        am.waitForAssignment(hri);
-        fail("Expected NoSuchProcedureException");
-      } catch (NoSuchProcedureException e) {
-        // Expected
-      }
-      assertTrue(regionStates.getRegionState(hri).isClosed());
-    } finally {
-      TEST_UTIL.deleteTable(tableName);
+      am.waitForAssignment(hri);
+      fail("Expected NoSuchProcedureException");
+    } catch (NoSuchProcedureException e) {
+      // Expected
     }
+    assertTrue(regionStates.getRegionState(hri).isClosed());
   }
 
   HRegionInfo createTableAndGetOneRegion(final TableName tableName)
       throws IOException, InterruptedException, ExecutionException {
-    HTableDescriptor desc = new HTableDescriptor(tableName);
-    desc.addFamily(new HColumnDescriptor(FAMILY));
+    TableDescriptor desc =
+        TableDescriptorBuilder.newBuilder(tableName)
+            .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build()).build();
     admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get();
 
     // wait till the table is assigned
@@ -333,280 +218,341 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
   // Will cause the Master to tell the regionserver to shut itself down because
   // regionserver is reporting the state as OPEN.
   public void testOfflineRegion() throws Exception {
-    final TableName tableName = TableName.valueOf("testOfflineRegion");
-    try {
-      HRegionInfo hri = createTableAndGetOneRegion(tableName);
+    HRegionInfo hri = createTableAndGetOneRegion(tableName);
 
-      RegionStates regionStates =
-          TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
-      admin.offline(hri.getRegionName()).get();
+    RegionStates regionStates =
+        TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
+    admin.offline(hri.getRegionName()).get();
 
-      long timeoutTime = System.currentTimeMillis() + 3000;
-      while (true) {
-        if (regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OFFLINE)
-            .contains(hri))
-          break;
-        long now = System.currentTimeMillis();
-        if (now > timeoutTime) {
-          fail("Failed to offline the region in time");
-          break;
-        }
-        Thread.sleep(10);
+    long timeoutTime = System.currentTimeMillis() + 3000;
+    while (true) {
+      if (regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OFFLINE)
+          .contains(hri)) break;
+      long now = System.currentTimeMillis();
+      if (now > timeoutTime) {
+        fail("Failed to offline the region in time");
+        break;
       }
-      RegionState regionState = regionStates.getRegionState(hri);
-      assertTrue(regionState.isOffline());
-    } finally {
-      TEST_UTIL.deleteTable(tableName);
+      Thread.sleep(10);
     }
+    RegionState regionState = regionStates.getRegionState(hri);
+    assertTrue(regionState.isOffline());
   }
 
   @Test
   public void testGetRegionByStateOfTable() throws Exception {
-    final TableName tableName = TableName.valueOf("testGetRegionByStateOfTable");
-    try {
-      HRegionInfo hri = createTableAndGetOneRegion(tableName);
-
-      RegionStates regionStates =
-          TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
-      assertTrue(regionStates.getRegionByStateOfTable(tableName)
-              .get(RegionState.State.OPEN)
-              .contains(hri));
-      assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom"))
-              .get(RegionState.State.OPEN)
-              .contains(hri));
-    } finally {
-      TEST_UTIL.deleteTable(tableName);
-    }
+    HRegionInfo hri = createTableAndGetOneRegion(tableName);
+
+    RegionStates regionStates =
+        TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
+    assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN)
+        .contains(hri));
+    assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom"))
+        .get(RegionState.State.OPEN).contains(hri));
   }
 
   @Test
   public void testMoveRegion() throws Exception {
-    final TableName tableName = TableName.valueOf("testMoveRegion");
-    try {
-      HRegionInfo hri = createTableAndGetOneRegion(tableName);
-
-      HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
-      RegionStates regionStates = master.getAssignmentManager().getRegionStates();
-      ServerName serverName = regionStates.getRegionServerOfRegion(hri);
-      ServerManager serverManager = master.getServerManager();
-      ServerName destServerName = null;
-      List<JVMClusterUtil.RegionServerThread> regionServers =
-          TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
-      for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
-        HRegionServer destServer = regionServer.getRegionServer();
-        destServerName = destServer.getServerName();
-        if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
-          break;
-        }
+    admin.setBalancerOn(false).join();
+
+    HRegionInfo hri = createTableAndGetOneRegion(tableName);
+    RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
+    ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
+
+    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+    ServerManager serverManager = master.getServerManager();
+    ServerName destServerName = null;
+    List<JVMClusterUtil.RegionServerThread> regionServers =
+        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
+    for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
+      HRegionServer destServer = regionServer.getRegionServer();
+      destServerName = destServer.getServerName();
+      if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
+        break;
       }
-      assertTrue(destServerName != null && !destServerName.equals(serverName));
-      admin.move(hri.getRegionName(), Optional.of(destServerName)).get();
-
-      long timeoutTime = System.currentTimeMillis() + 30000;
-      while (true) {
-        ServerName sn = regionStates.getRegionServerOfRegion(hri);
-        if (sn != null && sn.equals(destServerName)) {
-          TEST_UTIL.assertRegionOnServer(hri, sn, 200);
-          break;
-        }
-        long now = System.currentTimeMillis();
-        if (now > timeoutTime) {
-          fail("Failed to move the region in time: " + regionStates.getRegionState(hri));
-        }
-        regionStates.wait(50);
+    }
+
+    assertTrue(destServerName != null && !destServerName.equals(serverName));
+    admin.move(hri.getRegionName(), Optional.of(destServerName)).get();
+
+    long timeoutTime = System.currentTimeMillis() + 30000;
+    while (true) {
+      ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
+      if (sn != null && sn.equals(destServerName)) {
+        break;
       }
-    } finally {
-      TEST_UTIL.deleteTable(tableName);
+      long now = System.currentTimeMillis();
+      if (now > timeoutTime) {
+        fail("Failed to move the region in time: " + hri);
+      }
+      Thread.sleep(100);
     }
+    admin.setBalancerOn(true).join();
   }
 
   @Test
   public void testGetOnlineRegions() throws Exception {
-    final TableName tableName = TableName.valueOf("testGetOnlineRegions");
-    try {
-      createTableAndGetOneRegion(tableName);
-      AtomicInteger regionServerCount = new AtomicInteger(0);
-      TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
-          .map(rsThread -> rsThread.getRegionServer().getServerName()).forEach(serverName -> {
+    createTableAndGetOneRegion(tableName);
+    AtomicInteger regionServerCount = new AtomicInteger(0);
+    TEST_UTIL
+        .getHBaseCluster()
+        .getLiveRegionServerThreads()
+        .stream()
+        .map(rsThread -> rsThread.getRegionServer())
+        .forEach(
+          rs -> {
+            ServerName serverName = rs.getServerName();
             try {
-              Assert.assertEquals(admin.getOnlineRegions(serverName).get().size(),
-                TEST_UTIL.getAdmin().getOnlineRegions(serverName).size());
+              Assert.assertEquals(admin.getOnlineRegions(serverName).get().size(), rs
+                  .getOnlineRegions().size());
             } catch (Exception e) {
               fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage());
             }
             regionServerCount.incrementAndGet();
           });
-      Assert.assertEquals(regionServerCount.get(), 2);
-    } catch (Exception e) {
-      LOG.info("Exception", e);
-      throw e;
-    } finally {
-      TEST_UTIL.deleteTable(tableName);
-    }
+    Assert.assertEquals(regionServerCount.get(), 2);
   }
 
   @Test
   public void testFlushTableAndRegion() throws Exception {
-    final TableName tableName = TableName.valueOf("testFlushRegion");
-    try {
-      HRegionInfo hri = createTableAndGetOneRegion(tableName);
-      ServerName serverName = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
-          .getRegionStates().getRegionServerOfRegion(hri);
-      HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
-          .map(rsThread -> rsThread.getRegionServer())
-          .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
-      // write a put into the specific region
-      try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
-        table.put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")));
-      }
-      Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0);
-      // flush region and wait flush operation finished.
-      LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
-      admin.flushRegion(hri.getRegionName()).get();
-      LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
-      Threads.sleepWithoutInterrupt(500);
-      while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) {
-        Threads.sleep(50);
-      }
-      // check the memstore.
-      Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
+    HRegionInfo hri = createTableAndGetOneRegion(tableName);
+    ServerName serverName =
+        TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+            .getRegionServerOfRegion(hri);
+    HRegionServer regionServer =
+        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
+            .map(rsThread -> rsThread.getRegionServer())
+            .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
+
+    // write a put into the specific region
+    ASYNC_CONN.getRawTable(tableName)
+        .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")))
+        .join();
+    Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0);
+    // flush region and wait flush operation finished.
+    LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
+    admin.flushRegion(hri.getRegionName()).get();
+    LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
+    Threads.sleepWithoutInterrupt(500);
+    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) {
+      Threads.sleep(50);
+    }
+    // check the memstore.
+    Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
+
+    // write another put into the specific region
+    ASYNC_CONN.getRawTable(tableName)
+        .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")))
+        .join();
+    Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0);
+    admin.flush(tableName).get();
+    Threads.sleepWithoutInterrupt(500);
+    while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) {
+      Threads.sleep(50);
+    }
+    // check the memstore.
+    Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
+  }
 
-      // write another put into the specific region
-      try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
-        table.put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")));
-      }
-      Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0);
-      admin.flush(tableName).get();
-      Threads.sleepWithoutInterrupt(500);
-      while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) {
-        Threads.sleep(50);
+  @Test
+  public void testMergeRegions() throws Exception {
+    byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") };
+    createTableWithDefaultConf(tableName, Optional.of(splitRows));
+
+    RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
+    List<HRegionLocation> regionLocations =
+        AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+    HRegionInfo regionA;
+    HRegionInfo regionB;
+
+    // merge with full name
+    assertEquals(3, regionLocations.size());
+    regionA = regionLocations.get(0).getRegionInfo();
+    regionB = regionLocations.get(1).getRegionInfo();
+    admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
+
+    regionLocations =
+        AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+    assertEquals(2, regionLocations.size());
+    // merge with encoded name
+    regionA = regionLocations.get(0).getRegionInfo();
+    regionB = regionLocations.get(1).getRegionInfo();
+    admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
+
+    regionLocations =
+        AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+    assertEquals(1, regionLocations.size());
+  }
+
+  @Test
+  public void testSplitTable() throws Exception {
+    splitTest(TableName.valueOf("testSplitTable"), 3000, false, null);
+    splitTest(TableName.valueOf("testSplitTableWithSplitPoint"), 3000, false, Bytes.toBytes("3"));
+    splitTest(TableName.valueOf("testSplitTableRegion"), 3000, true, null);
+    splitTest(TableName.valueOf("testSplitTableRegionWithSplitPoint2"), 3000, true, Bytes.toBytes("3"));
+  }
+
+  private void
+      splitTest(TableName tableName, int rowCount, boolean isSplitRegion, byte[] splitPoint)
+          throws Exception {
+    // create table
+    createTableWithDefaultConf(tableName);
+
+    RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
+    List<HRegionLocation> regionLocations =
+        AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+    assertEquals(1, regionLocations.size());
+
+    RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
+    List<Put> puts = new ArrayList<>();
+    for (int i = 0; i < rowCount; i++) {
+      Put put = new Put(Bytes.toBytes(i));
+      put.addColumn(FAMILY, null, Bytes.toBytes("value" + i));
+      puts.add(put);
+    }
+    table.putAll(puts).join();
+
+    if (isSplitRegion) {
+      admin.splitRegion(regionLocations.get(0).getRegionInfo().getRegionName(),
+        Optional.ofNullable(splitPoint)).get();
+    } else {
+      if (splitPoint == null) {
+        admin.split(tableName).get();
+      } else {
+        admin.split(tableName, splitPoint).get();
       }
-      // check the memstore.
-      Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
-    } finally {
-      TEST_UTIL.deleteTable(tableName);
     }
-  }
 
-  @Test(timeout = 600000)
-  public void testCompactRpcAPI() throws Exception {
-    String tableName = "testCompactRpcAPI";
-    compactionTest(tableName, 8, CompactionState.MAJOR, false);
-    compactionTest(tableName, 15, CompactionState.MINOR, false);
-    compactionTest(tableName, 8, CompactionState.MAJOR, true);
-    compactionTest(tableName, 15, CompactionState.MINOR, true);
+    int count = 0;
+    for (int i = 0; i < 45; i++) {
+      try {
+        regionLocations =
+            AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
+                .get();
+        count = regionLocations.size();
+        if (count >= 2) {
+          break;
+        }
+        Thread.sleep(1000L);
+      } catch (Exception e) {
+        LOG.error(e);
+      }
+    }
+    assertEquals(count, 2);
   }
 
-  @Test(timeout = 600000)
+  @Test
   public void testCompactRegionServer() throws Exception {
-    TableName table = TableName.valueOf("testCompactRegionServer");
     byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
-    Table ht = null;
-    try {
-      ht = TEST_UTIL.createTable(table, families);
-      loadData(ht, families, 3000, 8);
-      List<HRegionServer> rsList = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
-          .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
-      List<Region> regions = new ArrayList<>();
-      rsList.forEach(rs -> regions.addAll(rs.getOnlineRegions(table)));
-      Assert.assertEquals(regions.size(), 1);
-      int countBefore = countStoreFilesInFamilies(regions, families);
-      Assert.assertTrue(countBefore > 0);
-      // Minor compaction for all region servers.
-      for (HRegionServer rs : rsList)
-        admin.compactRegionServer(rs.getServerName()).get();
-      Thread.sleep(5000);
-      int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
-      Assert.assertTrue(countAfterMinorCompaction < countBefore);
-      // Major compaction for all region servers.
-      for (HRegionServer rs : rsList)
-        admin.majorCompactRegionServer(rs.getServerName()).get();
-      Thread.sleep(5000);
-      int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
-      Assert.assertEquals(countAfterMajorCompaction, 3);
-    } finally {
-      if (ht != null) {
-        TEST_UTIL.deleteTable(table);
-      }
-    }
+    createTableWithDefaultConf(tableName, Optional.empty(), families);
+    loadData(tableName, families, 3000, 8);
+
+    List<HRegionServer> rsList =
+        TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
+            .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
+    List<Region> regions = new ArrayList<>();
+    rsList.forEach(rs -> regions.addAll(rs.getOnlineRegions(tableName)));
+    Assert.assertEquals(regions.size(), 1);
+    int countBefore = countStoreFilesInFamilies(regions, families);
+    Assert.assertTrue(countBefore > 0);
+
+    // Minor compaction for all region servers.
+    for (HRegionServer rs : rsList)
+      admin.compactRegionServer(rs.getServerName()).get();
+    Thread.sleep(5000);
+    int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
+    Assert.assertTrue(countAfterMinorCompaction < countBefore);
+
+    // Major compaction for all region servers.
+    for (HRegionServer rs : rsList)
+      admin.majorCompactRegionServer(rs.getServerName()).get();
+    Thread.sleep(5000);
+    int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
+    Assert.assertEquals(countAfterMajorCompaction, 3);
   }
 
-  private void compactionTest(final String tableName, final int flushes,
+  @Test
+  public void testCompact() throws Exception {
+    compactionTest(TableName.valueOf("testCompact1"), 8, CompactionState.MAJOR, false);
+    compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, false);
+    compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, true);
+    compactionTest(TableName.valueOf("testCompact4"), 15, CompactionState.MINOR, true);
+  }
+
+  private void compactionTest(final TableName tableName, final int flushes,
       final CompactionState expectedState, boolean singleFamily) throws Exception {
     // Create a table with regions
-    final TableName table = TableName.valueOf(tableName);
     byte[] family = Bytes.toBytes("family");
     byte[][] families =
         { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
-    Table ht = null;
-    try {
-      ht = TEST_UTIL.createTable(table, families);
-      loadData(ht, families, 3000, flushes);
-      List<Region> regions = new ArrayList<>();
-      TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads()
-          .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getOnlineRegions(table)));
-      Assert.assertEquals(regions.size(), 1);
-      int countBefore = countStoreFilesInFamilies(regions, families);
-      int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
-      assertTrue(countBefore > 0); // there should be some data files
-      if (expectedState == CompactionState.MINOR) {
-        if (singleFamily) {
-          admin.compact(table, Optional.of(family)).get();
-        } else {
-          admin.compact(table, Optional.empty()).get();
-        }
+    createTableWithDefaultConf(tableName, Optional.empty(), families);
+    loadData(tableName, families, 3000, flushes);
+
+    List<Region> regions = new ArrayList<>();
+    TEST_UTIL
+        .getHBaseCluster()
+        .getLiveRegionServerThreads()
+        .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getOnlineRegions(tableName)));
+    Assert.assertEquals(regions.size(), 1);
+
+    int countBefore = countStoreFilesInFamilies(regions, families);
+    int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
+    assertTrue(countBefore > 0); // there should be some data files
+    if (expectedState == CompactionState.MINOR) {
+      if (singleFamily) {
+        admin.compact(tableName, Optional.of(family)).get();
       } else {
-        if (singleFamily) {
-          admin.majorCompact(table, Optional.of(family)).get();
-        } else {
-          admin.majorCompact(table, Optional.empty()).get();
-        }
+        admin.compact(tableName, Optional.empty()).get();
       }
-      long curt = System.currentTimeMillis();
-      long waitTime = 5000;
-      long endt = curt + waitTime;
-      CompactionState state = TEST_UTIL.getAdmin().getCompactionState(table);
-      while (state == CompactionState.NONE && curt < endt) {
-        Thread.sleep(10);
-        state = TEST_UTIL.getAdmin().getCompactionState(table);
-        curt = System.currentTimeMillis();
-      }
-      // Now, should have the right compaction state,
-      // otherwise, the compaction should have already been done
-      if (expectedState != state) {
-        for (Region region : regions) {
-          state = CompactionState.valueOf(region.getCompactionState().toString());
-          assertEquals(CompactionState.NONE, state);
-        }
+    } else {
+      if (singleFamily) {
+        admin.majorCompact(tableName, Optional.of(family)).get();
       } else {
-        // Wait until the compaction is done
-        state = TEST_UTIL.getAdmin().getCompactionState(table);
-        while (state != CompactionState.NONE && curt < endt) {
-          Thread.sleep(10);
-          state = TEST_UTIL.getAdmin().getCompactionState(table);
-        }
-        // Now, compaction should be done.
+        admin.majorCompact(tableName, Optional.empty()).get();
+      }
+    }
+
+    long curt = System.currentTimeMillis();
+    long waitTime = 5000;
+    long endt = curt + waitTime;
+    CompactionState state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+    while (state == CompactionState.NONE && curt < endt) {
+      Thread.sleep(10);
+      state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+      curt = System.currentTimeMillis();
+    }
+    // Now, should have the right compaction state,
+    // otherwise, the compaction should have already been done
+    if (expectedState != state) {
+      for (Region region : regions) {
+        state = CompactionState.valueOf(region.getCompactionState().toString());
         assertEquals(CompactionState.NONE, state);
       }
-      int countAfter = countStoreFilesInFamilies(regions, families);
-      int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
-      assertTrue(countAfter < countBefore);
-      if (!singleFamily) {
-        if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter);
-        else assertTrue(families.length < countAfter);
-      } else {
-        int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
-        // assert only change was to single column family
-        assertTrue(singleFamDiff == (countBefore - countAfter));
-        if (expectedState == CompactionState.MAJOR) {
-          assertTrue(1 == countAfterSingleFamily);
-        } else {
-          assertTrue(1 < countAfterSingleFamily);
-        }
+    } else {
+      // Wait until the compaction is done
+      state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+      while (state != CompactionState.NONE && curt < endt) {
+        Thread.sleep(10);
+        state = TEST_UTIL.getAdmin().getCompactionState(tableName);
       }
-    } finally {
-      if (ht != null) {
-        TEST_UTIL.deleteTable(table);
+      // Now, compaction should be done.
+      assertEquals(CompactionState.NONE, state);
+    }
+
+    int countAfter = countStoreFilesInFamilies(regions, families);
+    int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
+    assertTrue(countAfter < countBefore);
+    if (!singleFamily) {
+      if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter);
+      else assertTrue(families.length < countAfter);
+    } else {
+      int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
+      // assert only change was to single column family
+      assertTrue(singleFamDiff == (countBefore - countAfter));
+      if (expectedState == CompactionState.MAJOR) {
+        assertTrue(1 == countAfterSingleFamily);
+      } else {
+        assertTrue(1 < countAfterSingleFamily);
       }
     }
   }
@@ -623,8 +569,9 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
     return count;
   }
 
-  private static void loadData(final Table ht, final byte[][] families, final int rows,
+  private static void loadData(final TableName tableName, final byte[][] families, final int rows,
       final int flushes) throws IOException {
+    RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
     List<Put> puts = new ArrayList<>(rows);
     byte[] qualifier = Bytes.toBytes("val");
     for (int i = 0; i < flushes; i++) {
@@ -636,7 +583,7 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
         }
         puts.add(p);
       }
-      ht.put(puts);
+      table.putAll(puts).join();
       TEST_UTIL.flush();
       puts.clear();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
index 9c46be9..3e577bc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
@@ -45,10 +45,13 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Class to test asynchronous replication admin operations.
  */
+@RunWith(Parameterized.class)
 @Category({LargeTests.class, ClientTests.class})
 public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
 
@@ -57,9 +60,6 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
   private final String ID_SECOND = "2";
   private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
 
-  @Rule
-  public TestName name = new TestName();
-
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
@@ -142,12 +142,12 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
   public void testAppendPeerTableCFs() throws Exception {
     ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
     rpc1.setClusterKey(KEY_ONE);
-    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
-    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
-    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
-    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
-    final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5");
-    final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6");
+    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
+    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
+    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
+    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
+    final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5");
+    final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");
 
     // Add a valid peer
     admin.addReplicationPeer(ID_ONE, rpc1).join();
@@ -244,10 +244,10 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
   public void testRemovePeerTableCFs() throws Exception {
     ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
     rpc1.setClusterKey(KEY_ONE);
-    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
-    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
-    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
-    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
+    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
+    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
+    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
+    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
     // Add a valid peer
     admin.addReplicationPeer(ID_ONE, rpc1).join();
     Map<TableName, List<String>> tableCFs = new HashMap<>();
@@ -360,8 +360,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
   public void testNamespacesAndTableCfsConfigConflict() throws Exception {
     String ns1 = "ns1";
     String ns2 = "ns2";
-    final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName());
-    final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2");
+    final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
+    final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");
 
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(KEY_ONE);

http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSnapshotAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSnapshotAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSnapshotAdminApi.java
index 3e0c261..a646287 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSnapshotAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSnapshotAdminApi.java
@@ -29,12 +29,16 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.regex.Pattern;
 
+@RunWith(Parameterized.class)
 @Category({ LargeTests.class, ClientTests.class })
 public class TestAsyncSnapshotAdminApi extends TestAsyncAdminBase {
 
@@ -42,15 +46,6 @@ public class TestAsyncSnapshotAdminApi extends TestAsyncAdminBase {
   String snapshotName2 = "snapshotName2";
   String snapshotName3 = "snapshotName3";
 
-  @Rule
-  public TestName testName = new TestName();
-  TableName tableName;
-
-  @Before
-  public void setup() {
-    tableName = TableName.valueOf(testName.getMethodName());
-  }
-
   @After
   public void cleanup() throws Exception {
     admin.deleteSnapshots(Pattern.compile(".*")).get();
@@ -175,10 +170,13 @@ public class TestAsyncSnapshotAdminApi extends TestAsyncAdminBase {
     admin.snapshot(snapshotName3, tableName).get();
     Assert.assertEquals(admin.listSnapshots().get().size(), 3);
 
-    Assert.assertEquals(admin.listSnapshots(Pattern.compile("(.*)")).get().size(), 3);
-    Assert.assertEquals(admin.listSnapshots(Pattern.compile("snapshotName(\\d+)")).get().size(), 3);
-    Assert.assertEquals(admin.listSnapshots(Pattern.compile("snapshotName[1|3]")).get().size(), 2);
-    Assert.assertEquals(admin.listSnapshots(Pattern.compile("snapshot(.*)")).get().size(), 3);
+    Assert.assertEquals(admin.listSnapshots(Optional.of(Pattern.compile("(.*)"))).get().size(), 3);
+    Assert.assertEquals(admin.listSnapshots(Optional.of(Pattern.compile("snapshotName(\\d+)")))
+        .get().size(), 3);
+    Assert.assertEquals(admin.listSnapshots(Optional.of(Pattern.compile("snapshotName[1|3]")))
+        .get().size(), 2);
+    Assert.assertEquals(admin.listSnapshots(Optional.of(Pattern.compile("snapshot(.*)"))).get()
+        .size(), 3);
     Assert.assertEquals(
       admin.listTableSnapshots(Pattern.compile("testListSnapshots"), Pattern.compile("s(.*)")).get()
           .size(),