You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/07/04 02:14:04 UTC
[1/5] hbase git commit: HBASE-18283 Provide a construct method which
accept a thread pool for AsyncAdmin
Repository: hbase
Updated Branches:
refs/heads/master 8318a092a -> 14f0423b5
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
index 6586a03..f75c346 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -32,11 +33,11 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder.ModifyableTableDescriptor;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -51,23 +53,20 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
/**
* Class to test asynchronous table admin operations.
*/
+@RunWith(Parameterized.class)
@Category({LargeTests.class, ClientTests.class})
public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
- @Rule
- public TestName name = new TestName();
-
@Test
public void testTableExist() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
boolean exist;
exist = admin.tableExists(tableName).get();
assertEquals(false, exist);
@@ -81,12 +80,12 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
@Test
public void testListTables() throws Exception {
int numTables = admin.listTables().get().size();
- final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
- final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
- final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
+ final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "1");
+ final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2");
+ final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "3");
TableName[] tables = new TableName[] { tableName1, tableName2, tableName3 };
for (int i = 0; i < tables.length; i++) {
- TEST_UTIL.createTable(tables[i], FAMILY);
+ createTableWithDefaultConf(tables[i]);
}
List<TableDescriptor> tableDescs = admin.listTables().get();
@@ -118,7 +117,8 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
}
for (int i = 0; i < tables.length; i++) {
- TEST_UTIL.deleteTable(tables[i]);
+ admin.disableTable(tables[i]).join();
+ admin.deleteTable(tables[i]).join();
}
tableDescs = admin.listTables(Optional.empty(), true).get();
@@ -127,27 +127,25 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
assertTrue("Not found system tables", tableNames.size() > 0);
}
- @Test(timeout = 300000)
+ @Test
public void testGetTableDescriptor() throws Exception {
- HColumnDescriptor fam1 = new HColumnDescriptor("fam1");
- HColumnDescriptor fam2 = new HColumnDescriptor("fam2");
- HColumnDescriptor fam3 = new HColumnDescriptor("fam3");
- HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
- htd.addFamily(fam1);
- htd.addFamily(fam2);
- htd.addFamily(fam3);
- admin.createTable(htd).join();
- TableDescriptor confirmedHtd = admin.getTableDescriptor(htd.getTableName()).get();
- assertEquals(htd.compareTo(new HTableDescriptor(confirmedHtd)), 0);
+ byte[][] families = { FAMILY, FAMILY_0, FAMILY_1 };
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+ for (byte[] family : families) {
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).build());
+ }
+ TableDescriptor desc = builder.build();
+ admin.createTable(desc).join();
+ ModifyableTableDescriptor modifyableDesc = ((ModifyableTableDescriptor) desc);
+ TableDescriptor confirmedHtd = admin.getTableDescriptor(tableName).get();
+ assertEquals(modifyableDesc.compareTo((ModifyableTableDescriptor) confirmedHtd), 0);
}
- @Test(timeout = 300000)
+ @Test
public void testCreateTable() throws Exception {
List<TableDescriptor> tables = admin.listTables().get();
int numTables = tables.size();
- final TableName tableName = TableName.valueOf(name.getMethodName());
- admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(FAMILY)))
- .join();
+ createTableWithDefaultConf(tableName);
tables = admin.listTables().get();
assertEquals(numTables + 1, tables.size());
assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
@@ -162,118 +160,102 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
return state.get().getState();
}
- @Test(timeout = 300000)
+ @Test
public void testCreateTableNumberOfRegions() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
- HTableDescriptor desc = new HTableDescriptor(tableName);
- desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
- admin.createTable(desc).join();
- List<HRegionLocation> regions;
- try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
- regions = l.getAllRegionLocations();
- assertEquals("Table should have only 1 region", 1, regions.size());
- }
+ RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
+
+ createTableWithDefaultConf(tableName);
+ List<HRegionLocation> regionLocations =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+ assertEquals("Table should have only 1 region", 1, regionLocations.size());
final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "_2");
- desc = new HTableDescriptor(tableName2);
- desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
- admin.createTable(desc, new byte[][] { new byte[] { 42 } }).join();
- try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName2)) {
- regions = l.getAllRegionLocations();
- assertEquals("Table should have only 2 region", 2, regions.size());
- }
+ createTableWithDefaultConf(tableName2, Optional.of(new byte[][] { new byte[] { 42 } }));
+ regionLocations =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName2)).get();
+ assertEquals("Table should have only 2 region", 2, regionLocations.size());
final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "_3");
- desc = new HTableDescriptor(tableName3);
- desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
- admin.createTable(desc, "a".getBytes(), "z".getBytes(), 3).join();
- try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName3)) {
- regions = l.getAllRegionLocations();
- assertEquals("Table should have only 3 region", 3, regions.size());
- }
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName3);
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
+ admin.createTable(builder.build(), "a".getBytes(), "z".getBytes(), 3).join();
+ regionLocations =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName3)).get();
+ assertEquals("Table should have only 3 region", 3, regionLocations.size());
final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "_4");
- desc = new HTableDescriptor(tableName4);
- desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
+ builder = TableDescriptorBuilder.newBuilder(tableName4);
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
try {
- admin.createTable(desc, "a".getBytes(), "z".getBytes(), 2).join();
+ admin.createTable(builder.build(), "a".getBytes(), "z".getBytes(), 2).join();
fail("Should not be able to create a table with only 2 regions using this API.");
} catch (CompletionException e) {
assertTrue(e.getCause() instanceof IllegalArgumentException);
}
final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "_5");
- desc = new HTableDescriptor(tableName5);
- desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
- admin.createTable(desc, new byte[] { 1 }, new byte[] { 127 }, 16).join();
- try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName5)) {
- regions = l.getAllRegionLocations();
- assertEquals("Table should have 16 region", 16, regions.size());
- }
+ builder = TableDescriptorBuilder.newBuilder(tableName5);
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
+ admin.createTable(builder.build(), new byte[] { 1 }, new byte[] { 127 }, 16).join();
+ regionLocations =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName5)).get();
+ assertEquals("Table should have 16 region", 16, regionLocations.size());
}
- @Test(timeout = 300000)
+ @Test
public void testCreateTableWithRegions() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
-
byte[][] splitKeys = { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, new byte[] { 3, 3, 3 },
new byte[] { 4, 4, 4 }, new byte[] { 5, 5, 5 }, new byte[] { 6, 6, 6 },
new byte[] { 7, 7, 7 }, new byte[] { 8, 8, 8 }, new byte[] { 9, 9, 9 }, };
int expectedRegions = splitKeys.length + 1;
-
- HTableDescriptor desc = new HTableDescriptor(tableName);
- desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
- admin.createTable(desc, splitKeys).join();
+ createTableWithDefaultConf(tableName, Optional.of(splitKeys));
boolean tableAvailable = admin.isTableAvailable(tableName, splitKeys).get();
assertTrue("Table should be created with splitKyes + 1 rows in META", tableAvailable);
- List<HRegionLocation> regions;
- Iterator<HRegionLocation> hris;
+ RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
+ List<HRegionLocation> regions =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+ Iterator<HRegionLocation> hris = regions.iterator();
+
+ assertEquals(
+ "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
+ expectedRegions, regions.size());
+ System.err.println("Found " + regions.size() + " regions");
+
HRegionInfo hri;
- ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
- try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
- regions = l.getAllRegionLocations();
-
- assertEquals(
- "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
- expectedRegions, regions.size());
- System.err.println("Found " + regions.size() + " regions");
-
- hris = regions.iterator();
- hri = hris.next().getRegionInfo();
- assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0);
- assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[0]));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[0]));
- assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[1]));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[1]));
- assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[2]));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[2]));
- assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[3]));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[3]));
- assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[4]));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[4]));
- assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[5]));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[5]));
- assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[6]));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[6]));
- assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[7]));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[7]));
- assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[8]));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[8]));
- assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
-
- verifyRoundRobinDistribution(conn, l, expectedRegions);
- }
+ hris = regions.iterator();
+ hri = hris.next().getRegionInfo();
+ assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0);
+ assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[0]));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[0]));
+ assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[1]));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[1]));
+ assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[2]));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[2]));
+ assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[3]));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[3]));
+ assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[4]));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[4]));
+ assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[5]));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[5]));
+ assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[6]));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[6]));
+ assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[7]));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[7]));
+ assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[8]));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[8]));
+ assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
+ verifyRoundRobinDistribution(regions, expectedRegions);
// Now test using start/end with a number of regions
@@ -283,99 +265,86 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
// Splitting into 10 regions, we expect (null,1) ... (9, null)
// with (1,2) (2,3) (3,4) (4,5) (5,6) (6,7) (7,8) (8,9) in the middle
-
expectedRegions = 10;
-
final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "_2");
-
- desc = new HTableDescriptor(tableName2);
- desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
- admin.createTable(desc, startKey, endKey, expectedRegions).join();
-
- try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName2)) {
- regions = l.getAllRegionLocations();
- assertEquals(
- "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
- expectedRegions, regions.size());
- System.err.println("Found " + regions.size() + " regions");
-
- hris = regions.iterator();
- hri = hris.next().getRegionInfo();
- assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0);
- assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }));
- assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 }));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 }));
- assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 }));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 }));
- assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 }));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 }));
- assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 }));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 }));
- assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 }));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 }));
- assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 }));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 }));
- assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 }));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 }));
- assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 }));
- hri = hris.next().getRegionInfo();
- assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 }));
- assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
-
- verifyRoundRobinDistribution(conn, l, expectedRegions);
- }
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName2);
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
+ admin.createTable(builder.build(), startKey, endKey, expectedRegions).join();
+
+ regions =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName2)).get();
+ assertEquals(
+ "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
+ expectedRegions, regions.size());
+ System.err.println("Found " + regions.size() + " regions");
+
+ hris = regions.iterator();
+ hri = hris.next().getRegionInfo();
+ assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0);
+ assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }));
+ assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 }));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 }));
+ assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 }));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 }));
+ assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 }));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 }));
+ assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 }));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 }));
+ assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 }));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 }));
+ assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 }));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 }));
+ assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 }));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 }));
+ assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 }));
+ hri = hris.next().getRegionInfo();
+ assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 }));
+ assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
+ verifyRoundRobinDistribution(regions, expectedRegions);
// Try once more with something that divides into something infinite
-
startKey = new byte[] { 0, 0, 0, 0, 0, 0 };
endKey = new byte[] { 1, 0, 0, 0, 0, 0 };
expectedRegions = 5;
-
final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "_3");
-
- desc = new HTableDescriptor(tableName3);
- desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
- admin.createTable(desc, startKey, endKey, expectedRegions).join();
-
- try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName3)) {
- regions = l.getAllRegionLocations();
- assertEquals(
- "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
- expectedRegions, regions.size());
- System.err.println("Found " + regions.size() + " regions");
-
- verifyRoundRobinDistribution(conn, l, expectedRegions);
- }
+ builder = TableDescriptorBuilder.newBuilder(tableName3);
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
+ admin.createTable(builder.build(), startKey, endKey, expectedRegions).join();
+
+ regions =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName3)).get();
+ assertEquals(
+ "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
+ expectedRegions, regions.size());
+ System.err.println("Found " + regions.size() + " regions");
+ verifyRoundRobinDistribution(regions, expectedRegions);
// Try an invalid case where there are duplicate split keys
splitKeys = new byte[][] { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 },
new byte[] { 3, 3, 3 }, new byte[] { 2, 2, 2 } };
-
- final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "_4");
- desc = new HTableDescriptor(tableName4);
- desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
+ final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "_4");;
try {
- admin.createTable(desc, splitKeys).join();
+ createTableWithDefaultConf(tableName4, Optional.of(splitKeys));
fail("Should not be able to create this table because of " + "duplicate split keys");
} catch (CompletionException e) {
assertTrue(e.getCause() instanceof IllegalArgumentException);
}
}
- private void verifyRoundRobinDistribution(ClusterConnection c, RegionLocator regionLocator,
- int expectedRegions) throws IOException {
- int numRS = c.getCurrentNrHRS();
- List<HRegionLocation> regions = regionLocator.getAllRegionLocations();
+ private void verifyRoundRobinDistribution(List<HRegionLocation> regions, int expectedRegions)
+ throws IOException {
+ int numRS = ((ClusterConnection) TEST_UTIL.getConnection()).getCurrentNrHRS();
+
Map<ServerName, List<HRegionInfo>> server2Regions = new HashMap<>();
regions.stream().forEach((loc) -> {
ServerName server = loc.getServerName();
@@ -394,103 +363,93 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
});
}
- @Test(timeout = 300000)
- public void testCreateTableWithOnlyEmptyStartRow() throws IOException {
- byte[] tableName = Bytes.toBytes(name.getMethodName());
+ @Test
+ public void testCreateTableWithOnlyEmptyStartRow() throws Exception {
byte[][] splitKeys = new byte[1][];
splitKeys[0] = HConstants.EMPTY_BYTE_ARRAY;
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
- desc.addFamily(new HColumnDescriptor("col"));
try {
- admin.createTable(desc, splitKeys).join();
+ createTableWithDefaultConf(tableName, Optional.of(splitKeys));
fail("Test case should fail as empty split key is passed.");
} catch (CompletionException e) {
assertTrue(e.getCause() instanceof IllegalArgumentException);
}
}
- @Test(timeout = 300000)
- public void testCreateTableWithEmptyRowInTheSplitKeys() throws IOException {
- byte[] tableName = Bytes.toBytes(name.getMethodName());
+ @Test
+ public void testCreateTableWithEmptyRowInTheSplitKeys() throws Exception {
byte[][] splitKeys = new byte[3][];
splitKeys[0] = "region1".getBytes();
splitKeys[1] = HConstants.EMPTY_BYTE_ARRAY;
splitKeys[2] = "region2".getBytes();
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
- desc.addFamily(new HColumnDescriptor("col"));
try {
- admin.createTable(desc, splitKeys).join();
+ createTableWithDefaultConf(tableName, Optional.of(splitKeys));
fail("Test case should fail as empty split key is passed.");
} catch (CompletionException e) {
assertTrue(e.getCause() instanceof IllegalArgumentException);
}
}
- @Test(timeout = 300000)
+ @Test
public void testDeleteTable() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
- admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(FAMILY))).join();
+ createTableWithDefaultConf(tableName);
assertTrue(admin.tableExists(tableName).get());
TEST_UTIL.getAdmin().disableTable(tableName);
admin.deleteTable(tableName).join();
assertFalse(admin.tableExists(tableName).get());
}
- @Test(timeout = 300000)
+ @Test
public void testDeleteTables() throws Exception {
- TableName[] tables = { TableName.valueOf(name.getMethodName() + "1"),
- TableName.valueOf(name.getMethodName() + "2"), TableName.valueOf(name.getMethodName() + "3") };
- Arrays.stream(tables).map(HTableDescriptor::new)
- .map((table) -> table.addFamily(new HColumnDescriptor(FAMILY))).forEach((table) -> {
- admin.createTable(table).join();
- admin.tableExists(table.getTableName()).thenAccept((exist) -> assertTrue(exist)).join();
- try {
- TEST_UTIL.getAdmin().disableTable(table.getTableName());
- } catch (Exception e) {
- }
- });
- List<TableDescriptor> failed = admin.deleteTables(Pattern.compile("testDeleteTables.*")).get();
+ TableName[] tables =
+ { TableName.valueOf(tableName.getNameAsString() + "1"),
+ TableName.valueOf(tableName.getNameAsString() + "2"),
+ TableName.valueOf(tableName.getNameAsString() + "3") };
+ Arrays.stream(tables).forEach((table) -> {
+ createTableWithDefaultConf(table);
+ admin.tableExists(table).thenAccept((exist) -> assertTrue(exist)).join();
+ admin.disableTable(table).join();
+ });
+ List<TableDescriptor> failed =
+ admin.deleteTables(Pattern.compile(tableName.getNameAsString() + ".*")).get();
assertEquals(0, failed.size());
Arrays.stream(tables).forEach((table) -> {
admin.tableExists(table).thenAccept((exist) -> assertFalse(exist)).join();
});
}
- @Test(timeout = 300000)
- public void testTruncateTable() throws IOException {
- testTruncateTable(TableName.valueOf(name.getMethodName()), false);
+ @Test
+ public void testTruncateTable() throws Exception {
+ testTruncateTable(tableName, false);
}
- @Test(timeout = 300000)
- public void testTruncateTablePreservingSplits() throws IOException {
- testTruncateTable(TableName.valueOf(name.getMethodName()), true);
+ @Test
+ public void testTruncateTablePreservingSplits() throws Exception {
+ testTruncateTable(tableName, true);
}
private void testTruncateTable(final TableName tableName, boolean preserveSplits)
- throws IOException {
+ throws Exception {
byte[][] splitKeys = new byte[2][];
splitKeys[0] = Bytes.toBytes(4);
splitKeys[1] = Bytes.toBytes(8);
// Create & Fill the table
- Table table = TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY, splitKeys);
- try {
- TEST_UTIL.loadNumericRows(table, HConstants.CATALOG_FAMILY, 0, 10);
- assertEquals(10, TEST_UTIL.countRows(table));
- } finally {
- table.close();
- }
+ createTableWithDefaultConf(tableName, Optional.of(splitKeys));
+ RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
+ int expectedRows = 10;
+ for (int i = 0; i < expectedRows; i++) {
+ byte[] data = Bytes.toBytes(String.valueOf(i));
+ Put put = new Put(data);
+ put.addColumn(FAMILY, null, data);
+ table.put(put).join();
+ }
+ assertEquals(10, table.scanAll(new Scan()).get().size());
assertEquals(3, TEST_UTIL.getHBaseCluster().getRegions(tableName).size());
// Truncate & Verify
- TEST_UTIL.getAdmin().disableTable(tableName);
+ admin.disableTable(tableName).join();
admin.truncateTable(tableName, preserveSplits).join();
- table = TEST_UTIL.getConnection().getTable(tableName);
- try {
- assertEquals(0, TEST_UTIL.countRows(table));
- } finally {
- table.close();
- }
+ assertEquals(0, table.scanAll(new Scan()).get().size());
if (preserveSplits) {
assertEquals(3, TEST_UTIL.getHBaseCluster().getRegions(tableName).size());
} else {
@@ -498,149 +457,147 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
}
}
- @Test(timeout = 300000)
+ @Test
public void testDisableAndEnableTable() throws Exception {
+ createTableWithDefaultConf(tableName);
+ RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
final byte[] row = Bytes.toBytes("row");
final byte[] qualifier = Bytes.toBytes("qualifier");
final byte[] value = Bytes.toBytes("value");
- final TableName tableName = TableName.valueOf(name.getMethodName());
- Table ht = TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
Put put = new Put(row);
- put.addColumn(HConstants.CATALOG_FAMILY, qualifier, value);
- ht.put(put);
+ put.addColumn(FAMILY, qualifier, value);
+ table.put(put).join();
Get get = new Get(row);
- get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
- ht.get(get);
+ get.addColumn(FAMILY, qualifier);
+ table.get(get).get();
- this.admin.disableTable(ht.getName()).join();
+ this.admin.disableTable(tableName).join();
assertTrue("Table must be disabled.", TEST_UTIL.getHBaseCluster().getMaster()
- .getTableStateManager().isTableState(ht.getName(), TableState.State.DISABLED));
+ .getTableStateManager().isTableState(tableName, TableState.State.DISABLED));
assertEquals(TableState.State.DISABLED, getStateFromMeta(tableName));
// Test that table is disabled
get = new Get(row);
- get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
+ get.addColumn(FAMILY, qualifier);
boolean ok = false;
try {
- ht.get(get);
- } catch (TableNotEnabledException e) {
+ table.get(get).get();
+ } catch (ExecutionException e) {
ok = true;
}
ok = false;
// verify that scan encounters correct exception
- Scan scan = new Scan();
try {
- ResultScanner scanner = ht.getScanner(scan);
- Result res = null;
- do {
- res = scanner.next();
- } while (res != null);
- } catch (TableNotEnabledException e) {
+ table.scanAll(new Scan()).get();
+ } catch (ExecutionException e) {
ok = true;
}
assertTrue(ok);
this.admin.enableTable(tableName).join();
assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
- .getTableStateManager().isTableState(ht.getName(), TableState.State.ENABLED));
+ .getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
assertEquals(TableState.State.ENABLED, getStateFromMeta(tableName));
// Test that table is enabled
try {
- ht.get(get);
- } catch (RetriesExhaustedException e) {
+ table.get(get).get();
+ } catch (Exception e) {
ok = false;
}
assertTrue(ok);
- ht.close();
}
- @Test(timeout = 300000)
+ @Test
public void testDisableAndEnableTables() throws Exception {
+ final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "1");
+ final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2");
+ createTableWithDefaultConf(tableName1);
+ createTableWithDefaultConf(tableName2);
+ RawAsyncTable table1 = ASYNC_CONN.getRawTable(tableName1);
+ RawAsyncTable table2 = ASYNC_CONN.getRawTable(tableName1);
+
final byte[] row = Bytes.toBytes("row");
final byte[] qualifier = Bytes.toBytes("qualifier");
final byte[] value = Bytes.toBytes("value");
- final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
- final TableName tableName2 = TableName.valueOf(name.getMethodName());
- Table ht1 = TEST_UTIL.createTable(tableName1, HConstants.CATALOG_FAMILY);
- Table ht2 = TEST_UTIL.createTable(tableName2, HConstants.CATALOG_FAMILY);
Put put = new Put(row);
- put.addColumn(HConstants.CATALOG_FAMILY, qualifier, value);
- ht1.put(put);
- ht2.put(put);
+ put.addColumn(FAMILY, qualifier, value);
+ table1.put(put).join();
+ table2.put(put).join();
Get get = new Get(row);
- get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
- ht1.get(get);
- ht2.get(get);
+ get.addColumn(FAMILY, qualifier);
+ table1.get(get).get();
+ table2.get(get).get();
- this.admin.disableTables(Pattern.compile("testDisableAndEnableTable.*")).join();
+ this.admin.disableTables(Pattern.compile(tableName.getNameAsString() + ".*")).join();
// Test that tables are disabled
get = new Get(row);
- get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
+ get.addColumn(FAMILY, qualifier);
boolean ok = false;
try {
- ht1.get(get);
- ht2.get(get);
- } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
+ table1.get(get).get();
+ } catch (ExecutionException e) {
ok = true;
}
+ assertTrue(ok);
+ ok = false;
+ try {
+ table2.get(get).get();
+ } catch (ExecutionException e) {
+ ok = true;
+ }
+ assertTrue(ok);
assertEquals(TableState.State.DISABLED, getStateFromMeta(tableName1));
assertEquals(TableState.State.DISABLED, getStateFromMeta(tableName2));
- assertTrue(ok);
- this.admin.enableTables(Pattern.compile("testDisableAndEnableTable.*")).join();
+ this.admin.enableTables(Pattern.compile("testDisableAndEnableTables.*")).join();
// Test that tables are enabled
try {
- ht1.get(get);
- } catch (IOException e) {
+ table1.get(get).get();
+ } catch (Exception e) {
ok = false;
}
try {
- ht2.get(get);
- } catch (IOException e) {
+ table2.get(get).get();
+ } catch (Exception e) {
ok = false;
}
assertTrue(ok);
-
- ht1.close();
- ht2.close();
-
assertEquals(TableState.State.ENABLED, getStateFromMeta(tableName1));
assertEquals(TableState.State.ENABLED, getStateFromMeta(tableName2));
}
- @Test(timeout = 300000)
+ @Test
public void testEnableTableRetainAssignment() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
- byte[][] splitKeys = { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, new byte[] { 3, 3, 3 },
- new byte[] { 4, 4, 4 }, new byte[] { 5, 5, 5 }, new byte[] { 6, 6, 6 },
- new byte[] { 7, 7, 7 }, new byte[] { 8, 8, 8 }, new byte[] { 9, 9, 9 } };
+ byte[][] splitKeys =
+ { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, new byte[] { 3, 3, 3 },
+ new byte[] { 4, 4, 4 }, new byte[] { 5, 5, 5 }, new byte[] { 6, 6, 6 },
+ new byte[] { 7, 7, 7 }, new byte[] { 8, 8, 8 }, new byte[] { 9, 9, 9 } };
int expectedRegions = splitKeys.length + 1;
- HTableDescriptor desc = new HTableDescriptor(tableName);
- desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
- admin.createTable(desc, splitKeys).join();
-
- try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
- List<HRegionLocation> regions = l.getAllRegionLocations();
-
- assertEquals(
- "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
- expectedRegions, regions.size());
- // Disable table.
- admin.disableTable(tableName).join();
- // Enable table, use retain assignment to assign regions.
- admin.enableTable(tableName).join();
- List<HRegionLocation> regions2 = l.getAllRegionLocations();
-
- // Check the assignment.
- assertEquals(regions.size(), regions2.size());
- assertTrue(regions2.containsAll(regions));
- }
+ createTableWithDefaultConf(tableName, Optional.of(splitKeys));
+
+ RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
+ List<HRegionLocation> regions =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+ assertEquals(
+ "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
+ expectedRegions, regions.size());
+
+ // Disable table.
+ admin.disableTable(tableName).join();
+ // Enable table, use retain assignment to assign regions.
+ admin.enableTable(tableName).join();
+
+ List<HRegionLocation> regions2 =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+ // Check the assignment.
+ assertEquals(regions.size(), regions2.size());
+ assertTrue(regions2.containsAll(regions));
}
- @Test(timeout = 300000)
+ @Test
public void testDisableCatalogTable() throws Exception {
try {
this.admin.disableTable(TableName.META_TABLE_NAME).join();
@@ -649,188 +606,149 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
}
// Before the fix for HBASE-6146, the below table creation was failing as the hbase:meta table
// actually getting disabled by the disableTable() call.
- HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName().getBytes()));
- HColumnDescriptor hcd = new HColumnDescriptor("cf1".getBytes());
- htd.addFamily(hcd);
- admin.createTable(htd).join();
+ createTableWithDefaultConf(tableName);
}
@Test
- public void testAddColumnFamily() throws IOException {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ public void testAddColumnFamily() throws Exception {
// Create a table with two families
- HTableDescriptor baseHtd = new HTableDescriptor(tableName);
- baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
- admin.createTable(baseHtd).join();
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).build());
+ admin.createTable(builder.build()).join();
admin.disableTable(tableName).join();
- try {
- // Verify the table descriptor
- verifyTableDescriptor(tableName, FAMILY_0);
-
- // Modify the table removing one family and verify the descriptor
- admin.addColumnFamily(tableName, new HColumnDescriptor(FAMILY_1)).join();
- verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
- } finally {
- admin.deleteTable(tableName);
- }
+ // Verify the table descriptor
+ verifyTableDescriptor(tableName, FAMILY_0);
+
+ // Modify the table removing one family and verify the descriptor
+ admin.addColumnFamily(tableName, ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).build())
+ .join();
+ verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
}
@Test
public void testAddSameColumnFamilyTwice() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
// Create a table with one families
- HTableDescriptor baseHtd = new HTableDescriptor(tableName);
- baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
- admin.createTable(baseHtd).join();
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).build());
+ admin.createTable(builder.build()).join();
admin.disableTable(tableName).join();
+ // Verify the table descriptor
+ verifyTableDescriptor(tableName, FAMILY_0);
+
+ // Modify the table removing one family and verify the descriptor
+ admin.addColumnFamily(tableName, ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).build())
+ .join();
+ verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
+
try {
- // Verify the table descriptor
- verifyTableDescriptor(tableName, FAMILY_0);
-
- // Modify the table removing one family and verify the descriptor
- this.admin.addColumnFamily(tableName, new HColumnDescriptor(FAMILY_1)).join();
- verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
-
- try {
- // Add same column family again - expect failure
- this.admin.addColumnFamily(tableName, new HColumnDescriptor(FAMILY_1)).join();
- Assert.fail("Delete a non-exist column family should fail");
- } catch (Exception e) {
- // Expected.
- }
- } finally {
- admin.deleteTable(tableName).join();
+ // Add same column family again - expect failure
+ this.admin.addColumnFamily(tableName,
+ ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).build()).join();
+ Assert.fail("Delete a non-exist column family should fail");
+ } catch (Exception e) {
+ // Expected.
}
}
@Test
public void testModifyColumnFamily() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
-
- HColumnDescriptor cfDescriptor = new HColumnDescriptor(FAMILY_0);
- int blockSize = cfDescriptor.getBlocksize();
- // Create a table with one families
- HTableDescriptor baseHtd = new HTableDescriptor(tableName);
- baseHtd.addFamily(cfDescriptor);
- admin.createTable(baseHtd).join();
+ TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
+ ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).build();
+ int blockSize = cfd.getBlocksize();
+ admin.createTable(tdBuilder.addColumnFamily(cfd).build()).join();
admin.disableTable(tableName).join();
- try {
- // Verify the table descriptor
- verifyTableDescriptor(tableName, FAMILY_0);
+ // Verify the table descriptor
+ verifyTableDescriptor(tableName, FAMILY_0);
- int newBlockSize = 2 * blockSize;
- cfDescriptor.setBlocksize(newBlockSize);
+ int newBlockSize = 2 * blockSize;
+ cfd = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).setBlocksize(newBlockSize).build();
+ // Modify colymn family
+ admin.modifyColumnFamily(tableName, cfd).join();
- // Modify colymn family
- admin.modifyColumnFamily(tableName, cfDescriptor).join();
-
- TableDescriptor htd = admin.getTableDescriptor(tableName).get();
- ColumnFamilyDescriptor hcfd = htd.getColumnFamily(FAMILY_0);
- assertTrue(hcfd.getBlocksize() == newBlockSize);
- } finally {
- admin.deleteTable(tableName).join();
- }
+ TableDescriptor htd = admin.getTableDescriptor(tableName).get();
+ ColumnFamilyDescriptor hcfd = htd.getColumnFamily(FAMILY_0);
+ assertTrue(hcfd.getBlocksize() == newBlockSize);
}
@Test
- public void testModifyNonExistingColumnFamily() throws IOException {
- final TableName tableName = TableName.valueOf(name.getMethodName());
-
- HColumnDescriptor cfDescriptor = new HColumnDescriptor(FAMILY_1);
- int blockSize = cfDescriptor.getBlocksize();
- // Create a table with one families
- HTableDescriptor baseHtd = new HTableDescriptor(tableName);
- baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
- admin.createTable(baseHtd).join();
+ public void testModifyNonExistingColumnFamily() throws Exception {
+ TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
+ ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).build();
+ int blockSize = cfd.getBlocksize();
+ admin.createTable(tdBuilder.addColumnFamily(cfd).build()).join();
admin.disableTable(tableName).join();
+ // Verify the table descriptor
+ verifyTableDescriptor(tableName, FAMILY_0);
+
+ int newBlockSize = 2 * blockSize;
+ cfd = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).setBlocksize(newBlockSize).build();
+
+ // Modify a column family that is not in the table.
try {
- // Verify the table descriptor
- verifyTableDescriptor(tableName, FAMILY_0);
-
- int newBlockSize = 2 * blockSize;
- cfDescriptor.setBlocksize(newBlockSize);
-
- // Modify a column family that is not in the table.
- try {
- admin.modifyColumnFamily(tableName, cfDescriptor).join();
- Assert.fail("Modify a non-exist column family should fail");
- } catch (Exception e) {
- // Expected.
- }
- } finally {
- admin.deleteTable(tableName).join();
+ admin.modifyColumnFamily(tableName, cfd).join();
+ Assert.fail("Modify a non-exist column family should fail");
+ } catch (Exception e) {
+ // Expected.
}
}
@Test
- public void testDeleteColumnFamily() throws IOException {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ public void testDeleteColumnFamily() throws Exception {
// Create a table with two families
- HTableDescriptor baseHtd = new HTableDescriptor(tableName);
- baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
- baseHtd.addFamily(new HColumnDescriptor(FAMILY_1));
- admin.createTable(baseHtd).join();
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).build())
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).build());
+ admin.createTable(builder.build()).join();
admin.disableTable(tableName).join();
- try {
- // Verify the table descriptor
- verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
+ // Verify the table descriptor
+ verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
- // Modify the table removing one family and verify the descriptor
- admin.deleteColumnFamily(tableName, FAMILY_1).join();
- verifyTableDescriptor(tableName, FAMILY_0);
- } finally {
- admin.deleteTable(tableName).join();
- }
+ // Modify the table removing one family and verify the descriptor
+ admin.deleteColumnFamily(tableName, FAMILY_1).join();
+ verifyTableDescriptor(tableName, FAMILY_0);
}
@Test
- public void testDeleteSameColumnFamilyTwice() throws IOException {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ public void testDeleteSameColumnFamilyTwice() throws Exception {
// Create a table with two families
- HTableDescriptor baseHtd = new HTableDescriptor(tableName);
- baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
- baseHtd.addFamily(new HColumnDescriptor(FAMILY_1));
- admin.createTable(baseHtd).join();
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_0).build())
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_1).build());
+ admin.createTable(builder.build()).join();
admin.disableTable(tableName).join();
- try {
- // Verify the table descriptor
- verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
+ // Verify the table descriptor
+ verifyTableDescriptor(tableName, FAMILY_0, FAMILY_1);
+
+ // Modify the table removing one family and verify the descriptor
+ admin.deleteColumnFamily(tableName, FAMILY_1).join();
+ verifyTableDescriptor(tableName, FAMILY_0);
- // Modify the table removing one family and verify the descriptor
+ try {
+ // Delete again - expect failure
admin.deleteColumnFamily(tableName, FAMILY_1).join();
- verifyTableDescriptor(tableName, FAMILY_0);
-
- try {
- // Delete again - expect failure
- admin.deleteColumnFamily(tableName, FAMILY_1).join();
- Assert.fail("Delete a non-exist column family should fail");
- } catch (Exception e) {
- // Expected.
- }
- } finally {
- admin.deleteTable(tableName).join();
+ Assert.fail("Delete a non-exist column family should fail");
+ } catch (Exception e) {
+ // Expected.
}
}
private void verifyTableDescriptor(final TableName tableName, final byte[]... families)
- throws IOException {
- Admin admin = TEST_UTIL.getAdmin();
-
+ throws Exception {
// Verify descriptor from master
- HTableDescriptor htd = admin.getTableDescriptor(tableName);
+ TableDescriptor htd = admin.getTableDescriptor(tableName).get();
verifyTableDescriptor(htd, tableName, families);
// Verify descriptor from HDFS
MasterFileSystem mfs = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem();
Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), tableName);
- HTableDescriptor td = FSTableDescriptors
- .getTableDescriptorFromFs(mfs.getFileSystem(), tableDir);
+ HTableDescriptor td =
+ FSTableDescriptors.getTableDescriptorFromFs(mfs.getFileSystem(), tableDir);
verifyTableDescriptor(td, tableName, families);
}
- private void verifyTableDescriptor(final HTableDescriptor htd, final TableName tableName,
+ private void verifyTableDescriptor(final TableDescriptor htd, final TableName tableName,
final byte[]... families) {
- Set<byte[]> htdFamilies = htd.getFamiliesKeys();
+ Set<byte[]> htdFamilies = htd.getColumnFamilyNames();
assertEquals(tableName, htd.getTableName());
assertEquals(families.length, htdFamilies.size());
for (byte[] familyName : families) {
@@ -840,28 +758,20 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
@Test
public void testIsTableEnabledAndDisabled() throws Exception {
- final TableName table = TableName.valueOf("testIsTableEnabledAndDisabled");
- HTableDescriptor desc = new HTableDescriptor(table);
- desc.addFamily(new HColumnDescriptor(FAMILY));
- admin.createTable(desc).join();
- assertTrue(admin.isTableEnabled(table).get());
- assertFalse(admin.isTableDisabled(table).get());
- admin.disableTable(table).join();
- assertFalse(admin.isTableEnabled(table).get());
- assertTrue(admin.isTableDisabled(table).get());
- admin.deleteTable(table).join();
+ createTableWithDefaultConf(tableName);
+ assertTrue(admin.isTableEnabled(tableName).get());
+ assertFalse(admin.isTableDisabled(tableName).get());
+ admin.disableTable(tableName).join();
+ assertFalse(admin.isTableEnabled(tableName).get());
+ assertTrue(admin.isTableDisabled(tableName).get());
}
@Test
public void testTableAvailableWithRandomSplitKeys() throws Exception {
- TableName tableName = TableName.valueOf("testTableAvailableWithRandomSplitKeys");
- HTableDescriptor desc = new HTableDescriptor(tableName);
- desc.addFamily(new HColumnDescriptor("col"));
+ createTableWithDefaultConf(tableName);
byte[][] splitKeys = new byte[1][];
splitKeys = new byte[][] { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 } };
- admin.createTable(desc).join();
boolean tableAvailable = admin.isTableAvailable(tableName, splitKeys).get();
assertFalse("Table should be created with 1 row in META", tableAvailable);
}
-
}
[4/5] hbase git commit: HBASE-18283 Provide a construct method which
accept a thread pool for AsyncAdmin
Posted by zg...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 1da660c..36fd60d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -17,174 +17,27 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
+import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import java.util.stream.Stream;
-
-import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
-import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
-import org.apache.hadoop.hbase.TableNotDisabledException;
-import org.apache.hadoop.hbase.TableNotEnabledException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
-import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
-import org.apache.hadoop.hbase.client.Scan.ReadType;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
import org.apache.hadoop.hbase.client.replication.TableCFs;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.quotas.QuotaFilter;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
-import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
-import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
-import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
-import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
/**
@@ -192,2110 +45,404 @@ import org.apache.hadoop.hbase.util.Pair;
*/
@InterfaceAudience.Private
public class AsyncHBaseAdmin implements AsyncAdmin {
- public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class);
- private final AsyncConnectionImpl connection;
-
- private final RawAsyncTable metaTable;
-
- private final long rpcTimeoutNs;
-
- private final long operationTimeoutNs;
-
- private final long pauseNs;
-
- private final int maxAttempts;
-
- private final int startLogErrorsCnt;
+ private final RawAsyncHBaseAdmin rawAdmin;
- private final NonceGenerator ng;
-
- AsyncHBaseAdmin(AsyncConnectionImpl connection) {
- this.connection = connection;
- this.metaTable = connection.getRawTable(META_TABLE_NAME);
- this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
- this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs();
- this.pauseNs = connection.connConf.getPauseNs();
- this.maxAttempts = connection.connConf.getMaxRetries();
- this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt();
- this.ng = connection.getNonceGenerator();
- }
-
- private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
- return this.connection.callerFactory.<T> masterRequest()
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
- }
-
- private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
- return this.connection.callerFactory.<T> adminRequest()
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
- }
-
- @FunctionalInterface
- private interface MasterRpcCall<RESP, REQ> {
- void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
- RpcCallback<RESP> done);
- }
-
- @FunctionalInterface
- private interface AdminRpcCall<RESP, REQ> {
- void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
- RpcCallback<RESP> done);
- }
+ private final ExecutorService pool;
- @FunctionalInterface
- private interface Converter<D, S> {
- D convert(S src) throws IOException;
+ AsyncHBaseAdmin(RawAsyncHBaseAdmin rawAdmin, ExecutorService pool) {
+ this.rawAdmin = rawAdmin;
+ this.pool = pool;
}
- private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
- MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
- Converter<RESP, PRESP> respConverter) {
- CompletableFuture<RESP> future = new CompletableFuture<>();
- rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
-
- @Override
- public void run(PRESP resp) {
- if (controller.failed()) {
- future.completeExceptionally(controller.getFailed());
- } else {
- try {
- future.complete(respConverter.convert(resp));
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
- }
- }
- });
- return future;
- }
-
- private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
- AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
- Converter<RESP, PRESP> respConverter) {
-
- CompletableFuture<RESP> future = new CompletableFuture<>();
- rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
-
- @Override
- public void run(PRESP resp) {
- if (controller.failed()) {
- future.completeExceptionally(new IOException(controller.errorText()));
- } else {
- try {
- future.complete(respConverter.convert(resp));
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
- }
+ private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
+ CompletableFuture<T> asyncFuture = new CompletableFuture<>();
+ future.whenCompleteAsync((r, e) -> {
+ if (e != null) {
+ asyncFuture.completeExceptionally(e);
+ } else {
+ asyncFuture.complete(r);
}
- });
- return future;
- }
-
- private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
- MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
- ProcedureBiConsumer consumer) {
- CompletableFuture<Long> procFuture = this
- .<Long> newMasterCaller()
- .action(
- (controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall,
- respConverter)).call();
- return waitProcedureResult(procFuture).whenComplete(consumer);
- }
-
- @FunctionalInterface
- private interface TableOperator {
- CompletableFuture<Void> operate(TableName table);
- }
-
- private CompletableFuture<List<TableDescriptor>> batchTableOperations(Pattern pattern,
- TableOperator operator, String operationType) {
- CompletableFuture<List<TableDescriptor>> future = new CompletableFuture<>();
- List<TableDescriptor> failed = new LinkedList<>();
- listTables(Optional.ofNullable(pattern), false).whenComplete(
- (tables, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- CompletableFuture[] futures =
- tables.stream()
- .map((table) -> operator.operate(table.getTableName()).whenComplete((v, ex) -> {
- if (ex != null) {
- LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex);
- failed.add(table);
- }
- })).<CompletableFuture> toArray(size -> new CompletableFuture[size]);
- CompletableFuture.allOf(futures).thenAccept((v) -> {
- future.complete(failed);
- });
- });
- return future;
- }
-
- @Override
- public AsyncConnectionImpl getConnection() {
- return this.connection;
+ }, pool);
+ return asyncFuture;
}
@Override
public CompletableFuture<Boolean> tableExists(TableName tableName) {
- return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
+ return wrap(rawAdmin.tableExists(tableName));
}
@Override
public CompletableFuture<List<TableDescriptor>> listTables(Optional<Pattern> pattern,
boolean includeSysTables) {
- return this.<List<TableDescriptor>> newMasterCaller()
- .action((controller, stub) -> this
- .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
- controller, stub,
- RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables),
- (s, c, req, done) -> s.getTableDescriptors(c, req, done),
- (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
- .call();
+ return wrap(rawAdmin.listTables(pattern, includeSysTables));
}
@Override
public CompletableFuture<List<TableName>> listTableNames(Optional<Pattern> pattern,
boolean includeSysTables) {
- return this.<List<TableName>> newMasterCaller()
- .action((controller, stub) -> this
- .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller, stub,
- RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables),
- (s, c, req, done) -> s.getTableNames(c, req, done),
- (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
- .call();
+ return wrap(rawAdmin.listTableNames(pattern, includeSysTables));
}
@Override
public CompletableFuture<TableDescriptor> getTableDescriptor(TableName tableName) {
- CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
- this.<List<TableSchema>> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
- controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s,
- c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp
- .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (!tableSchemas.isEmpty()) {
- future.complete(ProtobufUtil.convertToTableDesc(tableSchemas.get(0)));
- } else {
- future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
- }
- });
- return future;
- }
-
- @Override
- public CompletableFuture<Void> createTable(TableDescriptor desc) {
- return createTable(desc, null);
+ return wrap(rawAdmin.getTableDescriptor(tableName));
}
@Override
public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
int numRegions) {
- try {
- return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
- } catch (IllegalArgumentException e) {
- return failedFuture(e);
- }
- }
-
- @Override
- public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
- if (desc.getTableName() == null) {
- return failedFuture(new IllegalArgumentException("TableName cannot be null"));
- }
- if (splitKeys != null && splitKeys.length > 0) {
- Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
- // Verify there are no duplicate split keys
- byte[] lastKey = null;
- for (byte[] splitKey : splitKeys) {
- if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
- return failedFuture(new IllegalArgumentException(
- "Empty split key must not be passed in the split keys."));
- }
- if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
- return failedFuture(new IllegalArgumentException("All split keys must be unique, "
- + "found duplicate: " + Bytes.toStringBinary(splitKey) + ", "
- + Bytes.toStringBinary(lastKey)));
- }
- lastKey = splitKey;
- }
- }
+ return wrap(rawAdmin.createTable(desc, startKey, endKey, numRegions));
+ }
- return this.<CreateTableRequest, CreateTableResponse> procedureCall(
- RequestConverter.buildCreateTableRequest(desc, splitKeys, ng.getNonceGroup(), ng.newNonce()),
- (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
- new CreateTableProcedureBiConsumer(this, desc.getTableName()));
+ @Override
+ public CompletableFuture<Void> createTable(TableDescriptor desc, Optional<byte[][]> splitKeys) {
+ return wrap(rawAdmin.createTable(desc, splitKeys));
}
@Override
public CompletableFuture<Void> deleteTable(TableName tableName) {
- return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
- .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
- (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
- new DeleteTableProcedureBiConsumer(this, tableName));
+ return wrap(rawAdmin.deleteTable(tableName));
}
@Override
public CompletableFuture<List<TableDescriptor>> deleteTables(Pattern pattern) {
- return batchTableOperations(pattern, (table) -> deleteTable(table), "DELETE");
+ return wrap(rawAdmin.deleteTables(pattern));
}
@Override
public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
- return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(
- RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
- ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
- (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(this, tableName));
+ return wrap(rawAdmin.truncateTable(tableName, preserveSplits));
}
@Override
public CompletableFuture<Void> enableTable(TableName tableName) {
- return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter
- .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
- (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
- new EnableTableProcedureBiConsumer(this, tableName));
+ return wrap(rawAdmin.enableTable(tableName));
}
@Override
public CompletableFuture<List<TableDescriptor>> enableTables(Pattern pattern) {
- return batchTableOperations(pattern, (table) -> enableTable(table), "ENABLE");
+ return wrap(rawAdmin.enableTables(pattern));
}
@Override
public CompletableFuture<Void> disableTable(TableName tableName) {
- return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter
- .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
- (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
- new DisableTableProcedureBiConsumer(this, tableName));
+ return wrap(rawAdmin.disableTable(tableName));
}
@Override
public CompletableFuture<List<TableDescriptor>> disableTables(Pattern pattern) {
- return batchTableOperations(pattern, (table) -> disableTable(table), "DISABLE");
+ return wrap(rawAdmin.disableTables(pattern));
}
@Override
public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (state.isPresent()) {
- future.complete(state.get().inStates(TableState.State.ENABLED));
- } else {
- future.completeExceptionally(new TableNotFoundException(tableName));
- }
- });
- return future;
+ return wrap(rawAdmin.isTableEnabled(tableName));
}
@Override
public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (state.isPresent()) {
- future.complete(state.get().inStates(TableState.State.DISABLED));
- } else {
- future.completeExceptionally(new TableNotFoundException(tableName));
- }
- });
- return future;
- }
-
- @Override
- public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
- return isTableAvailable(tableName, null);
+ return wrap(rawAdmin.isTableDisabled(tableName));
}
@Override
public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- isTableEnabled(tableName).whenComplete(
- (enabled, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (!enabled) {
- future.complete(false);
- } else {
- AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
- .whenComplete(
- (locations, error1) -> {
- if (error1 != null) {
- future.completeExceptionally(error1);
- return;
- }
- int notDeployed = 0;
- int regionCount = 0;
- for (HRegionLocation location : locations) {
- HRegionInfo info = location.getRegionInfo();
- if (location.getServerName() == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Table " + tableName + " has not deployed region "
- + info.getEncodedName());
- }
- notDeployed++;
- } else if (splitKeys != null
- && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
- for (byte[] splitKey : splitKeys) {
- // Just check if the splitkey is available
- if (Bytes.equals(info.getStartKey(), splitKey)) {
- regionCount++;
- break;
- }
- }
- } else {
- // Always empty start row should be counted
- regionCount++;
- }
- }
- if (notDeployed > 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Table " + tableName + " has " + notDeployed + " regions");
- }
- future.complete(false);
- } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Table " + tableName + " expected to have "
- + (splitKeys.length + 1) + " regions, but only " + regionCount
- + " available");
- }
- future.complete(false);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Table " + tableName + " should be available");
- }
- future.complete(true);
- }
- });
- }
- });
- return future;
+ return wrap(rawAdmin.isTableAvailable(tableName, splitKeys));
}
@Override
public CompletableFuture<Pair<Integer, Integer>> getAlterStatus(TableName tableName) {
- return this
- .<Pair<Integer, Integer>>newMasterCaller()
- .action(
- (controller, stub) -> this
- .<GetSchemaAlterStatusRequest, GetSchemaAlterStatusResponse, Pair<Integer, Integer>> call(
- controller, stub, RequestConverter.buildGetSchemaAlterStatusRequest(tableName), (s,
- c, req, done) -> s.getSchemaAlterStatus(c, req, done), (resp) -> new Pair<>(
- resp.getYetToUpdateRegions(), resp.getTotalRegions()))).call();
+ return wrap(rawAdmin.getAlterStatus(tableName));
}
@Override
- public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
- return this.<AddColumnRequest, AddColumnResponse> procedureCall(
- RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
- ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
- new AddColumnFamilyProcedureBiConsumer(this, tableName));
+ public CompletableFuture<Void> addColumnFamily(TableName tableName,
+ ColumnFamilyDescriptor columnFamily) {
+ return wrap(rawAdmin.addColumnFamily(tableName, columnFamily));
}
@Override
public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
- return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(
- RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
- ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
- (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(this, tableName));
+ return wrap(rawAdmin.deleteColumnFamily(tableName, columnFamily));
}
@Override
public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
ColumnFamilyDescriptor columnFamily) {
- return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(
- RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
- ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
- (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(this, tableName));
+ return wrap(rawAdmin.modifyColumnFamily(tableName, columnFamily));
}
@Override
public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
- return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
- RequestConverter.buildCreateNamespaceRequest(descriptor),
- (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
- new CreateNamespaceProcedureBiConsumer(this, descriptor.getName()));
+ return wrap(rawAdmin.createNamespace(descriptor));
}
@Override
public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
- return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
- RequestConverter.buildModifyNamespaceRequest(descriptor),
- (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
- new ModifyNamespaceProcedureBiConsumer(this, descriptor.getName()));
+ return wrap(rawAdmin.modifyNamespace(descriptor));
}
@Override
public CompletableFuture<Void> deleteNamespace(String name) {
- return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
- RequestConverter.buildDeleteNamespaceRequest(name),
- (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
- new DeleteNamespaceProcedureBiConsumer(this, name));
+ return wrap(rawAdmin.deleteNamespace(name));
}
@Override
public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
- return this
- .<NamespaceDescriptor> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
- controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
- req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
- .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
+ return wrap(rawAdmin.getNamespaceDescriptor(name));
}
@Override
public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
- return this
- .<List<NamespaceDescriptor>> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
- controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
- done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
- .toNamespaceDescriptorList(resp))).call();
+ return wrap(rawAdmin.listNamespaceDescriptors());
}
@Override
- public CompletableFuture<Boolean> setBalancerOn(final boolean on) {
- return this
- .<Boolean> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller,
- stub, RequestConverter.buildSetBalancerRunningRequest(on, true),
- (s, c, req, done) -> s.setBalancerRunning(c, req, done),
- (resp) -> resp.getPrevBalanceValue())).call();
+ public CompletableFuture<Boolean> setBalancerOn(boolean on) {
+ return wrap(rawAdmin.setBalancerOn(on));
}
@Override
public CompletableFuture<Boolean> balance(boolean forcible) {
- return this
- .<Boolean> newMasterCaller()
- .action(
- (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
- stub, RequestConverter.buildBalanceRequest(forcible),
- (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
+ return wrap(rawAdmin.balance(forcible));
}
@Override
public CompletableFuture<Boolean> isBalancerOn() {
- return this
- .<Boolean> newMasterCaller()
- .action(
- (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
- controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
- (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
- .call();
+ return wrap(rawAdmin.isBalancerOn());
}
@Override
public CompletableFuture<Boolean> closeRegion(byte[] regionName, Optional<ServerName> serverName) {
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete((location, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- ServerName server = serverName.isPresent() ? serverName.get() : location.getServerName();
- if (server == null) {
- future.completeExceptionally(new NotServingRegionException(regionName));
- } else {
- closeRegion(location.getRegionInfo(), server).whenComplete((result, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(result);
- }
- });
- }
- });
- return future;
- }
-
- private CompletableFuture<Boolean> closeRegion(HRegionInfo hri, ServerName serverName) {
- return this
- .<Boolean> newAdminCaller()
- .action(
- (controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Boolean> adminCall(
- controller, stub,
- ProtobufUtil.buildCloseRegionRequest(serverName, hri.getRegionName()),
- (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> resp.getClosed()))
- .serverName(serverName).call();
+ return wrap(rawAdmin.closeRegion(regionName, serverName));
}
@Override
- public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) {
- return this.<List<HRegionInfo>> newAdminCaller()
- .action((controller, stub) -> this
- .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>> adminCall(
- controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
- (s, c, req, done) -> s.getOnlineRegion(c, req, done),
- resp -> ProtobufUtil.getRegionInfos(resp)))
- .serverName(sn).call();
+ public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName) {
+ return wrap(rawAdmin.getOnlineRegions(serverName));
}
@Override
public CompletableFuture<Void> flush(TableName tableName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- tableExists(tableName).whenComplete((exists, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (!exists) {
- future.completeExceptionally(new TableNotFoundException(tableName));
- } else {
- isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (!tableEnabled) {
- future.completeExceptionally(new TableNotEnabledException(tableName));
- } else {
- execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
- new HashMap<>()).whenComplete((ret, err3) -> {
- if (err3 != null) {
- future.completeExceptionally(err3);
- } else {
- future.complete(ret);
- }
- });
- }
- });
- }
- });
- return future;
+ return wrap(rawAdmin.flush(tableName));
}
@Override
public CompletableFuture<Void> flushRegion(byte[] regionName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
- }
-
- HRegionInfo regionInfo = location.getRegionInfo();
- this.<Void> newAdminCaller()
- .serverName(serverName)
- .action(
- (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
- controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
- .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
- resp -> null)).call().whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
+ return wrap(rawAdmin.flushRegion(regionName));
}
@Override
public CompletableFuture<Void> compact(TableName tableName, Optional<byte[]> columnFamily) {
- return compact(tableName, columnFamily, false, CompactType.NORMAL);
+ return wrap(rawAdmin.compact(tableName, columnFamily));
}
@Override
public CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
- return compactRegion(regionName, columnFamily, false);
+ return wrap(rawAdmin.compactRegion(regionName, columnFamily));
}
@Override
public CompletableFuture<Void> majorCompact(TableName tableName, Optional<byte[]> columnFamily) {
- return compact(tableName, columnFamily, true, CompactType.NORMAL);
+ return wrap(rawAdmin.majorCompact(tableName, columnFamily));
}
@Override
- public CompletableFuture<Void> majorCompactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
- return compactRegion(regionName, columnFamily, true);
+ public CompletableFuture<Void>
+ majorCompactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
+ return wrap(rawAdmin.majorCompactRegion(regionName, columnFamily));
}
@Override
- public CompletableFuture<Void> compactRegionServer(ServerName sn) {
- return compactRegionServer(sn, false);
+ public CompletableFuture<Void> compactRegionServer(ServerName serverName) {
+ return wrap(rawAdmin.compactRegionServer(serverName));
}
@Override
- public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
- return compactRegionServer(sn, true);
- }
-
- private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getOnlineRegions(sn).whenComplete((hRegionInfos, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
- if (hRegionInfos != null) {
- hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, Optional.empty())));
- }
- CompletableFuture
- .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
- }
-
- private CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily,
- boolean major) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
- }
- compact(location.getServerName(), location.getRegionInfo(), major, columnFamily)
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
- }
-
- /**
- * List all region locations for the specific table.
- */
- private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
- if (TableName.META_TABLE_NAME.equals(tableName)) {
- CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
- // For meta table, we use zk to fetch all locations.
- AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
- registry.getMetaRegionLocation().whenComplete(
- (metaRegions, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (metaRegions == null || metaRegions.isEmpty()
- || metaRegions.getDefaultRegionLocation() == null) {
- future.completeExceptionally(new IOException("meta region does not found"));
- } else {
- future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
- }
- // close the registry.
- IOUtils.closeQuietly(registry);
- });
- return future;
- } else {
- // For non-meta table, we fetch all locations by scanning hbase:meta table
- return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName));
- }
- }
-
- /**
- * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
- */
- private CompletableFuture<Void> compact(final TableName tableName, Optional<byte[]> columnFamily,
- final boolean major, CompactType compactType) {
- if (CompactType.MOB.equals(compactType)) {
- // TODO support MOB compact.
- return failedFuture(new UnsupportedOperationException("MOB compact does not support"));
- }
- CompletableFuture<Void> future = new CompletableFuture<>();
- getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
- for (HRegionLocation location : locations) {
- if (location.getRegionInfo() == null || location.getRegionInfo().isOffline()) continue;
- if (location.getServerName() == null) continue;
- compactFutures
- .add(compact(location.getServerName(), location.getRegionInfo(), major, columnFamily));
- }
- // future complete unless all of the compact futures are completed.
- CompletableFuture
- .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
- }
-
- /**
- * Compact the region at specific region server.
- */
- private CompletableFuture<Void> compact(final ServerName sn, final HRegionInfo hri,
- final boolean major, Optional<byte[]> columnFamily) {
- return this
- .<Void> newAdminCaller()
- .serverName(sn)
- .action(
- (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
- controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
- major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
- resp -> null)).call();
- }
-
- private byte[] toEncodeRegionName(byte[] regionName) {
- try {
- return HRegionInfo.isEncodedRegionName(regionName) ? regionName
- : Bytes.toBytes(HRegionInfo.encodeRegionName(regionName));
- } catch (IOException e) {
- return regionName;
- }
- }
-
- private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
- CompletableFuture<TableName> result) {
- getRegionLocation(encodeRegionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- result.completeExceptionally(err);
- return;
- }
- HRegionInfo regionInfo = location.getRegionInfo();
- if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
- result.completeExceptionally(new IllegalArgumentException(
- "Can't invoke merge on non-default regions directly"));
- return;
- }
- if (!tableName.compareAndSet(null, regionInfo.getTable())) {
- if (!tableName.get().equals(regionInfo.getTable())) {
- // tables of this two region should be same.
- result.completeExceptionally(new IllegalArgumentException(
- "Cannot merge regions from two different tables " + tableName.get() + " and "
- + regionInfo.getTable()));
- } else {
- result.complete(tableName.get());
- }
- }
- });
- }
-
- private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
- byte[] encodeRegionNameB) {
- AtomicReference<TableName> tableNameRef = new AtomicReference<>();
- CompletableFuture<TableName> future = new CompletableFuture<>();
-
- checkAndGetTableName(encodeRegionNameA, tableNameRef, future);
- checkAndGetTableName(encodeRegionNameB, tableNameRef, future);
- return future;
+ public CompletableFuture<Void> majorCompactRegionServer(ServerName serverName) {
+ return wrap(rawAdmin.majorCompactRegionServer(serverName));
}
@Override
public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB,
boolean forcible) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
- final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
-
- checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB)
- .whenComplete((tableName, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
-
- MergeTableRegionsRequest request = null;
- try {
- request = RequestConverter.buildMergeTableRegionsRequest(
- new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
- ng.newNonce());
- } catch (DeserializationException e) {
- future.completeExceptionally(e);
- return;
- }
-
- this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
- (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
- new MergeTableRegionProcedureBiConsumer(this, tableName)).whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
-
- });
- return future;
+ return wrap(rawAdmin.mergeRegions(nameOfRegionA, nameOfRegionB, forcible));
}
@Override
public CompletableFuture<Void> split(TableName tableName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- tableExists(tableName).whenComplete((exist, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (!exist) {
- future.completeExceptionally(new TableNotFoundException(tableName));
- return;
- }
- metaTable
- .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
- .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
- .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION)))
- .whenComplete((results, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- return;
- }
- if (results != null && !results.isEmpty()) {
- List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
- for (Result r : results) {
- if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue;
- RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
- if (rl != null) {
- for (HRegionLocation h : rl.getRegionLocations()) {
- if (h != null && h.getServerName() != null) {
- HRegionInfo hri = h.getRegionInfo();
- if (hri == null || hri.isSplitParent()
- || hri.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID)
- continue;
- splitFutures.add(split(h.getServerName(), hri, Optional.empty()));
- }
- }
- }
- }
- CompletableFuture
- .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()]))
- .whenComplete((ret, exception) -> {
- if (exception != null) {
- future.completeExceptionally(exception);
- return;
- }
- future.complete(ret);
- });
- } else {
- future.complete(null);
- }
- });
- });
- return future;
+ return wrap(rawAdmin.split(tableName));
}
@Override
public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
- CompletableFuture<Void> result = new CompletableFuture<>();
- if (splitPoint == null) {
- return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
- }
- connection.getRegionLocator(tableName).getRegionLocation(splitPoint)
- .whenComplete((loc, err) -> {
- if (err != null) {
- result.completeExceptionally(err);
- } else if (loc == null || loc.getRegionInfo() == null) {
- result.completeExceptionally(new IllegalArgumentException(
- "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
- } else {
- splitRegion(loc.getRegionInfo().getRegionName(), Optional.of(splitPoint))
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- result.completeExceptionally(err2);
- } else {
- result.complete(ret);
- }
-
- });
- }
- });
- return result;
+ return wrap(rawAdmin.split(tableName, splitPoint));
}
@Override
public CompletableFuture<Void> splitRegion(byte[] regionName, Optional<byte[]> splitPoint) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- HRegionInfo regionInfo = location.getRegionInfo();
- if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
- future.completeExceptionally(new IllegalArgumentException(
- "Can't split replicas directly. "
- + "Replicas are auto-split when their primary is split."));
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
- }
- split(serverName, regionInfo, splitPoint).whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
- }
-
- private CompletableFuture<Void> split(final ServerName sn, final HRegionInfo hri,
- Optional<byte[]> splitPoint) {
- if (hri.getStartKey() != null && splitPoint.isPresent()
- && Bytes.compareTo(hri.getStartKey(), splitPoint.get()) == 0) {
- return failedFuture(new IllegalArgumentException(
- "should not give a splitkey which equals to startkey!"));
- }
- return this
- .<Void> newAdminCaller()
- .action(
- (controller, stub) -> this.<SplitRegionRequest, SplitRegionResponse, Void> adminCall(
- controller, stub,
- ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), splitPoint),
- (s, c, req, done) -> s.splitRegion(controller, req, done), resp -> null))
- .serverName(sn).call();
+ return wrap(rawAdmin.splitRegion(regionName, splitPoint));
}
@Override
public CompletableFuture<Void> assign(byte[] regionName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- this.<Void> newMasterCaller()
- .action(
- ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
- controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo
- .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done),
- resp -> null))).call().whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
+ return wrap(rawAdmin.assign(regionName));
}
@Override
public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- this.<Void> newMasterCaller()
- .action(
- ((controller, stub) -> this
- .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
- RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
- (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call()
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
+ return wrap(rawAdmin.unassign(regionName, forcible));
}
@Override
public CompletableFuture<Void> offline(byte[] regionName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- this.<Void> newMasterCaller()
- .action(
- ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
- controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo
- .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done),
- resp -> null))).call().whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
+ return wrap(rawAdmin.offline(regionName));
}
@Override
public CompletableFuture<Void> move(byte[] regionName, Optional<ServerName> destServerName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- this.<Void> newMasterCaller()
- .action(
- (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(
- controller, stub, RequestConverter.buildMoveRegionRequest(
- regionInfo.getEncodedNameAsBytes(), destServerName), (s, c, req, done) -> s
- .moveRegion(c, req, done), resp -> null)).call().whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
- return future;
+ return wrap(rawAdmin.move(regionName, destServerName));
}
@Override
public CompletableFuture<Void> setQuota(QuotaSettings quota) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
- stub, QuotaSettings.buildSetQuotaRequestProto(quota),
- (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
+ return wrap(rawAdmin.setQuota(quota));
}
@Override
public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
- CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
- Scan scan = QuotaTableUtil.makeScan(filter);
- this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
- .scan(scan, new RawScanResultConsumer() {
- List<QuotaSettings> settings = new ArrayList<>();
-
- @Override
- public void onNext(Result[] results, ScanController controller) {
- for (Result result : results) {
- try {
- QuotaTableUtil.parseResultToCollection(result, settings);
- } catch (IOException e) {
- controller.terminate();
- future.completeExceptionally(e);
- }
- }
- }
-
- @Override
- public void onError(Throwable error) {
- future.completeExceptionally(error);
- }
-
- @Override
- public void onComplete() {
- future.complete(settings);
- }
- });
- return future;
- }
-
- public CompletableFuture<Void> addReplicationPeer(String peerId,
- ReplicationPeerConfig peerConfig) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub,
- RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req,
- done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
+ return wrap(rawAdmin.getQuota(filter));
+ }
+
+ @Override
+ public CompletableFuture<Void>
+ addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) {
+ return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig));
}
@Override
public CompletableFuture<Void> removeReplicationPeer(String peerId) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller,
- stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId),
- (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call();
+ return wrap(rawAdmin.removeReplicationPeer(peerId));
}
@Override
public CompletableFuture<Void> enableReplicationPeer(String peerId) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller,
- stub, RequestConverter.buildEnableReplicationPeerRequest(peerId),
- (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call();
+ return wrap(rawAdmin.enableReplicationPeer(peerId));
}
@Override
public CompletableFuture<Void> disableReplicationPeer(String peerId) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call(
- controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s,
- c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null))
- .call();
+ return wrap(rawAdmin.disableReplicationPeer(peerId));
}
+ @Override
public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
- return this
- .<ReplicationPeerConfig> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
- controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
- s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
- (resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call();
+ return wrap(rawAdmin.getReplicationPeerConfig(peerId));
}
@Override
public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
ReplicationPeerConfig peerConfig) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call(
- controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId,
- peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), (
- resp) -> null)).call();
+ return wrap(rawAdmin.updateReplicationPeerConfig(peerId, peerConfig));
}
@Override
- public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
+ public CompletableFuture<Void> appendReplicationPeerTableCFs(String peerId,
Map<TableName, ? extends Collection<String>> tableCfs) {
- if (tableCfs == null) {
- return failedFuture(new ReplicationException("tableCfs is null"));
- }
-
- CompletableFuture<Void> future = new CompletableFuture<Void>();
- getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
- if (!completeExceptionally(future, error)) {
- ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
- updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
- if (!completeExceptionally(future, error)) {
- future.complete(result);
- }
- });
- }
- });
- return future;
+ return wrap(rawAdmin.appendReplicationPeerTableCFs(peerId, tableCfs));
}
@Override
- public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
+ public CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
Map<TableName, ? extends Collection<String>> tableCfs) {
- if (tableCfs == null) {
- return failedFuture(new ReplicationException("tableCfs is null"));
- }
-
- CompletableFuture<Void> future = new CompletableFuture<Void>();
- getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
- if (!completeExceptionally(future, error)) {
- try {
- ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
- } catch (ReplicationException e) {
- future.completeExceptionally(e);
- return;
- }
- updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
- if (!completeExceptionally(future, error)) {
- future.complete(result);
- }
- });
- }
- });
- return future;
+ return wrap(rawAdmin.removeReplicationPeerTableCFs(peerId, tableCfs));
}
@Override
- public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Optional<Pattern> pattern) {
- return this
- .<List<ReplicationPeerDescription>> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
- controller,
- stub,
- RequestConverter.buildListReplicationPeersRequest(pattern),
- (s, c, req, done) -> s.listReplicationPeers(c, req, done),
- (resp) -> resp.getPeerDescList().stream()
- .map(ReplicationSerDeHelper::toReplicationPeerDescription)
- .collect(Collectors.toList()))).call();
+ public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(
+ Optional<Pattern> pattern) {
+ return wrap(rawAdmin.listReplicationPeers(pattern));
}
@Override
public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
- CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
- listTables().whenComplete(
- (tables, error) -> {
- if (!completeExceptionally(future, error)) {
- List<TableCFs> replicatedTableCFs = new ArrayList<>();
- tables.forEach(table -> {
- Map<String, Integer> cfs = new HashMap<>();
- Stream.of(table.getColumnFamilies())
- .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
- .forEach(column -> {
- cfs.put(column.getNameAsString(), column.getScope());
- });
- if (!cfs.isEmpty()) {
- replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
- }
- });
- future.complete(replicatedTableCFs);
- }
- });
- return future;
- }
-
- @Override
- public CompletableFuture<Void> snapshot(String snapshotName, TableName tableName) {
- return snapshot(snapshotName, tableName, SnapshotType.FLUSH);
- }
-
- @Override
- public CompletableFuture<Void> snapshot(String snapshotName, TableName tableName,
- SnapshotType type) {
- return snapshot(new SnapshotDescription(snapshotName, tableName, type));
- }
-
- @Override
- public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
- SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil
- .createHBaseProtosSnapshotDesc(snapshotDesc);
- try {
- ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
- } catch (IllegalArgumentException e) {
- return failedFuture(e);
- }
- CompletableFuture<Void> future = new CompletableFuture<>();
- final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
- this.<Long> newMasterCaller()
- .action(
- (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
- stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
- resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- TimerTask pollingTask = new TimerTask() {
- int tries = 0;
- long startTime = EnvironmentEdgeManager.currentTime();
- long endTime = startTime + expectedTimeout;
- long maxPauseTime = expectedTimeout / maxAttempts;
-
- @Override
- public void run(Timeout timeout) throws Exception {
- if (EnvironmentEdgeManager.currentTime() < endTime) {
- isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (done) {
- future.complete(null);
- } else {
- // retry again after pauseTime.
- long pauseTime = ConnectionUtils.getPauseTime(
- TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
- pauseTime = Math.min(pauseTime, maxPauseTime);
- AsyncConnectionImpl.RETRY_TIMER
- .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
- }
- } );
- } else {
- future.completeExceptionally(new SnapshotCreationException("Snapshot '"
- + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout
- + " ms", snapshotDesc));
- }
- }
- };
- AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
- });
- return future;
+ return wrap(rawAdmin.listReplicatedTableCFs());
+ }
+
+ @Override
+ public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
+ return wrap(rawAdmin.snapshot(snapshot));
}
@Override
public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
- return this
- .<Boolean> newMasterCaller()
- .action(
- (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
- controller,
- stub,
- IsSnapshotDoneRequest.newBuilder()
- .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
- req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
+ return wrap(rawAdmin.isSnapshotFinished(snapshot));
}
@Override
public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
- boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
- HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
- HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
- return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
+ return wrap(rawAdmin.restoreSnapshot(snapshotName));
}
@Override
public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- listSnapshots(Pattern.compile(snapshotName)).whenComplete(
- (snapshotDescriptions, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- TableName tableName = null;
- if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
- for (SnapshotDescription snap : snapshotDescriptions) {
- if (snap.getName().equals(snapshotName)) {
- tableName = snap.getTableName();
- break;
- }
- }
- }
- if (tableName == null) {
- future.completeExceptionally(new RestoreSnapshotException(
- "Unable to find the table name for snapshot=" + snapshotName));
- return;
- }
- final TableName finalTableName = tableName;
- tableExists(finalTableName)
- .whenComplete((exists, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (!exists) {
- // if table does not exist, then just clone snapshot into new table.
- completeConditionalOnFuture(future,
- internalRestoreSnapshot(snapshotName, finalTableName));
- } else {
- isTableDisabled(finalTableName).whenComplete(
- (disabled, err4) -> {
- if (err4 != null) {
- future.completeExceptionally(err4);
- } else if (!disabled) {
- future.completeExceptionally(new TableNotDisabledException(finalTableName));
- } else {
- completeConditionalOnFuture(future,
- restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
- }
- });
- }
- } );
- });
- return future;
- }
-
- private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
- boolean takeFailSafeSnapshot) {
- if (takeFailSafeSnapshot) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- // Step.1 Take a snapshot of the current state
- String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get(
- HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
- HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
- final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat
- .replace("{snapshot.name}", snapshotName)
- .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
- .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
- LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
- snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else {
- // Step.2 Restore snapshot
- internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> {
- if (err2 != null) {
- // Step.3.a Something went wrong during the restore and try to rollback.
- internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete(
- (void3, err3) -> {
- if (err3 != null) {
- future.completeExceptionally(err3);
- } else {
- String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
- + failSafeSnapshotSnapshotName + " succeeded.";
- future.completeExceptionally(new RestoreSnapshotException(msg));
- }
- });
- } else {
- // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
- LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
- deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete(
- (ret3, err3) -> {
- if (err3 != null) {
- LOG.error(
- "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3);
- future.completeExceptionally(err3);
- } else {
- future.complete(ret3);
- }
- });
- }
- } );
- }
- } );
- return future;
- } else {
- return internalRestoreSnapshot(snapshotName, tableName);
- }
- }
-
- private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
- CompletableFuture<T> parentFuture) {
- parentFuture.whenComplete((res, err) -> {
- if (err != null) {
- dependentFuture.completeExceptionally(err);
- } else {
- dependentFuture.complete(res);
- }
- });
+ return wrap(rawAdmin.restoreSnapshot(snapshotName, takeFailSafeSnapshot));
}
@Override
public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- tableExists(tableName).whenComplete((exists, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (exists) {
- future.completeExceptionally(new TableExistsException(tableName));
- } else {
- completeConditionalOnFuture(future, internalRestoreSnapshot(snapshotName, tableName));
- }
- });
- return future;
- }
-
- private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName) {
- SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
- .setName(snapshotName).setTable(tableName.getNameAsString()).build();
- try {
- ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
- } catch (IllegalArgumentException e) {
- return failedFuture(e);
- }
- return waitProcedureResult(this
- .<Long> newMasterCaller()
- .action(
- (controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(
- controller, stub, RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot)
- .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req,
- done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())).call());
- }
-
- @Override
- public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
- return this
- .<List<SnapshotDescription>> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(
- controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), (s, c, req,
- done) -> s.getCompletedSnapshots(c, req, done), resp -> resp.getSnapshotsList()
- .stream().map(ProtobufUtil::createSnapshotDesc).collect(Collectors.toList())))
- .call();
- }
-
- @Override
- public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
- CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
- listSnapshots()
- .whenComplete(
- (snapshotDescList, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- if (snapshotDescList == null || snapshotDescList.isEmpty()) {
- future.complete(Collections.emptyList());
- return;
- }
- future.complete(snapshotDescList.stream()
- .filter(snap -> pattern.matcher(snap.getName()).matches())
- .collect(Collectors.toList()));
- });
- return future;
+ return wrap(rawAdmin.cloneSnapshot(snapshotName, tableName));
}
@Override
- public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
- Pattern snapshotNamePattern) {
- CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
- listTableNames(Optional.ofNullable(tableNamePattern), false).whenComplete(
- (tableNames, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- if (tableNames == null || tableNames.size() <= 0) {
- future.complete(Collections.emptyList());
- return;
- }
- listSnapshots(snapshotNamePattern).whenComplete(
- (snapshotDescList, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- return;
- }
- if (snapshotDescList == null || snapshotDescList.isEmpty()) {
- future.complete(Collections.emptyList());
- return;
- }
- future.complete(snapshotDescList.stream()
- .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
- .collect(Collectors.toList()));
- });
- });
- return future;
+ public CompletableFuture<List<SnapshotDescription>> listSnapshots(Optional<Pattern> pattern) {
+ return wrap(rawAdmin.listSnapshots(pattern));
}
@Override
- public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
- return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
+ public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
+ Pattern snapshotNamePattern) {
+ return wrap(rawAdmin.listTableSnapshots(tableNamePattern, snapshotNamePattern));
}
@Override
- public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
- return deleteTableSnapshots(null, snapshotNamePattern);
+ public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
+ return wrap(rawAdmin.deleteSnapshot(snapshotName));
}
@Override
public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
Pattern snapshotNamePattern) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- listTableSnapshots(tableNamePattern, snapshotNamePattern).whenComplete(
- ((snapshotDescriptions, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
- future.complete(null);
- return;
- }
- List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>();
- snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures
- .add(internalDeleteSnapshot(snapDesc)));
- CompletableFuture.allOf(
- deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()]))
- .thenAccept(v -> future.complete(v));
- }));
- return future;
- }
-
- private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
- return this
- .<Void> newMasterCaller()
- .action(
- (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
- controller,
- stub,
- DeleteSnapshotRequest.newBuilder()
- .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
- req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
+ return wrap(rawAdmin.deleteTableSnapshots(tableNamePattern, snapshotNamePattern));
}
@Override
public CompletableFuture<Void> execProcedure(String signature, String instance,
Map<String, String> props) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- ProcedureDescription procDesc =
- ProtobufUtil.buildProcedureDescription(signature, instance, props);
- this.<Long> newMasterCaller()
- .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
- controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
- (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
- .call().whenComplete((expectedTimeout, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- TimerTask pollingTask = new TimerTask() {
- int tries = 0;
- lon
<TRUNCATED>
[3/5] hbase git commit: HBASE-18283 Provide a construct method which
accept a thread pool for AsyncAdmin
Posted by zg...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
new file mode 100644
index 0000000..fcfdf93
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -0,0 +1,2278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+
+import java.util.stream.Stream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
+import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
+import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.client.replication.TableCFs;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.quotas.QuotaFilter;
+import org.apache.hadoop.hbase.quotas.QuotaSettings;
+import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
+import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
+import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * The implementation of AsyncAdmin.
+ */
+@InterfaceAudience.Private
+public class RawAsyncHBaseAdmin implements AsyncAdmin {
+ public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
+
+ private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class);
+
+ private final AsyncConnectionImpl connection;
+
+ private final RawAsyncTable metaTable;
+
+ private final long rpcTimeoutNs;
+
+ private final long operationTimeoutNs;
+
+ private final long pauseNs;
+
+ private final int maxAttempts;
+
+ private final int startLogErrorsCnt;
+
+ private final NonceGenerator ng;
+
+ RawAsyncHBaseAdmin(AsyncConnectionImpl connection) {
+ this.connection = connection;
+ this.metaTable = connection.getRawTable(META_TABLE_NAME);
+ this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
+ this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs();
+ this.pauseNs = connection.connConf.getPauseNs();
+ this.maxAttempts = connection.connConf.getMaxRetries();
+ this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt();
+ this.ng = connection.getNonceGenerator();
+ }
+
+ private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
+ return this.connection.callerFactory.<T> masterRequest()
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+ .startLogErrorsCnt(startLogErrorsCnt);
+ }
+
+ private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
+ return this.connection.callerFactory.<T> adminRequest()
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+ .startLogErrorsCnt(startLogErrorsCnt);
+ }
+
+ @FunctionalInterface
+ private interface MasterRpcCall<RESP, REQ> {
+ void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
+ RpcCallback<RESP> done);
+ }
+
+ @FunctionalInterface
+ private interface AdminRpcCall<RESP, REQ> {
+ void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
+ RpcCallback<RESP> done);
+ }
+
+ @FunctionalInterface
+ private interface Converter<D, S> {
+ D convert(S src) throws IOException;
+ }
+
+ private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
+ MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
+ Converter<RESP, PRESP> respConverter) {
+ CompletableFuture<RESP> future = new CompletableFuture<>();
+ rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
+
+ @Override
+ public void run(PRESP resp) {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ try {
+ future.complete(respConverter.convert(resp));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+ }
+ });
+ return future;
+ }
+
+ private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
+ AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
+ Converter<RESP, PRESP> respConverter) {
+
+ CompletableFuture<RESP> future = new CompletableFuture<>();
+ rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
+
+ @Override
+ public void run(PRESP resp) {
+ if (controller.failed()) {
+ future.completeExceptionally(new IOException(controller.errorText()));
+ } else {
+ try {
+ future.complete(respConverter.convert(resp));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+ }
+ });
+ return future;
+ }
+
+ private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
+ MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
+ ProcedureBiConsumer consumer) {
+ CompletableFuture<Long> procFuture = this
+ .<Long> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall,
+ respConverter)).call();
+ return waitProcedureResult(procFuture).whenComplete(consumer);
+ }
+
+ @FunctionalInterface
+ private interface TableOperator {
+ CompletableFuture<Void> operate(TableName table);
+ }
+
+ private CompletableFuture<List<TableDescriptor>> batchTableOperations(Pattern pattern,
+ TableOperator operator, String operationType) {
+ CompletableFuture<List<TableDescriptor>> future = new CompletableFuture<>();
+ List<TableDescriptor> failed = new LinkedList<>();
+ listTables(Optional.ofNullable(pattern), false).whenComplete(
+ (tables, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ CompletableFuture[] futures =
+ tables.stream()
+ .map((table) -> operator.operate(table.getTableName()).whenComplete((v, ex) -> {
+ if (ex != null) {
+ LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex);
+ failed.add(table);
+ }
+ })).<CompletableFuture> toArray(size -> new CompletableFuture[size]);
+ CompletableFuture.allOf(futures).thenAccept((v) -> {
+ future.complete(failed);
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> tableExists(TableName tableName) {
+ return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
+ }
+
+ @Override
+ public CompletableFuture<List<TableDescriptor>> listTables(Optional<Pattern> pattern,
+ boolean includeSysTables) {
+ return this.<List<TableDescriptor>> newMasterCaller()
+ .action((controller, stub) -> this
+ .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call(
+ controller, stub,
+ RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables),
+ (s, c, req, done) -> s.getTableDescriptors(c, req, done),
+ (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<List<TableName>> listTableNames(Optional<Pattern> pattern,
+ boolean includeSysTables) {
+ return this.<List<TableName>> newMasterCaller()
+ .action((controller, stub) -> this
+ .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller, stub,
+ RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables),
+ (s, c, req, done) -> s.getTableNames(c, req, done),
+ (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<TableDescriptor> getTableDescriptor(TableName tableName) {
+ CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
+ this.<List<TableSchema>> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
+ controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s,
+ c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp
+ .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (!tableSchemas.isEmpty()) {
+ future.complete(ProtobufUtil.convertToTableDesc(tableSchemas.get(0)));
+ } else {
+ future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
+ int numRegions) {
+ try {
+ return createTable(desc, Optional.of(getSplitKeys(startKey, endKey, numRegions)));
+ } catch (IllegalArgumentException e) {
+ return failedFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> createTable(TableDescriptor desc, Optional<byte[][]> splitKeys) {
+ if (desc.getTableName() == null) {
+ return failedFuture(new IllegalArgumentException("TableName cannot be null"));
+ }
+ try {
+ splitKeys.ifPresent(keys -> verifySplitKeys(keys));
+ return this.<CreateTableRequest, CreateTableResponse> procedureCall(RequestConverter
+ .buildCreateTableRequest(desc, splitKeys, ng.getNonceGroup(), ng.newNonce()), (s, c, req,
+ done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
+ new CreateTableProcedureBiConsumer(this, desc.getTableName()));
+ } catch (IllegalArgumentException e) {
+ return failedFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteTable(TableName tableName) {
+ return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
+ .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+ (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
+ new DeleteTableProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<List<TableDescriptor>> deleteTables(Pattern pattern) {
+ return batchTableOperations(pattern, (table) -> deleteTable(table), "DELETE");
+ }
+
+ @Override
+ public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
+ return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(
+ RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
+ ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
+ (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<Void> enableTable(TableName tableName) {
+ return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter
+ .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+ (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
+ new EnableTableProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<List<TableDescriptor>> enableTables(Pattern pattern) {
+ return batchTableOperations(pattern, (table) -> enableTable(table), "ENABLE");
+ }
+
+ @Override
+ public CompletableFuture<Void> disableTable(TableName tableName) {
+ return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter
+ .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
+ (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
+ new DisableTableProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<List<TableDescriptor>> disableTables(Pattern pattern) {
+ return batchTableOperations(pattern, (table) -> disableTable(table), "DISABLE");
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (state.isPresent()) {
+ future.complete(state.get().inStates(TableState.State.ENABLED));
+ } else {
+ future.completeExceptionally(new TableNotFoundException(tableName));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (state.isPresent()) {
+ future.complete(state.get().inStates(TableState.State.DISABLED));
+ } else {
+ future.completeExceptionally(new TableNotFoundException(tableName));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
+ return isTableAvailable(tableName, null);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ isTableEnabled(tableName).whenComplete(
+ (enabled, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (!enabled) {
+ future.complete(false);
+ } else {
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
+ .whenComplete(
+ (locations, error1) -> {
+ if (error1 != null) {
+ future.completeExceptionally(error1);
+ return;
+ }
+ int notDeployed = 0;
+ int regionCount = 0;
+ for (HRegionLocation location : locations) {
+ HRegionInfo info = location.getRegionInfo();
+ if (location.getServerName() == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Table " + tableName + " has not deployed region "
+ + info.getEncodedName());
+ }
+ notDeployed++;
+ } else if (splitKeys != null
+ && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
+ for (byte[] splitKey : splitKeys) {
+ // Just check if the splitkey is available
+ if (Bytes.equals(info.getStartKey(), splitKey)) {
+ regionCount++;
+ break;
+ }
+ }
+ } else {
+ // Always empty start row should be counted
+ regionCount++;
+ }
+ }
+ if (notDeployed > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Table " + tableName + " has " + notDeployed + " regions");
+ }
+ future.complete(false);
+ } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Table " + tableName + " expected to have "
+ + (splitKeys.length + 1) + " regions, but only " + regionCount
+ + " available");
+ }
+ future.complete(false);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Table " + tableName + " should be available");
+ }
+ future.complete(true);
+ }
+ });
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Pair<Integer, Integer>> getAlterStatus(TableName tableName) {
+ return this
+ .<Pair<Integer, Integer>>newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<GetSchemaAlterStatusRequest, GetSchemaAlterStatusResponse, Pair<Integer, Integer>> call(
+ controller, stub, RequestConverter.buildGetSchemaAlterStatusRequest(tableName), (s,
+ c, req, done) -> s.getSchemaAlterStatus(c, req, done), (resp) -> new Pair<>(
+ resp.getYetToUpdateRegions(), resp.getTotalRegions()))).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) {
+ return this.<AddColumnRequest, AddColumnResponse> procedureCall(
+ RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
+ ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
+ new AddColumnFamilyProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
+ return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(
+ RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
+ ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
+ (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
+ ColumnFamilyDescriptor columnFamily) {
+ return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(
+ RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
+ ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
+ (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(this, tableName));
+ }
+
+ @Override
+ public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
+ return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
+ RequestConverter.buildCreateNamespaceRequest(descriptor),
+ (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
+ new CreateNamespaceProcedureBiConsumer(this, descriptor.getName()));
+ }
+
+ @Override
+ public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
+ return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
+ RequestConverter.buildModifyNamespaceRequest(descriptor),
+ (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
+ new ModifyNamespaceProcedureBiConsumer(this, descriptor.getName()));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteNamespace(String name) {
+ return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
+ RequestConverter.buildDeleteNamespaceRequest(name),
+ (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
+ new DeleteNamespaceProcedureBiConsumer(this, name));
+ }
+
+ @Override
+ public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
+ return this
+ .<NamespaceDescriptor> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<GetNamespaceDescriptorRequest, GetNamespaceDescriptorResponse, NamespaceDescriptor> call(
+ controller, stub, RequestConverter.buildGetNamespaceDescriptorRequest(name), (s, c,
+ req, done) -> s.getNamespaceDescriptor(c, req, done), (resp) -> ProtobufUtil
+ .toNamespaceDescriptor(resp.getNamespaceDescriptor()))).call();
+ }
+
+ @Override
+ public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
+ return this
+ .<List<NamespaceDescriptor>> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
+ controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
+ done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil
+ .toNamespaceDescriptorList(resp))).call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> setBalancerOn(final boolean on) {
+ return this
+ .<Boolean> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller,
+ stub, RequestConverter.buildSetBalancerRunningRequest(on, true),
+ (s, c, req, done) -> s.setBalancerRunning(c, req, done),
+ (resp) -> resp.getPrevBalanceValue())).call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> balance(boolean forcible) {
+ return this
+ .<Boolean> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller,
+ stub, RequestConverter.buildBalanceRequest(forcible),
+ (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isBalancerOn() {
+ return this
+ .<Boolean> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call(
+ controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
+ (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> closeRegion(byte[] regionName, Optional<ServerName> serverName) {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ getRegionLocation(regionName).whenComplete((location, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ ServerName server = serverName.isPresent() ? serverName.get() : location.getServerName();
+ if (server == null) {
+ future.completeExceptionally(new NotServingRegionException(regionName));
+ } else {
+ closeRegion(location.getRegionInfo(), server).whenComplete((result, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(result);
+ }
+ });
+ }
+ });
+ return future;
+ }
+
+ private CompletableFuture<Boolean> closeRegion(HRegionInfo hri, ServerName serverName) {
+ return this
+ .<Boolean> newAdminCaller()
+ .action(
+ (controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Boolean> adminCall(
+ controller, stub,
+ ProtobufUtil.buildCloseRegionRequest(serverName, hri.getRegionName()),
+ (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> resp.getClosed()))
+ .serverName(serverName).call();
+ }
+
+ @Override
+ public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) {
+ return this.<List<HRegionInfo>> newAdminCaller()
+ .action((controller, stub) -> this
+ .<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>> adminCall(
+ controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
+ (s, c, req, done) -> s.getOnlineRegion(c, req, done),
+ resp -> ProtobufUtil.getRegionInfos(resp)))
+ .serverName(sn).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> flush(TableName tableName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ tableExists(tableName).whenComplete((exists, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ } else if (!exists) {
+ future.completeExceptionally(new TableNotFoundException(tableName));
+ } else {
+ isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else if (!tableEnabled) {
+ future.completeExceptionally(new TableNotEnabledException(tableName));
+ } else {
+ execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
+ new HashMap<>()).whenComplete((ret, err3) -> {
+ if (err3 != null) {
+ future.completeExceptionally(err3);
+ } else {
+ future.complete(ret);
+ }
+ });
+ }
+ });
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> flushRegion(byte[] regionName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionLocation(regionName).whenComplete(
+ (location, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future.completeExceptionally(new NoServerForRegionException(Bytes
+ .toStringBinary(regionName)));
+ return;
+ }
+
+ HRegionInfo regionInfo = location.getRegionInfo();
+ this.<Void> newAdminCaller()
+ .serverName(serverName)
+ .action(
+ (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
+ controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
+ .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
+ resp -> null)).call().whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> compact(TableName tableName, Optional<byte[]> columnFamily) {
+ return compact(tableName, columnFamily, false, CompactType.NORMAL);
+ }
+
+ @Override
+ public CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
+ return compactRegion(regionName, columnFamily, false);
+ }
+
+ @Override
+ public CompletableFuture<Void> majorCompact(TableName tableName, Optional<byte[]> columnFamily) {
+ return compact(tableName, columnFamily, true, CompactType.NORMAL);
+ }
+
+ @Override
+ public CompletableFuture<Void> majorCompactRegion(byte[] regionName, Optional<byte[]> columnFamily) {
+ return compactRegion(regionName, columnFamily, true);
+ }
+
+ @Override
+ public CompletableFuture<Void> compactRegionServer(ServerName sn) {
+ return compactRegionServer(sn, false);
+ }
+
+ @Override
+ public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
+ return compactRegionServer(sn, true);
+ }
+
+ private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getOnlineRegions(sn).whenComplete((hRegionInfos, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
+ if (hRegionInfos != null) {
+ hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, Optional.empty())));
+ }
+ CompletableFuture
+ .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
+ .whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ private CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily,
+ boolean major) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionLocation(regionName).whenComplete(
+ (location, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future.completeExceptionally(new NoServerForRegionException(Bytes
+ .toStringBinary(regionName)));
+ return;
+ }
+ compact(location.getServerName(), location.getRegionInfo(), major, columnFamily)
+ .whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ /**
+ * List all region locations for the specific table.
+ */
+ private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
+ if (TableName.META_TABLE_NAME.equals(tableName)) {
+ CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
+ // For meta table, we use zk to fetch all locations.
+ AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
+ registry.getMetaRegionLocation().whenComplete(
+ (metaRegions, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ } else if (metaRegions == null || metaRegions.isEmpty()
+ || metaRegions.getDefaultRegionLocation() == null) {
+ future.completeExceptionally(new IOException("meta region does not found"));
+ } else {
+ future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
+ }
+ // close the registry.
+ IOUtils.closeQuietly(registry);
+ });
+ return future;
+ } else {
+ // For non-meta table, we fetch all locations by scanning hbase:meta table
+ return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName));
+ }
+ }
+
+ /**
+ * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
+ */
+ private CompletableFuture<Void> compact(final TableName tableName, Optional<byte[]> columnFamily,
+ final boolean major, CompactType compactType) {
+ if (CompactType.MOB.equals(compactType)) {
+ // TODO support MOB compact.
+ return failedFuture(new UnsupportedOperationException("MOB compact does not support"));
+ }
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
+ for (HRegionLocation location : locations) {
+ if (location.getRegionInfo() == null || location.getRegionInfo().isOffline()) continue;
+ if (location.getServerName() == null) continue;
+ compactFutures
+ .add(compact(location.getServerName(), location.getRegionInfo(), major, columnFamily));
+ }
+ // future complete unless all of the compact futures are completed.
+ CompletableFuture
+ .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
+ .whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ /**
+ * Compact the region at specific region server.
+ */
+ private CompletableFuture<Void> compact(final ServerName sn, final HRegionInfo hri,
+ final boolean major, Optional<byte[]> columnFamily) {
+ return this
+ .<Void> newAdminCaller()
+ .serverName(sn)
+ .action(
+ (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(
+ controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
+ major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done),
+ resp -> null)).call();
+ }
+
+ private byte[] toEncodeRegionName(byte[] regionName) {
+ try {
+ return HRegionInfo.isEncodedRegionName(regionName) ? regionName
+ : Bytes.toBytes(HRegionInfo.encodeRegionName(regionName));
+ } catch (IOException e) {
+ return regionName;
+ }
+ }
+
+ private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
+ CompletableFuture<TableName> result) {
+ getRegionLocation(encodeRegionName).whenComplete(
+ (location, err) -> {
+ if (err != null) {
+ result.completeExceptionally(err);
+ return;
+ }
+ HRegionInfo regionInfo = location.getRegionInfo();
+ if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+ result.completeExceptionally(new IllegalArgumentException(
+ "Can't invoke merge on non-default regions directly"));
+ return;
+ }
+ if (!tableName.compareAndSet(null, regionInfo.getTable())) {
+ if (!tableName.get().equals(regionInfo.getTable())) {
+ // tables of this two region should be same.
+ result.completeExceptionally(new IllegalArgumentException(
+ "Cannot merge regions from two different tables " + tableName.get() + " and "
+ + regionInfo.getTable()));
+ } else {
+ result.complete(tableName.get());
+ }
+ }
+ });
+ }
+
+ private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
+ byte[] encodeRegionNameB) {
+ AtomicReference<TableName> tableNameRef = new AtomicReference<>();
+ CompletableFuture<TableName> future = new CompletableFuture<>();
+
+ checkAndGetTableName(encodeRegionNameA, tableNameRef, future);
+ checkAndGetTableName(encodeRegionNameB, tableNameRef, future);
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB,
+ boolean forcible) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
+ final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
+
+ checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB)
+ .whenComplete((tableName, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+
+ MergeTableRegionsRequest request = null;
+ try {
+ request = RequestConverter.buildMergeTableRegionsRequest(
+ new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
+ ng.newNonce());
+ } catch (DeserializationException e) {
+ future.completeExceptionally(e);
+ return;
+ }
+
+ this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
+ (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
+ new MergeTableRegionProcedureBiConsumer(this, tableName)).whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> split(TableName tableName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ tableExists(tableName).whenComplete((exist, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (!exist) {
+ future.completeExceptionally(new TableNotFoundException(tableName));
+ return;
+ }
+ metaTable
+ .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
+ .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
+ .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION)))
+ .whenComplete((results, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ return;
+ }
+ if (results != null && !results.isEmpty()) {
+ List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
+ for (Result r : results) {
+ if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue;
+ RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
+ if (rl != null) {
+ for (HRegionLocation h : rl.getRegionLocations()) {
+ if (h != null && h.getServerName() != null) {
+ HRegionInfo hri = h.getRegionInfo();
+ if (hri == null || hri.isSplitParent()
+ || hri.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID)
+ continue;
+ splitFutures.add(split(h.getServerName(), hri, Optional.empty()));
+ }
+ }
+ }
+ }
+ CompletableFuture
+ .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()]))
+ .whenComplete((ret, exception) -> {
+ if (exception != null) {
+ future.completeExceptionally(exception);
+ return;
+ }
+ future.complete(ret);
+ });
+ } else {
+ future.complete(null);
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ if (splitPoint == null) {
+ return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
+ }
+ connection.getRegionLocator(tableName).getRegionLocation(splitPoint)
+ .whenComplete((loc, err) -> {
+ if (err != null) {
+ result.completeExceptionally(err);
+ } else if (loc == null || loc.getRegionInfo() == null) {
+ result.completeExceptionally(new IllegalArgumentException(
+ "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
+ } else {
+ splitRegion(loc.getRegionInfo().getRegionName(), Optional.of(splitPoint))
+ .whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ result.completeExceptionally(err2);
+ } else {
+ result.complete(ret);
+ }
+
+ });
+ }
+ });
+ return result;
+ }
+
+ @Override
+ public CompletableFuture<Void> splitRegion(byte[] regionName, Optional<byte[]> splitPoint) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionLocation(regionName).whenComplete(
+ (location, err) -> {
+ HRegionInfo regionInfo = location.getRegionInfo();
+ if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+ future.completeExceptionally(new IllegalArgumentException(
+ "Can't split replicas directly. "
+ + "Replicas are auto-split when their primary is split."));
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future.completeExceptionally(new NoServerForRegionException(Bytes
+ .toStringBinary(regionName)));
+ return;
+ }
+ split(serverName, regionInfo, splitPoint).whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ private CompletableFuture<Void> split(final ServerName sn, final HRegionInfo hri,
+ Optional<byte[]> splitPoint) {
+ if (hri.getStartKey() != null && splitPoint.isPresent()
+ && Bytes.compareTo(hri.getStartKey(), splitPoint.get()) == 0) {
+ return failedFuture(new IllegalArgumentException(
+ "should not give a splitkey which equals to startkey!"));
+ }
+ return this
+ .<Void> newAdminCaller()
+ .action(
+ (controller, stub) -> this.<SplitRegionRequest, SplitRegionResponse, Void> adminCall(
+ controller, stub,
+ ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), splitPoint),
+ (s, c, req, done) -> s.splitRegion(controller, req, done), resp -> null))
+ .serverName(sn).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> assign(byte[] regionName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionInfo(regionName).whenComplete(
+ (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ this.<Void> newMasterCaller()
+ .action(
+ ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
+ controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo
+ .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done),
+ resp -> null))).call().whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionInfo(regionName).whenComplete(
+ (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ this.<Void> newMasterCaller()
+ .action(
+ ((controller, stub) -> this
+ .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
+ RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
+ (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call()
+ .whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> offline(byte[] regionName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionInfo(regionName).whenComplete(
+ (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ this.<Void> newMasterCaller()
+ .action(
+ ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
+ controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo
+ .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done),
+ resp -> null))).call().whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> move(byte[] regionName, Optional<ServerName> destServerName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ getRegionInfo(regionName).whenComplete(
+ (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ this.<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(
+ controller, stub, RequestConverter.buildMoveRegionRequest(
+ regionInfo.getEncodedNameAsBytes(), destServerName), (s, c, req, done) -> s
+ .moveRegion(c, req, done), resp -> null)).call().whenComplete((ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> setQuota(QuotaSettings quota) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
+ stub, QuotaSettings.buildSetQuotaRequestProto(quota),
+ (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null)).call();
+ }
+
+ @Override
+ public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
+ CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
+ Scan scan = QuotaTableUtil.makeScan(filter);
+ this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
+ .scan(scan, new RawScanResultConsumer() {
+ List<QuotaSettings> settings = new ArrayList<>();
+
+ @Override
+ public void onNext(Result[] results, ScanController controller) {
+ for (Result result : results) {
+ try {
+ QuotaTableUtil.parseResultToCollection(result, settings);
+ } catch (IOException e) {
+ controller.terminate();
+ future.completeExceptionally(e);
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ future.completeExceptionally(error);
+ }
+
+ @Override
+ public void onComplete() {
+ future.complete(settings);
+ }
+ });
+ return future;
+ }
+
+ public CompletableFuture<Void> addReplicationPeer(String peerId,
+ ReplicationPeerConfig peerConfig) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub,
+ RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req,
+ done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> removeReplicationPeer(String peerId) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller,
+ stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId),
+ (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> enableReplicationPeer(String peerId) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller,
+ stub, RequestConverter.buildEnableReplicationPeerRequest(peerId),
+ (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> disableReplicationPeer(String peerId) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call(
+ controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s,
+ c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null))
+ .call();
+ }
+
+ public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
+ return this
+ .<ReplicationPeerConfig> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
+ controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
+ s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
+ (resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
+ ReplicationPeerConfig peerConfig) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call(
+ controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId,
+ peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), (
+ resp) -> null)).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
+ Map<TableName, ? extends Collection<String>> tableCfs) {
+ if (tableCfs == null) {
+ return failedFuture(new ReplicationException("tableCfs is null"));
+ }
+
+ CompletableFuture<Void> future = new CompletableFuture<Void>();
+ getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
+ if (!completeExceptionally(future, error)) {
+ ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
+ updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
+ if (!completeExceptionally(future, error)) {
+ future.complete(result);
+ }
+ });
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
+ Map<TableName, ? extends Collection<String>> tableCfs) {
+ if (tableCfs == null) {
+ return failedFuture(new ReplicationException("tableCfs is null"));
+ }
+
+ CompletableFuture<Void> future = new CompletableFuture<Void>();
+ getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
+ if (!completeExceptionally(future, error)) {
+ try {
+ ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
+ } catch (ReplicationException e) {
+ future.completeExceptionally(e);
+ return;
+ }
+ updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
+ if (!completeExceptionally(future, error)) {
+ future.complete(result);
+ }
+ });
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Optional<Pattern> pattern) {
+ return this
+ .<List<ReplicationPeerDescription>> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<ListReplicationPeersRequest, ListReplicationPeersResponse, List<ReplicationPeerDescription>> call(
+ controller,
+ stub,
+ RequestConverter.buildListReplicationPeersRequest(pattern),
+ (s, c, req, done) -> s.listReplicationPeers(c, req, done),
+ (resp) -> resp.getPeerDescList().stream()
+ .map(ReplicationSerDeHelper::toReplicationPeerDescription)
+ .collect(Collectors.toList()))).call();
+ }
+
+ @Override
+ public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
+ CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
+ listTables().whenComplete(
+ (tables, error) -> {
+ if (!completeExceptionally(future, error)) {
+ List<TableCFs> replicatedTableCFs = new ArrayList<>();
+ tables.forEach(table -> {
+ Map<String, Integer> cfs = new HashMap<>();
+ Stream.of(table.getColumnFamilies())
+ .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
+ .forEach(column -> {
+ cfs.put(column.getNameAsString(), column.getScope());
+ });
+ if (!cfs.isEmpty()) {
+ replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
+ }
+ });
+ future.complete(replicatedTableCFs);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
+ SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil
+ .createHBaseProtosSnapshotDesc(snapshotDesc);
+ try {
+ ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
+ } catch (IllegalArgumentException e) {
+ return failedFuture(e);
+ }
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
+ this.<Long> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
+ stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
+ resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ TimerTask pollingTask = new TimerTask() {
+ int tries = 0;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ long endTime = startTime + expectedTimeout;
+ long maxPauseTime = expectedTimeout / maxAttempts;
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (EnvironmentEdgeManager.currentTime() < endTime) {
+ isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else if (done) {
+ future.complete(null);
+ } else {
+ // retry again after pauseTime.
+ long pauseTime = ConnectionUtils.getPauseTime(
+ TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+ pauseTime = Math.min(pauseTime, maxPauseTime);
+ AsyncConnectionImpl.RETRY_TIMER
+ .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
+ }
+ } );
+ } else {
+ future.completeExceptionally(new SnapshotCreationException("Snapshot '"
+ + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout
+ + " ms", snapshotDesc));
+ }
+ }
+ };
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
+ return this
+ .<Boolean> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(
+ controller,
+ stub,
+ IsSnapshotDoneRequest.newBuilder()
+ .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
+ req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
+ boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
+ HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
+ HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
+ return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
+ }
+
+ @Override
+ public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ listSnapshots(Optional.of(Pattern.compile(snapshotName))).whenComplete(
+ (snapshotDescriptions, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ TableName tableName = null;
+ if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
+ for (SnapshotDescription snap : snapshotDescriptions) {
+ if (snap.getName().equals(snapshotName)) {
+ tableName = snap.getTableName();
+ break;
+ }
+ }
+ }
+ if (tableName == null) {
+ future.completeExceptionally(new RestoreSnapshotException(
+ "Unable to find the table name for snapshot=" + snapshotName));
+ return;
+ }
+ final TableName finalTableName = tableName;
+ tableExists(finalTableName)
+ .whenComplete((exists, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else if (!exists) {
+ // if table does not exist, then just clone snapshot into new table.
+ completeConditionalOnFuture(future,
+ internalRestoreSnapshot(snapshotName, finalTableName));
+ } else {
+ isTableDisabled(finalTableName).whenComplete(
+ (disabled, err4) -> {
+ if (err4 != null) {
+ future.completeExceptionally(err4);
+ } else if (!disabled) {
+ future.completeExceptionally(new TableNotDisabledException(finalTableName));
+ } else {
+ completeConditionalOnFuture(future,
+ restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
+ }
+ });
+ }
+ } );
+ });
+ return future;
+ }
+
+ private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
+ boolean takeFailSafeSnapshot) {
+ if (takeFailSafeSnapshot) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ // Step.1 Take a snapshot of the current state
+ String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get(
+ HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
+ HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
+ final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat
+ .replace("{snapshot.name}", snapshotName)
+ .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
+ .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
+ LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
+ snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ } else {
+ // Step.2 Restore snapshot
+ internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> {
+ if (err2 != null) {
+ // Step.3.a Something went wrong during the restore and try to rollback.
+ internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete(
+ (void3, err3) -> {
+ if (err3 != null) {
+ future.completeExceptionally(err3);
+ } else {
+ String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
+ + failSafeSnapshotSnapshotName + " succeeded.";
+ future.completeExceptionally(new RestoreSnapshotException(msg));
+ }
+ });
+ } else {
+ // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
+ LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
+ deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete(
+ (ret3, err3) -> {
+ if (err3 != null) {
+ LOG.error(
+ "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3);
+ future.completeExceptionally(err3);
+ } else {
+ future.complete(ret3);
+ }
+ });
+ }
+ } );
+ }
+ } );
+ return future;
+ } else {
+ return internalRestoreSnapshot(snapshotName, tableName);
+ }
+ }
+
+ private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
+ CompletableFuture<T> parentFuture) {
+ parentFuture.whenComplete((res, err) -> {
+ if (err != null) {
+ dependentFuture.completeExceptionally(err);
+ } else {
+ dependentFuture.complete(res);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ tableExists(tableName).whenComplete((exists, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ } else if (exists) {
+ future.completeExceptionally(new TableExistsException(tableName));
+ } else {
+ completeConditionalOnFuture(future, internalRestoreSnapshot(snapshotName, tableName));
+ }
+ });
+ return future;
+ }
+
+ private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName) {
+ SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
+ .setName(snapshotName).setTable(tableName.getNameAsString()).build();
+ try {
+ ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
+ } catch (IllegalArgumentException e) {
+ return failedFuture(e);
+ }
+ return waitProcedureResult(this
+ .<Long> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(
+ controller, stub, RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot)
+ .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req,
+ done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())).call());
+ }
+
+ @Override
+ public CompletableFuture<List<SnapshotDescription>> listSnapshots(Optional<Pattern> pattern) {
+ CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
+ this.<GetCompletedSnapshotsResponse> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, GetCompletedSnapshotsResponse> call(
+ controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), (s, c, req,
+ done) -> s.getCompletedSnapshots(c, req, done), resp -> resp))
+ .call()
+ .whenComplete(
+ (resp, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ future.complete(resp
+ .getSnapshotsList()
+ .stream()
+ .map(ProtobufUtil::createSnapshotDesc)
+ .filter(
+ snap -> pattern.isPresent() ? pattern.get().matcher(snap.getName()).matches()
+ : true).collect(Collectors.toList()));
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
+ Pattern snapshotNamePattern) {
+ CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
+ listTableNames(Optional.ofNullable(tableNamePattern), false).whenComplete(
+ (tableNames, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ if (tableNames == null || tableNames.size() <= 0) {
+ future.complete(Collections.emptyList());
+ return;
+ }
+ listSnapshots(Optional.ofNullable(snapshotNamePattern)).whenComplete(
+ (snapshotDescList, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ return;
+ }
+ if (snapshotDescList == null || snapshotDescList.isEmpty()) {
+ future.complete(Collections.emptyList());
+ return;
+ }
+ future.complete(snapshotDescList.stream()
+ .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
+ .collect(Collectors.toList()));
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
+ return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
+ return deleteTableSnapshots(null, snapshotNamePattern);
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
+ Pattern snapshotNamePattern) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ listTableSnapshots(tableNamePattern, snapshotNamePattern).whenComplete(
+ ((snapshotDescriptions, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
+ future.complete(null);
+ return;
+ }
+ List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>();
+ snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures
+ .add(internalDeleteSnapshot(snapDesc)));
+ CompletableFuture.allOf(
+ deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()]))
+ .thenAccept(v -> future.complete(v));
+ }));
+ return future;
+ }
+
+ private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
+ return this
+ .<Void> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
+ controller,
+ stub,
+ DeleteSnapshotRequest.newBuilder()
+ .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c,
+ req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call();
+ }
+
+ @Override
+ public CompletableFuture<Void> execProcedure(String signature, String instance,
+ Map<String, String> props) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ProcedureDescription procDesc =
+ ProtobufUtil.buildProcedureDescription(signature, instance, props);
+ this.<Long> newMasterCaller()
+ .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
+ controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
+ (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
+ .call().whenComplete((expectedTimeout, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ TimerTask pollingTask = new TimerTask() {
+ int tries = 0;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ long endTime = startTime + expectedTimeout;
+ long maxPauseTime = expectedTimeout / maxAttempts;
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (EnvironmentEdgeManager.currentTime() < endTime) {
+ isProcedureFinished(signature, instance, props).whenComplete((done, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ return;
+ }
+ if (done) {
+ future.complete(null);
+ } else {
+ // retry again after pauseTime.
+ long pauseTime = ConnectionUtils
+ .getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+ pauseTime = Math.min(pauseTime, maxPauseTime);
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
+ TimeUnit.MICROSECONDS);
+ }
+ });
+ } else {
+ future.completeExceptionally(new IOException("Procedure '" + signature + " : "
+ + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
+ }
+ }
+ };
+ // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<byte[]> execProcedureWithRet(String signature, String instance,
+ Map<String, String> props) {
+ ProcedureDescription proDesc =
+ ProtobufUtil.buildProcedureDescription(signature, instance, props);
+ return this.<byte[]> newMasterCaller()
+ .action(
+ (controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
+ controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
+ (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
+ resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
+ Map<String, String> props) {
+ ProcedureDescription proDesc =
+ ProtobufUtil.buildProcedureDescription(signature, instance, props);
+ return this.<Boolean> newMasterCaller()
+ .action((controller, stub) -> this
+ .<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(controller, stub,
+ IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
+ (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
+ return this.<Boolean> newMasterCaller().action(
+ (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
+ controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
+ (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<List<ProcedureInfo>> listProcedures() {
+ return this
+ .<List<ProcedureInfo>> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<ListProceduresRequest, ListProceduresResponse, List<ProcedureInfo>> call(
+ controller, stub, ListProceduresRequest.newBuilder().build(),
+ (s, c, req, done) -> s.listProcedures(c, req, done),
+ resp -> resp.getProcedureList().stream().map(ProtobufUtil::toProcedureInfo)
+ .collect(Collectors.toList()))).call();
+ }
+
+ /**
+ * Get the region location for the passed region name. The region name may be a full region name
+ * or encoded region name. If the region does not found, then it'll throw an
+ * UnknownRegionException wrapped by a {@link CompletableFuture}
+ * @param regionNameOrEncodedRegionName
+ * @return region location, wrapped by a {@link CompletableFuture}
+ */
+ @VisibleForTesting
+ CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
+ if (regionNameOrEncodedRegionName == null) {
+ return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
+ }
+ try {
+ CompletableFuture<Optional<HRegionLocation>> future;
+ if (HRegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
+ future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
+ regionNameOrEncodedRegionName);
+ } else {
+ future = AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
+ }
+
+ CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
+ future.whenComplete((location, err) -> {
+ if (err != null) {
+ returnedFuture.completeExceptionally(err);
+ return;
+ }
+ LOG.info("location is " + location);
+ if (!location.isPresent() || location.get().getRegionInfo() == null) {
+ LOG.info("unknown location is " + location);
+ returnedFuture.completeExceptionally(new UnknownRegionException(
+ "Invalid region name or encoded region name: "
+ + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
+ } else {
+ returnedFuture.complete(location.get());
+ }
+ });
+ return returnedFuture;
+ } catch (IOException e) {
+ return failedFuture(e);
+ }
+ }
+
+ /**
+ * Get the region info for the passed region name. The region name may be a full region name or
+ * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
+ * wrapped by a {@link CompletableFuture}
+ * @param regionNameOrEncodedRegionName
+ * @return region info, wrapped by a {@link CompletableFuture}
+ */
+ private CompletableFuture<HRegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
+ if (regionNameOrEncodedRegionName == null) {
+ return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
+ }
+
+ if (Bytes.equals(regionNameOrEncodedRegionName,
+ HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
+ || Bytes.equals(regionNameOrEncodedRegionName,
+ HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
+ return
<TRUNCATED>
[5/5] hbase git commit: HBASE-18283 Provide a construct method which
accept a thread pool for AsyncAdmin
Posted by zg...@apache.org.
HBASE-18283 Provide a construct method which accept a thread pool for AsyncAdmin
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/14f0423b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/14f0423b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/14f0423b
Branch: refs/heads/master
Commit: 14f0423b58256d872a1ddde1712a1fc60b4c5671
Parents: 8318a09
Author: Guanghao Zhang <zg...@apache.org>
Authored: Tue Jul 4 09:51:41 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Tue Jul 4 09:51:41 2017 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/client/AsyncAdmin.java | 67 +-
.../hadoop/hbase/client/AsyncConnection.java | 14 +-
.../hbase/client/AsyncConnectionImpl.java | 7 +-
.../hadoop/hbase/client/AsyncHBaseAdmin.java | 2085 +---------------
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 2278 ++++++++++++++++++
.../hbase/shaded/protobuf/RequestConverter.java | 21 +-
.../hadoop/hbase/client/TestAsyncAdminBase.java | 80 +-
.../hbase/client/TestAsyncBalancerAdminApi.java | 3 +
.../client/TestAsyncNamespaceAdminApi.java | 3 +
.../client/TestAsyncProcedureAdminApi.java | 4 +-
.../hbase/client/TestAsyncQuotaAdminApi.java | 37 +-
.../hbase/client/TestAsyncRegionAdminApi.java | 737 +++---
.../client/TestAsyncReplicationAdminApi.java | 30 +-
.../hbase/client/TestAsyncSnapshotAdminApi.java | 24 +-
.../hbase/client/TestAsyncTableAdminApi.java | 818 +++----
15 files changed, 3288 insertions(+), 2920 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 3b022f4..ff35d46 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -47,11 +47,6 @@ import org.apache.hadoop.hbase.util.Pair;
public interface AsyncAdmin {
/**
- * @return Async Connection used by this object.
- */
- AsyncConnectionImpl getConnection();
-
- /**
* @param tableName Table to check.
* @return True if table exists already. The return value will be wrapped by a
* {@link CompletableFuture}.
@@ -105,7 +100,9 @@ public interface AsyncAdmin {
* Creates a new table.
* @param desc table descriptor for table
*/
- CompletableFuture<Void> createTable(TableDescriptor desc);
+ default CompletableFuture<Void> createTable(TableDescriptor desc) {
+ return createTable(desc, Optional.empty());
+ }
/**
* Creates a new table with the specified number of regions. The start key specified will become
@@ -128,7 +125,7 @@ public interface AsyncAdmin {
* @param desc table descriptor for table
* @param splitKeys array of split keys for the initial regions of the table
*/
- CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys);
+ CompletableFuture<Void> createTable(TableDescriptor desc, Optional<byte[][]> splitKeys);
/**
* Deletes a table.
@@ -188,6 +185,13 @@ public interface AsyncAdmin {
/**
* @param tableName name of table to check
+ * @return true if table is on-line. The return value will be wrapped by a
+ * {@link CompletableFuture}.
+ */
+ CompletableFuture<Boolean> isTableEnabled(TableName tableName);
+
+ /**
+ * @param tableName name of table to check
* @return true if table is off-line. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
@@ -198,7 +202,9 @@ public interface AsyncAdmin {
* @return true if all regions of the table are available. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
- CompletableFuture<Boolean> isTableAvailable(TableName tableName);
+ default CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
+ return isTableAvailable(tableName, null);
+ }
/**
* Use this api to check if the table has been created with the specified number of splitkeys
@@ -275,13 +281,6 @@ public interface AsyncAdmin {
CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors();
/**
- * @param tableName name of table to check
- * @return true if table is on-line. The return value will be wrapped by a
- * {@link CompletableFuture}.
- */
- CompletableFuture<Boolean> isTableEnabled(TableName tableName);
-
- /**
* Turn the load balancer on or off.
* @param on
* @return Previous balancer value wrapped by a {@link CompletableFuture}.
@@ -330,7 +329,7 @@ public interface AsyncAdmin {
/**
* Get all the online regions on a region server.
*/
- CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn);
+ CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName);
/**
* Flush a table.
@@ -422,15 +421,15 @@ public interface AsyncAdmin {
/**
* Compact all regions on the region server.
- * @param sn the region server name
+ * @param serverName the region server name
*/
- CompletableFuture<Void> compactRegionServer(ServerName sn);
+ CompletableFuture<Void> compactRegionServer(ServerName serverName);
/**
* Compact all regions on the region server.
- * @param sn the region server name
+ * @param serverName the region server name
*/
- CompletableFuture<Void> majorCompactRegionServer(ServerName sn);
+ CompletableFuture<Void> majorCompactRegionServer(ServerName serverName);
/**
* Merge two regions.
@@ -563,18 +562,18 @@ public interface AsyncAdmin {
/**
* Append the replicable table-cf config of the specified peer
- * @param id a short that identifies the cluster
+ * @param peerId a short that identifies the cluster
* @param tableCfs A map from tableName to column family names
*/
- CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
+ CompletableFuture<Void> appendReplicationPeerTableCFs(String peerId,
Map<TableName, ? extends Collection<String>> tableCfs);
/**
* Remove some table-cfs from config of the specified peer
- * @param id a short name that identifies the cluster
+ * @param peerId a short name that identifies the cluster
* @param tableCfs A map from tableName to column family names
*/
- CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
+ CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
Map<TableName, ? extends Collection<String>> tableCfs);
/**
@@ -613,7 +612,9 @@ public interface AsyncAdmin {
* @param snapshotName name of the snapshot to be created
* @param tableName name of the table for which snapshot is created
*/
- CompletableFuture<Void> snapshot(String snapshotName, TableName tableName);
+ default CompletableFuture<Void> snapshot(String snapshotName, TableName tableName) {
+ return snapshot(snapshotName, tableName, SnapshotType.FLUSH);
+ }
/**
* Create typed snapshot of the table. Snapshots are considered unique based on <b>the name of the
@@ -627,8 +628,10 @@ public interface AsyncAdmin {
* @param tableName name of the table to snapshot
* @param type type of snapshot to take
*/
- CompletableFuture<Void> snapshot(String snapshotName, TableName tableName,
- SnapshotType type);
+ default CompletableFuture<Void> snapshot(String snapshotName, TableName tableName,
+ SnapshotType type) {
+ return snapshot(new SnapshotDescription(snapshotName, tableName, type));
+ }
/**
* Take a snapshot and wait for the server to complete that snapshot asynchronously. Only a single
@@ -695,14 +698,16 @@ public interface AsyncAdmin {
* @return a list of snapshot descriptors for completed snapshots wrapped by a
* {@link CompletableFuture}
*/
- CompletableFuture<List<SnapshotDescription>> listSnapshots();
+ default CompletableFuture<List<SnapshotDescription>> listSnapshots() {
+ return listSnapshots(Optional.empty());
+ }
/**
* List all the completed snapshots matching the given pattern.
* @param pattern The compiled regular expression to match against
* @return - returns a List of SnapshotDescription wrapped by a {@link CompletableFuture}
*/
- CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern);
+ CompletableFuture<List<SnapshotDescription>> listSnapshots(Optional<Pattern> pattern);
/**
* List all the completed snapshots matching the given table name regular expression and snapshot
@@ -725,7 +730,9 @@ public interface AsyncAdmin {
* Delete existing snapshots whose names match the pattern passed.
* @param pattern pattern for names of the snapshot to match
*/
- CompletableFuture<Void> deleteSnapshots(Pattern pattern);
+ default CompletableFuture<Void> deleteSnapshots(Pattern pattern) {
+ return deleteTableSnapshots(null, pattern);
+ }
/**
* Delete all existing snapshots matching the given table name regular expression and snapshot
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index 65005fa..22ed064 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -96,11 +96,17 @@ public interface AsyncConnection extends Closeable {
AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool);
/**
- * Retrieve an AsyncAdmin implementation to administer an HBase cluster. The returned AsyncAdmin
- * is not guaranteed to be thread-safe. A new instance should be created for each using thread.
- * This is a lightweight operation. Pooling or caching of the returned AsyncAdmin is not
- * recommended.
+ * Retrieve an AsyncAdmin implementation to administer an HBase cluster. The returned
+ * {@code CompletableFuture} will be finished directly in the rpc framework's callback thread, so
+ * typically you should not do any time consuming work inside these methods.
* @return an AsyncAdmin instance for cluster administration
*/
AsyncAdmin getAdmin();
+
+ /**
+ * Retrieve an AsyncAdmin implementation to administer an HBase cluster.
+ * @param pool the thread pool to use for executing callback
+ * @return an AsyncAdmin instance for cluster administration
+ */
+ AsyncAdmin getAdmin(ExecutorService pool);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 776498a..c170bce 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -279,6 +279,11 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public AsyncAdmin getAdmin() {
- return new AsyncHBaseAdmin(this);
+ return new RawAsyncHBaseAdmin(this);
+ }
+
+ @Override
+ public AsyncAdmin getAdmin(ExecutorService pool) {
+ return new AsyncHBaseAdmin(new RawAsyncHBaseAdmin(this), pool);
}
}
\ No newline at end of file
[2/5] hbase git commit: HBASE-18283 Provide a construct method which
accept a thread pool for AsyncAdmin
Posted by zg...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 39ae6a5..dff9116 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -19,11 +19,13 @@ package org.apache.hadoop.hbase.shaded.protobuf;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
+
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
@@ -1271,19 +1273,26 @@ public final class RequestConverter {
final byte [][] splitKeys,
final long nonceGroup,
final long nonce) {
+ return buildCreateTableRequest(hTableDesc, Optional.ofNullable(splitKeys), nonceGroup, nonce);
+ }
+
+ /**
+ * Creates a protocol buffer CreateTableRequest
+ * @param hTableDesc
+ * @param splitKeys
+ * @return a CreateTableRequest
+ */
+ public static CreateTableRequest buildCreateTableRequest(TableDescriptor hTableDesc,
+ Optional<byte[][]> splitKeys, long nonceGroup, long nonce) {
CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
builder.setTableSchema(ProtobufUtil.convertToTableSchema(hTableDesc));
- if (splitKeys != null) {
- for (byte [] splitKey : splitKeys) {
- builder.addSplitKeys(UnsafeByteOperations.unsafeWrap(splitKey));
- }
- }
+ splitKeys.ifPresent(keys -> Arrays.stream(keys).forEach(
+ key -> builder.addSplitKeys(UnsafeByteOperations.unsafeWrap(key))));
builder.setNonceGroup(nonceGroup);
builder.setNonce(nonce);
return builder.build();
}
-
/**
* Creates a protocol buffer ModifyTableRequest
*
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
index cdb5433..b182563 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBase.java
@@ -19,15 +19,32 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
/**
* Class to test AsyncAdmin.
@@ -36,13 +53,34 @@ public abstract class TestAsyncAdminBase {
protected static final Log LOG = LogFactory.getLog(TestAsyncAdminBase.class);
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- protected static byte[] FAMILY = Bytes.toBytes("testFamily");
+ protected static final byte[] FAMILY = Bytes.toBytes("testFamily");
protected static final byte[] FAMILY_0 = Bytes.toBytes("cf0");
protected static final byte[] FAMILY_1 = Bytes.toBytes("cf1");
protected static AsyncConnection ASYNC_CONN;
protected AsyncAdmin admin;
+ @Parameter
+ public Supplier<AsyncAdmin> getAdmin;
+
+ private static AsyncAdmin getRawAsyncAdmin() {
+ return ASYNC_CONN.getAdmin();
+ }
+
+ private static AsyncAdmin getAsyncAdmin() {
+ return ASYNC_CONN.getAdmin(ForkJoinPool.commonPool());
+ }
+
+ @Parameters
+ public static List<Object[]> params() {
+ return Arrays.asList(new Supplier<?>[] { TestAsyncAdminBase::getRawAsyncAdmin },
+ new Supplier<?>[] { TestAsyncAdminBase::getAsyncAdmin });
+ }
+
+ @Rule
+ public TestName testName = new TestName();
+ protected TableName tableName;
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
@@ -60,7 +98,43 @@ public abstract class TestAsyncAdminBase {
}
@Before
- public void setUp() throws Exception {
- this.admin = ASYNC_CONN.getAdmin();
+ public void setUp() {
+ admin = ASYNC_CONN.getAdmin();
+ String methodName = testName.getMethodName();
+ tableName = TableName.valueOf(methodName.substring(0, methodName.length() - 3));
+ }
+
+ @After
+ public void tearDown() {
+ admin.listTableNames(Optional.of(Pattern.compile(tableName.getNameAsString() + ".*")), false)
+ .whenCompleteAsync((tables, err) -> {
+ if (tables != null) {
+ tables.forEach(table -> {
+ try {
+ admin.disableTable(table).join();
+ } catch (Exception e) {
+ LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
+ }
+ admin.deleteTable(table).join();
+ });
+ }
+ }, ForkJoinPool.commonPool()).join();
+ }
+
+ protected void createTableWithDefaultConf(TableName tableName) {
+ createTableWithDefaultConf(tableName, Optional.empty());
+ }
+
+ protected void createTableWithDefaultConf(TableName tableName, Optional<byte[][]> splitKeys) {
+ createTableWithDefaultConf(tableName, splitKeys, FAMILY);
+ }
+
+ protected void createTableWithDefaultConf(TableName tableName, Optional<byte[][]> splitKeys,
+ byte[]... families) {
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+ for (byte[] family : families) {
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).build());
+ }
+ admin.createTable(builder.build(), splitKeys).join();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBalancerAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBalancerAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBalancerAdminApi.java
index 00303e2..995e0aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBalancerAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBalancerAdminApi.java
@@ -23,7 +23,10 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncBalancerAdminApi extends TestAsyncAdminBase {
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNamespaceAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNamespaceAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNamespaceAdminApi.java
index eccff3f..dd3655e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNamespaceAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNamespaceAdminApi.java
@@ -42,10 +42,13 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
/**
* Class to test asynchronous namespace admin operations.
*/
+@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncNamespaceAdminApi extends TestAsyncAdminBase {
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java
index 832bcbe..12c699b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcedureAdminApi.java
@@ -66,7 +66,7 @@ public class TestAsyncProcedureAdminApi extends TestAsyncAdminBase {
@Test
public void testExecProcedure() throws Exception {
- TableName tableName = TableName.valueOf("testExecProcedure");
+ String snapshotString = "offlineTableSnapshot";
try {
Table table = TEST_UTIL.createTable(tableName, Bytes.toBytes("cf"));
for (int i = 0; i < 100; i++) {
@@ -74,13 +74,13 @@ public class TestAsyncProcedureAdminApi extends TestAsyncAdminBase {
table.put(put);
}
// take a snapshot of the enabled table
- String snapshotString = "offlineTableSnapshot";
Map<String, String> props = new HashMap<>();
props.put("table", tableName.getNameAsString());
admin.execProcedure(SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, snapshotString,
props).get();
LOG.debug("Snapshot completed.");
} finally {
+ admin.deleteSnapshot(snapshotString).join();
TEST_UTIL.deleteTable(tableName);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java
index ac9bc16..c39a582 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java
@@ -18,10 +18,6 @@
package org.apache.hadoop.hbase.client;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.quotas.QuotaCache;
@@ -34,50 +30,35 @@ import org.apache.hadoop.hbase.quotas.ThrottleType;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.concurrent.TimeUnit;
+import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+@RunWith(Parameterized.class)
@Category({ ClientTests.class, MediumTests.class })
-public class TestAsyncQuotaAdminApi {
- private static final Log LOG = LogFactory.getLog(TestAsyncQuotaAdminApi.class);
-
- private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- protected static AsyncConnection ASYNC_CONN;
- protected AsyncAdmin admin;
+public class TestAsyncQuotaAdminApi extends TestAsyncAdminBase {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 2000);
- TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
- TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
- TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
- TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
- TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- IOUtils.closeQuietly(ASYNC_CONN);
- TEST_UTIL.shutdownMiniCluster();
- }
-
- @Before
- public void setUp() throws Exception {
- this.admin = ASYNC_CONN.getAdmin();
- }
-
@Test
public void testThrottleType() throws Exception {
String userName = User.getCurrent().getShortName();
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
index a3afabc..7c8b236 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -31,11 +32,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.HMaster;
@@ -51,36 +51,29 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
/**
* Class to test asynchronous region admin operations.
*/
+@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
public static Random RANDOM = new Random(System.currentTimeMillis());
- private void createTableWithDefaultConf(TableName TABLENAME) throws Exception {
- HTableDescriptor htd = new HTableDescriptor(TABLENAME);
- HColumnDescriptor hcd = new HColumnDescriptor("value");
- htd.addFamily(hcd);
-
- admin.createTable(htd, null).get();
- }
-
@Test
public void testCloseRegion() throws Exception {
- TableName TABLENAME = TableName.valueOf("TestHBACloseRegion");
- createTableWithDefaultConf(TABLENAME);
+ createTableWithDefaultConf(tableName);
HRegionInfo info = null;
- HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TABLENAME);
+ HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
for (HRegionInfo regionInfo : onlineRegions) {
if (!regionInfo.getTable().isSystemTable()) {
@@ -102,16 +95,14 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
@Test
public void testCloseRegionIfInvalidRegionNameIsPassed() throws Exception {
- final String name = "TestHBACloseRegion1";
- byte[] TABLENAME = Bytes.toBytes(name);
- createTableWithDefaultConf(TableName.valueOf(TABLENAME));
+ createTableWithDefaultConf(tableName);
HRegionInfo info = null;
- HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TableName.valueOf(TABLENAME));
+ HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
for (HRegionInfo regionInfo : onlineRegions) {
if (!regionInfo.isMetaTable()) {
- if (regionInfo.getRegionNameAsString().contains(name)) {
+ if (regionInfo.getRegionNameAsString().contains(tableName.getNameAsString())) {
info = regionInfo;
boolean catchNotServingException = false;
try {
@@ -132,10 +123,9 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
@Test
public void testCloseRegionWhenServerNameIsEmpty() throws Exception {
- byte[] TABLENAME = Bytes.toBytes("TestHBACloseRegionWhenServerNameIsEmpty");
- createTableWithDefaultConf(TableName.valueOf(TABLENAME));
+ createTableWithDefaultConf(tableName);
- HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TableName.valueOf(TABLENAME));
+ HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
for (HRegionInfo regionInfo : onlineRegions) {
if (!regionInfo.isMetaTable()) {
@@ -147,166 +137,61 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
}
@Test
- public void testGetRegion() throws Exception {
- AsyncHBaseAdmin rawAdmin = (AsyncHBaseAdmin) admin;
-
- final TableName tableName = TableName.valueOf("testGetRegion");
- LOG.info("Started " + tableName);
+ public void testGetRegionLocation() throws Exception {
+ RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
TEST_UTIL.createMultiRegionTable(tableName, HConstants.CATALOG_FAMILY);
-
- try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
- HRegionLocation regionLocation = locator.getRegionLocation(Bytes.toBytes("mmm"));
- HRegionInfo region = regionLocation.getRegionInfo();
- byte[] regionName = region.getRegionName();
- HRegionLocation location = rawAdmin.getRegionLocation(regionName).get();
- assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName()));
- location = rawAdmin.getRegionLocation(region.getEncodedNameAsBytes()).get();
- assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName()));
- }
- }
-
- @Test
- public void testMergeRegions() throws Exception {
- final TableName tableName = TableName.valueOf("testMergeRegions");
- HColumnDescriptor cd = new HColumnDescriptor("d");
- HTableDescriptor td = new HTableDescriptor(tableName);
- td.addFamily(cd);
- byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") };
- Admin syncAdmin = TEST_UTIL.getAdmin();
- try {
- TEST_UTIL.createTable(td, splitRows);
- TEST_UTIL.waitTableAvailable(tableName);
-
- List<HRegionInfo> tableRegions;
- HRegionInfo regionA;
- HRegionInfo regionB;
-
- // merge with full name
- tableRegions = syncAdmin.getTableRegions(tableName);
- assertEquals(3, syncAdmin.getTableRegions(tableName).size());
- regionA = tableRegions.get(0);
- regionB = tableRegions.get(1);
- admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
-
- assertEquals(2, syncAdmin.getTableRegions(tableName).size());
-
- // merge with encoded name
- tableRegions = syncAdmin.getTableRegions(tableName);
- regionA = tableRegions.get(0);
- regionB = tableRegions.get(1);
- admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
-
- assertEquals(1, syncAdmin.getTableRegions(tableName).size());
- } finally {
- syncAdmin.disableTable(tableName);
- syncAdmin.deleteTable(tableName);
- }
+ AsyncTableRegionLocator locator = ASYNC_CONN.getRegionLocator(tableName);
+ HRegionLocation regionLocation = locator.getRegionLocation(Bytes.toBytes("mmm")).get();
+ HRegionInfo region = regionLocation.getRegionInfo();
+ byte[] regionName = regionLocation.getRegionInfo().getRegionName();
+ HRegionLocation location = rawAdmin.getRegionLocation(regionName).get();
+ assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName()));
+ location = rawAdmin.getRegionLocation(region.getEncodedNameAsBytes()).get();
+ assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName()));
}
@Test
- public void testSplitTable() throws Exception {
- splitTests(TableName.valueOf("testSplitTable"), 3000, false, null);
- splitTests(TableName.valueOf("testSplitTableWithSplitPoint"), 3000, false, Bytes.toBytes("3"));
- splitTests(TableName.valueOf("testSplitRegion"), 3000, true, null);
- splitTests(TableName.valueOf("testSplitRegionWithSplitPoint"), 3000, true, Bytes.toBytes("3"));
- }
-
- private void splitTests(TableName tableName, int rowCount, boolean isSplitRegion,
- byte[] splitPoint) throws Exception {
- int count = 0;
- // create table
- HColumnDescriptor cd = new HColumnDescriptor("d");
- HTableDescriptor td = new HTableDescriptor(tableName);
- td.addFamily(cd);
- Table table = TEST_UTIL.createTable(td, null);
- TEST_UTIL.waitTableAvailable(tableName);
-
- List<HRegionInfo> regions = TEST_UTIL.getAdmin().getTableRegions(tableName);
- assertEquals(regions.size(), 1);
-
- List<Put> puts = new ArrayList<>();
- for (int i = 0; i < rowCount; i++) {
- Put put = new Put(Bytes.toBytes(i));
- put.addColumn(Bytes.toBytes("d"), null, Bytes.toBytes("value" + i));
- puts.add(put);
- }
- table.put(puts);
-
- if (isSplitRegion) {
- admin.splitRegion(regions.get(0).getRegionName(), Optional.ofNullable(splitPoint)).get();
- } else {
- if (splitPoint == null) {
- admin.split(tableName).get();
- } else {
- admin.split(tableName, splitPoint).get();
- }
- }
+ public void testAssignRegionAndUnassignRegion() throws Exception {
+ createTableWithDefaultConf(tableName);
- for (int i = 0; i < 45; i++) {
- try {
- List<HRegionInfo> hRegionInfos = TEST_UTIL.getAdmin().getTableRegions(tableName);
- count = hRegionInfos.size();
- if (count >= 2) {
- break;
- }
- Thread.sleep(1000L);
- } catch (Exception e) {
- LOG.error(e);
- }
+ // assign region.
+ HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+ AssignmentManager am = master.getAssignmentManager();
+ HRegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
+
+ // assert region on server
+ RegionStates regionStates = am.getRegionStates();
+ ServerName serverName = regionStates.getRegionServerOfRegion(hri);
+ TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
+ assertTrue(regionStates.getRegionState(hri).isOpened());
+
+ // Region is assigned now. Let's assign it again.
+ // Master should not abort, and region should stay assigned.
+ admin.assign(hri.getRegionName()).get();
+ try {
+ am.waitForAssignment(hri);
+ fail("Expected NoSuchProcedureException");
+ } catch (NoSuchProcedureException e) {
+ // Expected
}
+ assertTrue(regionStates.getRegionState(hri).isOpened());
- assertEquals(count, 2);
- }
-
- @Test
- public void testAssignRegionAndUnassignRegion() throws Exception {
- final TableName tableName = TableName.valueOf("testAssignRegionAndUnassignRegion");
+ // unassign region
+ admin.unassign(hri.getRegionName(), true).get();
try {
- // create test table
- HTableDescriptor desc = new HTableDescriptor(tableName);
- desc.addFamily(new HColumnDescriptor(FAMILY));
- admin.createTable(desc).get();
-
- // assign region.
- HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
- AssignmentManager am = master.getAssignmentManager();
- HRegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
-
- // assert region on server
- RegionStates regionStates = am.getRegionStates();
- ServerName serverName = regionStates.getRegionServerOfRegion(hri);
- TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
- assertTrue(regionStates.getRegionState(hri).isOpened());
-
- // Region is assigned now. Let's assign it again.
- // Master should not abort, and region should stay assigned.
- admin.assign(hri.getRegionName()).get();
- try {
- am.waitForAssignment(hri);
- fail("Expected NoSuchProcedureException");
- } catch (NoSuchProcedureException e) {
- // Expected
- }
- assertTrue(regionStates.getRegionState(hri).isOpened());
-
- // unassign region
- admin.unassign(hri.getRegionName(), true).get();
- try {
- am.waitForAssignment(hri);
- fail("Expected NoSuchProcedureException");
- } catch (NoSuchProcedureException e) {
- // Expected
- }
- assertTrue(regionStates.getRegionState(hri).isClosed());
- } finally {
- TEST_UTIL.deleteTable(tableName);
+ am.waitForAssignment(hri);
+ fail("Expected NoSuchProcedureException");
+ } catch (NoSuchProcedureException e) {
+ // Expected
}
+ assertTrue(regionStates.getRegionState(hri).isClosed());
}
HRegionInfo createTableAndGetOneRegion(final TableName tableName)
throws IOException, InterruptedException, ExecutionException {
- HTableDescriptor desc = new HTableDescriptor(tableName);
- desc.addFamily(new HColumnDescriptor(FAMILY));
+ TableDescriptor desc =
+ TableDescriptorBuilder.newBuilder(tableName)
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build()).build();
admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get();
// wait till the table is assigned
@@ -333,280 +218,341 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
// Will cause the Master to tell the regionserver to shut itself down because
// regionserver is reporting the state as OPEN.
public void testOfflineRegion() throws Exception {
- final TableName tableName = TableName.valueOf("testOfflineRegion");
- try {
- HRegionInfo hri = createTableAndGetOneRegion(tableName);
+ HRegionInfo hri = createTableAndGetOneRegion(tableName);
- RegionStates regionStates =
- TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
- admin.offline(hri.getRegionName()).get();
+ RegionStates regionStates =
+ TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
+ admin.offline(hri.getRegionName()).get();
- long timeoutTime = System.currentTimeMillis() + 3000;
- while (true) {
- if (regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OFFLINE)
- .contains(hri))
- break;
- long now = System.currentTimeMillis();
- if (now > timeoutTime) {
- fail("Failed to offline the region in time");
- break;
- }
- Thread.sleep(10);
+ long timeoutTime = System.currentTimeMillis() + 3000;
+ while (true) {
+ if (regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OFFLINE)
+ .contains(hri)) break;
+ long now = System.currentTimeMillis();
+ if (now > timeoutTime) {
+ fail("Failed to offline the region in time");
+ break;
}
- RegionState regionState = regionStates.getRegionState(hri);
- assertTrue(regionState.isOffline());
- } finally {
- TEST_UTIL.deleteTable(tableName);
+ Thread.sleep(10);
}
+ RegionState regionState = regionStates.getRegionState(hri);
+ assertTrue(regionState.isOffline());
}
@Test
public void testGetRegionByStateOfTable() throws Exception {
- final TableName tableName = TableName.valueOf("testGetRegionByStateOfTable");
- try {
- HRegionInfo hri = createTableAndGetOneRegion(tableName);
-
- RegionStates regionStates =
- TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
- assertTrue(regionStates.getRegionByStateOfTable(tableName)
- .get(RegionState.State.OPEN)
- .contains(hri));
- assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom"))
- .get(RegionState.State.OPEN)
- .contains(hri));
- } finally {
- TEST_UTIL.deleteTable(tableName);
- }
+ HRegionInfo hri = createTableAndGetOneRegion(tableName);
+
+ RegionStates regionStates =
+ TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
+ assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN)
+ .contains(hri));
+ assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom"))
+ .get(RegionState.State.OPEN).contains(hri));
}
@Test
public void testMoveRegion() throws Exception {
- final TableName tableName = TableName.valueOf("testMoveRegion");
- try {
- HRegionInfo hri = createTableAndGetOneRegion(tableName);
-
- HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
- RegionStates regionStates = master.getAssignmentManager().getRegionStates();
- ServerName serverName = regionStates.getRegionServerOfRegion(hri);
- ServerManager serverManager = master.getServerManager();
- ServerName destServerName = null;
- List<JVMClusterUtil.RegionServerThread> regionServers =
- TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
- for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
- HRegionServer destServer = regionServer.getRegionServer();
- destServerName = destServer.getServerName();
- if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
- break;
- }
+ admin.setBalancerOn(false).join();
+
+ HRegionInfo hri = createTableAndGetOneRegion(tableName);
+ RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
+ ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
+
+ HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+ ServerManager serverManager = master.getServerManager();
+ ServerName destServerName = null;
+ List<JVMClusterUtil.RegionServerThread> regionServers =
+ TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
+ for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
+ HRegionServer destServer = regionServer.getRegionServer();
+ destServerName = destServer.getServerName();
+ if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
+ break;
}
- assertTrue(destServerName != null && !destServerName.equals(serverName));
- admin.move(hri.getRegionName(), Optional.of(destServerName)).get();
-
- long timeoutTime = System.currentTimeMillis() + 30000;
- while (true) {
- ServerName sn = regionStates.getRegionServerOfRegion(hri);
- if (sn != null && sn.equals(destServerName)) {
- TEST_UTIL.assertRegionOnServer(hri, sn, 200);
- break;
- }
- long now = System.currentTimeMillis();
- if (now > timeoutTime) {
- fail("Failed to move the region in time: " + regionStates.getRegionState(hri));
- }
- regionStates.wait(50);
+ }
+
+ assertTrue(destServerName != null && !destServerName.equals(serverName));
+ admin.move(hri.getRegionName(), Optional.of(destServerName)).get();
+
+ long timeoutTime = System.currentTimeMillis() + 30000;
+ while (true) {
+ ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
+ if (sn != null && sn.equals(destServerName)) {
+ break;
}
- } finally {
- TEST_UTIL.deleteTable(tableName);
+ long now = System.currentTimeMillis();
+ if (now > timeoutTime) {
+ fail("Failed to move the region in time: " + hri);
+ }
+ Thread.sleep(100);
}
+ admin.setBalancerOn(true).join();
}
@Test
public void testGetOnlineRegions() throws Exception {
- final TableName tableName = TableName.valueOf("testGetOnlineRegions");
- try {
- createTableAndGetOneRegion(tableName);
- AtomicInteger regionServerCount = new AtomicInteger(0);
- TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
- .map(rsThread -> rsThread.getRegionServer().getServerName()).forEach(serverName -> {
+ createTableAndGetOneRegion(tableName);
+ AtomicInteger regionServerCount = new AtomicInteger(0);
+ TEST_UTIL
+ .getHBaseCluster()
+ .getLiveRegionServerThreads()
+ .stream()
+ .map(rsThread -> rsThread.getRegionServer())
+ .forEach(
+ rs -> {
+ ServerName serverName = rs.getServerName();
try {
- Assert.assertEquals(admin.getOnlineRegions(serverName).get().size(),
- TEST_UTIL.getAdmin().getOnlineRegions(serverName).size());
+ Assert.assertEquals(admin.getOnlineRegions(serverName).get().size(), rs
+ .getOnlineRegions().size());
} catch (Exception e) {
fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage());
}
regionServerCount.incrementAndGet();
});
- Assert.assertEquals(regionServerCount.get(), 2);
- } catch (Exception e) {
- LOG.info("Exception", e);
- throw e;
- } finally {
- TEST_UTIL.deleteTable(tableName);
- }
+ Assert.assertEquals(regionServerCount.get(), 2);
}
@Test
public void testFlushTableAndRegion() throws Exception {
- final TableName tableName = TableName.valueOf("testFlushRegion");
- try {
- HRegionInfo hri = createTableAndGetOneRegion(tableName);
- ServerName serverName = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
- .getRegionStates().getRegionServerOfRegion(hri);
- HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
- .map(rsThread -> rsThread.getRegionServer())
- .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
- // write a put into the specific region
- try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
- table.put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")));
- }
- Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0);
- // flush region and wait flush operation finished.
- LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
- admin.flushRegion(hri.getRegionName()).get();
- LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
- Threads.sleepWithoutInterrupt(500);
- while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) {
- Threads.sleep(50);
- }
- // check the memstore.
- Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
+ HRegionInfo hri = createTableAndGetOneRegion(tableName);
+ ServerName serverName =
+ TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+ .getRegionServerOfRegion(hri);
+ HRegionServer regionServer =
+ TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
+ .map(rsThread -> rsThread.getRegionServer())
+ .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
+
+ // write a put into the specific region
+ ASYNC_CONN.getRawTable(tableName)
+ .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")))
+ .join();
+ Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0);
+ // flush region and wait flush operation finished.
+ LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
+ admin.flushRegion(hri.getRegionName()).get();
+ LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
+ Threads.sleepWithoutInterrupt(500);
+ while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) {
+ Threads.sleep(50);
+ }
+ // check the memstore.
+ Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
+
+ // write another put into the specific region
+ ASYNC_CONN.getRawTable(tableName)
+ .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")))
+ .join();
+ Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0);
+ admin.flush(tableName).get();
+ Threads.sleepWithoutInterrupt(500);
+ while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) {
+ Threads.sleep(50);
+ }
+ // check the memstore.
+ Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
+ }
- // write another put into the specific region
- try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
- table.put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")));
- }
- Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0);
- admin.flush(tableName).get();
- Threads.sleepWithoutInterrupt(500);
- while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) {
- Threads.sleep(50);
+ @Test
+ public void testMergeRegions() throws Exception {
+ byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") };
+ createTableWithDefaultConf(tableName, Optional.of(splitRows));
+
+ RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
+ List<HRegionLocation> regionLocations =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+ HRegionInfo regionA;
+ HRegionInfo regionB;
+
+ // merge with full name
+ assertEquals(3, regionLocations.size());
+ regionA = regionLocations.get(0).getRegionInfo();
+ regionB = regionLocations.get(1).getRegionInfo();
+ admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
+
+ regionLocations =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+ assertEquals(2, regionLocations.size());
+ // merge with encoded name
+ regionA = regionLocations.get(0).getRegionInfo();
+ regionB = regionLocations.get(1).getRegionInfo();
+ admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
+
+ regionLocations =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+ assertEquals(1, regionLocations.size());
+ }
+
+ @Test
+ public void testSplitTable() throws Exception {
+ splitTest(TableName.valueOf("testSplitTable"), 3000, false, null);
+ splitTest(TableName.valueOf("testSplitTableWithSplitPoint"), 3000, false, Bytes.toBytes("3"));
+ splitTest(TableName.valueOf("testSplitTableRegion"), 3000, true, null);
+ splitTest(TableName.valueOf("testSplitTableRegionWithSplitPoint2"), 3000, true, Bytes.toBytes("3"));
+ }
+
+ private void
+ splitTest(TableName tableName, int rowCount, boolean isSplitRegion, byte[] splitPoint)
+ throws Exception {
+ // create table
+ createTableWithDefaultConf(tableName);
+
+ RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
+ List<HRegionLocation> regionLocations =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
+ assertEquals(1, regionLocations.size());
+
+ RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
+ List<Put> puts = new ArrayList<>();
+ for (int i = 0; i < rowCount; i++) {
+ Put put = new Put(Bytes.toBytes(i));
+ put.addColumn(FAMILY, null, Bytes.toBytes("value" + i));
+ puts.add(put);
+ }
+ table.putAll(puts).join();
+
+ if (isSplitRegion) {
+ admin.splitRegion(regionLocations.get(0).getRegionInfo().getRegionName(),
+ Optional.ofNullable(splitPoint)).get();
+ } else {
+ if (splitPoint == null) {
+ admin.split(tableName).get();
+ } else {
+ admin.split(tableName, splitPoint).get();
}
- // check the memstore.
- Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
- } finally {
- TEST_UTIL.deleteTable(tableName);
}
- }
- @Test(timeout = 600000)
- public void testCompactRpcAPI() throws Exception {
- String tableName = "testCompactRpcAPI";
- compactionTest(tableName, 8, CompactionState.MAJOR, false);
- compactionTest(tableName, 15, CompactionState.MINOR, false);
- compactionTest(tableName, 8, CompactionState.MAJOR, true);
- compactionTest(tableName, 15, CompactionState.MINOR, true);
+ int count = 0;
+ for (int i = 0; i < 45; i++) {
+ try {
+ regionLocations =
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
+ .get();
+ count = regionLocations.size();
+ if (count >= 2) {
+ break;
+ }
+ Thread.sleep(1000L);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+ assertEquals(count, 2);
}
- @Test(timeout = 600000)
+ @Test
public void testCompactRegionServer() throws Exception {
- TableName table = TableName.valueOf("testCompactRegionServer");
byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
- Table ht = null;
- try {
- ht = TEST_UTIL.createTable(table, families);
- loadData(ht, families, 3000, 8);
- List<HRegionServer> rsList = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
- .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
- List<Region> regions = new ArrayList<>();
- rsList.forEach(rs -> regions.addAll(rs.getOnlineRegions(table)));
- Assert.assertEquals(regions.size(), 1);
- int countBefore = countStoreFilesInFamilies(regions, families);
- Assert.assertTrue(countBefore > 0);
- // Minor compaction for all region servers.
- for (HRegionServer rs : rsList)
- admin.compactRegionServer(rs.getServerName()).get();
- Thread.sleep(5000);
- int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
- Assert.assertTrue(countAfterMinorCompaction < countBefore);
- // Major compaction for all region servers.
- for (HRegionServer rs : rsList)
- admin.majorCompactRegionServer(rs.getServerName()).get();
- Thread.sleep(5000);
- int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
- Assert.assertEquals(countAfterMajorCompaction, 3);
- } finally {
- if (ht != null) {
- TEST_UTIL.deleteTable(table);
- }
- }
+ createTableWithDefaultConf(tableName, Optional.empty(), families);
+ loadData(tableName, families, 3000, 8);
+
+ List<HRegionServer> rsList =
+ TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
+ .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
+ List<Region> regions = new ArrayList<>();
+ rsList.forEach(rs -> regions.addAll(rs.getOnlineRegions(tableName)));
+ Assert.assertEquals(regions.size(), 1);
+ int countBefore = countStoreFilesInFamilies(regions, families);
+ Assert.assertTrue(countBefore > 0);
+
+ // Minor compaction for all region servers.
+ for (HRegionServer rs : rsList)
+ admin.compactRegionServer(rs.getServerName()).get();
+ Thread.sleep(5000);
+ int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
+ Assert.assertTrue(countAfterMinorCompaction < countBefore);
+
+ // Major compaction for all region servers.
+ for (HRegionServer rs : rsList)
+ admin.majorCompactRegionServer(rs.getServerName()).get();
+ Thread.sleep(5000);
+ int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
+ Assert.assertEquals(countAfterMajorCompaction, 3);
}
- private void compactionTest(final String tableName, final int flushes,
+ @Test
+ public void testCompact() throws Exception {
+ compactionTest(TableName.valueOf("testCompact1"), 8, CompactionState.MAJOR, false);
+ compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, false);
+ compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, true);
+ compactionTest(TableName.valueOf("testCompact4"), 15, CompactionState.MINOR, true);
+ }
+
+ private void compactionTest(final TableName tableName, final int flushes,
final CompactionState expectedState, boolean singleFamily) throws Exception {
// Create a table with regions
- final TableName table = TableName.valueOf(tableName);
byte[] family = Bytes.toBytes("family");
byte[][] families =
{ family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
- Table ht = null;
- try {
- ht = TEST_UTIL.createTable(table, families);
- loadData(ht, families, 3000, flushes);
- List<Region> regions = new ArrayList<>();
- TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads()
- .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getOnlineRegions(table)));
- Assert.assertEquals(regions.size(), 1);
- int countBefore = countStoreFilesInFamilies(regions, families);
- int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
- assertTrue(countBefore > 0); // there should be some data files
- if (expectedState == CompactionState.MINOR) {
- if (singleFamily) {
- admin.compact(table, Optional.of(family)).get();
- } else {
- admin.compact(table, Optional.empty()).get();
- }
+ createTableWithDefaultConf(tableName, Optional.empty(), families);
+ loadData(tableName, families, 3000, flushes);
+
+ List<Region> regions = new ArrayList<>();
+ TEST_UTIL
+ .getHBaseCluster()
+ .getLiveRegionServerThreads()
+ .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getOnlineRegions(tableName)));
+ Assert.assertEquals(regions.size(), 1);
+
+ int countBefore = countStoreFilesInFamilies(regions, families);
+ int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
+ assertTrue(countBefore > 0); // there should be some data files
+ if (expectedState == CompactionState.MINOR) {
+ if (singleFamily) {
+ admin.compact(tableName, Optional.of(family)).get();
} else {
- if (singleFamily) {
- admin.majorCompact(table, Optional.of(family)).get();
- } else {
- admin.majorCompact(table, Optional.empty()).get();
- }
+ admin.compact(tableName, Optional.empty()).get();
}
- long curt = System.currentTimeMillis();
- long waitTime = 5000;
- long endt = curt + waitTime;
- CompactionState state = TEST_UTIL.getAdmin().getCompactionState(table);
- while (state == CompactionState.NONE && curt < endt) {
- Thread.sleep(10);
- state = TEST_UTIL.getAdmin().getCompactionState(table);
- curt = System.currentTimeMillis();
- }
- // Now, should have the right compaction state,
- // otherwise, the compaction should have already been done
- if (expectedState != state) {
- for (Region region : regions) {
- state = CompactionState.valueOf(region.getCompactionState().toString());
- assertEquals(CompactionState.NONE, state);
- }
+ } else {
+ if (singleFamily) {
+ admin.majorCompact(tableName, Optional.of(family)).get();
} else {
- // Wait until the compaction is done
- state = TEST_UTIL.getAdmin().getCompactionState(table);
- while (state != CompactionState.NONE && curt < endt) {
- Thread.sleep(10);
- state = TEST_UTIL.getAdmin().getCompactionState(table);
- }
- // Now, compaction should be done.
+ admin.majorCompact(tableName, Optional.empty()).get();
+ }
+ }
+
+ long curt = System.currentTimeMillis();
+ long waitTime = 5000;
+ long endt = curt + waitTime;
+ CompactionState state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+ while (state == CompactionState.NONE && curt < endt) {
+ Thread.sleep(10);
+ state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+ curt = System.currentTimeMillis();
+ }
+ // Now, should have the right compaction state,
+ // otherwise, the compaction should have already been done
+ if (expectedState != state) {
+ for (Region region : regions) {
+ state = CompactionState.valueOf(region.getCompactionState().toString());
assertEquals(CompactionState.NONE, state);
}
- int countAfter = countStoreFilesInFamilies(regions, families);
- int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
- assertTrue(countAfter < countBefore);
- if (!singleFamily) {
- if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter);
- else assertTrue(families.length < countAfter);
- } else {
- int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
- // assert only change was to single column family
- assertTrue(singleFamDiff == (countBefore - countAfter));
- if (expectedState == CompactionState.MAJOR) {
- assertTrue(1 == countAfterSingleFamily);
- } else {
- assertTrue(1 < countAfterSingleFamily);
- }
+ } else {
+ // Wait until the compaction is done
+ state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+ while (state != CompactionState.NONE && curt < endt) {
+ Thread.sleep(10);
+ state = TEST_UTIL.getAdmin().getCompactionState(tableName);
}
- } finally {
- if (ht != null) {
- TEST_UTIL.deleteTable(table);
+ // Now, compaction should be done.
+ assertEquals(CompactionState.NONE, state);
+ }
+
+ int countAfter = countStoreFilesInFamilies(regions, families);
+ int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
+ assertTrue(countAfter < countBefore);
+ if (!singleFamily) {
+ if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter);
+ else assertTrue(families.length < countAfter);
+ } else {
+ int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
+ // assert only change was to single column family
+ assertTrue(singleFamDiff == (countBefore - countAfter));
+ if (expectedState == CompactionState.MAJOR) {
+ assertTrue(1 == countAfterSingleFamily);
+ } else {
+ assertTrue(1 < countAfterSingleFamily);
}
}
}
@@ -623,8 +569,9 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
return count;
}
- private static void loadData(final Table ht, final byte[][] families, final int rows,
+ private static void loadData(final TableName tableName, final byte[][] families, final int rows,
final int flushes) throws IOException {
+ RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
List<Put> puts = new ArrayList<>(rows);
byte[] qualifier = Bytes.toBytes("val");
for (int i = 0; i < flushes; i++) {
@@ -636,7 +583,7 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
}
puts.add(p);
}
- ht.put(puts);
+ table.putAll(puts).join();
TEST_UTIL.flush();
puts.clear();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
index 9c46be9..3e577bc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
@@ -45,10 +45,13 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
/**
* Class to test asynchronous replication admin operations.
*/
+@RunWith(Parameterized.class)
@Category({LargeTests.class, ClientTests.class})
public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
@@ -57,9 +60,6 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
private final String ID_SECOND = "2";
private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
- @Rule
- public TestName name = new TestName();
-
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
@@ -142,12 +142,12 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
public void testAppendPeerTableCFs() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
rpc1.setClusterKey(KEY_ONE);
- final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
- final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
- final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
- final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
- final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5");
- final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6");
+ final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
+ final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
+ final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
+ final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
+ final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5");
+ final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");
// Add a valid peer
admin.addReplicationPeer(ID_ONE, rpc1).join();
@@ -244,10 +244,10 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
public void testRemovePeerTableCFs() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
rpc1.setClusterKey(KEY_ONE);
- final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
- final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
- final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
- final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
+ final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
+ final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
+ final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
+ final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
// Add a valid peer
admin.addReplicationPeer(ID_ONE, rpc1).join();
Map<TableName, List<String>> tableCFs = new HashMap<>();
@@ -360,8 +360,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
public void testNamespacesAndTableCfsConfigConflict() throws Exception {
String ns1 = "ns1";
String ns2 = "ns2";
- final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName());
- final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2");
+ final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
+ final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
http://git-wip-us.apache.org/repos/asf/hbase/blob/14f0423b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSnapshotAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSnapshotAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSnapshotAdminApi.java
index 3e0c261..a646287 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSnapshotAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSnapshotAdminApi.java
@@ -29,12 +29,16 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.regex.Pattern;
+@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class })
public class TestAsyncSnapshotAdminApi extends TestAsyncAdminBase {
@@ -42,15 +46,6 @@ public class TestAsyncSnapshotAdminApi extends TestAsyncAdminBase {
String snapshotName2 = "snapshotName2";
String snapshotName3 = "snapshotName3";
- @Rule
- public TestName testName = new TestName();
- TableName tableName;
-
- @Before
- public void setup() {
- tableName = TableName.valueOf(testName.getMethodName());
- }
-
@After
public void cleanup() throws Exception {
admin.deleteSnapshots(Pattern.compile(".*")).get();
@@ -175,10 +170,13 @@ public class TestAsyncSnapshotAdminApi extends TestAsyncAdminBase {
admin.snapshot(snapshotName3, tableName).get();
Assert.assertEquals(admin.listSnapshots().get().size(), 3);
- Assert.assertEquals(admin.listSnapshots(Pattern.compile("(.*)")).get().size(), 3);
- Assert.assertEquals(admin.listSnapshots(Pattern.compile("snapshotName(\\d+)")).get().size(), 3);
- Assert.assertEquals(admin.listSnapshots(Pattern.compile("snapshotName[1|3]")).get().size(), 2);
- Assert.assertEquals(admin.listSnapshots(Pattern.compile("snapshot(.*)")).get().size(), 3);
+ Assert.assertEquals(admin.listSnapshots(Optional.of(Pattern.compile("(.*)"))).get().size(), 3);
+ Assert.assertEquals(admin.listSnapshots(Optional.of(Pattern.compile("snapshotName(\\d+)")))
+ .get().size(), 3);
+ Assert.assertEquals(admin.listSnapshots(Optional.of(Pattern.compile("snapshotName[1|3]")))
+ .get().size(), 2);
+ Assert.assertEquals(admin.listSnapshots(Optional.of(Pattern.compile("snapshot(.*)"))).get()
+ .size(), 3);
Assert.assertEquals(
admin.listTableSnapshots(Pattern.compile("testListSnapshots"), Pattern.compile("s(.*)")).get()
.size(),