You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2016/09/09 06:16:14 UTC

hbase git commit: HBASE-16570 Compute region locality in parallel at startup (binlijin)

Repository: hbase
Updated Branches:
  refs/heads/master 46c756a4a -> e11aafae9


HBASE-16570 Compute region locality in parallel at startup (binlijin)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e11aafae
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e11aafae
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e11aafae

Branch: refs/heads/master
Commit: e11aafae957bc8d71cb622833011f29325049987
Parents: 46c756a
Author: chenheng <ch...@apache.org>
Authored: Fri Sep 9 10:54:48 2016 +0800
Committer: chenheng <ch...@apache.org>
Committed: Fri Sep 9 10:54:48 2016 +0800

----------------------------------------------------------------------
 .../hbase/master/balancer/BaseLoadBalancer.java | 49 +++++++++++++++----
 .../master/balancer/RegionLocationFinder.java   | 18 +++++--
 .../master/balancer/TestBaseLoadBalancer.java   | 51 +++++++++++++++++---
 .../balancer/TestRegionLocationFinder.java      |  3 +-
 4 files changed, 100 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e11aafae/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index dc5bace..2b13b21 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -33,6 +33,7 @@ import java.util.NavigableMap;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
@@ -59,6 +60,7 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * The base class for load balancers. It provides the the functions used to by
@@ -117,6 +119,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     HRegionInfo[] regions;
     Deque<RegionLoad>[] regionLoads;
     private RegionLocationFinder regionFinder;
+    ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures;
 
     int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
 
@@ -238,6 +241,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       regionIndexToTableIndex = new int[numRegions];
       regionIndexToPrimaryIndex = new int[numRegions];
       regionLoads = new Deque[numRegions];
+      regionLocationFutures = new ArrayList<ListenableFuture<HDFSBlocksDistribution>>(
+          numRegions);
+      if (regionFinder != null) {
+        for (int i = 0; i < numRegions; i++) {
+          regionLocationFutures.add(null);
+        }
+      }
       regionLocations = new int[numRegions][];
       serverIndicesSortedByRegionCount = new Integer[numServers];
       serverIndicesSortedByLocality = new Integer[numServers];
@@ -307,6 +317,33 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
         regionIndex++;
       }
 
+      if (regionFinder != null) {
+        for (int index = 0; index < regionLocationFutures.size(); index++) {
+          ListenableFuture<HDFSBlocksDistribution> future = regionLocationFutures
+              .get(index);
+          HDFSBlocksDistribution blockDistbn = null;
+          try {
+            blockDistbn = future.get();
+          } catch (InterruptedException ite) {
+          } catch (ExecutionException ee) {
+            LOG.debug(
+                "IOException during HDFSBlocksDistribution computation. for region = "
+                    + regions[index].getEncodedName(), ee);
+          } finally {
+            if (blockDistbn == null) {
+              blockDistbn = new HDFSBlocksDistribution();
+            }
+          }
+          List<ServerName> loc = regionFinder.getTopBlockLocations(blockDistbn);
+          regionLocations[index] = new int[loc.size()];
+          for (int i = 0; i < loc.size(); i++) {
+            regionLocations[index][i] = loc.get(i) == null ? -1
+                : (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
+                    : serversToIndex.get(loc.get(i).getHostAndPort()));
+          }
+        }
+      }
+
       for (int i = 0; i < serversPerHostList.size(); i++) {
         serversPerHost[i] = new int[serversPerHostList.get(i).size()];
         for (int j = 0; j < serversPerHost[i].length; j++) {
@@ -454,15 +491,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       }
 
       if (regionFinder != null) {
-        //region location
-        List<ServerName> loc = regionFinder.getTopBlockLocations(region);
-        regionLocations[regionIndex] = new int[loc.size()];
-        for (int i=0; i < loc.size(); i++) {
-          regionLocations[regionIndex][i] =
-              loc.get(i) == null ? -1 :
-                (serversToIndex.get(loc.get(i).getHostAndPort()) == null ? -1
-                    : serversToIndex.get(loc.get(i).getHostAndPort()));
-        }
+        // region location
+        regionLocationFutures.set(regionIndex,
+            regionFinder.asyncGetBlockDistribution(region));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e11aafae/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
index a6724ee..fbe57d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java
@@ -21,10 +21,12 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -63,6 +65,7 @@ import java.util.concurrent.TimeUnit;
 class RegionLocationFinder {
   private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class);
   private static final long CACHE_TIME = 240 * 60 * 1000;
+  private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution();
   private Configuration conf;
   private volatile ClusterStatus status;
   private MasterServices services;
@@ -164,8 +167,8 @@ class RegionLocationFinder {
     return includesUserTables;
   }
 
-  protected List<ServerName> getTopBlockLocations(HRegionInfo region) {
-    HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
+  protected List<ServerName> getTopBlockLocations(
+      HDFSBlocksDistribution blocksDistribution) {
     List<String> topHosts = blocksDistribution.getTopHosts();
     return mapHostNameToServerName(topHosts);
   }
@@ -208,7 +211,7 @@ class RegionLocationFinder {
           + region.getEncodedName(), ioe);
     }
 
-    return new HDFSBlocksDistribution();
+    return EMPTY_BLOCK_DISTRIBUTION;
   }
 
   /**
@@ -295,4 +298,13 @@ class RegionLocationFinder {
       return blockDistbn;
     }
   }
+
+  public ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(
+      HRegionInfo hri) {
+    try {
+      return loader.reload(hri, EMPTY_BLOCK_DISTRIBUTION);
+    } catch (Exception e) {
+      return Futures.immediateFuture(EMPTY_BLOCK_DISTRIBUTION);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e11aafae/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
index d8c0a3d..37165d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -56,6 +57,8 @@ import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 @Category({MasterTests.class, MediumTests.class})
 public class TestBaseLoadBalancer extends BalancerTestBase {
@@ -448,17 +451,49 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
 
     // mock block locality for some regions
     RegionLocationFinder locationFinder = mock(RegionLocationFinder.class);
+    HDFSBlocksDistribution emptyBlockDistribution = new HDFSBlocksDistribution();
+    ListenableFuture<HDFSBlocksDistribution> defaultFuture = Futures
+        .immediateFuture(emptyBlockDistribution);
+    for (HRegionInfo regionInfo : regions) {
+      when(locationFinder.asyncGetBlockDistribution(regionInfo)).thenReturn(
+          defaultFuture);
+    }
     // block locality: region:0   => {server:0}
     //                 region:1   => {server:0, server:1}
     //                 region:42 => {server:4, server:9, server:5}
-    when(locationFinder.getTopBlockLocations(regions.get(0))).thenReturn(
-      Lists.newArrayList(servers.get(0)));
-    when(locationFinder.getTopBlockLocations(regions.get(1))).thenReturn(
-      Lists.newArrayList(servers.get(0), servers.get(1)));
-    when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn(
-      Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
-    when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn(
-      Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus
+    HDFSBlocksDistribution region0BlockDistribution = new HDFSBlocksDistribution();
+    ListenableFuture<HDFSBlocksDistribution> future0 = Futures
+        .immediateFuture(region0BlockDistribution);
+    when(locationFinder.asyncGetBlockDistribution(regions.get(0))).thenReturn(
+        future0);
+    when(locationFinder.getTopBlockLocations(region0BlockDistribution))
+        .thenReturn(Lists.newArrayList(servers.get(0)));
+
+    HDFSBlocksDistribution region1BlockDistribution = new HDFSBlocksDistribution();
+    ListenableFuture<HDFSBlocksDistribution> future1 = Futures
+        .immediateFuture(region1BlockDistribution);
+    when(locationFinder.asyncGetBlockDistribution(regions.get(1))).thenReturn(
+        future1);
+    when(locationFinder.getTopBlockLocations(region1BlockDistribution))
+        .thenReturn(Lists.newArrayList(servers.get(0), servers.get(1)));
+
+    HDFSBlocksDistribution region42BlockDistribution = new HDFSBlocksDistribution();
+    ListenableFuture<HDFSBlocksDistribution> future42 = Futures
+        .immediateFuture(region42BlockDistribution);
+    when(locationFinder.asyncGetBlockDistribution(regions.get(42))).thenReturn(
+        future42);
+    when(locationFinder.getTopBlockLocations(region42BlockDistribution))
+        .thenReturn(
+            Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5)));
+
+    HDFSBlocksDistribution region43BlockDistribution = new HDFSBlocksDistribution();
+    ListenableFuture<HDFSBlocksDistribution> future43 = Futures
+        .immediateFuture(region43BlockDistribution);
+    when(locationFinder.asyncGetBlockDistribution(regions.get(43))).thenReturn(
+        future43);
+    // this server does not exists in clusterStatus
+    when(locationFinder.getTopBlockLocations(region43BlockDistribution))
+        .thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0)));
 
     BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e11aafae/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
index daa8942..039cac1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionLocationFinder.java
@@ -121,7 +121,8 @@ public class TestRegionLocationFinder {
     for (int i = 0; i < ServerNum; i++) {
       HRegionServer server = cluster.getRegionServer(i);
       for (Region region : server.getOnlineRegions(tableName)) {
-        List<ServerName> servers = finder.getTopBlockLocations(region.getRegionInfo());
+        List<ServerName> servers = finder.getTopBlockLocations(finder
+            .getBlockDistribution(region.getRegionInfo()));
         // test table may have empty region
         if (region.getHDFSBlocksDistribution().getUniqueBlocksTotalWeight() == 0) {
           continue;