You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/04 06:40:33 UTC
[07/15] hbase git commit: HBASE-17356 Add replica get support
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 4f73909..869a630 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
@@ -491,23 +492,23 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
- this.<List<TableSchema>> newMasterCaller()
- .action(
- (controller, stub) -> this
- .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
- controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s,
- c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp
- .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (!tableSchemas.isEmpty()) {
- future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
- } else {
- future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
- }
- });
+ addListener(this.<List<TableSchema>> newMasterCaller()
+ .action((controller, stub) -> this
+ .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
+ controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName),
+ (s, c, req, done) -> s.getTableDescriptors(c, req, done),
+ (resp) -> resp.getTableSchemaList()))
+ .call(), (tableSchemas, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (!tableSchemas.isEmpty()) {
+ future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
+ } else {
+ future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
+ }
+ });
return future;
}
@@ -590,7 +591,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
- AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+ addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@@ -607,7 +608,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
- AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
+ addListener(AsyncMetaTableAccessor.getTableState(metaTable, tableName), (state, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@@ -636,40 +637,37 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<Boolean> isTableAvailable(TableName tableName,
Optional<byte[][]> splitKeys) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
- isTableEnabled(tableName).whenComplete(
- (enabled, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (!enabled) {
- future.complete(false);
- } else {
- AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
- .whenComplete(
- (locations, error1) -> {
- if (error1 != null) {
- future.completeExceptionally(error1);
- return;
- }
- List<HRegionLocation> notDeployedRegions =
- locations.stream().filter(loc -> loc.getServerName() == null)
- .collect(Collectors.toList());
- if (notDeployedRegions.size() > 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Table " + tableName + " has " + notDeployedRegions.size()
- + " regions");
- }
- future.complete(false);
- return;
- }
+ addListener(isTableEnabled(tableName), (enabled, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (!enabled) {
+ future.complete(false);
+ } else {
+ addListener(
+ AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)),
+ (locations, error1) -> {
+ if (error1 != null) {
+ future.completeExceptionally(error1);
+ return;
+ }
+ List<HRegionLocation> notDeployedRegions = locations.stream()
+ .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
+ if (notDeployedRegions.size() > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
+ }
+ future.complete(false);
+ return;
+ }
- Optional<Boolean> available =
- splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
- future.complete(available.orElse(true));
- });
- }
- });
+ Optional<Boolean> available =
+ splitKeys.map(keys -> compareRegionsWithSplitKeys(locations, keys));
+ future.complete(available.orElse(true));
+ });
+ }
+ });
return future;
}
@@ -791,20 +789,20 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> flush(TableName tableName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- tableExists(tableName).whenComplete((exists, err) -> {
+ addListener(tableExists(tableName), (exists, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (!exists) {
future.completeExceptionally(new TableNotFoundException(tableName));
} else {
- isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> {
+ addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else if (!tableEnabled) {
future.completeExceptionally(new TableNotEnabledException(tableName));
} else {
- execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
- new HashMap<>()).whenComplete((ret, err3) -> {
+ addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
+ new HashMap<>()), (ret, err3) -> {
if (err3 != null) {
future.completeExceptionally(err3);
} else {
@@ -821,27 +819,25 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> flushRegion(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
+ addListener(getRegionLocation(regionName), (location, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future
+ .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+ return;
+ }
+ addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
}
- flush(serverName, location.getRegion())
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
});
+ });
return future;
}
@@ -859,7 +855,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> flushRegionServer(ServerName sn) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegions(sn).whenComplete((hRegionInfos, err) -> {
+ addListener(getRegions(sn), (hRegionInfos, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
@@ -868,9 +864,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
if (hRegionInfos != null) {
hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
}
- CompletableFuture
- .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
- .whenComplete((ret, err2) -> {
+ addListener(CompletableFuture.allOf(
+ compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
@@ -943,7 +938,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegions(sn).whenComplete((hRegionInfos, err) -> {
+ addListener(getRegions(sn), (hRegionInfos, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
@@ -952,15 +947,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
if (hRegionInfos != null) {
hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
}
- CompletableFuture
- .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
+ addListener(CompletableFuture.allOf(
+ compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
});
return future;
}
@@ -968,28 +962,26 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
boolean major) {
CompletableFuture<Void> future = new CompletableFuture<>();
-
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
- }
- compact(location.getServerName(), location.getRegion(), major, columnFamily)
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
+ addListener(getRegionLocation(regionName), (location, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future
+ .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+ return;
+ }
+ addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
+ (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
return future;
}
@@ -1001,19 +993,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
// For meta table, we use zk to fetch all locations.
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
- registry.getMetaRegionLocation().whenComplete(
- (metaRegions, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (metaRegions == null || metaRegions.isEmpty()
- || metaRegions.getDefaultRegionLocation() == null) {
- future.completeExceptionally(new IOException("meta region does not found"));
- } else {
- future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
- }
- // close the registry.
- IOUtils.closeQuietly(registry);
- });
+ addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ } else if (metaRegions == null || metaRegions.isEmpty() ||
+ metaRegions.getDefaultRegionLocation() == null) {
+ future.completeExceptionally(new IOException("meta region does not found"));
+ } else {
+ future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
+ }
+ // close the registry.
+ IOUtils.closeQuietly(registry);
+ });
return future;
} else {
// For non-meta table, we fetch all locations by scanning hbase:meta table
@@ -1024,40 +1015,40 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
/**
* Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
*/
- private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
- boolean major, CompactType compactType) {
+ private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
+ CompactType compactType) {
CompletableFuture<Void> future = new CompletableFuture<>();
switch (compactType) {
case MOB:
- connection.registry.getMasterAddress().whenComplete((serverName, err) -> {
+ addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
- compact(serverName, regionInfo, major, columnFamily)
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
+ addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
});
break;
case NORMAL:
- getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
+ addListener(getTableHRegionLocations(tableName), (locations, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
- CompletableFuture<?>[] compactFutures = locations.stream().filter(l -> l.getRegion() != null)
+ CompletableFuture<?>[] compactFutures =
+ locations.stream().filter(l -> l.getRegion() != null)
.filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
.map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
.toArray(CompletableFuture<?>[]::new);
// future complete unless all of the compact futures are completed.
- CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> {
+ addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
@@ -1098,29 +1089,28 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
CompletableFuture<TableName> result) {
- getRegionLocation(encodeRegionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- result.completeExceptionally(err);
- return;
- }
- RegionInfo regionInfo = location.getRegion();
- if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
- result.completeExceptionally(new IllegalArgumentException(
- "Can't invoke merge on non-default regions directly"));
- return;
- }
- if (!tableName.compareAndSet(null, regionInfo.getTable())) {
- if (!tableName.get().equals(regionInfo.getTable())) {
- // tables of this two region should be same.
- result.completeExceptionally(new IllegalArgumentException(
- "Cannot merge regions from two different tables " + tableName.get() + " and "
- + regionInfo.getTable()));
- } else {
- result.complete(tableName.get());
- }
+ addListener(getRegionLocation(encodeRegionName), (location, err) -> {
+ if (err != null) {
+ result.completeExceptionally(err);
+ return;
+ }
+ RegionInfo regionInfo = location.getRegion();
+ if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+ result.completeExceptionally(
+ new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
+ return;
+ }
+ if (!tableName.compareAndSet(null, regionInfo.getTable())) {
+ if (!tableName.get().equals(regionInfo.getTable())) {
+ // tables of this two region should be same.
+ result.completeExceptionally(
+ new IllegalArgumentException("Cannot merge regions from two different tables " +
+ tableName.get() + " and " + regionInfo.getTable()));
+ } else {
+ result.complete(tableName.get());
}
- });
+ }
+ });
}
private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA,
@@ -1185,41 +1175,42 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
final byte[] encodeRegionNameA = toEncodeRegionName(nameOfRegionA);
final byte[] encodeRegionNameB = toEncodeRegionName(nameOfRegionB);
- checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB)
- .whenComplete((tableName, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
+ addListener(checkRegionsAndGetTableName(encodeRegionNameA, encodeRegionNameB),
+ (tableName, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
- MergeTableRegionsRequest request = null;
- try {
- request = RequestConverter.buildMergeTableRegionsRequest(
- new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
- ng.newNonce());
- } catch (DeserializationException e) {
- future.completeExceptionally(e);
- return;
- }
+ MergeTableRegionsRequest request = null;
+ try {
+ request = RequestConverter.buildMergeTableRegionsRequest(
+ new byte[][] { encodeRegionNameA, encodeRegionNameB }, forcible, ng.getNonceGroup(),
+ ng.newNonce());
+ } catch (DeserializationException e) {
+ future.completeExceptionally(e);
+ return;
+ }
+ addListener(
this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(request,
(s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
- new MergeTableRegionProcedureBiConsumer(tableName)).whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
-
- });
+ new MergeTableRegionProcedureBiConsumer(tableName)),
+ (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
return future;
}
@Override
public CompletableFuture<Void> split(TableName tableName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- tableExists(tableName).whenComplete((exist, error) -> {
+ addListener(tableExists(tableName), (exist, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@@ -1228,45 +1219,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
future.completeExceptionally(new TableNotFoundException(tableName));
return;
}
- metaTable
+ addListener(
+ metaTable
.scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
- .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
- .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION)))
- .whenComplete((results, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- return;
- }
- if (results != null && !results.isEmpty()) {
- List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
- for (Result r : results) {
- if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) continue;
- RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
- if (rl != null) {
- for (HRegionLocation h : rl.getRegionLocations()) {
- if (h != null && h.getServerName() != null) {
- RegionInfo hri = h.getRegion();
- if (hri == null || hri.isSplitParent()
- || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID)
- continue;
- splitFutures.add(split(hri, null));
+ .withStartRow(MetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REGION))
+ .withStopRow(MetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REGION))),
+ (results, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ return;
+ }
+ if (results != null && !results.isEmpty()) {
+ List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
+ for (Result r : results) {
+ if (r.isEmpty() || MetaTableAccessor.getRegionInfo(r) == null) {
+ continue;
+ }
+ RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
+ if (rl != null) {
+ for (HRegionLocation h : rl.getRegionLocations()) {
+ if (h != null && h.getServerName() != null) {
+ RegionInfo hri = h.getRegion();
+ if (hri == null || hri.isSplitParent() ||
+ hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+ continue;
}
+ splitFutures.add(split(hri, null));
}
}
}
- CompletableFuture
- .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()]))
- .whenComplete((ret, exception) -> {
- if (exception != null) {
- future.completeExceptionally(exception);
- return;
- }
- future.complete(ret);
- });
- } else {
- future.complete(null);
}
- });
+ addListener(
+ CompletableFuture
+ .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
+ (ret, exception) -> {
+ if (exception != null) {
+ future.completeExceptionally(exception);
+ return;
+ }
+ future.complete(ret);
+ });
+ } else {
+ future.complete(null);
+ }
+ });
});
return future;
}
@@ -1277,54 +1273,52 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
if (splitPoint == null) {
return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
}
- connection.getRegionLocator(tableName).getRegionLocation(splitPoint)
- .whenComplete((loc, err) -> {
- if (err != null) {
- result.completeExceptionally(err);
- } else if (loc == null || loc.getRegion() == null) {
- result.completeExceptionally(new IllegalArgumentException(
- "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
- } else {
- splitRegion(loc.getRegion().getRegionName(), splitPoint)
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- result.completeExceptionally(err2);
- } else {
- result.complete(ret);
- }
+ addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint),
+ (loc, err) -> {
+ if (err != null) {
+ result.completeExceptionally(err);
+ } else if (loc == null || loc.getRegion() == null) {
+ result.completeExceptionally(new IllegalArgumentException(
+ "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
+ } else {
+ addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
+ if (err2 != null) {
+ result.completeExceptionally(err2);
+ } else {
+ result.complete(ret);
+ }
- });
- }
- });
+ });
+ }
+ });
return result;
}
@Override
public CompletableFuture<Void> splitRegion(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- RegionInfo regionInfo = location.getRegion();
- if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
- future.completeExceptionally(new IllegalArgumentException(
- "Can't split replicas directly. "
- + "Replicas are auto-split when their primary is split."));
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
+ addListener(getRegionLocation(regionName), (location, err) -> {
+ RegionInfo regionInfo = location.getRegion();
+ if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+ future
+ .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
+ "Replicas are auto-split when their primary is split."));
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future
+ .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+ return;
+ }
+ addListener(split(regionInfo, null), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
}
- split(regionInfo, null).whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
});
+ });
return future;
}
@@ -1333,35 +1327,34 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
Preconditions.checkNotNull(splitPoint,
"splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- RegionInfo regionInfo = location.getRegion();
- if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
- future.completeExceptionally(new IllegalArgumentException(
- "Can't split replicas directly. "
- + "Replicas are auto-split when their primary is split."));
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
- }
- if (regionInfo.getStartKey() != null
- && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
- future.completeExceptionally(new IllegalArgumentException(
- "should not give a splitkey which equals to startkey!"));
- return;
+ addListener(getRegionLocation(regionName), (location, err) -> {
+ RegionInfo regionInfo = location.getRegion();
+ if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+ future
+ .completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " +
+ "Replicas are auto-split when their primary is split."));
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future
+ .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+ return;
+ }
+ if (regionInfo.getStartKey() != null &&
+ Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0) {
+ future.completeExceptionally(
+ new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
+ return;
+ }
+ addListener(split(regionInfo, splitPoint), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
}
- split(regionInfo, splitPoint).whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
});
+ });
return future;
}
@@ -1370,121 +1363,119 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
TableName tableName = hri.getTable();
SplitTableRegionRequest request = null;
try {
- request = RequestConverter
- .buildSplitTableRegionRequest(hri, splitPoint,
- ng.getNonceGroup(), ng.newNonce());
+ request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
+ ng.newNonce());
} catch (DeserializationException e) {
future.completeExceptionally(e);
return future;
}
- this.<SplitTableRegionRequest, SplitTableRegionResponse>procedureCall(request,
- (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
- new SplitTableRegionProcedureBiConsumer(tableName)).whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
+ addListener(this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(request,
+ (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
+ new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
return future;
}
@Override
public CompletableFuture<Void> assign(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- this.<Void> newMasterCaller()
- .action(
- ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
- controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo
- .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done),
- resp -> null))).call().whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
+ addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ addListener(this.<Void> newMasterCaller()
+ .action(((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
+ controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
+ (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
+ .call(), (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
return future;
}
@Override
public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
+ addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ addListener(
this.<Void> newMasterCaller()
- .action(
- ((controller, stub) -> this
- .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
- RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
- (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call()
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
+ .action(((controller, stub) -> this
+ .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
+ RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible),
+ (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
+ .call(),
+ (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
return future;
}
@Override
public CompletableFuture<Void> offline(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
+ addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ addListener(
this.<Void> newMasterCaller()
- .action(
- ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
- controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo
- .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done),
- resp -> null))).call().whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
+ .action(((controller, stub) -> this
+ .<OfflineRegionRequest, OfflineRegionResponse, Void> call(controller, stub,
+ RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
+ (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
+ .call(),
+ (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
return future;
}
@Override
public CompletableFuture<Void> move(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete(
- (regionInfo, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
+ addListener(getRegionInfo(regionName), (regionInfo, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ addListener(
moveRegion(
- RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null))
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
- });
+ RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
+ (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
+ });
return future;
}
@@ -1493,20 +1484,20 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
Preconditions.checkNotNull(destServerName,
"destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
CompletableFuture<Void> future = new CompletableFuture<>();
- getRegionInfo(regionName).whenComplete((regionInfo, err) -> {
+ addListener(getRegionInfo(regionName), (regionInfo, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
- moveRegion(
- RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName))
- .whenComplete((ret, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(ret);
- }
- });
+ addListener(moveRegion(RequestConverter
+ .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
+ (ret, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(ret);
+ }
+ });
});
return future;
}
@@ -1636,11 +1627,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
CompletableFuture<Void> future = new CompletableFuture<Void>();
- getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
+ addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
if (!completeExceptionally(future, error)) {
ReplicationPeerConfig newPeerConfig =
- ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
- updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> {
+ ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
+ addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
if (!completeExceptionally(future, error)) {
future.complete(result);
}
@@ -1658,24 +1649,23 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
CompletableFuture<Void> future = new CompletableFuture<Void>();
- getReplicationPeerConfig(id).whenComplete(
- (peerConfig, error) -> {
- if (!completeExceptionally(future, error)) {
- ReplicationPeerConfig newPeerConfig = null;
- try {
- newPeerConfig = ReplicationPeerConfigUtil
- .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
- } catch (ReplicationException e) {
- future.completeExceptionally(e);
- return;
- }
- updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> {
- if (!completeExceptionally(future, error)) {
- future.complete(result);
- }
- });
+ addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
+ if (!completeExceptionally(future, error)) {
+ ReplicationPeerConfig newPeerConfig = null;
+ try {
+ newPeerConfig = ReplicationPeerConfigUtil
+ .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
+ } catch (ReplicationException e) {
+ future.completeExceptionally(e);
+ return;
}
- });
+ addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
+ if (!completeExceptionally(future, error)) {
+ future.complete(result);
+ }
+ });
+ }
+ });
return future;
}
@@ -1710,31 +1700,30 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
- listTableDescriptors().whenComplete(
- (tables, error) -> {
- if (!completeExceptionally(future, error)) {
- List<TableCFs> replicatedTableCFs = new ArrayList<>();
- tables.forEach(table -> {
- Map<String, Integer> cfs = new HashMap<>();
- Stream.of(table.getColumnFamilies())
- .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
- .forEach(column -> {
- cfs.put(column.getNameAsString(), column.getScope());
- });
- if (!cfs.isEmpty()) {
- replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
- }
- });
- future.complete(replicatedTableCFs);
- }
- });
+ addListener(listTableDescriptors(), (tables, error) -> {
+ if (!completeExceptionally(future, error)) {
+ List<TableCFs> replicatedTableCFs = new ArrayList<>();
+ tables.forEach(table -> {
+ Map<String, Integer> cfs = new HashMap<>();
+ Stream.of(table.getColumnFamilies())
+ .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
+ .forEach(column -> {
+ cfs.put(column.getNameAsString(), column.getScope());
+ });
+ if (!cfs.isEmpty()) {
+ replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
+ }
+ });
+ future.complete(replicatedTableCFs);
+ }
+ });
return future;
}
@Override
public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
- SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil
- .createHBaseProtosSnapshotDesc(snapshotDesc);
+ SnapshotProtos.SnapshotDescription snapshot =
+ ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
try {
ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
} catch (IllegalArgumentException e) {
@@ -1742,47 +1731,47 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
CompletableFuture<Void> future = new CompletableFuture<>();
final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
- this.<Long> newMasterCaller()
- .action(
- (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
- stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
- resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- TimerTask pollingTask = new TimerTask() {
- int tries = 0;
- long startTime = EnvironmentEdgeManager.currentTime();
- long endTime = startTime + expectedTimeout;
- long maxPauseTime = expectedTimeout / maxAttempts;
-
- @Override
- public void run(Timeout timeout) throws Exception {
- if (EnvironmentEdgeManager.currentTime() < endTime) {
- isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (done) {
- future.complete(null);
- } else {
- // retry again after pauseTime.
- long pauseTime = ConnectionUtils.getPauseTime(
- TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+ addListener(this.<Long> newMasterCaller()
+ .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
+ stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
+ resp -> resp.getExpectedTimeout()))
+ .call(), (expectedTimeout, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ TimerTask pollingTask = new TimerTask() {
+ int tries = 0;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ long endTime = startTime + expectedTimeout;
+ long maxPauseTime = expectedTimeout / maxAttempts;
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (EnvironmentEdgeManager.currentTime() < endTime) {
+ addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else if (done) {
+ future.complete(null);
+ } else {
+ // retry again after pauseTime.
+ long pauseTime =
+ ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
pauseTime = Math.min(pauseTime, maxPauseTime);
- AsyncConnectionImpl.RETRY_TIMER
- .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
+ TimeUnit.MILLISECONDS);
}
- } );
- } else {
- future.completeExceptionally(new SnapshotCreationException("Snapshot '"
- + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout
- + " ms", snapshotDesc));
- }
+ });
+ } else {
+ future.completeExceptionally(
+ new SnapshotCreationException("Snapshot '" + snapshot.getName() +
+ "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
}
- };
- AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
- });
+ }
+ };
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+ });
return future;
}
@@ -1808,52 +1797,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
@Override
- public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) {
+ public CompletableFuture<Void> restoreSnapshot(String snapshotName,
+ boolean takeFailSafeSnapshot) {
CompletableFuture<Void> future = new CompletableFuture<>();
- listSnapshots(Pattern.compile(snapshotName)).whenComplete(
- (snapshotDescriptions, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- TableName tableName = null;
- if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
- for (SnapshotDescription snap : snapshotDescriptions) {
- if (snap.getName().equals(snapshotName)) {
- tableName = snap.getTableName();
- break;
- }
+ addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ TableName tableName = null;
+ if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
+ for (SnapshotDescription snap : snapshotDescriptions) {
+ if (snap.getName().equals(snapshotName)) {
+ tableName = snap.getTableName();
+ break;
}
}
- if (tableName == null) {
- future.completeExceptionally(new RestoreSnapshotException(
- "Unable to find the table name for snapshot=" + snapshotName));
- return;
- }
- final TableName finalTableName = tableName;
- tableExists(finalTableName)
- .whenComplete((exists, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (!exists) {
- // if table does not exist, then just clone snapshot into new table.
- completeConditionalOnFuture(future,
- internalRestoreSnapshot(snapshotName, finalTableName));
+ }
+ if (tableName == null) {
+ future.completeExceptionally(new RestoreSnapshotException(
+ "Unable to find the table name for snapshot=" + snapshotName));
+ return;
+ }
+ final TableName finalTableName = tableName;
+ addListener(tableExists(finalTableName), (exists, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else if (!exists) {
+ // if table does not exist, then just clone snapshot into new table.
+ completeConditionalOnFuture(future,
+ internalRestoreSnapshot(snapshotName, finalTableName));
+ } else {
+ addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
+ if (err4 != null) {
+ future.completeExceptionally(err4);
+ } else if (!disabled) {
+ future.completeExceptionally(new TableNotDisabledException(finalTableName));
} else {
- isTableDisabled(finalTableName).whenComplete(
- (disabled, err4) -> {
- if (err4 != null) {
- future.completeExceptionally(err4);
- } else if (!disabled) {
- future.completeExceptionally(new TableNotDisabledException(finalTableName));
- } else {
- completeConditionalOnFuture(future,
- restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
- }
- });
+ completeConditionalOnFuture(future,
+ restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot));
}
- } );
+ });
+ }
});
+ });
return future;
}
@@ -1862,49 +1849,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
if (takeFailSafeSnapshot) {
CompletableFuture<Void> future = new CompletableFuture<>();
// Step.1 Take a snapshot of the current state
- String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get(
- HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
- HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
- final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat
- .replace("{snapshot.name}", snapshotName)
+ String failSafeSnapshotSnapshotNameFormat =
+ this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
+ HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
+ final String failSafeSnapshotSnapshotName =
+ failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
.replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
.replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
- snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> {
+ addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else {
// Step.2 Restore snapshot
- internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> {
- if (err2 != null) {
- // Step.3.a Something went wrong during the restore and try to rollback.
- internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete(
- (void3, err3) -> {
- if (err3 != null) {
- future.completeExceptionally(err3);
- } else {
- String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
- + failSafeSnapshotSnapshotName + " succeeded.";
- future.completeExceptionally(new RestoreSnapshotException(msg));
- }
- });
- } else {
- // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
- LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
- deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete(
- (ret3, err3) -> {
- if (err3 != null) {
- LOG.error(
- "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3);
- future.completeExceptionally(err3);
- } else {
- future.complete(ret3);
- }
- });
+ addListener(internalRestoreSnapshot(snapshotName, tableName), (void2, err2) -> {
+ if (err2 != null) {
+ // Step.3.a Something went wrong during the restore and try to rollback.
+ addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName),
+ (void3, err3) -> {
+ if (err3 != null) {
+ future.completeExceptionally(err3);
+ } else {
+ String msg =
+ "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" +
+ failSafeSnapshotSnapshotName + " succeeded.";
+ future.completeExceptionally(new RestoreSnapshotException(msg));
+ }
+ });
+ } else {
+ // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
+ LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
+ addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
+ if (err3 != null) {
+ LOG.error(
+ "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
+ err3);
+ future.completeExceptionally(err3);
+ } else {
+ future.complete(ret3);
+ }
+ });
+ }
+ });
}
- } );
- }
- } );
+ });
return future;
} else {
return internalRestoreSnapshot(snapshotName, tableName);
@@ -1913,7 +1901,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
CompletableFuture<T> parentFuture) {
- parentFuture.whenComplete((res, err) -> {
+ addListener(parentFuture, (res, err) -> {
if (err != null) {
dependentFuture.completeExceptionally(err);
} else {
@@ -1925,7 +1913,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
CompletableFuture<Void> future = new CompletableFuture<>();
- tableExists(tableName).whenComplete((exists, err) -> {
+ addListener(tableExists(tableName), (exists, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (exists) {
@@ -1995,31 +1983,29 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(
Pattern tableNamePattern, Pattern snapshotNamePattern) {
CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
- listTableNames(tableNamePattern, false).whenComplete(
- (tableNames, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
+ addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ if (tableNames == null || tableNames.size() <= 0) {
+ future.complete(Collections.emptyList());
+ return;
+ }
+ addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
return;
}
- if (tableNames == null || tableNames.size() <= 0) {
+ if (snapshotDescList == null || snapshotDescList.isEmpty()) {
future.complete(Collections.emptyList());
return;
}
- getCompletedSnapshots(snapshotNamePattern).whenComplete(
- (snapshotDescList, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- return;
- }
- if (snapshotDescList == null || snapshotDescList.isEmpty()) {
- future.complete(Collections.emptyList());
- return;
- }
- future.complete(snapshotDescList.stream()
- .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
- .collect(Collectors.toList()));
- });
+ future.complete(snapshotDescList.stream()
+ .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
+ .collect(Collectors.toList()));
});
+ });
return future;
}
@@ -2066,7 +2052,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
}
CompletableFuture<Void> future = new CompletableFuture<>();
- listSnapshotsFuture.whenComplete(((snapshotDescriptions, err) -> {
+ addListener(listSnapshotsFuture, ((snapshotDescriptions, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
@@ -2075,12 +2061,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
future.complete(null);
return;
}
- List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>();
- snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures
- .add(internalDeleteSnapshot(snapDesc)));
- CompletableFuture.allOf(
- deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()]))
- .thenAccept(v -> future.complete(v));
+ addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
+ .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
+ if (e != null) {
+ future.completeExceptionally(e);
+ } else {
+ future.complete(v);
+ }
+ });
}));
return future;
}
@@ -2102,50 +2090,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
Map<String, String> props) {
CompletableFuture<Void> future = new CompletableFuture<>();
ProcedureDescription procDesc =
- ProtobufUtil.buildProcedureDescription(signature, instance, props);
- this.<Long> newMasterCaller()
- .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
- controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
- (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
- .call().whenComplete((expectedTimeout, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- TimerTask pollingTask = new TimerTask() {
- int tries = 0;
- long startTime = EnvironmentEdgeManager.currentTime();
- long endTime = startTime + expectedTimeout;
- long maxPauseTime = expectedTimeout / maxAttempts;
-
- @Override
- public void run(Timeout timeout) throws Exception {
- if (EnvironmentEdgeManager.currentTime() < endTime) {
- isProcedureFinished(signature, instance, props).whenComplete((done, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- return;
- }
- if (done) {
- future.complete(null);
- } else {
- // retry again after pauseTime.
- long pauseTime = ConnectionUtils
- .getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
- pauseTime = Math.min(pauseTime, maxPauseTime);
- AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
- TimeUnit.MICROSECONDS);
- }
- });
- } else {
- future.completeExceptionally(new IOException("Procedure '" + signature + " : "
- + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
- }
+ ProtobufUtil.buildProcedureDescription(signature, instance, props);
+ addListener(this.<Long> newMasterCaller()
+ .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
+ controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
+ (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
+ .call(), (expectedTimeout, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ TimerTask pollingTask = new TimerTask() {
+ int tries = 0;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ long endTime = startTime + expectedTimeout;
+ long maxPauseTime = expectedTimeout / maxAttempts;
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (EnvironmentEdgeManager.currentTime() < endTime) {
+ addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ return;
+ }
+ if (done) {
+ future.complete(null);
+ } else {
+ // retry again after pauseTime.
+ long pauseTime =
+ ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+ pauseTime = Math.min(pauseTime, maxPauseTime);
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
+ TimeUnit.MICROSECONDS);
+ }
+ });
+ } else {
+ future.completeExceptionally(new IOException("Procedure '" + signature + " : " +
+ instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
}
- };
- // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
- AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
- });
+ }
+ };
+ // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
+ AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
+ });
return future;
}
@@ -2264,15 +2252,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
- future.whenComplete((location, err) -> {
+ addListener(future, (location, err) -> {
if (err != null) {
returnedFuture.completeExceptionally(err);
return;
}
if (!location.isPresent() || location.get().getRegion() == null) {
- returnedFuture.completeExceptionally(new UnknownRegionException(
- "Invalid region name or encoded region name: "
- + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
+ returnedFuture.completeExceptionally(
+ new UnknownRegionException("Invalid region name or encoded region name: " +
+ Bytes.toStringBinary(regionNameOrEncodedRegionName)));
} else {
returnedFuture.complete(location.get());
}
@@ -2296,14 +2284,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
if (Bytes.equals(regionNameOrEncodedRegionName,
- RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
- || Bytes.equals(regionNameOrEncodedRegionName,
+ RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()) ||
+ Bytes.equals(regionNameOrEncodedRegionName,
RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
}
CompletableFuture<RegionInfo> future = new CompletableFuture<>();
- getRegionLocation(regionNameOrEncodedRegionName).whenComplete((location, err) -> {
+ addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else {
@@ -2345,7 +2333,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
+ private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
abstract void onFinished();
@@ -2361,7 +2349,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
+ private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
protected final TableName tableName;
TableProcedureBiConsumer(TableName tableName) {
@@ -2386,7 +2374,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
+ private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
protected final String namespaceName;
NamespaceProcedureBiConsumer(String namespaceName) {
@@ -2410,7 +2398,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
CreateTableProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2422,7 +2410,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
super(tableName);
@@ -2452,7 +2440,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
TruncateTableProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2464,7 +2452,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
EnableTableProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2476,7 +2464,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
DisableTableProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2488,7 +2476,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
AddColumnFamilyProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2500,7 +2488,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2512,7 +2500,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2524,7 +2512,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
+ private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
CreateNamespaceProcedureBiConsumer(String namespaceName) {
super(namespaceName);
@@ -2536,7 +2524,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
+ private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
DeleteNamespaceProcedureBiConsumer(String namespaceName) {
super(namespaceName);
@@ -2548,7 +2536,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
+ private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
ModifyNamespaceProcedureBiConsumer(String namespaceName) {
super(namespaceName);
@@ -2560,7 +2548,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
MergeTableRegionProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2572,7 +2560,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
+ private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
SplitTableRegionProcedureBiConsumer(TableName tableName) {
super(tableName);
@@ -2584,7 +2572,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
+ private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
private final String peerId;
private final Supplier<String> getOperation;
@@ -2610,7 +2598,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
CompletableFuture<Void> future = new CompletableFuture<>();
- procFuture.whenComplete((procId, error) -> {
+ addListener(procFuture, (procId, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@@ -2621,30 +2609,33 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
- this.<GetProcedureResultResponse> newMasterCaller().action((controller, stub) -> this
- .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
- controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
- (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
- .call().whenComplete((response, error) -> {
- if (error != null) {
- LOG.warn("failed to get the procedure result procId={}", procId,
- ConnectionUtils.translateException(error));
- retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
- ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
- return;
- }
- if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
- retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
- ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
- return;
- }
- if (response.hasException()) {
- IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
- future.completeExceptionally(ioe);
- } else {
- future.complete(null);
- }
- });
+ addListener(
+ this.<GetProcedureResultResponse> newMasterCaller()
+ .action((controller, stub) -> this
+ .<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
+ controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
+ (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
+ .call(),
+ (response, error) -> {
+ if (error != null) {
+ LOG.warn("failed to get the procedure result procId={}", procId,
+ ConnectionUtils.translateException(error));
+ retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
+ ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
+ return;
+ }
+ if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
+ retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
+ ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
+ return;
+ }
+ if (response.hasException()) {
+ IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
+ future.completeExceptionally(ioe);
+ } else {
+ future.complete(null);
+ }
+ });
}
private <T> CompletableFuture<T> failedFuture(Throwable error) {
@@ -2726,24 +2717,26 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> updateConfiguration() {
CompletableFuture<Void> future = new CompletableFuture<Void>();
- getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS))
- .whenComplete((status, err) -> {
+ addListener(
+ getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER, Option.BACKUP_MASTERS)),
+ (status, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else {
List<CompletableFuture<Void>> futures = new ArrayList<>();
status.getLiveServerMetrics().keySet()
- .forEach(server -> futures.add(updateConfiguration(server)));
+ .forEach(server -> futures.add(updateConfiguration(server)));
futures.add(updateConfiguration(status.getMasterName()));
status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
- CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
- .whenComplete((result, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- future.complete(result);
- }
- });
+ addListener(
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
+ (result, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(result);
+ }
+ });
}
});
return future;
@@ -2826,88 +2819,87 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
switch (compactType) {
case MOB:
- connection.registry.getMasterAddress().whenComplete((serverName, err) -> {
+ addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
- this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName).action(
- (controller, stub) -> this
- .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
+ addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
+ .action((controller, stub) -> this
+ .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
controller, stub,
RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
- (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)
- ).call().whenComplete((resp2, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else {
- if (resp2.hasCompactionState()) {
- future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
+ (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
+ .call(), (resp2, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
} else {
- future.complete(CompactionState.NONE);
+ if (resp2.hasCompactionState()) {
+ future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
+ } else {
+ future.complete(CompactionState.NONE);
+ }
}
- }
- });
+ });
});
break;
case NORMAL:
- getTableHRegionLocations(tableName).whenComplete(
- (locations, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- List<CompactionState> regionStates = new ArrayList<>();
- List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
- locations.stream().filter(loc -> loc.getServerName() != null)
- .filter(loc -> loc.getRegion() != null)
- .filter(loc -> !loc.getRegion().isOffline())
- .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
- futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
- // If any region compaction state is MAJOR_AND_MINOR
- // the table compaction state is MAJOR_AND_MINOR, too.
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
- future.complete(regionState);
- } else {
- regionStates.add(regionState);
- }
- }));
- });
- CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
- .whenComplete((ret, err3) -> {
- // If future not completed, check all regions's compaction state
- if (!future.isCompletedExceptionally() && !future.isDone()) {
- CompactionState state = CompactionState.NONE;
- for (CompactionState regionState : regionStates) {
- switch (regionState) {
- case MAJOR:
- if (state == CompactionState.MINOR) {
- future.complete(CompactionState.MAJOR_AND_MINOR);
- } else {
- state = CompactionState.MAJOR;
- }
- break;
- case MINOR:
- if (state == CompactionState.MAJOR) {
- future.complete(CompactionState.MAJOR_AND_MINOR);
- } else {
- state = CompactionState.MINOR;
- }
- break;
- case NONE:
- default:
+ addListener(getTableHRegionLocations(tableName), (locations, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ List<CompactionState> regionStates = new ArrayList<>();
+ List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
+ locations.stream().filter(loc -> loc.getServerName() != null)
+ .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
+ .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
+ futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
+ // If any region compaction state is MAJOR_AND_MINOR
+ // the table compaction state is MAJOR_AND_MINOR, too.
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
+ future.complete(regionState);
+ } else {
+ regionStates.add(regionState);
+ }
+ }));
+ });
+ addListener(
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
+ (ret, err3) -> {
+ // If future not completed, check all regions's compaction state
+ if (!future.isCompletedExceptionally() && !future.isDone()) {
+ CompactionState state = CompactionState.NONE;
+ for (CompactionState regionState : regionStates) {
+ switch (regionState) {
+ case MAJOR:
+ if (state == CompactionState.MINOR) {
+ future.complete(CompactionState.MAJOR_AND_MINOR);
+ } else {
+ state = CompactionState.MAJOR;
}
- if (!future.isDone()) {
- future.complete(state);
+ break;
+ case MINOR:
+ if (state == CompactionState.MAJOR) {
+ future.complete(CompactionState.MAJOR_AND_MINOR);
+ } else {
+ state = CompactionState.MINOR;
}
- }
+ break;
+ case NONE:
+ default:
}
- });
- });
+ if (!future.isDone()) {
+ future.complete(state);
+ }
+ }
+ }
+ });
+ });
break;
default:
throw new IllegalArgumentException("Unknown compactType: " + compactType);
@@ -2919,37 +2911,38 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
CompletableFuture<CompactionState> future = new CompletableFuture<>();
- getRegionLocation(regionName).whenComplete(
- (location, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- ServerName serverName = location.getServerName();
- if (serverName == null) {
- future.completeExceptionally(new NoServerForRegionException(Bytes
- .toStringBinary(regionName)));
- return;
- }
+ addListener(getRegionLocation(regionName), (location, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future
+ .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
+ return;
+ }
+ addListener(
this.<GetRegionInfoResponse> newAdminCaller()
- .action(
- (controller, stub) -> this
- .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
- controller, stub, RequestConverter.buildGetRegionInfoRequest(location
- .getRegion().getRegionName(), true), (s, c, req, done) -> s
-
<TRUNCATED>