You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/07/08 03:35:26 UTC
[53/57] [abbrv] hbase git commit: HBASE-18319 Implement
getClusterStatus/getRegionLoad/getCompactionState/getLastMajorCompactionTimestamp
methods
HBASE-18319 Implement getClusterStatus/getRegionLoad/getCompactionState/getLastMajorCompactionTimestamp methods
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b0a5fa0c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b0a5fa0c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b0a5fa0c
Branch: refs/heads/HBASE-18147
Commit: b0a5fa0c2a119168c4272e5efba16a3ef9e9c329
Parents: 4fe7385
Author: Guanghao Zhang <zg...@apache.org>
Authored: Wed Jul 5 18:33:57 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Fri Jul 7 16:21:45 2017 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/client/AsyncAdmin.java | 95 ++++++++
.../hadoop/hbase/client/AsyncHBaseAdmin.java | 44 ++++
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 219 ++++++++++++++++++-
.../hbase/shaded/protobuf/ProtobufUtil.java | 11 +-
.../hbase/shaded/protobuf/RequestConverter.java | 16 +-
.../hbase/client/TestAsyncClusterAdminApi.java | 132 +++++++++++
.../hbase/client/TestAsyncRegionAdminApi.java | 8 +-
.../hbase/client/TestAsyncTableAdminApi.java | 81 ++++++-
8 files changed, 591 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index ff35d46..8ade209 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.IOException;
import java.util.List;
import java.util.Collection;
import java.util.Map;
@@ -24,8 +25,10 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
+import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
@@ -332,6 +335,11 @@ public interface AsyncAdmin {
CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName);
/**
+ * Get the regions of a given table.
+ */
+ CompletableFuture<List<HRegionInfo>> getTableRegions(TableName tableName);
+
+ /**
* Flush a table.
* @param tableName table to flush
*/
@@ -796,4 +804,91 @@ public interface AsyncAdmin {
* @return procedure list wrapped by {@link CompletableFuture}
*/
CompletableFuture<List<ProcedureInfo>> listProcedures();
+
+ /**
+ * @return cluster status wrapped by {@link CompletableFuture}
+ */
+ CompletableFuture<ClusterStatus> getClusterStatus();
+
+ /**
+ * @return current master server name wrapped by {@link CompletableFuture}
+ */
+ default CompletableFuture<ServerName> getMaster() {
+ return getClusterStatus().thenApply(ClusterStatus::getMaster);
+ }
+
+ /**
+ * @return current backup master list wrapped by {@link CompletableFuture}
+ */
+ default CompletableFuture<Collection<ServerName>> getBackupMasters() {
+ return getClusterStatus().thenApply(ClusterStatus::getBackupMasters);
+ }
+
+ /**
+ * @return current live region servers list wrapped by {@link CompletableFuture}
+ */
+ default CompletableFuture<Collection<ServerName>> getRegionServers() {
+ return getClusterStatus().thenApply(ClusterStatus::getServers);
+ }
+
+ /**
+ * Get a list of {@link RegionLoad} of all regions hosted on a region seerver.
+ * @param serverName
+ * @return a list of {@link RegionLoad} wrapped by {@link CompletableFuture}
+ */
+ default CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName) {
+ return getRegionLoads(serverName, Optional.empty());
+ }
+
+ /**
+ * Get a list of {@link RegionLoad} of all regions hosted on a region seerver for a table.
+ * @param serverName
+ * @param tableName
+ * @return a list of {@link RegionLoad} wrapped by {@link CompletableFuture}
+ */
+ CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
+ Optional<TableName> tableName);
+
+ /**
+ * Check whether master is in maintenance mode
+ * @return true if master is in maintenance mode, false otherwise. The return value will be
+ * wrapped by a {@link CompletableFuture}
+ */
+ CompletableFuture<Boolean> isMasterInMaintenanceMode();
+
+ /**
+ * Get the current compaction state of a table. It could be in a major compaction, a minor
+ * compaction, both, or none.
+ * @param tableName table to examine
+ * @return the current compaction state wrapped by a {@link CompletableFuture}
+ */
+ CompletableFuture<CompactionState> getCompactionState(TableName tableName);
+
+ /**
+ * Get the current compaction state of region. It could be in a major compaction, a minor
+ * compaction, both, or none.
+ * @param regionName region to examine
+ * @return the current compaction state wrapped by a {@link CompletableFuture}
+ */
+ CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName);
+
+ /**
+ * Get the timestamp of the last major compaction for the passed table.
+ * <p>
+ * The timestamp of the oldest HFile resulting from a major compaction of that table, or not
+ * present if no such HFile could be found.
+ * @param tableName table to examine
+ * @return the last major compaction timestamp wrapped by a {@link CompletableFuture}
+ */
+ CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName);
+
+ /**
+ * Get the timestamp of the last major compaction for the passed region.
+ * <p>
+ * The timestamp of the oldest HFile resulting from a major compaction of that region, or not
+ * present if no such HFile could be found.
+ * @param regionName region to examine
+ * @return the last major compaction timestamp wrapped by a {@link CompletableFuture}
+ */
+ CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(byte[] regionName);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 36fd60d..2998133 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -27,8 +27,10 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
@@ -225,6 +227,11 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
}
@Override
+ public CompletableFuture<List<HRegionInfo>> getTableRegions(TableName tableName) {
+ return wrap(rawAdmin.getTableRegions(tableName));
+ }
+
+ @Override
public CompletableFuture<Void> flush(TableName tableName) {
return wrap(rawAdmin.flush(tableName));
}
@@ -445,4 +452,41 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
public CompletableFuture<List<ProcedureInfo>> listProcedures() {
return wrap(rawAdmin.listProcedures());
}
+
+ @Override
+ public CompletableFuture<ClusterStatus> getClusterStatus() {
+ return wrap(rawAdmin.getClusterStatus());
+ }
+
+ @Override
+ public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
+ Optional<TableName> tableName) {
+ return wrap(rawAdmin.getRegionLoads(serverName, tableName));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
+ return wrap(rawAdmin.isMasterInMaintenanceMode());
+ }
+
+ @Override
+ public CompletableFuture<CompactionState> getCompactionState(TableName tableName) {
+ return wrap(rawAdmin.getCompactionState(tableName));
+ }
+
+ @Override
+ public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
+ return wrap(rawAdmin.getCompactionStateForRegion(regionName));
+ }
+
+ @Override
+ public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
+ return wrap(rawAdmin.getLastMajorCompactionTimestamp(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
+ byte[] regionName) {
+ return wrap(rawAdmin.getLastMajorCompactionTimestampForRegion(regionName));
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 179fd7d..b119754 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -46,12 +46,14 @@ import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -89,10 +91,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegion
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
@@ -115,6 +122,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColu
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
@@ -133,6 +142,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
@@ -141,6 +152,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamesp
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
@@ -178,7 +192,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.*;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
@@ -728,14 +742,26 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
}
@Override
- public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) {
+ public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName) {
return this.<List<HRegionInfo>> newAdminCaller()
.action((controller, stub) -> this
.<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>> adminCall(
controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
(s, c, req, done) -> s.getOnlineRegion(c, req, done),
resp -> ProtobufUtil.getRegionInfos(resp)))
- .serverName(sn).call();
+ .serverName(serverName).call();
+ }
+
+ @Override
+ public CompletableFuture<List<HRegionInfo>> getTableRegions(TableName tableName) {
+ if (tableName.equals(META_TABLE_NAME)) {
+ return connection.getLocator().getRegionLocation(tableName, null, null, operationTimeoutNs)
+ .thenApply(loc -> Arrays.asList(loc.getRegionInfo()));
+ } else {
+ return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
+ .thenApply(
+ locs -> locs.stream().map(loc -> loc.getRegionInfo()).collect(Collectors.toList()));
+ }
}
@Override
@@ -2275,4 +2301,189 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
}
return false;
}
-}
+
+ @Override
+ public CompletableFuture<ClusterStatus> getClusterStatus() {
+ return this
+ .<ClusterStatus> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<GetClusterStatusRequest, GetClusterStatusResponse, ClusterStatus> call(controller,
+ stub, RequestConverter.buildGetClusterStatusRequest(),
+ (s, c, req, done) -> s.getClusterStatus(c, req, done),
+ resp -> ProtobufUtil.convert(resp.getClusterStatus()))).call();
+ }
+
+ @Override
+ public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
+ Optional<TableName> tableName) {
+ return this
+ .<List<RegionLoad>> newAdminCaller()
+ .action(
+ (controller, stub) -> this
+ .<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionLoad>> adminCall(
+ controller, stub, RequestConverter.buildGetRegionLoadRequest(tableName), (s, c,
+ req, done) -> s.getRegionLoad(controller, req, done),
+ ProtobufUtil::getRegionLoadInfo)).serverName(serverName).call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
+ return this
+ .<Boolean> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
+ stub, IsInMaintenanceModeRequest.newBuilder().build(),
+ (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
+ resp -> resp.getInMaintenanceMode())).call();
+ }
+
+ @Override
+ public CompletableFuture<CompactionState> getCompactionState(TableName tableName) {
+ CompletableFuture<CompactionState> future = new CompletableFuture<>();
+ getTableHRegionLocations(tableName).whenComplete(
+ (locations, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ List<CompactionState> regionStates = new ArrayList<>();
+ List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
+ locations.stream().filter(loc -> loc.getServerName() != null)
+ .filter(loc -> loc.getRegionInfo() != null)
+ .filter(loc -> !loc.getRegionInfo().isOffline())
+ .map(loc -> loc.getRegionInfo().getRegionName()).forEach(region -> {
+ futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
+ // If any region compaction state is MAJOR_AND_MINOR
+ // the table compaction state is MAJOR_AND_MINOR, too.
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
+
+ future.complete(regionState);
+ } else {
+ regionStates.add(regionState);
+ }
+ }));
+ });
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
+ .whenComplete((ret, err3) -> {
+ // If future not completed, check all regions's compaction state
+ if (!future.isCompletedExceptionally() && !future.isDone()) {
+ CompactionState state = CompactionState.NONE;
+ for (CompactionState regionState : regionStates) {
+ switch (regionState) {
+ case MAJOR:
+ if (state == CompactionState.MINOR) {
+ future.complete(CompactionState.MAJOR_AND_MINOR);
+ } else {
+ state = CompactionState.MAJOR;
+ }
+ break;
+ case MINOR:
+ if (state == CompactionState.MAJOR) {
+ future.complete(CompactionState.MAJOR_AND_MINOR);
+ } else {
+ state = CompactionState.MINOR;
+ }
+ break;
+ case NONE:
+ default:
+ }
+ if (!future.isDone()) {
+ future.complete(state);
+ }
+ }
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
+ CompletableFuture<CompactionState> future = new CompletableFuture<>();
+ getRegionLocation(regionName).whenComplete(
+ (location, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ ServerName serverName = location.getServerName();
+ if (serverName == null) {
+ future.completeExceptionally(new NoServerForRegionException(Bytes
+ .toStringBinary(regionName)));
+ return;
+ }
+ this.<GetRegionInfoResponse> newAdminCaller()
+ .action(
+ (controller, stub) -> this
+ .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
+ controller, stub, RequestConverter.buildGetRegionInfoRequest(location
+ .getRegionInfo().getRegionName(), true), (s, c, req, done) -> s
+ .getRegionInfo(controller, req, done), resp -> resp))
+ .serverName(serverName).call().whenComplete((resp2, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ if (resp2.hasCompactionState()) {
+ future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
+ } else {
+ future.complete(CompactionState.NONE);
+ }
+ }
+ });
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
+ MajorCompactionTimestampRequest request =
+ MajorCompactionTimestampRequest.newBuilder()
+ .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
+ return this
+ .<Optional<Long>> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
+ controller, stub, request,
+ (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
+ ProtobufUtil::toOptionalTimestamp)).call();
+ }
+
+ @Override
+ public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
+ byte[] regionName) {
+ CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+ // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
+ getRegionInfo(regionName)
+ .whenComplete(
+ (region, err) -> {
+ if (err != null) {
+ future.completeExceptionally(err);
+ return;
+ }
+ MajorCompactionTimestampForRegionRequest.Builder builder =
+ MajorCompactionTimestampForRegionRequest.newBuilder();
+ builder.setRegion(RequestConverter.buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName));
+ this.<Optional<Long>> newMasterCaller()
+ .action(
+ (controller, stub) -> this
+ .<MajorCompactionTimestampForRegionRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
+ controller, stub, builder.build(), (s, c, req, done) -> s
+ .getLastMajorCompactionTimestampForRegion(c, req, done),
+ ProtobufUtil::toOptionalTimestamp)).call()
+ .whenComplete((timestamp, err2) -> {
+ if (err2 != null) {
+ future.completeExceptionally(err2);
+ } else {
+ future.complete(timestamp);
+ }
+ });
+ });
+ return future;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 2bb2994..eebe4bd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -165,6 +165,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedure;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
@@ -1806,7 +1807,8 @@ public final class ProtobufUtil {
public static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoad(
final RpcController controller, final AdminService.BlockingInterface admin,
final TableName tableName) throws IOException {
- GetRegionLoadRequest request = RequestConverter.buildGetRegionLoadRequest(tableName);
+ GetRegionLoadRequest request =
+ RequestConverter.buildGetRegionLoadRequest(Optional.ofNullable(tableName));
GetRegionLoadResponse response;
try {
response = admin.getRegionLoad(controller, request);
@@ -1816,7 +1818,7 @@ public final class ProtobufUtil {
return getRegionLoadInfo(response);
}
- static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoadInfo(
+ public static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoadInfo(
GetRegionLoadResponse regionLoadResponse) {
List<org.apache.hadoop.hbase.RegionLoad> regionLoadList =
new ArrayList<>(regionLoadResponse.getRegionLoadsCount());
@@ -3066,6 +3068,11 @@ public final class ProtobufUtil {
return CompactionState.valueOf(state.toString());
}
+ public static Optional<Long> toOptionalTimestamp(MajorCompactionTimestampResponse resp) {
+ long timestamp = resp.getCompactionTimestamp();
+ return timestamp == 0 ? Optional.empty() : Optional.of(timestamp);
+ }
+
/**
* Creates {@link org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type}
* from {@link SnapshotType}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index dff9116..a74d737 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -796,15 +796,23 @@ public final class RequestConverter {
/**
* Create a protocol buffer GetRegionLoadRequest for all regions/regions of a table.
- *
* @param tableName the table for which regionLoad should be obtained from RS
* @return a protocol buffer GetRegionLoadRequest
+ * @deprecated use {@link #buildGetRegionLoadRequest(Optional)} instead.
*/
+ @Deprecated
public static GetRegionLoadRequest buildGetRegionLoadRequest(final TableName tableName) {
+ return buildGetRegionLoadRequest(Optional.ofNullable(tableName));
+ }
+
+ /**
+ * Create a protocol buffer GetRegionLoadRequest for all regions/regions of a table.
+ * @param tableName the table for which regionLoad should be obtained from RS
+ * @return a protocol buffer GetRegionLoadRequest
+ */
+ public static GetRegionLoadRequest buildGetRegionLoadRequest(Optional<TableName> tableName) {
GetRegionLoadRequest.Builder builder = GetRegionLoadRequest.newBuilder();
- if (tableName != null) {
- builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
- }
+ tableName.ifPresent(table -> builder.setTableName(ProtobufUtil.toProtoTableName(table)));
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java
new file mode 100644
index 0000000..e8f6380
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+@Category({ MiscTests.class, MediumTests.class })
+public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
+
+ @Test
+ public void testRegionLoad() throws Exception {
+ // Turn off the balancer
+ admin.setBalancerOn(false).join();
+ TableName[] tables =
+ new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"),
+ TableName.valueOf(tableName.getNameAsString() + "2"),
+ TableName.valueOf(tableName.getNameAsString() + "3") };
+ createAndLoadTable(tables);
+ // Check if regions match with the regionLoad from the server
+ Collection<ServerName> servers = admin.getRegionServers().get();
+ for (ServerName serverName : servers) {
+ List<HRegionInfo> regions = admin.getOnlineRegions(serverName).get();
+ checkRegionsAndRegionLoads(regions, admin.getRegionLoads(serverName).get());
+ }
+
+ // Check if regionLoad matches the table's regions and nothing is missed
+ for (TableName table : tables) {
+ List<HRegionInfo> tableRegions = admin.getTableRegions(table).get();
+ List<RegionLoad> regionLoads = Lists.newArrayList();
+ for (ServerName serverName : servers) {
+ regionLoads.addAll(admin.getRegionLoads(serverName, Optional.of(table)).get());
+ }
+ checkRegionsAndRegionLoads(tableRegions, regionLoads);
+ }
+
+ // Check RegionLoad matches the regionLoad from ClusterStatus
+ ClusterStatus clusterStatus = admin.getClusterStatus().get();
+ for (ServerName serverName : clusterStatus.getServers()) {
+ ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+ compareRegionLoads(serverLoad.getRegionsLoad().values(), admin.getRegionLoads(serverName)
+ .get());
+ }
+ }
+
+ private void compareRegionLoads(Collection<RegionLoad> regionLoadCluster,
+ Collection<RegionLoad> regionLoads) {
+
+ assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match",
+ regionLoadCluster.size(), regionLoads.size());
+
+ for (RegionLoad loadCluster : regionLoadCluster) {
+ boolean matched = false;
+ for (RegionLoad load : regionLoads) {
+ if (Bytes.equals(loadCluster.getName(), load.getName())) {
+ matched = true;
+ continue;
+ }
+ }
+ assertTrue("The contents of region load from cluster and server should match", matched);
+ }
+ }
+
+ private void checkRegionsAndRegionLoads(Collection<HRegionInfo> regions,
+ Collection<RegionLoad> regionLoads) {
+
+ assertEquals("No of regions and regionloads doesn't match", regions.size(), regionLoads.size());
+
+ Map<byte[], RegionLoad> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ for (RegionLoad regionLoad : regionLoads) {
+ regionLoadMap.put(regionLoad.getName(), regionLoad);
+ }
+ for (HRegionInfo info : regions) {
+ assertTrue("Region not in regionLoadMap region:" + info.getRegionNameAsString()
+ + " regionMap: " + regionLoadMap, regionLoadMap.containsKey(info.getRegionName()));
+ }
+ }
+
+ private void createAndLoadTable(TableName[] tables) {
+ for (TableName table : tables) {
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table);
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
+ admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join();
+ RawAsyncTable asyncTable = ASYNC_CONN.getRawTable(table);
+ List<Put> puts = new ArrayList<>();
+ for (byte[] row : HBaseTestingUtility.ROWS) {
+ puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v")));
+ }
+ asyncTable.putAll(puts).join();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
index 7c8b236..7752d37 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java
@@ -515,10 +515,10 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
long curt = System.currentTimeMillis();
long waitTime = 5000;
long endt = curt + waitTime;
- CompactionState state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+ CompactionState state = admin.getCompactionState(tableName).get();
while (state == CompactionState.NONE && curt < endt) {
Thread.sleep(10);
- state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+ state = admin.getCompactionState(tableName).get();
curt = System.currentTimeMillis();
}
// Now, should have the right compaction state,
@@ -530,10 +530,10 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
}
} else {
// Wait until the compaction is done
- state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+ state = admin.getCompactionState(tableName).get();
while (state != CompactionState.NONE && curt < endt) {
Thread.sleep(10);
- state = TEST_UTIL.getAdmin().getCompactionState(tableName);
+ state = admin.getCompactionState(tableName).get();
}
// Now, compaction should be done.
assertEquals(CompactionState.NONE, state);
http://git-wip-us.apache.org/repos/asf/hbase/blob/b0a5fa0c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
index f75c346..f2db244 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableAdminApi.java
@@ -38,6 +38,7 @@ import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -774,4 +775,82 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
boolean tableAvailable = admin.isTableAvailable(tableName, splitKeys).get();
assertFalse("Table should be created with 1 row in META", tableAvailable);
}
-}
+
+ @Test
+ public void testCompactionTimestamps() throws Exception {
+ createTableWithDefaultConf(tableName);
+ RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
+ Optional<Long> ts = admin.getLastMajorCompactionTimestamp(tableName).get();
+ assertFalse(ts.isPresent());
+ Put p = new Put(Bytes.toBytes("row1"));
+ p.addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"));
+ table.put(p).join();
+ ts = admin.getLastMajorCompactionTimestamp(tableName).get();
+ // no files written -> no data
+ assertFalse(ts.isPresent());
+
+ admin.flush(tableName).join();
+ ts = admin.getLastMajorCompactionTimestamp(tableName).get();
+ // still 0, we flushed a file, but no major compaction happened
+ assertFalse(ts.isPresent());
+
+ byte[] regionName =
+ ASYNC_CONN.getRegionLocator(tableName).getRegionLocation(Bytes.toBytes("row1")).get()
+ .getRegionInfo().getRegionName();
+ Optional<Long> ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get();
+ assertFalse(ts1.isPresent());
+ p = new Put(Bytes.toBytes("row2"));
+ p.addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"));
+ table.put(p).join();
+ admin.flush(tableName).join();
+ ts1 = admin.getLastMajorCompactionTimestamp(tableName).get();
+ // make sure the region API returns the same value, as the old file is still around
+ assertFalse(ts1.isPresent());
+
+ for (int i = 0; i < 3; i++) {
+ table.put(p).join();
+ admin.flush(tableName).join();
+ }
+ admin.majorCompact(tableName).join();
+ long curt = System.currentTimeMillis();
+ long waitTime = 10000;
+ long endt = curt + waitTime;
+ CompactionState state = admin.getCompactionState(tableName).get();
+ LOG.info("Current compaction state 1 is " + state);
+ while (state == CompactionState.NONE && curt < endt) {
+ Thread.sleep(100);
+ state = admin.getCompactionState(tableName).get();
+ curt = System.currentTimeMillis();
+ LOG.info("Current compaction state 2 is " + state);
+ }
+ // Now, should have the right compaction state, let's wait until the compaction is done
+ if (state == CompactionState.MAJOR) {
+ state = admin.getCompactionState(tableName).get();
+ LOG.info("Current compaction state 3 is " + state);
+ while (state != CompactionState.NONE && curt < endt) {
+ Thread.sleep(10);
+ state = admin.getCompactionState(tableName).get();
+ LOG.info("Current compaction state 4 is " + state);
+ }
+ }
+ // Sleep to wait region server report
+ Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2);
+
+ ts = admin.getLastMajorCompactionTimestamp(tableName).get();
+ // after a compaction our earliest timestamp will have progressed forward
+ assertTrue(ts.isPresent());
+ assertTrue(ts.get() > 0);
+ // region api still the same
+ ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get();
+ assertTrue(ts1.isPresent());
+ assertEquals(ts.get(), ts1.get());
+ table.put(p).join();
+ admin.flush(tableName).join();
+ ts = admin.getLastMajorCompactionTimestamp(tableName).join();
+ assertTrue(ts.isPresent());
+ assertEquals(ts.get(), ts1.get());
+ ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get();
+ assertTrue(ts1.isPresent());
+ assertEquals(ts.get(), ts1.get());
+ }
+}
\ No newline at end of file