You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/02/03 12:53:26 UTC
[hbase] 08/09: HBASE-21719 Rewrite RegionPlacementMaintainer to use
AsyncClusterConnection
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 457df3a967de58c481153d79a086e16c029259dd
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Jan 15 11:43:41 2019 +0800
HBASE-21719 Rewrite RegionPlacementMaintainer to use AsyncClusterConnection
Signed-off-by: Michael Stack <st...@apache.org>
---
.../hbase/master/RegionPlacementMaintainer.java | 225 +++++++++++----------
1 file changed, 113 insertions(+), 112 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java
index faf5e4a..fda0a9c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.master;
+import java.io.Closeable;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
@@ -39,29 +38,30 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.MunkresAssignment;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
@@ -71,7 +71,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
*/
@InterfaceAudience.Private
// TODO: Remove? Unused. Partially implemented only.
-public class RegionPlacementMaintainer {
+public class RegionPlacementMaintainer implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(RegionPlacementMaintainer.class
.getName());
//The cost of a placement that should never be assigned.
@@ -96,9 +96,9 @@ public class RegionPlacementMaintainer {
private final boolean enforceMinAssignmentMove;
private RackManager rackManager;
private Set<TableName> targetTableSet;
- private final Connection connection;
+ private AsyncClusterConnection connection;
- public RegionPlacementMaintainer(Configuration conf) {
+ public RegionPlacementMaintainer(Configuration conf) throws IOException {
this(conf, true, true);
}
@@ -109,11 +109,6 @@ public class RegionPlacementMaintainer {
this.enforceMinAssignmentMove = enforceMinAssignmentMove;
this.targetTableSet = new HashSet<>();
this.rackManager = new RackManager(conf);
- try {
- this.connection = ConnectionFactory.createConnection(this.conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
}
private static void printHelp(Options opt) {
@@ -124,6 +119,14 @@ public class RegionPlacementMaintainer {
" [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt);
}
+ private AsyncClusterConnection getConnection() throws IOException {
+ if (connection == null) {
+ connection =
+ ClusterConnectionFactory.createAsyncClusterConnection(this.conf, null, User.getCurrent());
+ }
+ return connection;
+ }
+
public void setTargetTableName(String[] tableNames) {
if (tableNames != null) {
for (String table : tableNames)
@@ -133,10 +136,8 @@ public class RegionPlacementMaintainer {
/**
* @return the new RegionAssignmentSnapshot
- * @throws IOException
*/
- public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot()
- throws IOException {
+ public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException {
SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
currentAssignmentShapshot.initialize();
@@ -145,9 +146,6 @@ public class RegionPlacementMaintainer {
/**
* Verify the region placement is consistent with the assignment plan
- * @param isDetailMode
- * @return reports
- * @throws IOException
*/
public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode)
throws IOException {
@@ -206,10 +204,9 @@ public class RegionPlacementMaintainer {
// Get the all the region servers
List<ServerName> servers = new ArrayList<>();
- try (Admin admin = this.connection.getAdmin()) {
- servers.addAll(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
+ servers.addAll(
+ FutureUtils.get(getConnection().getAdmin().getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)))
.getLiveServerMetrics().keySet());
- }
LOG.info("Start to generate assignment plan for " + numRegions +
" regions from table " + tableName + " with " +
@@ -492,6 +489,11 @@ public class RegionPlacementMaintainer {
return plan;
}
+ @Override
+ public void close() throws IOException {
+ Closeables.close(connection, true);
+ }
+
/**
* Some algorithms for solving the assignment problem may traverse workers or
* jobs in linear order which may result in skewing the assignments of the
@@ -690,19 +692,17 @@ public class RegionPlacementMaintainer {
}
if (singleServerPlan != null) {
// Update the current region server with its updated favored nodes
- BlockingInterface currentRegionServer =
- ((ClusterConnection)this.connection).getAdmin(entry.getKey());
+ AsyncRegionServerAdmin rsAdmin = getConnection().getRegionServerAdmin(entry.getKey());
UpdateFavoredNodesRequest request =
- RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
-
+ RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
UpdateFavoredNodesResponse updateFavoredNodesResponse =
- currentRegionServer.updateFavoredNodes(null, request);
+ FutureUtils.get(rsAdmin.updateFavoredNodes(request));
LOG.info("Region server " +
- ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName() +
- " has updated " + updateFavoredNodesResponse.getResponse() + " / " +
- singleServerPlan.getAssignmentMap().size() +
- " regions with the assignment plan");
- succeededNum ++;
+ FutureUtils.get(rsAdmin.getServerInfo(RequestConverter.buildGetServerInfoRequest()))
+ .getServerInfo() +
+ " has updated " + updateFavoredNodesResponse.getResponse() + " / " +
+ singleServerPlan.getAssignmentMap().size() + " regions with the assignment plan");
+ succeededNum++;
}
} catch (Exception e) {
failedUpdateMap.put(entry.getKey(), e);
@@ -719,7 +719,7 @@ public class RegionPlacementMaintainer {
" region servers with its corresponding favored nodes");
for (Map.Entry<ServerName, Exception> entry :
failedUpdateMap.entrySet() ) {
- LOG.error("Failed to update " + entry.getKey().getHostAndPort() +
+ LOG.error("Failed to update " + entry.getKey().getAddress() +
" because of " + entry.getValue().getMessage());
}
}
@@ -1019,93 +1019,94 @@ public class RegionPlacementMaintainer {
}
// Create the region placement obj
- RegionPlacementMaintainer rp = new RegionPlacementMaintainer(conf, enforceLocality,
- enforceMinAssignmentMove);
+ try (RegionPlacementMaintainer rp =
+ new RegionPlacementMaintainer(conf, enforceLocality, enforceMinAssignmentMove)) {
- if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
- verificationDetails = true;
- }
-
- if (cmd.hasOption("tables")) {
- String tableNameListStr = cmd.getOptionValue("tables");
- String[] tableNames = StringUtils.split(tableNameListStr, ",");
- rp.setTargetTableName(tableNames);
- }
+ if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
+ verificationDetails = true;
+ }
- if (cmd.hasOption("munkres")) {
- USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
- }
+ if (cmd.hasOption("tables")) {
+ String tableNameListStr = cmd.getOptionValue("tables");
+ String[] tableNames = StringUtils.split(tableNameListStr, ",");
+ rp.setTargetTableName(tableNames);
+ }
- // Read all the modes
- if (cmd.hasOption("v") || cmd.hasOption("verify")) {
- // Verify the region placement.
- rp.verifyRegionPlacement(verificationDetails);
- } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
- // Generate the assignment plan only without updating the hbase:meta and RS
- FavoredNodesPlan plan = rp.getNewAssignmentPlan();
- printAssignmentPlan(plan);
- } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
- // Generate the new assignment plan
- FavoredNodesPlan plan = rp.getNewAssignmentPlan();
- // Print the new assignment plan
- printAssignmentPlan(plan);
- // Write the new assignment plan to META
- rp.updateAssignmentPlanToMeta(plan);
- } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
- // Generate the new assignment plan
- FavoredNodesPlan plan = rp.getNewAssignmentPlan();
- // Print the new assignment plan
- printAssignmentPlan(plan);
- // Update the assignment to hbase:meta and Region Servers
- rp.updateAssignmentPlan(plan);
- } else if (cmd.hasOption("diff")) {
- FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
- Map<String, Map<String, Float>> locality = FSUtils
- .getRegionDegreeLocalityMappingFromFS(conf);
- Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
- rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
- System.out.println("Do you want to update the assignment plan? [y/n]");
- Scanner s = new Scanner(System.in);
- String input = s.nextLine().trim();
- if (input.equals("y")) {
- System.out.println("Updating assignment plan...");
- rp.updateAssignmentPlan(newPlan);
+ if (cmd.hasOption("munkres")) {
+ USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
}
- s.close();
- } else if (cmd.hasOption("ld")) {
- Map<String, Map<String, Float>> locality = FSUtils
- .getRegionDegreeLocalityMappingFromFS(conf);
- rp.printLocalityAndDispersionForCurrentPlan(locality);
- } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
- FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
- printAssignmentPlan(plan);
- } else if (cmd.hasOption("overwrite")) {
- if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
- throw new IllegalArgumentException("Please specify: " +
+
+ // Read all the modes
+ if (cmd.hasOption("v") || cmd.hasOption("verify")) {
+ // Verify the region placement.
+ rp.verifyRegionPlacement(verificationDetails);
+ } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
+ // Generate the assignment plan only without updating the hbase:meta and RS
+ FavoredNodesPlan plan = rp.getNewAssignmentPlan();
+ printAssignmentPlan(plan);
+ } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
+ // Generate the new assignment plan
+ FavoredNodesPlan plan = rp.getNewAssignmentPlan();
+ // Print the new assignment plan
+ printAssignmentPlan(plan);
+ // Write the new assignment plan to META
+ rp.updateAssignmentPlanToMeta(plan);
+ } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
+ // Generate the new assignment plan
+ FavoredNodesPlan plan = rp.getNewAssignmentPlan();
+ // Print the new assignment plan
+ printAssignmentPlan(plan);
+ // Update the assignment to hbase:meta and Region Servers
+ rp.updateAssignmentPlan(plan);
+ } else if (cmd.hasOption("diff")) {
+ FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
+ Map<String, Map<String, Float>> locality =
+ FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
+ Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
+ rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
+ System.out.println("Do you want to update the assignment plan? [y/n]");
+ Scanner s = new Scanner(System.in);
+ String input = s.nextLine().trim();
+ if (input.equals("y")) {
+ System.out.println("Updating assignment plan...");
+ rp.updateAssignmentPlan(newPlan);
+ }
+ s.close();
+ } else if (cmd.hasOption("ld")) {
+ Map<String, Map<String, Float>> locality =
+ FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
+ rp.printLocalityAndDispersionForCurrentPlan(locality);
+ } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
+ FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
+ printAssignmentPlan(plan);
+ } else if (cmd.hasOption("overwrite")) {
+ if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
+ throw new IllegalArgumentException("Please specify: " +
" -update -r regionName -f server1:port,server2:port,server3:port");
- }
+ }
- String regionName = cmd.getOptionValue("r");
- String favoredNodesStr = cmd.getOptionValue("f");
- LOG.info("Going to update the region " + regionName + " with the new favored nodes " +
+ String regionName = cmd.getOptionValue("r");
+ String favoredNodesStr = cmd.getOptionValue("f");
+ LOG.info("Going to update the region " + regionName + " with the new favored nodes " +
favoredNodesStr);
- List<ServerName> favoredNodes = null;
- RegionInfo regionInfo =
+ List<ServerName> favoredNodes = null;
+ RegionInfo regionInfo =
rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName);
- if (regionInfo == null) {
- LOG.error("Cannot find the region " + regionName + " from the META");
- } else {
- try {
- favoredNodes = getFavoredNodeList(favoredNodesStr);
- } catch (IllegalArgumentException e) {
- LOG.error("Cannot parse the invalid favored nodes because " + e);
+ if (regionInfo == null) {
+ LOG.error("Cannot find the region " + regionName + " from the META");
+ } else {
+ try {
+ favoredNodes = getFavoredNodeList(favoredNodesStr);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Cannot parse the invalid favored nodes because " + e);
+ }
+ FavoredNodesPlan newPlan = new FavoredNodesPlan();
+ newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
+ rp.updateAssignmentPlan(newPlan);
}
- FavoredNodesPlan newPlan = new FavoredNodesPlan();
- newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
- rp.updateAssignmentPlan(newPlan);
+ } else {
+ printHelp(opt);
}
- } else {
- printHelp(opt);
}
} catch (ParseException e) {
printHelp(opt);