You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/09/17 13:22:59 UTC
[hbase] branch branch-2 updated: HBASE-24528 : BalancerDecision
queue implementation in HMaster with Admin API (#2411)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 4316dc7 HBASE-24528 : BalancerDecision queue implementation in HMaster with Admin API (#2411)
4316dc7 is described below
commit 4316dc738c6099bec37a77f65a63e3e1e68dcdb6
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Thu Sep 17 18:51:42 2020 +0530
HBASE-24528 : BalancerDecision queue implementation in HMaster with Admin API (#2411)
* Admin API getLogEntries() for ring buffer use-cases: so far, provides balancerDecision and slowLogResponse
* Refactor RPC call for similar use-cases
* Single RPC API getLogEntries() for both Master.proto and Admin.proto
Closes #2261
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../java/org/apache/hadoop/hbase/client/Admin.java | 44 +++++-
.../org/apache/hadoop/hbase/client/AsyncAdmin.java | 45 +++++-
.../hadoop/hbase/client/AsyncHBaseAdmin.java | 12 +-
.../hadoop/hbase/client/BalancerDecision.java | 152 +++++++++++++++++++++
.../hbase/client/ConnectionImplementation.java | 7 +
.../org/apache/hadoop/hbase/client/HBaseAdmin.java | 71 ++++++----
.../org/apache/hadoop/hbase/client/LogEntry.java | 36 ++---
.../apache/hadoop/hbase/client/LogQueryFilter.java | 9 +-
.../hadoop/hbase/client/OnlineLogRecord.java | 35 ++---
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 83 +++++------
.../org/apache/hadoop/hbase/client/ServerType.java | 32 ++---
.../hbase/client/ShortCircuitMasterConnection.java | 8 +-
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 76 ++++++++++-
.../hbase/shaded/protobuf/RequestConverter.java | 85 +++++++-----
hbase-common/src/main/resources/hbase-default.xml | 11 +-
.../MetricsStochasticBalancerSourceImpl.java | 4 +-
.../src/main/protobuf/Admin.proto | 18 +++
.../src/main/protobuf/HBase.proto | 12 +-
.../src/main/protobuf/Master.proto | 26 ++++
.../src/main/protobuf/RecentLogs.proto | 40 ++----
.../hadoop/hbase/master/MasterRpcServices.java | 55 ++++++++
.../hbase/master/balancer/BaseLoadBalancer.java | 11 ++
.../master/balancer/StochasticLoadBalancer.java | 87 +++++++++---
...uePayload.java => BalancerDecisionDetails.java} | 32 +++--
.../hadoop/hbase/namequeues/NamedQueuePayload.java | 35 ++++-
.../hbase/namequeues/RingBufferEnvelope.java | 2 +-
.../hadoop/hbase/namequeues/RpcLogDetails.java | 4 +-
.../impl/BalancerDecisionQueueService.java | 150 ++++++++++++++++++++
.../hbase/namequeues/impl/SlowLogQueueService.java | 29 ++--
.../namequeues/request/NamedQueueGetRequest.java | 17 ++-
.../namequeues/response/NamedQueueGetResponse.java | 15 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 28 ++--
.../hadoop/hbase/regionserver/RSRpcServices.java | 35 ++++-
.../org/apache/hadoop/hbase/client/TestAdmin2.java | 9 +-
.../client/TestAsyncTableGetMultiThreaded.java | 16 ++-
.../hadoop/hbase/master/MockRegionServer.java | 7 +
.../master/balancer/TestBalancerDecision.java | 105 ++++++++++++++
.../balancer/TestStochasticLoadBalancer.java | 2 +
.../hbase/namequeues/TestNamedQueueRecorder.java | 2 +-
.../hbase/namequeues/TestSlowLogAccessor.java | 2 +-
hbase-shell/src/main/ruby/hbase/admin.rb | 88 ++++++------
hbase-shell/src/main/ruby/shell.rb | 1 +
.../ruby/shell/commands/get_balancer_decisions.rb | 49 +++++++
.../ruby/shell/commands/get_largelog_responses.rb | 5 +-
.../ruby/shell/commands/get_slowlog_responses.rb | 5 +-
hbase-shell/src/test/ruby/hbase/admin_test.rb | 4 +-
.../hadoop/hbase/thrift2/client/ThriftAdmin.java | 8 ++
src/main/asciidoc/_chapters/hbase-default.adoc | 14 ++
48 files changed, 1280 insertions(+), 343 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index d76ca51..e59c6cb9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -23,12 +23,14 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CacheEvictionStats;
@@ -3202,9 +3204,30 @@ public interface Admin extends Abortable, Closeable {
* @param logQueryFilter filter to be used if provided (determines slow / large RPC logs)
* @return online slowlog response list
* @throws IOException if a remote or network exception occurs
+ * @deprecated since 2.4.0 and will be removed in 4.0.0.
+ * Use {@link #getLogEntries(Set, String, ServerType, int, Map)} instead.
*/
- List<OnlineLogRecord> getSlowLogResponses(final Set<ServerName> serverNames,
- final LogQueryFilter logQueryFilter) throws IOException;
+ @Deprecated
+ default List<OnlineLogRecord> getSlowLogResponses(final Set<ServerName> serverNames,
+ final LogQueryFilter logQueryFilter) throws IOException {
+ String logType;
+ if (LogQueryFilter.Type.LARGE_LOG.equals(logQueryFilter.getType())) {
+ logType = "LARGE_LOG";
+ } else {
+ logType = "SLOW_LOG";
+ }
+ Map<String, Object> filterParams = new HashMap<>();
+ filterParams.put("regionName", logQueryFilter.getRegionName());
+ filterParams.put("clientAddress", logQueryFilter.getClientAddress());
+ filterParams.put("tableName", logQueryFilter.getTableName());
+ filterParams.put("userName", logQueryFilter.getUserName());
+ filterParams.put("filterByOperator", logQueryFilter.getFilterByOperator().toString());
+ List<LogEntry> logEntries =
+ getLogEntries(serverNames, logType, ServerType.REGION_SERVER, logQueryFilter.getLimit(),
+ filterParams);
+ return logEntries.stream().map(logEntry -> (OnlineLogRecord) logEntry)
+ .collect(Collectors.toList());
+ }
/**
* Clears online slow/large RPC logs from the provided list of
@@ -3218,4 +3241,21 @@ public interface Admin extends Abortable, Closeable {
List<Boolean> clearSlowLogResponses(final Set<ServerName> serverNames)
throws IOException;
+
+ /**
+ * Retrieve recent online records from HMaster / RegionServers.
+ * Examples include slow/large RPC logs, balancer decisions by master.
+ *
+ * @param serverNames servers to retrieve records from, useful in case of records maintained
+ * by RegionServer as we can select specific server. In case of servertype=MASTER, logs will
+ * only come from the currently active master.
+ * @param logType string representing type of log records
+ * @param serverType enum for server type: HMaster or RegionServer
+ * @param limit put a limit to list of records that server should send in response
+ * @param filterParams additional filter params
+ * @return Log entries representing online records from servers
+ * @throws IOException if a remote or network exception occurs
+ */
+ List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType,
+ ServerType serverType, int limit, Map<String, Object> filterParams) throws IOException;
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index b37785d..27d1553 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -21,6 +21,7 @@ import com.google.protobuf.RpcChannel;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -28,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.hadoop.hbase.CacheEvictionStats;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
@@ -1515,11 +1517,33 @@ public interface AsyncAdmin {
* RegionServers
*
* @param serverNames Server names to get slowlog responses from
- * @param slowLogQueryFilter filter to be used if provided
+ * @param logQueryFilter filter to be used if provided
* @return Online slowlog response list. The return value wrapped by a {@link CompletableFuture}
+ * @deprecated since 2.4.0 and will be removed in 4.0.0.
+ * Use {@link #getLogEntries(Set, String, ServerType, int, Map)} instead.
*/
- CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(final Set<ServerName> serverNames,
- final LogQueryFilter slowLogQueryFilter);
+ @Deprecated
+ default CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(
+ final Set<ServerName> serverNames, final LogQueryFilter logQueryFilter) {
+ String logType;
+ if (LogQueryFilter.Type.LARGE_LOG.equals(logQueryFilter.getType())) {
+ logType = "LARGE_LOG";
+ } else {
+ logType = "SLOW_LOG";
+ }
+ Map<String, Object> filterParams = new HashMap<>();
+ filterParams.put("regionName", logQueryFilter.getRegionName());
+ filterParams.put("clientAddress", logQueryFilter.getClientAddress());
+ filterParams.put("tableName", logQueryFilter.getTableName());
+ filterParams.put("userName", logQueryFilter.getUserName());
+ filterParams.put("filterByOperator", logQueryFilter.getFilterByOperator().toString());
+ CompletableFuture<List<LogEntry>> logEntries =
+ getLogEntries(serverNames, logType, ServerType.REGION_SERVER, logQueryFilter.getLimit(),
+ filterParams);
+ return logEntries.thenApply(
+ logEntryList -> logEntryList.stream().map(logEntry -> (OnlineLogRecord) logEntry)
+ .collect(Collectors.toList()));
+ }
/**
* Clears online slow RPC logs from the provided list of
@@ -1531,4 +1555,19 @@ public interface AsyncAdmin {
*/
CompletableFuture<List<Boolean>> clearSlowLogResponses(final Set<ServerName> serverNames);
+ /**
+ * Retrieve recent online records from HMaster / RegionServers.
+ * Examples include slow/large RPC logs, balancer decisions by master.
+ *
+ * @param serverNames servers to retrieve records from, useful in case of records maintained
+ * by RegionServer as we can select specific server. In case of servertype=MASTER, logs will
+ * only come from the currently active master.
+ * @param logType string representing type of log records
+ * @param serverType enum for server type: HMaster or RegionServer
+ * @param limit put a limit to list of records that server should send in response
+ * @param filterParams additional filter params
+ * @return Log entries representing online records from servers
+ */
+ CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, String logType,
+ ServerType serverType, int limit, Map<String, Object> filterParams);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 29bf6ea..7c51ed0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -848,14 +848,14 @@ class AsyncHBaseAdmin implements AsyncAdmin {
}
@Override
- public CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(
- final Set<ServerName> serverNames, final LogQueryFilter logQueryFilter) {
- return wrap(rawAdmin.getSlowLogResponses(serverNames, logQueryFilter));
- }
-
- @Override
public CompletableFuture<List<Boolean>> clearSlowLogResponses(Set<ServerName> serverNames) {
return wrap(rawAdmin.clearSlowLogResponses(serverNames));
}
+ @Override
+ public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
+ String logType, ServerType serverType, int limit,
+ Map<String, Object> filterParams) {
+ return wrap(rawAdmin.getLogEntries(serverNames, logType, serverType, limit, filterParams));
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BalancerDecision.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BalancerDecision.java
new file mode 100644
index 0000000..e2bf2e2
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BalancerDecision.java
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.List;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.util.GsonUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.hbase.thirdparty.com.google.gson.Gson;
+import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer;
+
+/**
+ * History of balancer decisions taken for region movements.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+final public class BalancerDecision extends LogEntry {
+
+ private final String initialFunctionCosts;
+ private final String finalFunctionCosts;
+ private final double initTotalCost;
+ private final double computedTotalCost;
+ private final long computedSteps;
+ private final List<String> regionPlans;
+
+ // used to convert object to pretty printed format
+ // used by toJsonPrettyPrint()
+ private static final Gson GSON = GsonUtil.createGson()
+ .setPrettyPrinting()
+ .registerTypeAdapter(BalancerDecision.class, (JsonSerializer<BalancerDecision>)
+ (balancerDecision, type, jsonSerializationContext) -> {
+ Gson gson = new Gson();
+ return gson.toJsonTree(balancerDecision);
+ }).create();
+
+ private BalancerDecision(String initialFunctionCosts, String finalFunctionCosts,
+ double initTotalCost, double computedTotalCost, List<String> regionPlans,
+ long computedSteps) {
+ this.initialFunctionCosts = initialFunctionCosts;
+ this.finalFunctionCosts = finalFunctionCosts;
+ this.initTotalCost = initTotalCost;
+ this.computedTotalCost = computedTotalCost;
+ this.regionPlans = regionPlans;
+ this.computedSteps = computedSteps;
+ }
+
+ public String getInitialFunctionCosts() {
+ return initialFunctionCosts;
+ }
+
+ public String getFinalFunctionCosts() {
+ return finalFunctionCosts;
+ }
+
+ public double getInitTotalCost() {
+ return initTotalCost;
+ }
+
+ public double getComputedTotalCost() {
+ return computedTotalCost;
+ }
+
+ public List<String> getRegionPlans() {
+ return regionPlans;
+ }
+
+ public long getComputedSteps() {
+ return computedSteps;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("initialFunctionCosts", initialFunctionCosts)
+ .append("finalFunctionCosts", finalFunctionCosts)
+ .append("initTotalCost", initTotalCost)
+ .append("computedTotalCost", computedTotalCost)
+ .append("computedSteps", computedSteps)
+ .append("regionPlans", regionPlans)
+ .toString();
+ }
+
+ @Override
+ public String toJsonPrettyPrint() {
+ return GSON.toJson(this);
+ }
+
+ public static class Builder {
+ private String initialFunctionCosts;
+ private String finalFunctionCosts;
+ private double initTotalCost;
+ private double computedTotalCost;
+ private long computedSteps;
+ private List<String> regionPlans;
+
+ public Builder setInitialFunctionCosts(String initialFunctionCosts) {
+ this.initialFunctionCosts = initialFunctionCosts;
+ return this;
+ }
+
+ public Builder setFinalFunctionCosts(String finalFunctionCosts) {
+ this.finalFunctionCosts = finalFunctionCosts;
+ return this;
+ }
+
+ public Builder setInitTotalCost(double initTotalCost) {
+ this.initTotalCost = initTotalCost;
+ return this;
+ }
+
+ public Builder setComputedTotalCost(double computedTotalCost) {
+ this.computedTotalCost = computedTotalCost;
+ return this;
+ }
+
+ public Builder setRegionPlans(List<String> regionPlans) {
+ this.regionPlans = regionPlans;
+ return this;
+ }
+
+ public Builder setComputedSteps(long computedSteps) {
+ this.computedSteps = computedSteps;
+ return this;
+ }
+
+ public BalancerDecision build() {
+ return new BalancerDecision(initialFunctionCosts, finalFunctionCosts,
+ initTotalCost, computedTotalCost, regionPlans, computedSteps);
+ }
+ }
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index b3b7b7d..be85011 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -101,6 +101,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Has
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
@@ -1825,6 +1826,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
HasUserPermissionsRequest request) throws ServiceException {
return stub.hasUserPermissions(controller, request);
}
+
+ @Override
+ public HBaseProtos.LogEntry getLogEntries(RpcController controller,
+ HBaseProtos.LogRequest request) throws ServiceException {
+ return stub.getLogEntries(controller, request);
+ }
};
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 03ada17..b652143 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -4370,15 +4370,15 @@ public class HBaseAdmin implements Admin {
}
- @Override
- public List<OnlineLogRecord> getSlowLogResponses(@Nullable final Set<ServerName> serverNames,
- final LogQueryFilter logQueryFilter) throws IOException {
+ private List<LogEntry> getSlowLogResponses(
+ final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
+ final String logType) {
if (CollectionUtils.isEmpty(serverNames)) {
return Collections.emptyList();
}
return serverNames.stream().map(serverName -> {
try {
- return getSlowLogResponseFromServer(serverName, logQueryFilter);
+ return getSlowLogResponseFromServer(serverName, filterParams, limit, logType);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -4386,29 +4386,17 @@ public class HBaseAdmin implements Admin {
).flatMap(List::stream).collect(Collectors.toList());
}
- private List<OnlineLogRecord> getSlowLogResponseFromServer(final ServerName serverName,
- final LogQueryFilter logQueryFilter) throws IOException {
- return getSlowLogResponsesFromServer(this.connection.getAdmin(serverName), logQueryFilter);
- }
-
- private List<OnlineLogRecord> getSlowLogResponsesFromServer(AdminService.BlockingInterface admin,
- LogQueryFilter logQueryFilter) throws IOException {
- return executeCallable(new RpcRetryingCallable<List<OnlineLogRecord>>() {
+ private List<LogEntry> getSlowLogResponseFromServer(ServerName serverName,
+ Map<String, Object> filterParams, int limit, String logType) throws IOException {
+ AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
+ return executeCallable(new RpcRetryingCallable<List<LogEntry>>() {
@Override
- protected List<OnlineLogRecord> rpcCall(int callTimeout) throws Exception {
+ protected List<LogEntry> rpcCall(int callTimeout) throws Exception {
HBaseRpcController controller = rpcControllerFactory.newController();
- if (logQueryFilter.getType() == null
- || logQueryFilter.getType() == LogQueryFilter.Type.SLOW_LOG) {
- AdminProtos.SlowLogResponses slowLogResponses =
- admin.getSlowLogResponses(controller,
- RequestConverter.buildSlowLogResponseRequest(logQueryFilter));
- return ProtobufUtil.toSlowLogPayloads(slowLogResponses);
- } else {
- AdminProtos.SlowLogResponses slowLogResponses =
- admin.getLargeLogResponses(controller,
- RequestConverter.buildSlowLogResponseRequest(logQueryFilter));
- return ProtobufUtil.toSlowLogPayloads(slowLogResponses);
- }
+ HBaseProtos.LogRequest logRequest =
+ RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType);
+ HBaseProtos.LogEntry logEntry = admin.getLogEntries(controller, logRequest);
+ return ProtobufUtil.toSlowLogPayloads(logEntry);
}
});
}
@@ -4428,6 +4416,39 @@ public class HBaseAdmin implements Admin {
}).collect(Collectors.toList());
}
+ @Override
+ public List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType,
+ ServerType serverType, int limit, Map<String, Object> filterParams) throws IOException {
+ if (logType == null || serverType == null) {
+ throw new IllegalArgumentException("logType and/or serverType cannot be empty");
+ }
+ if (logType.equals("SLOW_LOG") || logType.equals("LARGE_LOG")) {
+ if (ServerType.MASTER.equals(serverType)) {
+ throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
+ }
+ return getSlowLogResponses(filterParams, serverNames, limit, logType);
+ } else if (logType.equals("BALANCER_DECISION")) {
+ if (ServerType.REGION_SERVER.equals(serverType)) {
+ throw new IllegalArgumentException(
+ "Balancer Decision logs are not maintained by HRegionServer");
+ }
+ return getBalancerDecisions(limit);
+ }
+ return Collections.emptyList();
+ }
+
+ private List<LogEntry> getBalancerDecisions(final int limit) throws IOException {
+ return executeCallable(new MasterCallable<List<LogEntry>>(getConnection(),
+ getRpcControllerFactory()) {
+ @Override
+ protected List<LogEntry> rpcCall() throws Exception {
+ HBaseProtos.LogEntry logEntry =
+ master.getLogEntries(getRpcController(), ProtobufUtil.toBalancerDecisionRequest(limit));
+ return ProtobufUtil.toBalancerDecisionResponse(logEntry);
+ }
+ });
+ }
+
private Boolean clearSlowLogsResponses(final ServerName serverName) throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
return executeCallable(new RpcRetryingCallable<Boolean>() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogEntry.java
similarity index 55%
copy from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
copy to hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogEntry.java
index 7aa87fa..41f79cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogEntry.java
@@ -17,33 +17,23 @@
* limitations under the License.
*/
-package org.apache.hadoop.hbase.namequeues;
+package org.apache.hadoop.hbase.client;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
/**
- * Base payload to be prepared by client to send various namedQueue events for in-memory
- * ring buffer storage in either HMaster or RegionServer.
- * e.g slowLog responses
+ * Abstract response class representing online logs response from ring-buffer use-cases
+ * e.g slow/large RPC logs, balancer decision logs
*/
-@InterfaceAudience.Private
-public class NamedQueuePayload {
-
- public enum NamedQueueEvent {
- SLOW_LOG
- }
-
- private final NamedQueueEvent namedQueueEvent;
-
- public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
- if (namedQueueEvent == null) {
- throw new RuntimeException("NamedQueuePayload with null namedQueueEvent");
- }
- this.namedQueueEvent = namedQueueEvent;
- }
-
- public NamedQueueEvent getNamedQueueEvent() {
- return namedQueueEvent;
- }
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class LogEntry {
+
+ /**
+ * Based on response sent by server, provide pretty printed Json representation in string
+ * @return Pretty printed Json representation
+ */
+ public abstract String toJsonPrettyPrint();
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogQueryFilter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogQueryFilter.java
index 728aba4..506fc4f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogQueryFilter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogQueryFilter.java
@@ -23,12 +23,16 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
/**
* Slow/Large Log Query Filter with all filter and limit parameters
- * Used by Admin API: getSlowLogResponses
+ * Extends generic LogRequest used by Admin API getLogEntries
+ * @deprecated as of 2.4.0. Will be removed in 4.0.0.
*/
-@InterfaceAudience.Private
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+@Deprecated
public class LogQueryFilter {
private String regionName;
@@ -153,4 +157,5 @@ public class LogQueryFilter {
.append("filterByOperator", filterByOperator)
.toString();
}
+
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java
index 8af0013..115e55f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.util.GsonUtil;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.gson.Gson;
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
@@ -33,8 +34,9 @@ import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer;
* Slow/Large Log payload for hbase-client, to be used by Admin API get_slow_responses and
* get_large_responses
*/
-@InterfaceAudience.Private
-final public class OnlineLogRecord {
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+final public class OnlineLogRecord extends LogEntry {
// used to convert object to pretty printed format
// used by toJsonPrettyPrint()
@@ -56,22 +58,22 @@ final public class OnlineLogRecord {
return jsonObj;
}).create();
- private long startTime;
- private int processingTime;
- private int queueTime;
- private long responseSize;
- private String clientAddress;
- private String serverClass;
- private String methodName;
- private String callDetails;
- private String param;
+ private final long startTime;
+ private final int processingTime;
+ private final int queueTime;
+ private final long responseSize;
+ private final String clientAddress;
+ private final String serverClass;
+ private final String methodName;
+ private final String callDetails;
+ private final String param;
// we don't want to serialize region name, it is just for the filter purpose
// hence avoiding deserialization
- private transient String regionName;
- private String userName;
- private int multiGetsCount;
- private int multiMutationsCount;
- private int multiServiceCalls;
+ private final transient String regionName;
+ private final String userName;
+ private final int multiGetsCount;
+ private final int multiMutationsCount;
+ private final int multiServiceCalls;
public long getStartTime() {
return startTime;
@@ -293,6 +295,7 @@ final public class OnlineLogRecord {
.toHashCode();
}
+ @Override
public String toJsonPrettyPrint() {
return GSON.toJson(this);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 392447a..7f93914 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -3906,49 +3906,26 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.call();
}
- @Override
- public CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(
- @Nullable final Set<ServerName> serverNames, final LogQueryFilter logQueryFilter) {
+ private CompletableFuture<List<LogEntry>> getSlowLogResponses(
+ final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
+ final String logType) {
if (CollectionUtils.isEmpty(serverNames)) {
return CompletableFuture.completedFuture(Collections.emptyList());
}
- if (logQueryFilter.getType() == null
- || logQueryFilter.getType() == LogQueryFilter.Type.SLOW_LOG) {
- return CompletableFuture.supplyAsync(() -> serverNames.stream()
- .map((ServerName serverName) ->
- getSlowLogResponseFromServer(serverName, logQueryFilter))
- .map(CompletableFuture::join)
- .flatMap(List::stream)
- .collect(Collectors.toList()));
- } else {
- return CompletableFuture.supplyAsync(() -> serverNames.stream()
- .map((ServerName serverName) ->
- getLargeLogResponseFromServer(serverName, logQueryFilter))
- .map(CompletableFuture::join)
- .flatMap(List::stream)
- .collect(Collectors.toList()));
- }
+ return CompletableFuture.supplyAsync(() -> serverNames.stream()
+ .map((ServerName serverName) ->
+ getSlowLogResponseFromServer(serverName, filterParams, limit, logType))
+ .map(CompletableFuture::join)
+ .flatMap(List::stream)
+ .collect(Collectors.toList()));
}
- private CompletableFuture<List<OnlineLogRecord>> getLargeLogResponseFromServer(
- final ServerName serverName, final LogQueryFilter logQueryFilter) {
- return this.<List<OnlineLogRecord>>newAdminCaller()
- .action((controller, stub) -> this
- .adminCall(
- controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter),
- AdminService.Interface::getLargeLogResponses,
- ProtobufUtil::toSlowLogPayloads))
- .serverName(serverName).call();
- }
-
- private CompletableFuture<List<OnlineLogRecord>> getSlowLogResponseFromServer(
- final ServerName serverName, final LogQueryFilter logQueryFilter) {
- return this.<List<OnlineLogRecord>>newAdminCaller()
- .action((controller, stub) -> this
- .adminCall(
- controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter),
- AdminService.Interface::getSlowLogResponses,
- ProtobufUtil::toSlowLogPayloads))
+ private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
+ Map<String, Object> filterParams, int limit, String logType) {
+ return this.<List<LogEntry>>newAdminCaller().action((controller, stub) -> this
+ .adminCall(controller, stub,
+ RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
+ AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
.serverName(serverName).call();
}
@@ -3985,4 +3962,34 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
);
}
+ private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
+ return this.<List<LogEntry>>newMasterCaller()
+ .action((controller, stub) ->
+ this.call(controller, stub,
+ ProtobufUtil.toBalancerDecisionRequest(limit),
+ MasterService.Interface::getLogEntries, ProtobufUtil::toBalancerDecisionResponse))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
+ String logType, ServerType serverType, int limit,
+ Map<String, Object> filterParams) {
+ if (logType == null || serverType == null) {
+ throw new IllegalArgumentException("logType and/or serverType cannot be empty");
+ }
+ if (logType.equals("SLOW_LOG") || logType.equals("LARGE_LOG")) {
+ if (ServerType.MASTER.equals(serverType)) {
+ throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
+ }
+ return getSlowLogResponses(filterParams, serverNames, limit, logType);
+ } else if (logType.equals("BALANCER_DECISION")) {
+ if (ServerType.REGION_SERVER.equals(serverType)) {
+ throw new IllegalArgumentException(
+ "Balancer Decision logs are not maintained by HRegionServer");
+ }
+ return getBalancerDecisions(limit);
+ }
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerType.java
similarity index 55%
copy from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
copy to hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerType.java
index 7aa87fa..1d1bf6e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerType.java
@@ -17,33 +17,17 @@
* limitations under the License.
*/
-package org.apache.hadoop.hbase.namequeues;
+package org.apache.hadoop.hbase.client;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * Base payload to be prepared by client to send various namedQueue events for in-memory
- * ring buffer storage in either HMaster or RegionServer.
- * e.g slowLog responses
+ * Select server type i.e destination for RPC request associated with ring buffer.
+ * e.g slow/large log records are maintained by HRegionServer, whereas balancer decisions
+ * are maintained by HMaster.
*/
-@InterfaceAudience.Private
-public class NamedQueuePayload {
-
- public enum NamedQueueEvent {
- SLOW_LOG
- }
-
- private final NamedQueueEvent namedQueueEvent;
-
- public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
- if (namedQueueEvent == null) {
- throw new RuntimeException("NamedQueuePayload with null namedQueueEvent");
- }
- this.namedQueueEvent = namedQueueEvent;
- }
-
- public NamedQueueEvent getNamedQueueEvent() {
- return namedQueueEvent;
- }
-
+@InterfaceAudience.Public
+public enum ServerType {
+ MASTER,
+ REGION_SERVER
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
index 8d9c9b7..b205504 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Rev
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
-
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
@@ -473,6 +473,12 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
}
@Override
+ public HBaseProtos.LogEntry getLogEntries(RpcController controller,
+ HBaseProtos.LogRequest request) throws ServiceException {
+ return stub.getLogEntries(controller, request);
+ }
+
+ @Override
public MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
RpcController controller, MajorCompactionTimestampForRegionRequest request)
throws ServiceException {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index b9231bc..d9aa0ea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.security.AccessController;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.ClientUtil;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -77,6 +79,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.LogEntry;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.OnlineLogRecord;
import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
@@ -133,6 +136,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@@ -191,6 +195,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableD
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
@@ -3435,7 +3440,7 @@ public final class ProtobufUtil {
* @param slowLogPayload SlowLog Payload protobuf instance
* @return SlowLog Payload for client usecase
*/
- private static OnlineLogRecord getSlowLogRecord(
+ private static LogEntry getSlowLogRecord(
final TooSlowLog.SlowLogPayload slowLogPayload) {
OnlineLogRecord onlineLogRecord = new OnlineLogRecord.OnlineLogRecordBuilder()
.setCallDetails(slowLogPayload.getCallDetails())
@@ -3459,14 +3464,26 @@ public final class ProtobufUtil {
/**
* Convert AdminProtos#SlowLogResponses to list of {@link OnlineLogRecord}
*
- * @param slowLogResponses slowlog response protobuf instance
+ * @param logEntry slowlog response protobuf instance
* @return list of SlowLog payloads for client usecase
*/
- public static List<OnlineLogRecord> toSlowLogPayloads(
- final AdminProtos.SlowLogResponses slowLogResponses) {
- List<OnlineLogRecord> slowLogRecords = slowLogResponses.getSlowLogPayloadsList()
- .stream().map(ProtobufUtil::getSlowLogRecord).collect(Collectors.toList());
- return slowLogRecords;
+ public static List<LogEntry> toSlowLogPayloads(
+ final HBaseProtos.LogEntry logEntry) {
+ try {
+ final String logClassName = logEntry.getLogClassName();
+ Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class);
+ Method method = logClass.getMethod("parseFrom", ByteString.class);
+ if (logClassName.contains("SlowLogResponses")) {
+ AdminProtos.SlowLogResponses slowLogResponses = (AdminProtos.SlowLogResponses) method
+ .invoke(null, logEntry.getLogMessage());
+ return slowLogResponses.getSlowLogPayloadsList().stream()
+ .map(ProtobufUtil::getSlowLogRecord).collect(Collectors.toList());
+ }
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
+ | InvocationTargetException e) {
+ throw new RuntimeException("Error while retrieving response from server");
+ }
+ throw new RuntimeException("Invalid response from server");
}
/**
@@ -3551,4 +3568,49 @@ public final class ProtobufUtil {
throw new DoNotRetryIOException(e.getMessage());
}
}
+
+ public static List<LogEntry> toBalancerDecisionResponse(
+ HBaseProtos.LogEntry logEntry) {
+ try {
+ final String logClassName = logEntry.getLogClassName();
+ Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class);
+ Method method = logClass.getMethod("parseFrom", ByteString.class);
+ if (logClassName.contains("BalancerDecisionsResponse")) {
+ MasterProtos.BalancerDecisionsResponse response =
+ (MasterProtos.BalancerDecisionsResponse) method
+ .invoke(null, logEntry.getLogMessage());
+ return getBalancerDecisionEntries(response);
+ }
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
+ | InvocationTargetException e) {
+ throw new RuntimeException("Error while retrieving response from server");
+ }
+ throw new RuntimeException("Invalid response from server");
+ }
+
+ public static List<LogEntry> getBalancerDecisionEntries(
+ MasterProtos.BalancerDecisionsResponse response) {
+ List<RecentLogs.BalancerDecision> balancerDecisions = response.getBalancerDecisionList();
+ if (CollectionUtils.isEmpty(balancerDecisions)) {
+ return Collections.emptyList();
+ }
+ return balancerDecisions.stream().map(balancerDecision -> new BalancerDecision.Builder()
+ .setInitTotalCost(balancerDecision.getInitTotalCost())
+ .setInitialFunctionCosts(balancerDecision.getInitialFunctionCosts())
+ .setComputedTotalCost(balancerDecision.getComputedTotalCost())
+ .setFinalFunctionCosts(balancerDecision.getFinalFunctionCosts())
+ .setComputedSteps(balancerDecision.getComputedSteps())
+ .setRegionPlans(balancerDecision.getRegionPlansList()).build())
+ .collect(Collectors.toList());
+ }
+
+ public static HBaseProtos.LogRequest toBalancerDecisionRequest(int limit) {
+ MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest =
+ MasterProtos.BalancerDecisionsRequest.newBuilder().setLimit(limit).build();
+ return HBaseProtos.LogRequest.newBuilder()
+ .setLogClassName(balancerDecisionsRequest.getClass().getName())
+ .setLogMessage(balancerDecisionsRequest.toByteString())
+ .build();
+ }
+
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index d64968a..825e914 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.LogQueryFilter;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
@@ -71,6 +70,8 @@ import org.apache.hadoop.security.token.Token;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
@@ -2068,45 +2069,67 @@ public final class RequestConverter {
}
/**
- * Create a protocol buffer {@link SlowLogResponseRequest}
+ * Build RPC request payload for getLogEntries
*
- * @param logQueryFilter filter to use if provided
- * @return a protocol buffer SlowLogResponseRequest
+ * @param filterParams map of filter params
+ * @param limit limit for no of records that server returns
+ * @param logType type of the log records
+ * @return request payload {@link HBaseProtos.LogRequest}
*/
- public static SlowLogResponseRequest buildSlowLogResponseRequest(
- final LogQueryFilter logQueryFilter) {
+ public static HBaseProtos.LogRequest buildSlowLogResponseRequest(
+ final Map<String, Object> filterParams, final int limit, final String logType) {
SlowLogResponseRequest.Builder builder = SlowLogResponseRequest.newBuilder();
- if (logQueryFilter == null) {
- return builder.build();
- }
- final String clientAddress = logQueryFilter.getClientAddress();
- if (StringUtils.isNotEmpty(clientAddress)) {
- builder.setClientAddress(clientAddress);
- }
- final String regionName = logQueryFilter.getRegionName();
- if (StringUtils.isNotEmpty(regionName)) {
- builder.setRegionName(regionName);
- }
- final String tableName = logQueryFilter.getTableName();
- if (StringUtils.isNotEmpty(tableName)) {
- builder.setTableName(tableName);
+ builder.setLimit(limit);
+ if (logType.equals("SLOW_LOG")) {
+ builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG);
+ } else if (logType.equals("LARGE_LOG")) {
+ builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG);
}
- final String userName = logQueryFilter.getUserName();
- if (StringUtils.isNotEmpty(userName)) {
- builder.setUserName(userName);
+ boolean filterByAnd = false;
+ if (MapUtils.isNotEmpty(filterParams)) {
+ if (filterParams.containsKey("clientAddress")) {
+ final String clientAddress = (String) filterParams.get("clientAddress");
+ if (StringUtils.isNotEmpty(clientAddress)) {
+ builder.setClientAddress(clientAddress);
+ }
+ }
+ if (filterParams.containsKey("regionName")) {
+ final String regionName = (String) filterParams.get("regionName");
+ if (StringUtils.isNotEmpty(regionName)) {
+ builder.setRegionName(regionName);
+ }
+ }
+ if (filterParams.containsKey("tableName")) {
+ final String tableName = (String) filterParams.get("tableName");
+ if (StringUtils.isNotEmpty(tableName)) {
+ builder.setTableName(tableName);
+ }
+ }
+ if (filterParams.containsKey("userName")) {
+ final String userName = (String) filterParams.get("userName");
+ if (StringUtils.isNotEmpty(userName)) {
+ builder.setUserName(userName);
+ }
+ }
+ if (filterParams.containsKey("filterByOperator")) {
+ final String filterByOperator = (String) filterParams.get("filterByOperator");
+ if (StringUtils.isNotEmpty(filterByOperator)) {
+ if (filterByOperator.toUpperCase().equals("AND")) {
+ filterByAnd = true;
+ }
+ }
+ }
}
- LogQueryFilter.FilterByOperator filterByOperator = logQueryFilter.getFilterByOperator();
- if (LogQueryFilter.FilterByOperator.AND.equals(filterByOperator)) {
+ if (filterByAnd) {
builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.AND);
} else {
builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.OR);
}
- if (LogQueryFilter.Type.SLOW_LOG.equals(logQueryFilter.getType())) {
- builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG);
- } else {
- builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG);
- }
- return builder.setLimit(logQueryFilter.getLimit()).build();
+ SlowLogResponseRequest slowLogResponseRequest = builder.build();
+ return HBaseProtos.LogRequest.newBuilder()
+ .setLogClassName(slowLogResponseRequest.getClass().getName())
+ .setLogMessage(slowLogResponseRequest.toByteString())
+ .build();
}
/**
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 5b77dfa..0f734bc 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1998,7 +1998,7 @@ possible configurations would overwhelm and obscure the important.
</property>
<property>
<name>hbase.namedqueue.provider.classes</name>
- <value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService</value>
+ <value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService</value>
<description>
Default values for NamedQueueService implementors. This comma separated full class names
represent all implementors of NamedQueueService that we would like to be invoked by
@@ -2008,4 +2008,13 @@ possible configurations would overwhelm and obscure the important.
"org.apache.hadoop.hbase.namequeues.impl"
</description>
</property>
+ <property>
+ <name>hbase.master.balancer.decision.buffer.enabled</name>
+ <value>false</value>
+ <description>
+ Indicates whether active HMaster has ring buffer running for storing
+ balancer decisions in FIFO manner with limited entries. The size of
+ the ring buffer is indicated by config: hbase.master.balancer.decision.queue.size
+ </description>
+ </property>
</configuration>
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/balancer/MetricsStochasticBalancerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/balancer/MetricsStochasticBalancerSourceImpl.java
index 3b4ba36..de1dd81 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/balancer/MetricsStochasticBalancerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/balancer/MetricsStochasticBalancerSourceImpl.java
@@ -37,7 +37,7 @@ public class MetricsStochasticBalancerSourceImpl extends MetricsBalancerSourceIm
private int metricsSize = 1000;
private int mruCap = calcMruCap(metricsSize);
- private Map<String, Map<String, Double>> stochasticCosts =
+ private final Map<String, Map<String, Double>> stochasticCosts =
new LinkedHashMap<String, Map<String, Double>>(mruCap, MRU_LOAD_FACTOR, true) {
private static final long serialVersionUID = 8204713453436906599L;
@@ -71,7 +71,6 @@ public class MetricsStochasticBalancerSourceImpl extends MetricsBalancerSourceIm
if (tableName == null || costFunctionName == null || cost == null) {
return;
}
-
if (functionDesc != null) {
costFunctionDescs.put(costFunctionName, functionDesc);
}
@@ -81,7 +80,6 @@ public class MetricsStochasticBalancerSourceImpl extends MetricsBalancerSourceIm
if (costs == null) {
costs = new ConcurrentHashMap<>();
}
-
costs.put(costFunctionName, cost);
stochasticCosts.put(tableName, costs);
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
index b5bf2ea..768c535 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
@@ -282,6 +282,13 @@ message ExecuteProceduresRequest {
message ExecuteProceduresResponse {
}
+/**
+ * Slow/Large log (LogRequest) use-case specific RPC request. This request payload will be
+ * converted in bytes and sent to generic RPC API: GetLogEntries
+ * LogRequest message has two params:
+ * 1. log_class_name: SlowLogResponseRequest (for Slow/Large log use-case)
+ * 2. log_message: SlowLogResponseRequest converted in bytes (for Slow/Large log use-case)
+ */
message SlowLogResponseRequest {
enum FilterByOperator {
AND = 0;
@@ -302,6 +309,13 @@ message SlowLogResponseRequest {
optional LogType log_type = 7;
}
+/**
+ * Slow/Large log (LogEntry) use-case specific RPC response. This response payload will be
+ * converted in bytes by servers and sent as response to generic RPC API: GetLogEntries
+ * LogEntry message has two params:
+ * 1. log_class_name: SlowLogResponses (for Slow/Large log use-case)
+ * 2. log_message: SlowLogResponses converted in bytes (for Slow/Large log use-case)
+ */
message SlowLogResponses {
repeated SlowLogPayload slow_log_payloads = 1;
}
@@ -387,4 +401,8 @@ service AdminService {
rpc ClearSlowLogsResponses(ClearSlowLogResponseRequest)
returns(ClearSlowLogResponses);
+
+ rpc GetLogEntries(LogRequest)
+ returns(LogEntry);
+
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/HBase.proto b/hbase-protocol-shaded/src/main/protobuf/HBase.proto
index 133b1c2..4ea1587 100644
--- a/hbase-protocol-shaded/src/main/protobuf/HBase.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/HBase.proto
@@ -258,4 +258,14 @@ message RegionLocation {
required RegionInfo region_info = 1;
optional ServerName server_name = 2;
required int64 seq_num = 3;
-}
\ No newline at end of file
+}
+
+message LogRequest {
+ required string log_class_name = 1;
+ required bytes log_message = 2;
+}
+
+message LogEntry {
+ required string log_class_name = 1;
+ required bytes log_message = 2;
+}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto
index 4f9d75e..66b175c 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto
@@ -37,6 +37,7 @@ import "Quota.proto";
import "Replication.proto";
import "Snapshot.proto";
import "AccessControl.proto";
+import "RecentLogs.proto";
/* Column-level protobufs */
@@ -692,6 +693,28 @@ message SwitchExceedThrottleQuotaResponse {
required bool previous_exceed_throttle_quota_enabled = 1;
}
+/**
+ * BalancerDecision (LogRequest) use-case specific RPC request. This request payload will be
+ * converted in bytes and sent to generic RPC API: GetLogEntries
+ * LogRequest message has two params:
+ * 1. log_class_name: BalancerDecisionsRequest (for BalancerDecision use-case)
+ * 2. log_message: BalancerDecisionsRequest converted in bytes (for BalancerDecision use-case)
+ */
+message BalancerDecisionsRequest {
+ optional uint32 limit = 1;
+}
+
+/**
+ * BalancerDecision (LogEntry) use-case specific RPC response. This response payload will be
+ * converted in bytes by servers and sent as response to generic RPC API: GetLogEntries
+ * LogEntry message has two params:
+ * 1. log_class_name: BalancerDecisionsResponse (for BalancerDecision use-case)
+ * 2. log_message: BalancerDecisionsResponse converted in bytes (for BalancerDecision use-case)
+ */
+message BalancerDecisionsResponse {
+ repeated BalancerDecision balancer_decision = 1;
+}
+
service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
@@ -1079,6 +1102,9 @@ service MasterService {
/** returns a list of namespace names */
rpc ListNamespaces(ListNamespacesRequest)
returns(ListNamespacesResponse);
+
+ rpc GetLogEntries(LogRequest)
+ returns(LogEntry);
}
// HBCK Service definitions.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java b/hbase-protocol-shaded/src/main/protobuf/RecentLogs.proto
similarity index 52%
copy from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
copy to hbase-protocol-shaded/src/main/protobuf/RecentLogs.proto
index 7aa87fa..ea50b81 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
+++ b/hbase-protocol-shaded/src/main/protobuf/RecentLogs.proto
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,33 +16,24 @@
* limitations under the License.
*/
-package org.apache.hadoop.hbase.namequeues;
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Base payload to be prepared by client to send various namedQueue events for in-memory
- * ring buffer storage in either HMaster or RegionServer.
- * e.g slowLog responses
- */
-@InterfaceAudience.Private
-public class NamedQueuePayload {
+syntax = "proto2";
- public enum NamedQueueEvent {
- SLOW_LOG
- }
+// This file contains protocol buffers that are used for Online BalancerDecision history
+// To be used as Ring Buffer payload
+package hbase.pb;
- private final NamedQueueEvent namedQueueEvent;
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "RecentLogs";
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
- public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
- if (namedQueueEvent == null) {
- throw new RuntimeException("NamedQueuePayload with null namedQueueEvent");
- }
- this.namedQueueEvent = namedQueueEvent;
- }
+message BalancerDecision {
- public NamedQueueEvent getNamedQueueEvent() {
- return namedQueueEvent;
- }
+ required string initial_function_costs = 1;
+ required string final_function_costs = 2;
+ required double init_total_cost = 3;
+ required double computed_total_cost = 4;
+ required uint64 computed_steps = 5;
+ repeated string region_plans = 6;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 6d13480..2e286c4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -21,10 +21,13 @@ import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -81,6 +84,10 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.procedure2.LockType;
import org.apache.hadoop.hbase.procedure2.LockedResource;
@@ -123,6 +130,8 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
@@ -324,6 +333,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaSta
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
@@ -2981,4 +2991,49 @@ public class MasterRpcServices extends RSRpcServices implements
location -> response.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
return response.build();
}
+ @Override
+ public HBaseProtos.LogEntry getLogEntries(RpcController controller,
+ HBaseProtos.LogRequest request) throws ServiceException {
+ try {
+ final String logClassName = request.getLogClassName();
+ Class<?> logClass = Class.forName(logClassName)
+ .asSubclass(Message.class);
+ Method method = logClass.getMethod("parseFrom", ByteString.class);
+ if (logClassName.contains("BalancerDecisionsRequest")) {
+ MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest =
+ (MasterProtos.BalancerDecisionsRequest) method
+ .invoke(null, request.getLogMessage());
+ MasterProtos.BalancerDecisionsResponse balancerDecisionsResponse =
+ getBalancerDecisions(balancerDecisionsRequest);
+ return HBaseProtos.LogEntry.newBuilder()
+ .setLogClassName(balancerDecisionsResponse.getClass().getName())
+ .setLogMessage(balancerDecisionsResponse.toByteString())
+ .build();
+ }
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
+ | InvocationTargetException e) {
+ LOG.error("Error while retrieving log entries.", e);
+ throw new ServiceException(e);
+ }
+ throw new ServiceException("Invalid request params");
+ }
+
+ private MasterProtos.BalancerDecisionsResponse getBalancerDecisions(
+ MasterProtos.BalancerDecisionsRequest request) {
+ final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder();
+ if (namedQueueRecorder == null) {
+ return MasterProtos.BalancerDecisionsResponse.newBuilder()
+ .addAllBalancerDecision(Collections.emptyList()).build();
+ }
+ final NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
+ namedQueueGetRequest.setNamedQueueEvent(BalancerDecisionDetails.BALANCER_DECISION_EVENT);
+ namedQueueGetRequest.setBalancerDecisionsRequest(request);
+ NamedQueueGetResponse namedQueueGetResponse =
+ namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
+ List<RecentLogs.BalancerDecision> balancerDecisions =
+ namedQueueGetResponse.getBalancerDecisions();
+ return MasterProtos.BalancerDecisionsResponse.newBuilder()
+ .addAllBalancerDecision(balancerDecisions).build();
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 7d05c41..b6ec918 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
@@ -73,6 +74,11 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Private
public abstract class BaseLoadBalancer implements LoadBalancer {
+
+ public static final String BALANCER_DECISION_BUFFER_ENABLED =
+ "hbase.master.balancer.decision.buffer.enabled";
+ public static final boolean DEFAULT_BALANCER_DECISION_BUFFER_ENABLED = false;
+
protected static final int MIN_SERVER_BALANCE = 2;
private volatile boolean stopped = false;
@@ -85,6 +91,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
protected boolean useRegionFinder;
protected boolean isByTable = false;
+ /**
+ * Use to add balancer decision history to ring-buffer
+ */
+ protected NamedQueueRecorder namedQueueRecorder;
+
private static class DefaultRackManager extends RackManager {
@Override
public String getRack(ServerName server) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index 8550877..4f51507 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
@@ -46,6 +47,8 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRe
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
+import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -221,6 +224,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
curFunctionCosts = new Double[costFunctions.size()];
tempFunctionCosts = new Double[costFunctions.size()];
+ boolean isBalancerDecisionRecording = getConf()
+ .getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
+ BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
+ if (this.namedQueueRecorder == null && isBalancerDecisionRecording) {
+ this.namedQueueRecorder = NamedQueueRecorder.getInstance(getConf());
+ }
+
LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
Arrays.toString(getCostFunctionNames()) + " etc.");
@@ -233,26 +243,21 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return;
}
- costFunctions.addAll(Arrays.stream(functionsNames)
- .map(c -> {
- Class<? extends CostFunction> klass = null;
- try {
- klass = (Class<? extends CostFunction>) Class.forName(c);
- } catch (ClassNotFoundException e) {
- LOG.warn("Cannot load class " + c + "': " + e.getMessage());
- }
- if (null == klass) {
- return null;
- }
-
- CostFunction reflected = ReflectionUtils.newInstance(klass, conf);
- LOG.info("Successfully loaded custom CostFunction '" +
- reflected.getClass().getSimpleName() + "'");
-
- return reflected;
- })
- .filter(Objects::nonNull)
- .collect(Collectors.toList()));
+ costFunctions.addAll(Arrays.stream(functionsNames).map(c -> {
+ Class<? extends CostFunction> klass = null;
+ try {
+ klass = (Class<? extends CostFunction>) Class.forName(c);
+ } catch (ClassNotFoundException e) {
+ LOG.warn("Cannot load class " + c + "': " + e.getMessage());
+ }
+ if (null == klass) {
+ return null;
+ }
+ CostFunction reflected = ReflectionUtils.newInstance(klass, conf);
+ LOG.info(
+ "Successfully loaded custom CostFunction '" + reflected.getClass().getSimpleName() + "'");
+ return reflected;
+ }).filter(Objects::nonNull).collect(Collectors.toList()));
}
protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
@@ -411,7 +416,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
curOverallCost = currentCost;
System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
double initCost = currentCost;
- double newCost = currentCost;
+ double newCost;
long computedMaxSteps;
if (runMaxSteps) {
@@ -432,6 +437,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
+ functionCost() + " computedMaxSteps: " + computedMaxSteps);
+ final String initFunctionTotalCosts = totalCostsPerFunc();
// Perform a stochastic walk to see if we can get a good fit.
long step;
@@ -480,6 +486,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
"{} regions; Going from a computed cost of {}" +
" to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime),
step, plans.size(), initCost, currentCost);
+ sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
return plans;
}
LOG.info("Could not find a better load balance plan. Tried {} different configurations in " +
@@ -488,6 +495,27 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return null;
}
+ private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
+ double initCost, String initFunctionTotalCosts, long step) {
+ if (this.namedQueueRecorder != null) {
+ List<String> regionPlans = new ArrayList<>();
+ for (RegionPlan plan : plans) {
+ regionPlans.add(
+ "table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
+ + " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
+ }
+ BalancerDecision balancerDecision =
+ new BalancerDecision.Builder()
+ .setInitTotalCost(initCost)
+ .setInitialFunctionCosts(initFunctionTotalCosts)
+ .setComputedTotalCost(currentCost)
+ .setFinalFunctionCosts(totalCostsPerFunc())
+ .setComputedSteps(step)
+ .setRegionPlans(regionPlans).build();
+ namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision));
+ }
+ }
+
/**
* update costs to JMX
*/
@@ -532,6 +560,23 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return builder.toString();
}
+ private String totalCostsPerFunc() {
+ StringBuilder builder = new StringBuilder();
+ for (CostFunction c : costFunctions) {
+ if (c.getMultiplier() * c.cost() > 0.0) {
+ builder.append(" ");
+ builder.append(c.getClass().getSimpleName());
+ builder.append(" : ");
+ builder.append(c.getMultiplier() * c.cost());
+ builder.append(";");
+ }
+ }
+ if (builder.length() > 0) {
+ builder.deleteCharAt(builder.length() - 1);
+ }
+ return builder.toString();
+ }
+
/**
* Create all of the RegionPlan's needed to move from the initial cluster state to the desired
* state.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/BalancerDecisionDetails.java
similarity index 56%
copy from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/BalancerDecisionDetails.java
index 7aa87fa..99a490d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/BalancerDecisionDetails.java
@@ -19,31 +19,33 @@
package org.apache.hadoop.hbase.namequeues;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * Base payload to be prepared by client to send various namedQueue events for in-memory
- * ring buffer storage in either HMaster or RegionServer.
- * e.g slowLog responses
+ * Balancer decision details that would be passed on to ring buffer for history
*/
@InterfaceAudience.Private
-public class NamedQueuePayload {
+public class BalancerDecisionDetails extends NamedQueuePayload {
- public enum NamedQueueEvent {
- SLOW_LOG
- }
+ public static final int BALANCER_DECISION_EVENT = 1;
- private final NamedQueueEvent namedQueueEvent;
+ private final BalancerDecision balancerDecision;
- public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
- if (namedQueueEvent == null) {
- throw new RuntimeException("NamedQueuePayload with null namedQueueEvent");
- }
- this.namedQueueEvent = namedQueueEvent;
+ public BalancerDecisionDetails(BalancerDecision balancerDecision) {
+ super(BALANCER_DECISION_EVENT);
+ this.balancerDecision = balancerDecision;
}
- public NamedQueueEvent getNamedQueueEvent() {
- return namedQueueEvent;
+ public BalancerDecision getBalancerDecision() {
+ return balancerDecision;
}
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("balancerDecision", balancerDecision)
+ .toString();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
index 7aa87fa..eff2df9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
@@ -30,16 +30,39 @@ import org.apache.yetus.audience.InterfaceAudience;
public class NamedQueuePayload {
public enum NamedQueueEvent {
- SLOW_LOG
+ SLOW_LOG(0),
+ BALANCE_DECISION(1);
+
+ private final int value;
+
+ NamedQueueEvent(int i) {
+ this.value = i;
+ }
+
+ public static NamedQueueEvent getEventByOrdinal(int value){
+ switch (value) {
+ case 0: {
+ return SLOW_LOG;
+ }
+ case 1: {
+ return BALANCE_DECISION;
+ }
+ default: {
+ throw new IllegalArgumentException(
+ "NamedQueue event with ordinal " + value + " not defined");
+ }
+ }
+ }
+
+ public int getValue() {
+ return value;
+ }
}
private final NamedQueueEvent namedQueueEvent;
- public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
- if (namedQueueEvent == null) {
- throw new RuntimeException("NamedQueuePayload with null namedQueueEvent");
- }
- this.namedQueueEvent = namedQueueEvent;
+ public NamedQueuePayload(int eventOrdinal) {
+ this.namedQueueEvent = NamedQueueEvent.getEventByOrdinal(eventOrdinal);
}
public NamedQueueEvent getNamedQueueEvent() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java
index f93baaa..9b422ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java
@@ -43,7 +43,7 @@ final class RingBufferEnvelope {
}
/**
- * Retrieve current rpcCall details {@link RpcLogDetails} available on Envelope and
+ * Retrieve current namedQueue payload {@link NamedQueuePayload} available on Envelope and
* free up the Envelope
*
* @return Retrieve rpc log details
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java
index 581d1a3..91ac91e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java
@@ -30,6 +30,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class RpcLogDetails extends NamedQueuePayload {
+ public static final int SLOW_LOG_EVENT = 0;
+
private final RpcCall rpcCall;
private final Message param;
private final String clientAddress;
@@ -40,7 +42,7 @@ public class RpcLogDetails extends NamedQueuePayload {
public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize,
String className, boolean isSlowLog, boolean isLargeLog) {
- super(NamedQueueEvent.SLOW_LOG);
+ super(SLOW_LOG_EVENT);
this.rpcCall = rpcCall;
this.param = param;
this.clientAddress = clientAddress;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java
new file mode 100644
index 0000000..e6fb982
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.BalancerDecision;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
+import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
+import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
+import org.apache.hadoop.hbase.namequeues.NamedQueueService;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
+import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+/**
+ * In-memory Queue service provider for Balancer Decision events
+ */
+@InterfaceAudience.Private
+public class BalancerDecisionQueueService implements NamedQueueService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BalancerDecisionQueueService.class);
+
+ private final boolean isBalancerDecisionRecording;
+
+ private static final String BALANCER_DECISION_QUEUE_SIZE =
+ "hbase.master.balancer.decision.queue.size";
+ private static final int DEFAULT_BALANCER_DECISION_QUEUE_SIZE = 250;
+
+ private static final int REGION_PLANS_THRESHOLD_PER_BALANCER = 15;
+
+ private final Queue<RecentLogs.BalancerDecision> balancerDecisionQueue;
+
+ public BalancerDecisionQueueService(Configuration conf) {
+ isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
+ BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
+ if (!isBalancerDecisionRecording) {
+ balancerDecisionQueue = null;
+ return;
+ }
+ final int queueSize =
+ conf.getInt(BALANCER_DECISION_QUEUE_SIZE, DEFAULT_BALANCER_DECISION_QUEUE_SIZE);
+ final EvictingQueue<RecentLogs.BalancerDecision> evictingQueue =
+ EvictingQueue.create(queueSize);
+ balancerDecisionQueue = Queues.synchronizedQueue(evictingQueue);
+ }
+
+ @Override
+ public NamedQueuePayload.NamedQueueEvent getEvent() {
+ return NamedQueuePayload.NamedQueueEvent.BALANCE_DECISION;
+ }
+
+ @Override
+ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
+ if (!isBalancerDecisionRecording) {
+ return;
+ }
+ if (!(namedQueuePayload instanceof BalancerDecisionDetails)) {
+ LOG.warn(
+ "BalancerDecisionQueueService: NamedQueuePayload is not of type BalancerDecisionDetails.");
+ return;
+ }
+ BalancerDecisionDetails balancerDecisionDetails = (BalancerDecisionDetails) namedQueuePayload;
+ BalancerDecision balancerDecisionRecords =
+ balancerDecisionDetails.getBalancerDecision();
+ List<String> regionPlans = balancerDecisionRecords.getRegionPlans();
+ List<List<String>> regionPlansList;
+ if (regionPlans.size() > REGION_PLANS_THRESHOLD_PER_BALANCER) {
+ regionPlansList = Lists.partition(regionPlans, REGION_PLANS_THRESHOLD_PER_BALANCER);
+ } else {
+ regionPlansList = Collections.singletonList(regionPlans);
+ }
+ for (List<String> regionPlansPerBalancer : regionPlansList) {
+ RecentLogs.BalancerDecision balancerDecision = RecentLogs.BalancerDecision.newBuilder()
+ .setInitTotalCost(balancerDecisionRecords.getInitTotalCost())
+ .setInitialFunctionCosts(balancerDecisionRecords.getInitialFunctionCosts())
+ .setComputedTotalCost(balancerDecisionRecords.getComputedTotalCost())
+ .setFinalFunctionCosts(balancerDecisionRecords.getFinalFunctionCosts())
+ .setComputedSteps(balancerDecisionRecords.getComputedSteps())
+ .addAllRegionPlans(regionPlansPerBalancer)
+ .build();
+ balancerDecisionQueue.add(balancerDecision);
+ }
+ }
+
+ @Override
+ public boolean clearNamedQueue() {
+ if (!isBalancerDecisionRecording) {
+ return false;
+ }
+ LOG.debug("Received request to clean up balancer decision queue.");
+ balancerDecisionQueue.clear();
+ return true;
+ }
+
+ @Override
+ public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
+ if (!isBalancerDecisionRecording) {
+ return null;
+ }
+ List<RecentLogs.BalancerDecision> balancerDecisions =
+ Arrays.stream(balancerDecisionQueue.toArray(new RecentLogs.BalancerDecision[0]))
+ .collect(Collectors.toList());
+ // latest records should be displayed first, hence reverse order sorting
+ Collections.reverse(balancerDecisions);
+ int limit = balancerDecisions.size();
+ if (request.getBalancerDecisionsRequest().hasLimit()) {
+ limit = Math.min(request.getBalancerDecisionsRequest().getLimit(), balancerDecisions.size());
+ }
+ // filter limit if provided
+ balancerDecisions = balancerDecisions.subList(0, limit);
+ final NamedQueueGetResponse namedQueueGetResponse = new NamedQueueGetResponse();
+ namedQueueGetResponse.setNamedQueueEvent(BalancerDecisionDetails.BALANCER_DECISION_EVENT);
+ namedQueueGetResponse.setBalancerDecisions(balancerDecisions);
+ return namedQueueGetResponse;
+ }
+
+ @Override
+ public void persistAll() {
+ // no-op for now
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
index f26ff51..c0a8e1d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
@@ -19,6 +19,12 @@
package org.apache.hadoop.hbase.namequeues.impl;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
@@ -31,22 +37,19 @@ import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Queue;
-import java.util.stream.Collectors;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
/**
* In-memory Queue service provider for Slow/LargeLog events
@@ -201,7 +204,7 @@ public class SlowLogQueueService implements NamedQueueService {
slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest);
}
NamedQueueGetResponse response = new NamedQueueGetResponse();
- response.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+ response.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
response.setSlowLogPayloads(slowLogPayloads);
return response;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java
index 6e88bf4..182cfd1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.namequeues.request;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -36,6 +37,7 @@ public class NamedQueueGetRequest {
private AdminProtos.SlowLogResponseRequest slowLogResponseRequest;
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
+ private MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest;
public AdminProtos.SlowLogResponseRequest getSlowLogResponseRequest() {
return slowLogResponseRequest;
@@ -46,12 +48,21 @@ public class NamedQueueGetRequest {
this.slowLogResponseRequest = slowLogResponseRequest;
}
+ public MasterProtos.BalancerDecisionsRequest getBalancerDecisionsRequest() {
+ return balancerDecisionsRequest;
+ }
+
+ public void setBalancerDecisionsRequest(
+ MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest) {
+ this.balancerDecisionsRequest = balancerDecisionsRequest;
+ }
+
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
- public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
- this.namedQueueEvent = namedQueueEvent;
+ public void setNamedQueueEvent(int eventOrdinal) {
+ this.namedQueueEvent = NamedQueuePayload.NamedQueueEvent.getEventByOrdinal(eventOrdinal);
}
@Override
@@ -59,7 +70,7 @@ public class NamedQueueGetRequest {
return new ToStringBuilder(this)
.append("slowLogResponseRequest", slowLogResponseRequest)
.append("namedQueueEvent", namedQueueEvent)
+ .append("balancerDecisionsRequest", balancerDecisionsRequest)
.toString();
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java
index ee4ed43..224402a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.namequeues.response;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.yetus.audience.InterfaceAudience;
import java.util.List;
@@ -32,6 +33,7 @@ import java.util.List;
public class NamedQueueGetResponse {
private List<TooSlowLog.SlowLogPayload> slowLogPayloads;
+ private List<RecentLogs.BalancerDecision> balancerDecisions;
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
public List<TooSlowLog.SlowLogPayload> getSlowLogPayloads() {
@@ -42,18 +44,27 @@ public class NamedQueueGetResponse {
this.slowLogPayloads = slowLogPayloads;
}
+ public List<RecentLogs.BalancerDecision> getBalancerDecisions() {
+ return balancerDecisions;
+ }
+
+ public void setBalancerDecisions(List<RecentLogs.BalancerDecision> balancerDecisions) {
+ this.balancerDecisions = balancerDecisions;
+ }
+
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
- public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
- this.namedQueueEvent = namedQueueEvent;
+ public void setNamedQueueEvent(int eventOrdinal) {
+ this.namedQueueEvent = NamedQueuePayload.NamedQueueEvent.getEventByOrdinal(eventOrdinal);
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("slowLogPayloads", slowLogPayloads)
+ .append("balancerDecisions", balancerDecisions)
.append("namedQueueEvent", namedQueueEvent)
.toString();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 41d6b20..ad21f50 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -121,6 +121,7 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
@@ -596,14 +597,7 @@ public class HRegionServer extends Thread implements
this.abortRequested = new AtomicBoolean(false);
this.stopped = false;
- if (!(this instanceof HMaster)) {
- final boolean isOnlineLogProviderEnabled = conf.getBoolean(
- HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
- HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
- if (isOnlineLogProviderEnabled) {
- this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
- }
- }
+ initNamedQueueRecorder(conf);
rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf);
String hostName =
@@ -679,6 +673,24 @@ public class HRegionServer extends Thread implements
}
}
+ private void initNamedQueueRecorder(Configuration conf) {
+ if (!(this instanceof HMaster)) {
+ final boolean isOnlineLogProviderEnabled = conf.getBoolean(
+ HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
+ HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
+ if (isOnlineLogProviderEnabled) {
+ this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
+ }
+ } else {
+ final boolean isBalancerDecisionRecording = conf
+ .getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
+ BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
+ if (isBalancerDecisionRecording) {
+ this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
+ }
+ }
+ }
+
// HMaster should override this method to load the specific config for master
protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
String hostname = conf.get(RS_HOSTNAME_KEY);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index ba818d2..2fc584a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -21,6 +21,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -106,6 +107,7 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
+import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.net.Address;
@@ -241,6 +243,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanReques
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
@@ -3787,7 +3790,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
List<SlowLogPayload> slowLogPayloads;
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
- namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+ namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
@@ -3826,6 +3829,36 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return clearSlowLogResponses;
}
+ @Override
+ public HBaseProtos.LogEntry getLogEntries(RpcController controller,
+ HBaseProtos.LogRequest request) throws ServiceException {
+ try {
+ final String logClassName = request.getLogClassName();
+ Class<?> logClass = Class.forName(logClassName)
+ .asSubclass(Message.class);
+ Method method = logClass.getMethod("parseFrom", ByteString.class);
+ if (logClassName.contains("SlowLogResponseRequest")) {
+ SlowLogResponseRequest slowLogResponseRequest =
+ (SlowLogResponseRequest) method.invoke(null, request.getLogMessage());
+ final NamedQueueRecorder namedQueueRecorder =
+ this.regionServer.getNamedQueueRecorder();
+ final List<SlowLogPayload> slowLogPayloads =
+ getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder);
+ SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
+ .addAllSlowLogPayloads(slowLogPayloads)
+ .build();
+ return HBaseProtos.LogEntry.newBuilder()
+ .setLogClassName(slowLogResponses.getClass().getName())
+ .setLogMessage(slowLogResponses.toByteString()).build();
+ }
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
+ | InvocationTargetException e) {
+ LOG.error("Error while retrieving log entries.", e);
+ throw new ServiceException(e);
+ }
+ throw new ServiceException("Invalid request params");
+ }
+
@VisibleForTesting
public RpcScheduler getRpcScheduler() {
return rpcServer.getScheduler();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
index 2e8632f..90eb594 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
@@ -888,12 +888,13 @@ public class TestAdmin2 extends TestAdminBase {
}
Assert.assertEquals(countFailedClearSlowResponse, 0);
- LogQueryFilter logQueryFilter = new LogQueryFilter();
- List<OnlineLogRecord> onlineLogRecords = ADMIN.getSlowLogResponses(new HashSet<>(serverNames),
- logQueryFilter);
-
+ List<LogEntry> onlineLogRecords = ADMIN.getLogEntries(new HashSet<>(serverNames),
+ "SLOW_LOG", ServerType.REGION_SERVER, 100, null);
// after cleanup of slowlog responses, total count of slowlog payloads should be 0
Assert.assertEquals(onlineLogRecords.size(), 0);
+ List<LogEntry> balancerDecisionRecords =
+ ADMIN.getLogEntries(null, "BALANCER_DECISION", ServerType.MASTER, 100, null);
+ Assert.assertEquals(balancerDecisionRecords.size(), 0);
}
@Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
index eedfcf2..23ab43d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -74,13 +75,10 @@ public class TestAsyncTableGetMultiThreaded {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static TableName TABLE_NAME = TableName.valueOf("async");
-
- private static byte[] FAMILY = Bytes.toBytes("cf");
-
- private static byte[] QUALIFIER = Bytes.toBytes("cq");
-
- private static int COUNT = 1000;
+ private static final TableName TABLE_NAME = TableName.valueOf("async");
+ private static final byte[] FAMILY = Bytes.toBytes("cf");
+ private static final byte[] QUALIFIER = Bytes.toBytes("cq");
+ private static final int COUNT = 1000;
private static AsyncConnection CONN;
@@ -99,6 +97,7 @@ public class TestAsyncTableGetMultiThreaded {
TEST_UTIL.getConfiguration().setInt(MAX_BUFFER_COUNT_KEY, 100);
TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(memoryCompaction));
+ TEST_UTIL.getConfiguration().setBoolean("hbase.master.balancer.decision.buffer.enabled", true);
TEST_UTIL.startMiniCluster(3);
SPLIT_KEYS = new byte[8][];
@@ -211,6 +210,9 @@ public class TestAsyncTableGetMultiThreaded {
LOG.info("====== Move meta done ======");
Thread.sleep(5000);
}
+ List<LogEntry> balancerDecisionRecords =
+ admin.getLogEntries(null, "BALANCER_DECISION", ServerType.MASTER, 2, null);
+ Assert.assertEquals(balancerDecisionRecords.size(), 2);
LOG.info("====== Read test finished, shutdown thread pool ======");
stop.set(true);
executor.shutdown();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index aaeae01..5c4eea8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -137,6 +137,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
@@ -700,6 +701,12 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
}
@Override
+ public HBaseProtos.LogEntry getLogEntries(RpcController controller,
+ HBaseProtos.LogRequest request) throws ServiceException {
+ return null;
+ }
+
+ @Override
public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
RpcController controller, GetSpaceQuotaSnapshotsRequest request)
throws ServiceException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerDecision.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerDecision.java
new file mode 100644
index 0000000..cfeeefe
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerDecision.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.balancer;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.LogEntry;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
+
+/**
+ * Test BalancerDecision ring buffer using namedQueue interface
+ */
+@Category({ MasterTests.class, MediumTests.class })
+public class TestBalancerDecision extends BalancerTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBalancerDecision.class);
+
+ @Test
+ public void testBalancerDecisions() {
+ conf.setBoolean("hbase.master.balancer.decision.buffer.enabled", true);
+ loadBalancer.setConf(conf);
+ float minCost = conf.getFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
+ conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 1.0f);
+ try {
+ // Test with/without per table balancer.
+ boolean[] perTableBalancerConfigs = {true, false};
+ for (boolean isByTable : perTableBalancerConfigs) {
+ conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
+ loadBalancer.setConf(conf);
+ for (int[] mockCluster : clusterStateMocks) {
+ Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
+ Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
+ (Map) mockClusterServersWithTables(servers);
+ List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
+ boolean emptyPlans = plans == null || plans.isEmpty();
+ Assert.assertTrue(emptyPlans || needsBalanceIdleRegion(mockCluster));
+ }
+ }
+ final NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
+ namedQueueGetRequest.setNamedQueueEvent(BalancerDecisionDetails.BALANCER_DECISION_EVENT);
+ namedQueueGetRequest
+ .setBalancerDecisionsRequest(MasterProtos.BalancerDecisionsRequest.getDefaultInstance());
+ NamedQueueGetResponse namedQueueGetResponse =
+ loadBalancer.namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
+ List<RecentLogs.BalancerDecision> balancerDecisions =
+ namedQueueGetResponse.getBalancerDecisions();
+ MasterProtos.BalancerDecisionsResponse response =
+ MasterProtos.BalancerDecisionsResponse.newBuilder()
+ .addAllBalancerDecision(balancerDecisions)
+ .build();
+ List<LogEntry> balancerDecisionRecords =
+ ProtobufUtil.getBalancerDecisionEntries(response);
+ Assert.assertTrue(balancerDecisionRecords.size() > 160);
+ } finally {
+ // reset config
+ conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE);
+ conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", minCost);
+ loadBalancer.setConf(conf);
+ }
+ }
+
+ private static boolean needsBalanceIdleRegion(int[] cluster) {
+ return (Arrays.stream(cluster).anyMatch(x -> x > 1)) && (Arrays.stream(cluster)
+ .anyMatch(x -> x < 1));
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
index f9e6a9e..83444eb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
@@ -151,6 +151,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
String regionNameAsString = RegionInfo.getRegionNameAsString(Bytes.toBytes(REGION_KEY));
assertTrue(loadBalancer.loads.get(regionNameAsString) != null);
assertTrue(loadBalancer.loads.get(regionNameAsString).size() == 15);
+ assertNull(loadBalancer.namedQueueRecorder);
Queue<BalancerRegionLoad> loads = loadBalancer.loads.get(regionNameAsString);
int i = 0;
@@ -203,6 +204,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
double expected = 1 - expectedLocalities[test];
assertEquals(expected, cost, 0.001);
}
+ assertNull(loadBalancer.namedQueueRecorder);
}
@Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
index 542efc3..161bcc1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
@@ -234,7 +234,7 @@ public class TestNamedQueueRecorder {
private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
- namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+ namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java
index 4ebf2a5..f7bc6fe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java
@@ -101,7 +101,7 @@ public class TestSlowLogAccessor {
private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(
AdminProtos.SlowLogResponseRequest request) {
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
- namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+ namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index 6c29e54..512f96a 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -1496,7 +1496,7 @@ module Hbase
#----------------------------------------------------------------------------------------------
# Retrieve SlowLog Responses from RegionServers
- def get_slowlog_responses(server_names, args)
+ def get_slowlog_responses(server_names, args, is_large_log = false)
unless server_names.is_a?(Array) || server_names.is_a?(String)
raise(ArgumentError,
"#{server_names.class} of #{server_names.inspect} is not of Array/String type")
@@ -1508,38 +1508,44 @@ module Hbase
server_names = getServerNames(server_names_list, false)
end
filter_params = get_filter_params(args)
- filter_params.setType(org.apache.hadoop.hbase.client.LogQueryFilter::Type::SLOW_LOG)
- slow_log_responses = @admin.getSlowLogResponses(java.util.HashSet.new(server_names),
- filter_params)
+ if args.key? 'LIMIT'
+ limit = args['LIMIT']
+ else
+ limit = 10
+ end
+ if is_large_log
+ log_type = 'LARGE_LOG'
+ else
+ log_type = 'SLOW_LOG'
+ end
+ log_dest = org.apache.hadoop.hbase.client.ServerType::REGION_SERVER
+ server_names_set = java.util.HashSet.new(server_names)
+ slow_log_responses = @admin.getLogEntries(server_names_set, log_type, log_dest, limit,
+ filter_params)
slow_log_responses_arr = []
- for slow_log_response in slow_log_responses
+ slow_log_responses.each { |slow_log_response|
slow_log_responses_arr << slow_log_response.toJsonPrettyPrint
- end
- puts 'Retrieved SlowLog Responses from RegionServers'
- puts slow_log_responses_arr
+ }
+ slow_log_responses_arr
end
def get_filter_params(args)
- filter_params = org.apache.hadoop.hbase.client.LogQueryFilter.new
+ filter_params = java.util.HashMap.new
if args.key? 'REGION_NAME'
region_name = args['REGION_NAME']
- filter_params.setRegionName(region_name)
+ filter_params.put('regionName', region_name)
end
if args.key? 'TABLE_NAME'
table_name = args['TABLE_NAME']
- filter_params.setTableName(table_name)
+ filter_params.put('tableName', table_name)
end
if args.key? 'CLIENT_IP'
- client_ip = args['CLIENT_IP']
- filter_params.setClientAddress(client_ip)
+ client_address = args['CLIENT_IP']
+ filter_params.put('clientAddress', client_address)
end
if args.key? 'USER'
user = args['USER']
- filter_params.setUserName(user)
- end
- if args.key? 'LIMIT'
- limit = args['LIMIT']
- filter_params.setLimit(limit)
+ filter_params.put('userName', user)
end
if args.key? 'FILTER_BY_OP'
filter_by_op = args['FILTER_BY_OP']
@@ -1547,39 +1553,13 @@ module Hbase
raise(ArgumentError, "FILTER_BY_OP should be either OR / AND")
end
if filter_by_op == 'AND'
- filter_params.setFilterByOperator(
- org.apache.hadoop.hbase.client.LogQueryFilter::FilterByOperator::AND)
+ filter_params.put('filterByOperator', 'AND')
end
end
filter_params
end
#----------------------------------------------------------------------------------------------
- # Retrieve LargeLog Responses from RegionServers
- def get_largelog_responses(server_names, args)
- unless server_names.is_a?(Array) || server_names.is_a?(String)
- raise(ArgumentError,
- "#{server_names.class} of #{server_names.inspect} is not of Array/String type")
- end
- if server_names == '*'
- server_names = getServerNames([], true)
- else
- server_names_list = to_server_names(server_names)
- server_names = getServerNames(server_names_list, false)
- end
- filter_params = get_filter_params(args)
- filter_params.setType(org.apache.hadoop.hbase.client.LogQueryFilter::Type::LARGE_LOG)
- large_log_responses = @admin.getSlowLogResponses(java.util.HashSet.new(server_names),
- filter_params)
- large_log_responses_arr = []
- for large_log_response in large_log_responses
- large_log_responses_arr << large_log_response.toJsonPrettyPrint
- end
- puts 'Retrieved LargeLog Responses from RegionServers'
- puts large_log_responses_arr
- end
-
- #----------------------------------------------------------------------------------------------
# Clears SlowLog Responses from RegionServers
def clear_slowlog_responses(server_names)
unless server_names.nil? || server_names.is_a?(Array) || server_names.is_a?(String)
@@ -1668,6 +1648,24 @@ module Hbase
end
#----------------------------------------------------------------------------------------------
+ # Retrieve latest balancer decisions made by LoadBalancers
+ def get_balancer_decisions(args)
+ if args.key? 'LIMIT'
+ limit = args['LIMIT']
+ else
+ limit = 250
+ end
+ log_type = 'BALANCER_DECISION'
+ log_dest = org.apache.hadoop.hbase.client.ServerType::MASTER
+ balancer_decisions_responses = @admin.getLogEntries(nil, log_type, log_dest, limit, nil)
+ balancer_decisions_resp_arr = []
+ balancer_decisions_responses.each { |balancer_dec_resp|
+ balancer_decisions_resp_arr << balancer_dec_resp.toJsonPrettyPrint
+ }
+ balancer_decisions_resp_arr
+ end
+
+ #----------------------------------------------------------------------------------------------
# Stop the active Master
def stop_master
@admin.stopMaster
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index b638bbe..97d2abe 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -338,6 +338,7 @@ Shell.load_command_group(
compact
compaction_switch
flush
+ get_balancer_decisions
get_slowlog_responses
get_largelog_responses
major_compact
diff --git a/hbase-shell/src/main/ruby/shell/commands/get_balancer_decisions.rb b/hbase-shell/src/main/ruby/shell/commands/get_balancer_decisions.rb
new file mode 100644
index 0000000..801166e
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/get_balancer_decisions.rb
@@ -0,0 +1,49 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# Retrieve latest balancer decisions maintained in memory by HMaster
+
+module Shell
+ module Commands
+ # Retrieve latest large log responses
+ class GetBalancerDecisions < Command
+ def help
+ <<-EOF
+Retrieve latest balancer decisions made by LoadBalancers.
+
+Examples:
+
+ hbase> get_balancer_decisions => Retrieve recent balancer decisions with
+ region plans
+ hbase> get_balancer_decisions LIMIT => 10 => Retrieve 10 most recent balancer decisions
+ with region plans
+
+ EOF
+ end
+
+ def command(args = {})
+ unless args.is_a? Hash
+ raise 'Filter parameters are not Hash'
+ end
+
+ balancer_decisions_resp_arr = admin.get_balancer_decisions(args)
+ puts 'Retrieved BalancerDecision Responses'
+ puts balancer_decisions_resp_arr
+ end
+ end
+ end
+end
diff --git a/hbase-shell/src/main/ruby/shell/commands/get_largelog_responses.rb b/hbase-shell/src/main/ruby/shell/commands/get_largelog_responses.rb
index f7effe7..8ed55ab 100644
--- a/hbase-shell/src/main/ruby/shell/commands/get_largelog_responses.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/get_largelog_responses.rb
@@ -90,8 +90,9 @@ echo "get_largelog_responses '*'" | hbase shell > xyz.out 2>&1
unless args.is_a? Hash
raise 'Filter parameters are not Hash'
end
-
- admin.get_largelog_responses(server_names, args)
+ large_log_responses_arr = admin.get_slowlog_responses(server_names, args, true)
+ puts 'Retrieved LargeLog Responses from RegionServers'
+ puts large_log_responses_arr
end
end
end
diff --git a/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb b/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb
index 23a3ec2..2dc108b 100644
--- a/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb
@@ -90,8 +90,9 @@ echo "get_slowlog_responses '*'" | hbase shell > xyz.out 2>&1
unless args.is_a? Hash
raise 'Filter parameters are not Hash'
end
-
- admin.get_slowlog_responses(server_names, args)
+ slow_log_responses_arr = admin.get_slowlog_responses(server_names, args)
+ puts 'Retrieved SlowLog Responses from RegionServers'
+ puts slow_log_responses_arr
end
end
end
diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb b/hbase-shell/src/test/ruby/hbase/admin_test.rb
index 3f89383..d32e32e 100644
--- a/hbase-shell/src/test/ruby/hbase/admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb
@@ -386,8 +386,8 @@ module Hbase
#-------------------------------------------------------------------------------
define_test 'get slowlog responses should work' do
- output = capture_stdout { command(:get_slowlog_responses, '*', {}) }
- assert(output.include?('Retrieved SlowLog Responses from RegionServers'))
+ output = command(:get_slowlog_responses, '*', {})
+ assert(output.nil?)
end
#-------------------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
index 9359d43..d4bc569 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
@@ -44,10 +44,12 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.CompactType;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.LogEntry;
import org.apache.hadoop.hbase.client.LogQueryFilter;
import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
import org.apache.hadoop.hbase.client.OnlineLogRecord;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.ServerType;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.SnapshotType;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -1438,4 +1440,10 @@ public class ThriftAdmin implements Admin {
return splitRegionAsync(regionName, null);
}
+ @Override
+ public List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType,
+ ServerType serverType, int limit, Map<String, Object> filterParams)
+ throws IOException {
+ throw new NotImplementedException("getLogEntries not supported in ThriftAdmin");
+ }
}
diff --git a/src/main/asciidoc/_chapters/hbase-default.adoc b/src/main/asciidoc/_chapters/hbase-default.adoc
index 5768add..1472381 100644
--- a/src/main/asciidoc/_chapters/hbase-default.adoc
+++ b/src/main/asciidoc/_chapters/hbase-default.adoc
@@ -2308,3 +2308,17 @@ The percent of region server RPC threads failed to abort RS.
.Default
`false`
+[[hbase.master.balancer.decision.buffer.enabled]]
+*`hbase.master.balancer.decision.buffer.enabled`*::
++
+.Description
+
+ Indicates whether active HMaster has ring buffer running for storing
+ balancer decisions in FIFO manner with limited entries. The size of
+ the ring buffer is indicated by config:
+ hbase.master.balancer.decision.queue.size
+
++
+.Default
+`false`
+