You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2016/09/09 06:16:14 UTC
hbase git commit: HBASE-16570 Compute region locality in parallel at
startup (binlijin)
Repository: hbase
Updated Branches:
refs/heads/master 46c756a4a -> e11aafae9
HBASE-16570 Compute region locality in parallel at startup (binlijin)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e11aafae
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e11aafae
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e11aafae
Branch: refs/heads/master
Commit: e11aafae957bc8d71cb622833011f29325049987
Parents: 46c756a
Author: chenheng <ch...@apache.org>
Authored: Fri Sep 9 10:54:48 2016 +0800
Committer: chenheng <ch...@apache.org>
Committed: Fri Sep 9 10:54:48 2016 +0800
----------------------------------------------------------------------
.../hbase/master/balancer/BaseLoadBalancer.java | 49 +++++++++++++++----
.../master/balancer/RegionLocationFinder.java | 18 +++++--
.../master/balancer/TestBaseLoadBalancer.java | 51 +++++++++++++++++---
.../balancer/TestRegionLocationFinder.java | 3 +-
4 files changed, 100 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e11aafae/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index dc5bace..2b13b21 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -33,6 +33,7 @@ import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
@@ -59,6 +60,7 @@ import com.google.common.base.Joiner;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* The base class for load balancers. It provides the the functions used to by
@@ -117,6 +119,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
HRegionInfo[] regions;
Deque<RegionLoad>[] regionLoads;
private RegionLocationFinder regionFinder;
+ ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures;
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
@@ -238,6 +241,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
regionIndexToTableIndex = new int[numRegions];
regionIndexToPrimaryIndex = new int[numRegions];
regionLoads = new Deque[numRegions];
+ regionLocationFutures = new ArrayList<ListenableFuture<HDFSBlocksDistribution>>(
+ numRegions);
+ if (regionFinder != null) {
+ for (int i = 0; i < numRegions; i++) {
+ regionLocationFutures.add(null);
+ }
+ }
regionLocations = new int[numRegions][];
serverIndicesSortedByRegionCount = new Integer[numServers];
serverIndicesSortedByLocality = new Integer[numServers];
@@ -307,6 +317,33 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
regionIndex++;
}
+ if (regionFinder != null) {
+ for (int index = 0; index < regionLocationFutures.size(); index++) {
+ ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures
+ .get(index);
+ HDFSBlocksDistribution blockDistbn = null;
+ try {
+ blockDistbn = future.get();
+ } catch (InterruptedException ite) {
+ } catch (ExecutionException ee) {
+ LOG.debug(
+ "IOException during HDFSBlocksDistribution computation. for region = "
+ + regions[index].getEncodedName(), ee);
+ } finally {
+ if (blockDistbn == null) {
+ blockDistbn = new HDFSBlocksDistribution();
+ }
+ }
+ List<ServerName> loc = regionFinder.getTopBlockLocations(blockDistbn);
+ regionLocations[index] = new int[loc.size()];
+ for (int i = 0; i < loc.size(); i++) {
+ regionLocations[index][i] = loc.get(i) == null ? -1
+ : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
+ : serversToIndex.get(loc.get(i).getHostAndPort()));
+ }
+ }
+ }
+
for (int i = 0; i < serversPerHostList.size(); i++) {
serversPerHost[i] = new int[serversPerHostList.get(i).size()];
for (int j = 0; j < serversPerHost[i].length; j++) {
@@ -454,15 +491,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
if (regionFinder != null) {
- //region location
- List<ServerName> loc = regionFinder.getTopBlockLocations(region);
- regionLocations[regionIndex] = new int[loc.size()];
- for (int i=0; i < loc.size(); i++) {
- regionLocations[regionIndex][i] =
- loc.get(i) == null ? -1 :
- (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
- : serversToIndex.get(loc.get(i).getHostAndPort()));
- }
+ // region location
+ regionLocationFutures.set(regionIndex,
+ regionFinder.asyncGetBlockDistribution(region));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e11aafae/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
index a6724ee..fbe57d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
@@ -21,10 +21,12 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -63,6 +65,7 @@ import java.util.concurrent.TimeUnit;
class RegionLocationFinder {
private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class);
private static final long CACHE_TIME = 240 * 60 * 1000;
+ private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution();
private Configuration conf;
private volatile ClusterStatus status;
private MasterServices services;
@@ -164,8 +167,8 @@ class RegionLocationFinder {
return includesUserTables;
}
- protected List<ServerName> getTopBlockLocations(HRegionInfo region) {
- HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
+ protected List<ServerName> getTopBlockLocations(
+ HDFSBlocksDistribution blocksDistribution) {
List<String> topHosts = blocksDistribution.getTopHosts();
return mapHostNameToServerName(topHosts);
}
@@ -208,7 +211,7 @@ class RegionLocationFinder {
+ region.getEncodedName(), ioe);
}
- return new HDFSBlocksDistribution();
+ return EMPTY_BLOCK_DISTRIBUTION;
}
/**
@@ -295,4 +298,13 @@ class RegionLocationFinder {
return blockDistbn;
}
}
+
+ public ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(
+ HRegionInfo hri) {
+ try {
+ return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION);
+ } catch (Exception e) {
+ return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e11aafae/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
index d8c0a3d..37165d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -56,6 +57,8 @@ import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
@Category({MasterTests.class, MediumTests.class})
public class TestBaseLoadBalancer extends BalancerTestBase {
@@ -448,17 +451,49 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
// mock block locality for some regions
RegionLocationFinder locationFinder = mock(RegionLocationFinder.class);
+ HDFSBlocksDistribution emptyBlockDistribution = new HDFSBlocksDistribution();
+ ListenableFuture<HDFSBlocksDistribution> defaultFuture = Futures
+ .immediateFuture(emptyBlockDistribution);
+ for (HRegionInfo regionInfo : regions) {
+ when(locationFinder.asyncGetBlockDistribution(regionInfo)).thenReturn(
+ defaultFuture);
+ }
// block locality: region:0 => {server:0}
// region:1 => {server:0, server:1}
// region:42 => {server:4, server:9, server:5}
- when(locationFinder.getTopBlockLocations(regions.get(0))).thenReturn(
- Lists.newArrayList(servers.get(0)));
- when(locationFinder.getTopBlockLocations(regions.get(1))).thenReturn(
- Lists.newArrayList(servers.get(0), servers.get(1)));
- when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn(
- Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
- when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn(
- Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus
+ HDFSBlocksDistribution region0BlockDistribution = new HDFSBlocksDistribution();
+ ListenableFuture<HDFSBlocksDistribution> future0 = Futures
+ .immediateFuture(region0BlockDistribution);
+ when(locationFinder.asyncGetBlockDistribution(regions.get(0))).thenReturn(
+ future0);
+ when(locationFinder.getTopBlockLocations(region0BlockDistribution))
+ .thenReturn(Lists.newArrayList(servers.get(0)));
+
+ HDFSBlocksDistribution region1BlockDistribution = new HDFSBlocksDistribution();
+ ListenableFuture<HDFSBlocksDistribution> future1 = Futures
+ .immediateFuture(region1BlockDistribution);
+ when(locationFinder.asyncGetBlockDistribution(regions.get(1))).thenReturn(
+ future1);
+ when(locationFinder.getTopBlockLocations(region1BlockDistribution))
+ .thenReturn(Lists.newArrayList(servers.get(0), servers.get(1)));
+
+ HDFSBlocksDistribution region42BlockDistribution = new HDFSBlocksDistribution();
+ ListenableFuture<HDFSBlocksDistribution> future42 = Futures
+ .immediateFuture(region42BlockDistribution);
+ when(locationFinder.asyncGetBlockDistribution(regions.get(42))).thenReturn(
+ future42);
+ when(locationFinder.getTopBlockLocations(region42BlockDistribution))
+ .thenReturn(
+ Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
+
+ HDFSBlocksDistribution region43BlockDistribution = new HDFSBlocksDistribution();
+ ListenableFuture<HDFSBlocksDistribution> future43 = Futures
+ .immediateFuture(region43BlockDistribution);
+ when(locationFinder.asyncGetBlockDistribution(regions.get(43))).thenReturn(
+ future43);
+ // this server does not exists in clusterStatus
+ when(locationFinder.getTopBlockLocations(region43BlockDistribution))
+ .thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0)));
BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e11aafae/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
index daa8942..039cac1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
@@ -121,7 +121,8 @@ public class TestRegionLocationFinder {
for (int i = 0; i < ServerNum; i++) {
HRegionServer server = cluster.getRegionServer(i);
for (Region region : server.getOnlineRegions(tableName)) {
- List<ServerName> servers = finder.getTopBlockLocations(region.getRegionInfo());
+ List<ServerName> servers = finder.getTopBlockLocations(finder
+ .getBlockDistribution(region.getRegionInfo()));
// test table may have empty region
if (region.getHDFSBlocksDistribution().getUniqueBlocksTotalWeight() == 0) {
continue;