You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/02/03 07:46:56 UTC
[hbase] branch branch-2.2 updated: HBASE-21829 Use
FutureUtils.addListener instead of calling whenComplete directly
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new 7cf4ea6 HBASE-21829 Use FutureUtils.addListener instead of calling whenComplete directly
7cf4ea6 is described below
commit 7cf4ea6a9d23183e309a2eeadefc52ef1081d3fb
Author: zhangduo <zh...@apache.org>
AuthorDate: Sat Feb 2 21:24:18 2019 +0800
HBASE-21829 Use FutureUtils.addListener instead of calling whenComplete directly
---
.../hadoop/hbase/AsyncMetaTableAccessor.java | 90 ++++++++++------------
.../client/AsyncAdminRequestRetryingCaller.java | 4 +-
.../hbase/client/AsyncBatchRpcRetryingCaller.java | 3 +-
.../hbase/client/AsyncBufferedMutatorImpl.java | 5 +-
.../hadoop/hbase/client/AsyncConnectionImpl.java | 2 +-
.../hadoop/hbase/client/AsyncHBaseAdmin.java | 11 +--
.../AsyncMasterRequestRpcRetryingCaller.java | 8 +-
.../hbase/client/AsyncMetaRegionLocator.java | 3 +-
.../AsyncServerRequestRpcRetryingCaller.java | 4 +-
.../AsyncSingleRequestRpcRetryingCaller.java | 2 +-
.../apache/hadoop/hbase/client/AsyncTableImpl.java | 11 +--
.../hadoop/hbase/client/ConnectionFactory.java | 12 +--
.../client/MasterCoprocessorRpcChannelImpl.java | 32 ++++----
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 21 ++---
.../hadoop/hbase/client/RawAsyncTableImpl.java | 2 +-
.../client/RegionCoprocessorRpcChannelImpl.java | 32 ++++----
.../RegionServerCoprocessorRpcChannelImpl.java | 32 ++++----
.../hadoop/hbase/client/ZKAsyncRegistry.java | 21 ++---
.../org/apache/hadoop/hbase/util/FutureUtils.java | 57 +++++++++++++-
.../client/coprocessor/AsyncAggregationClient.java | 11 ++-
.../hbase/client/example/AsyncClientExample.java | 62 +++++++--------
.../hbase/client/example/HttpProxyExample.java | 48 ++++++------
.../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 4 +-
.../regionserver/wal/AsyncProtobufLogWriter.java | 6 +-
24 files changed, 271 insertions(+), 212 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
index 5d38179..4a886d1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -81,7 +83,7 @@ public class AsyncMetaTableAccessor {
long time = EnvironmentEdgeManager.currentTime();
try {
get.setTimeRange(0, time);
- metaTable.get(get).whenComplete((result, error) -> {
+ addListener(metaTable.get(get), (result, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@@ -109,16 +111,14 @@ public class AsyncMetaTableAccessor {
CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
try {
RegionInfo parsedRegionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionName);
- metaTable.get(
- new Get(MetaTableAccessor.getMetaKeyForRegion(parsedRegionInfo))
- .addFamily(HConstants.CATALOG_FAMILY)).whenComplete(
- (r, err) -> {
+ addListener(metaTable.get(new Get(MetaTableAccessor.getMetaKeyForRegion(parsedRegionInfo))
+ .addFamily(HConstants.CATALOG_FAMILY)), (r, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
- future.complete(getRegionLocations(r).map(
- locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId())));
+ future.complete(getRegionLocations(r)
+ .map(locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId())));
});
} catch (IOException parseEx) {
LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName));
@@ -136,34 +136,29 @@ public class AsyncMetaTableAccessor {
public static CompletableFuture<Optional<HRegionLocation>> getRegionLocationWithEncodedName(
AsyncTable<?> metaTable, byte[] encodedRegionName) {
CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
- metaTable.scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY))
- .whenComplete(
- (results, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- return;
- }
- String encodedRegionNameStr = Bytes.toString(encodedRegionName);
- results
- .stream()
- .filter(result -> !result.isEmpty())
- .filter(result -> MetaTableAccessor.getRegionInfo(result) != null)
- .forEach(
- result -> {
- getRegionLocations(result).ifPresent(
- locations -> {
- for (HRegionLocation location : locations.getRegionLocations()) {
- if (location != null
- && encodedRegionNameStr.equals(location.getRegion()
- .getEncodedName())) {
- future.complete(Optional.of(location));
- return;
- }
- }
- });
- });
- future.complete(Optional.empty());
+ addListener(
+ metaTable
+ .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)),
+ (results, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ String encodedRegionNameStr = Bytes.toString(encodedRegionName);
+ results.stream().filter(result -> !result.isEmpty())
+ .filter(result -> MetaTableAccessor.getRegionInfo(result) != null).forEach(result -> {
+ getRegionLocations(result).ifPresent(locations -> {
+ for (HRegionLocation location : locations.getRegionLocations()) {
+ if (location != null &&
+ encodedRegionNameStr.equals(location.getRegion().getEncodedName())) {
+ future.complete(Optional.of(location));
+ return;
+ }
+ }
+ });
});
+ future.complete(Optional.empty());
+ });
return future;
}
@@ -190,19 +185,18 @@ public class AsyncMetaTableAccessor {
public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
AsyncTable<AdvancedScanResultConsumer> metaTable, Optional<TableName> tableName) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
- getTableRegionsAndLocations(metaTable, tableName, true).whenComplete(
- (locations, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (locations == null || locations.isEmpty()) {
- future.complete(Collections.emptyList());
- } else {
- List<HRegionLocation> regionLocations = locations.stream()
- .map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
- .collect(Collectors.toList());
- future.complete(regionLocations);
- }
- });
+ addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ } else if (locations == null || locations.isEmpty()) {
+ future.complete(Collections.emptyList());
+ } else {
+ List<HRegionLocation> regionLocations =
+ locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
+ .collect(Collectors.toList());
+ future.complete(regionLocations);
+ }
+ });
return future;
}
@@ -254,7 +248,7 @@ public class AsyncMetaTableAccessor {
}
};
- scanMeta(metaTable, tableName, QueryType.REGION, visitor).whenComplete((v, error) -> {
+ addListener(scanMeta(metaTable, tableName, QueryType.REGION, visitor), (v, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
index cf31d79..02e22c0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName;
@@ -61,7 +63,7 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
return;
}
resetCallTimeout();
- callable.call(controller, adminStub).whenComplete((result, error) -> {
+ addListener(callable.call(controller, adminStub), (result, error) -> {
if (error != null) {
onError(error, () -> "Call to admin stub failed", err -> {
});
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 33e6366..4051e1d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
import java.io.IOException;
import java.util.ArrayList;
@@ -409,7 +410,7 @@ class AsyncBatchRpcRetryingCaller<T> {
.map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
if (error != null) {
- error = translateException(error);
+ error = unwrapCompletionException(translateException(error));
if (error instanceof DoNotRetryIOException) {
failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
return;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index 318c6c9..61d49af 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -25,7 +27,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -96,7 +97,7 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
Iterator<CompletableFuture<Void>> toCompleteIter = toComplete.iterator();
for (CompletableFuture<?> future : table.batch(toSend)) {
CompletableFuture<Void> toCompleteFuture = toCompleteIter.next();
- future.whenComplete((r, e) -> {
+ addListener(future, (r, e) -> {
if (e != null) {
toCompleteFuture.completeExceptionally(e);
} else {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 1828650..4a32546 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -188,7 +188,7 @@ class AsyncConnectionImpl implements AsyncConnection {
}
private void makeMasterStub(CompletableFuture<MasterService.Interface> future) {
- registry.getMasterAddress().whenComplete((sn, error) -> {
+ addListener(registry.getMasterAddress(), (sn, error) -> {
if (sn == null) {
String msg = "ZooKeeper available but no active master location found";
LOG.info(msg);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 8894ad1..f0f2037 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.security.access.UserPermission;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -67,15 +68,7 @@ class AsyncHBaseAdmin implements AsyncAdmin {
}
private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
- CompletableFuture<T> asyncFuture = new CompletableFuture<>();
- future.whenCompleteAsync((r, e) -> {
- if (e != null) {
- asyncFuture.completeExceptionally(e);
- } else {
- asyncFuture.complete(r);
- }
- }, pool);
- return asyncFuture;
+ return FutureUtils.wrapFuture(future, pool);
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
index a52e799..7ed44e2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
@@ -43,20 +45,20 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
- startLogErrorsCnt);
+ startLogErrorsCnt);
this.callable = callable;
}
@Override
protected void doCall() {
- conn.getMasterStub().whenComplete((stub, error) -> {
+ addListener(conn.getMasterStub(), (stub, error) -> {
if (error != null) {
onError(error, () -> "Get async master stub failed", err -> {
});
return;
}
resetCallTimeout();
- callable.call(controller, stub).whenComplete((result, error2) -> {
+ addListener(callable.call(controller, stub), (result, error2) -> {
if (error2 != null) {
onError(error2, () -> "Call to master failed", err -> {
});
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
index 9fef15d..ce3a2dd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegi
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
@@ -71,7 +72,7 @@ class AsyncMetaRegionLocator {
if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
LOG.debug("Start fetching meta region location from registry.");
CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
- registry.getMetaRegionLocation().whenComplete((locs, error) -> {
+ addListener(registry.getMetaRegionLocation(), (locs, error) -> {
if (error != null) {
LOG.debug("Failed to fetch meta region location from registry", error);
metaRelocateFuture.getAndSet(null).completeExceptionally(error);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
index 54b055a..f114eff 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName;
@@ -62,7 +64,7 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
return;
}
resetCallTimeout();
- callable.call(controller, stub).whenComplete((result, error) -> {
+ addListener(callable.call(controller, stub), (result, error) -> {
if (error != null) {
onError(error, () -> "Call to admin stub failed", err -> {
});
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index a552e40..9490d0f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -79,7 +79,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
return;
}
resetCallTimeout();
- callable.call(controller, loc, stub).whenComplete((result, error) -> {
+ addListener(callable.call(controller, loc, stub), (result, error) -> {
if (error != null) {
onError(error,
() -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 9747d06..426b184 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -87,15 +88,7 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
}
private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
- CompletableFuture<T> asyncFuture = new CompletableFuture<>();
- future.whenCompleteAsync((r, e) -> {
- if (e != null) {
- asyncFuture.completeExceptionally(e);
- } else {
- asyncFuture.complete(r);
- }
- }, pool);
- return asyncFuture;
+ return FutureUtils.wrapFuture(future, pool);
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index e24af74..e3e87f6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -18,19 +18,20 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
@@ -282,7 +283,7 @@ public class ConnectionFactory {
final User user) {
CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
- registry.getClusterId().whenComplete((clusterId, error) -> {
+ addListener(registry.getClusterId(), (clusterId, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@@ -295,9 +296,8 @@ public class ConnectionFactory {
AsyncConnectionImpl.class, AsyncConnection.class);
try {
future.complete(
- user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>)() ->
- ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user))
- );
+ user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
+ .newInstance(clazz, conf, registry, clusterId, user)));
} catch (Exception e) {
future.completeExceptionally(e);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java
index 9176c87..9e68a16 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCoprocessorRpcChannelImpl.java
@@ -17,23 +17,24 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-
/**
* The implementation of a master based coprocessor rpc channel.
*/
@@ -75,12 +76,13 @@ class MasterCoprocessorRpcChannelImpl implements RpcChannel {
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
- callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
- .whenComplete(((r, e) -> {
- if (e != null) {
- ((ClientCoprocessorRpcController) controller).setFailed(e);
- }
- done.run(r);
- }));
+ addListener(
+ callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call(),
+ ((r, e) -> {
+ if (e != null) {
+ ((ClientCoprocessorRpcController) controller).setFailed(e);
+ }
+ done.run(r);
+ }));
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 7c394c1..50817de 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
@@ -412,12 +413,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
ProcedureBiConsumer consumer) {
- CompletableFuture<Long> procFuture = this
- .<Long> newMasterCaller()
- .action(
- (controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall,
- respConverter)).call();
- return waitProcedureResult(procFuture).whenComplete(consumer);
+ CompletableFuture<Long> procFuture =
+ this.<Long> newMasterCaller().action((controller, stub) -> this
+ .<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)).call();
+ CompletableFuture<Void> future = waitProcedureResult(procFuture);
+ addListener(future, consumer);
+ return future;
}
@FunctionalInterface
@@ -2879,7 +2880,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
// If any region compaction state is MAJOR_AND_MINOR
// the table compaction state is MAJOR_AND_MINOR, too.
if (err2 != null) {
- future.completeExceptionally(err2);
+ future.completeExceptionally(unwrapCompletionException(err2));
} else if (regionState == CompactionState.MAJOR_AND_MINOR) {
future.complete(regionState);
} else {
@@ -3026,7 +3027,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
serverNames.stream().forEach(serverName -> {
futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
if (err2 != null) {
- future.completeExceptionally(err2);
+ future.completeExceptionally(unwrapCompletionException(err2));
} else {
serverStates.put(serverName, serverState);
}
@@ -3545,7 +3546,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
futures
.add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
if (err2 != null) {
- future.completeExceptionally(err2);
+ future.completeExceptionally(unwrapCompletionException(err2));
} else {
aggregator.append(stats);
}
@@ -3554,7 +3555,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
(ret, err3) -> {
if (err3 != null) {
- future.completeExceptionally(err3);
+ future.completeExceptionally(unwrapCompletionException(err3));
} else {
future.complete(aggregator.sum());
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 3a94566..be94ca4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -583,7 +583,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
(l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
locateFinished, unfinishedRequest, l, e));
}
- coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> {
+ addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> {
if (e != null) {
callback.onRegionError(region, e);
} else {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
index 4417c7e..94e7d9a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
@@ -17,10 +17,16 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
@@ -33,12 +39,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientServ
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-
/**
* The implementation of a region based coprocessor rpc channel.
*/
@@ -102,16 +102,16 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
- conn.callerFactory.<Message> single().table(tableName).row(row)
+ addListener(
+ conn.callerFactory.<Message> single().table(tableName).row(row)
.locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call()
- .whenComplete((r, e) -> {
- if (e != null) {
- ((ClientCoprocessorRpcController) controller).setFailed(e);
- }
- done.run(r);
- });
+ .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call(),
+ (r, e) -> {
+ if (e != null) {
+ ((ClientCoprocessorRpcController) controller).setFailed(e);
+ }
+ done.run(r);
+ });
}
-
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java
index 372dd4a..38512d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCoprocessorRpcChannelImpl.java
@@ -17,23 +17,24 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-
/**
* The implementation of a region server based coprocessor rpc channel.
*/
@@ -75,12 +76,13 @@ public class RegionServerCoprocessorRpcChannelImpl implements RpcChannel {
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
- callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call()
- .whenComplete(((r, e) -> {
- if (e != null) {
- ((ClientCoprocessorRpcController) controller).setFailed(e);
- }
- done.run(r);
- }));
+ addListener(
+ callerBuilder.action((c, s) -> rpcCall(method, request, responsePrototype, c, s)).call(),
+ ((r, e) -> {
+ if (e != null) {
+ ((ClientCoprocessorRpcController) controller).setFailed(e);
+ }
+ done.run(r);
+ }));
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
index c7ae32c..c02643f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
@@ -22,11 +22,11 @@ import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGION
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
@@ -41,7 +41,9 @@ import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
@@ -68,7 +70,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
CompletableFuture<T> future = new CompletableFuture<>();
- zk.get(path).whenComplete((data, error) -> {
+ addListener(zk.get(path), (data, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@@ -139,7 +141,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
MutableInt remaining = new MutableInt(locs.length);
znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> {
if (replicaId == DEFAULT_REPLICA_ID) {
- getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
+ addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
@@ -154,13 +156,12 @@ class ZKAsyncRegistry implements AsyncRegistry {
new IOException("Meta region is in state " + stateAndServerName.getFirst()));
return;
}
- locs[DEFAULT_REPLICA_ID] =
- new HRegionLocation(getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO),
- stateAndServerName.getSecond());
+ locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
+ getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond());
tryComplete(remaining, locs, future);
});
} else {
- getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
+ addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
if (future.isDone()) {
return;
}
@@ -174,12 +175,12 @@ class ZKAsyncRegistry implements AsyncRegistry {
Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
LOG.warn("Meta region for replica " + replicaId + " is in state " +
- stateAndServerName.getFirst());
+ stateAndServerName.getFirst());
locs[replicaId] = null;
} else {
locs[replicaId] =
- new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
- stateAndServerName.getSecond());
+ new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
+ stateAndServerName.getSecond());
}
}
tryComplete(remaining, locs, future);
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index 02ce655..861dacb 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import org.apache.yetus.audience.InterfaceAudience;
@@ -57,7 +59,12 @@ public final class FutureUtils {
BiConsumer<? super T, ? super Throwable> action) {
future.whenComplete((resp, error) -> {
try {
- action.accept(resp, error);
+ // See this post on stack overflow(shorten since the url is too long),
+ // https://s.apache.org/completionexception
+ // For a chain of CompleableFuture, only the first child CompletableFuture can get the
+ // original exception, others will get a CompletionException, which wraps the original
+ // exception. So here we unwrap it before passing it to the callback action.
+ action.accept(resp, unwrapCompletionException(error));
} catch (Throwable t) {
LOG.error("Unexpected error caught when processing CompletableFuture", t);
}
@@ -65,6 +72,54 @@ public final class FutureUtils {
}
/**
+ * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only
+ * exception is that we will call
+ * {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}.
+ * @see #addListener(CompletableFuture, BiConsumer)
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public static <T> void addListener(CompletableFuture<T> future,
+ BiConsumer<? super T, ? super Throwable> action, Executor executor) {
+ future.whenCompleteAsync((resp, error) -> {
+ try {
+ action.accept(resp, unwrapCompletionException(error));
+ } catch (Throwable t) {
+ LOG.error("Unexpected error caught when processing CompletableFuture", t);
+ }
+ }, executor);
+ }
+
+ /**
+ * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
+ * the callbacks in the given {@code executor}.
+ */
+ public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future,
+ Executor executor) {
+ CompletableFuture<T> wrappedFuture = new CompletableFuture<>();
+ addListener(future, (r, e) -> {
+ if (e != null) {
+ wrappedFuture.completeExceptionally(e);
+ } else {
+ wrappedFuture.complete(r);
+ }
+ }, executor);
+ return wrappedFuture;
+ }
+
+ /**
+ * Get the cause of the {@link Throwable} if it is a {@link CompletionException}.
+ */
+ public static Throwable unwrapCompletionException(Throwable error) {
+ if (error instanceof CompletionException) {
+ Throwable cause = error.getCause();
+ if (cause != null) {
+ return cause;
+ }
+ }
+ return error;
+ }
+
+ /**
* A helper class for getting the result of a Future, and convert the error to an
* {@link IOException}.
*/
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
index 3b3e8d9..b3003c4 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.client.coprocessor;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.Message;
-
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
@@ -29,7 +29,6 @@ import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
-
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
@@ -455,10 +454,10 @@ public final class AsyncAggregationClient {
}
public static <R, S, P extends Message, Q extends Message, T extends Message>
- CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
- ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
+ ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<R> future = new CompletableFuture<>();
- sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> {
+ addListener(sumByRegion(table, ci, scan), (sumByRegion, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (sumByRegion.isEmpty()) {
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
index bcc9c0a..b8b3213 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client.example;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -78,7 +80,7 @@ public class AsyncClientExample extends Configured implements Tool {
for (;;) {
if (future.compareAndSet(null, new CompletableFuture<>())) {
CompletableFuture<AsyncConnection> toComplete = future.get();
- ConnectionFactory.createAsyncConnection(getConf()).whenComplete((conn, error) -> {
+ addListener(ConnectionFactory.createAsyncConnection(getConf()),(conn, error) -> {
if (error != null) {
toComplete.completeExceptionally(error);
// we need to reset the future holder so we will get a chance to recreate an async
@@ -98,15 +100,15 @@ public class AsyncClientExample extends Configured implements Tool {
}
}
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NONNULL_PARAM_VIOLATION",
- justification="it is valid to pass NULL to CompletableFuture#completedFuture")
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
+ justification = "it is valid to pass NULL to CompletableFuture#completedFuture")
private CompletableFuture<Void> closeConn() {
CompletableFuture<AsyncConnection> f = future.get();
if (f == null) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
- f.whenComplete((conn, error) -> {
+ addListener(f, (conn, error) -> {
if (error == null) {
IOUtils.closeQuietly(conn);
}
@@ -136,44 +138,44 @@ public class AsyncClientExample extends Configured implements Tool {
CountDownLatch latch = new CountDownLatch(numOps);
IntStream.range(0, numOps).forEach(i -> {
CompletableFuture<AsyncConnection> future = getConn();
- future.whenComplete((conn, error) -> {
+ addListener(future, (conn, error) -> {
if (error != null) {
LOG.warn("failed to get async connection for " + i, error);
latch.countDown();
return;
}
AsyncTable<?> table = conn.getTable(tableName, threadPool);
- table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i)))
- .whenComplete((putResp, putErr) -> {
- if (putErr != null) {
- LOG.warn("put failed for " + i, putErr);
+ addListener(table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i))),
+ (putResp, putErr) -> {
+ if (putErr != null) {
+ LOG.warn("put failed for " + i, putErr);
+ latch.countDown();
+ return;
+ }
+ LOG.info("put for " + i + " succeeded, try getting");
+ addListener(table.get(new Get(getKey(i))), (result, getErr) -> {
+ if (getErr != null) {
+ LOG.warn("get failed for " + i);
latch.countDown();
return;
}
- LOG.info("put for " + i + " succeeded, try getting");
- table.get(new Get(getKey(i))).whenComplete((result, getErr) -> {
- if (getErr != null) {
- LOG.warn("get failed for " + i);
- latch.countDown();
- return;
- }
- if (result.isEmpty()) {
- LOG.warn("get failed for " + i + ", server returns empty result");
- } else if (!result.containsColumn(FAMILY, QUAL)) {
- LOG.warn("get failed for " + i + ", the result does not contain " +
- Bytes.toString(FAMILY) + ":" + Bytes.toString(QUAL));
+ if (result.isEmpty()) {
+ LOG.warn("get failed for " + i + ", server returns empty result");
+ } else if (!result.containsColumn(FAMILY, QUAL)) {
+ LOG.warn("get failed for " + i + ", the result does not contain " +
+ Bytes.toString(FAMILY) + ":" + Bytes.toString(QUAL));
+ } else {
+ int v = Bytes.toInt(result.getValue(FAMILY, QUAL));
+ if (v != i) {
+ LOG.warn("get failed for " + i + ", the value of " + Bytes.toString(FAMILY) +
+ ":" + Bytes.toString(QUAL) + " is " + v + ", exected " + i);
} else {
- int v = Bytes.toInt(result.getValue(FAMILY, QUAL));
- if (v != i) {
- LOG.warn("get failed for " + i + ", the value of " + Bytes.toString(FAMILY) +
- ":" + Bytes.toString(QUAL) + " is " + v + ", exected " + i);
- } else {
- LOG.info("get for " + i + " succeeded");
- }
+ LOG.info("get for " + i + " succeeded");
}
- latch.countDown();
- });
+ }
+ latch.countDown();
});
+ });
});
});
latch.await();
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
index f9caf2b..668bf7a 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client.example;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Optional;
@@ -159,36 +161,38 @@ public class HttpProxyExample {
private void get(ChannelHandlerContext ctx, FullHttpRequest req) {
Params params = parse(req);
- conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
- .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier)))
- .whenComplete((r, e) -> {
- if (e != null) {
- exceptionCaught(ctx, e);
+ addListener(
+ conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
+ .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))),
+ (r, e) -> {
+ if (e != null) {
+ exceptionCaught(ctx, e);
+ } else {
+ byte[] value =
+ r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier));
+ if (value != null) {
+ write(ctx, HttpResponseStatus.OK, Optional.of(Bytes.toStringBinary(value)));
} else {
- byte[] value =
- r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier));
- if (value != null) {
- write(ctx, HttpResponseStatus.OK, Optional.of(Bytes.toStringBinary(value)));
- } else {
- write(ctx, HttpResponseStatus.NOT_FOUND, Optional.empty());
- }
+ write(ctx, HttpResponseStatus.NOT_FOUND, Optional.empty());
}
- });
+ }
+ });
}
private void put(ChannelHandlerContext ctx, FullHttpRequest req) {
Params params = parse(req);
byte[] value = new byte[req.content().readableBytes()];
req.content().readBytes(value);
- conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
- .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value))
- .whenComplete((r, e) -> {
- if (e != null) {
- exceptionCaught(ctx, e);
- } else {
- write(ctx, HttpResponseStatus.OK, Optional.empty());
- }
- });
+ addListener(
+ conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
+ .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)),
+ (r, e) -> {
+ if (e != null) {
+ exceptionCaught(ctx, e);
+ } else {
+ write(ctx, HttpResponseStatus.OK, Optional.empty());
+ }
+ });
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 7c19390..b2ebdb0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
@@ -348,7 +350,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
final long startTimeNs = System.nanoTime();
final long epoch = (long) epochAndState >>> 2L;
- writer.sync().whenCompleteAsync((result, error) -> {
+ addListener(writer.sync(), (result, error) -> {
if (error != null) {
syncFailed(epoch, error);
} else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index 6368fb7..37c6f00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
@@ -194,7 +196,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
// should not happen
throw new AssertionError(e);
}
- output.flush(false).whenComplete((len, error) -> {
+ addListener(output.flush(false), (len, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
@@ -215,7 +217,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
}
output.writeInt(trailer.getSerializedSize());
output.write(magic);
- output.flush(false).whenComplete((len, error) -> {
+ addListener(output.flush(false), (len, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {