You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/04/19 01:31:50 UTC
git commit: PHOENIX-936 Custom load balancer to colocate user table
regions and index table regions (Rajeshbabu)
Repository: incubator-phoenix
Updated Branches:
refs/heads/local-index 73e1c0864 -> 611045292
PHOENIX-936 Custom load balancer to colocate user table regions and index table regions (Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/61104529
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/61104529
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/61104529
Branch: refs/heads/local-index
Commit: 61104529252d2913dadda6d87f141e335c0a66ff
Parents: 73e1c08
Author: James Taylor <ja...@apache.org>
Authored: Fri Apr 18 16:31:29 2014 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Fri Apr 18 16:31:29 2014 -0700
----------------------------------------------------------------------
.../index/balancer/TestIndexLoadBalancer.java | 531 ++++++++++++++
.../hbase/index/balancer/IndexLoadBalancer.java | 688 +++++++++++++++++++
2 files changed, 1219 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/61104529/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/TestIndexLoadBalancer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/TestIndexLoadBalancer.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/TestIndexLoadBalancer.java
new file mode 100644
index 0000000..fb0afa9
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/TestIndexLoadBalancer.java
@@ -0,0 +1,531 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.balancer;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.TestSplitTransactionOnCluster.MockedRegionObserver;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.phoenix.hbase.index.IndexTestingUtils;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.util.ConfigUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestIndexLoadBalancer {
+
+ private static final Log LOG = LogFactory.getLog(TestIndexLoadBalancer.class);
+
+ private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static HBaseAdmin admin = null;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ final int NUM_RS = 4;
+ Configuration conf = UTIL.getConfiguration();
+ conf.setBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, true);
+ conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MockedMasterObserver.class.getName());
+ conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, IndexLoadBalancer.class,
+ LoadBalancer.class);
+ IndexTestingUtils.setupConfig(conf);
+ // disable version checking, so we can test against whatever version of HBase happens to be
+ // installed (right now, its generally going to be SNAPSHOT versions).
+ conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
+ // set replication required parameter
+ ConfigUtil.setReplicationConfigIfAbsent(conf);
+ UTIL.startMiniCluster(NUM_RS);
+ admin = UTIL.getHBaseAdmin();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (admin != null) {
+ admin.disableTables(".*");
+ admin.deleteTables(".*");
+ admin.close();
+ }
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test(timeout = 180000)
+ public void testRoundRobinAssignmentDuringIndexTableCreation() throws Exception {
+ MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ TableName tableName = TableName.valueOf("testRoundRobinAssignmentDuringIndexTableCreation");
+ TableName indexTableName =
+ TableName.valueOf("testRoundRobinAssignmentDuringIndexTableCreation_index");
+ createUserAndIndexTable(tableName, indexTableName);
+ boolean isRegionColocated =
+ checkForColocation(master, tableName.getNameAsString(), indexTableName
+ .getNameAsString());
+ assertTrue("User regions and index regions should colocate.", isRegionColocated);
+ }
+
+ @Test(timeout = 180000)
+ public void testColocationAfterSplit() throws Exception {
+ MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ // Table names to make use of the
+ TableName tableName = TableName.valueOf("testSplitHooksBeforeAndAfterPONR_1");
+ TableName indexTableName = TableName.valueOf("testSplitHooksBeforeAndAfterPONR_2");
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addCoprocessor(MockedRegionObserver.class.getName());
+ htd.addFamily(new HColumnDescriptor("cf"));
+ char c = 'A';
+ byte[][] split = new byte[20][];
+ for (int i = 0; i < 20; i++) {
+ byte[] b = { (byte) c };
+ split[i] = b;
+ c++;
+ }
+ admin.createTable(htd, split);
+ HTableDescriptor iHtd = new HTableDescriptor(indexTableName);
+ iHtd.addFamily(new HColumnDescriptor("cf"));
+ iHtd.setValue(IndexLoadBalancer.PARENT_TABLE_KEY, tableName.toBytes());
+ admin.createTable(iHtd, split);
+
+ // test put with the indexed column
+
+ insertData(tableName);
+ insertData(indexTableName);
+
+ admin.split(tableName.getNameAsString(), "c");
+ List<HRegionInfo> regionsOfUserTable =
+ master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
+
+ while (regionsOfUserTable.size() != 22) {
+ Thread.sleep(100);
+ regionsOfUserTable =
+ master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
+ }
+
+ List<HRegionInfo> regionsOfIndexTable =
+ master.getAssignmentManager().getRegionStates().getRegionsOfTable(indexTableName);
+
+ while (regionsOfIndexTable.size() != 22) {
+ Thread.sleep(100);
+ regionsOfIndexTable =
+ master.getAssignmentManager().getRegionStates().getRegionsOfTable(
+ indexTableName);
+ }
+ boolean isRegionColocated =
+ checkForColocation(master, tableName.getNameAsString(), indexTableName
+ .getNameAsString());
+ assertTrue("User regions and index regions should colocate.", isRegionColocated);
+ }
+
+ @Test(timeout = 180000)
+ public void testColocationAfterRegionsMerge() throws Exception {
+ MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ RegionStates regionStates = master.getAssignmentManager().getRegionStates();
+ // Table names to make use of the
+ TableName tableName = TableName.valueOf("testColocationAfterRegionsMerge");
+ TableName indexTableName = TableName.valueOf("testColocationAfterRegionsMerge_index");
+ createUserAndIndexTable(tableName, indexTableName);
+ ServerName server = cluster.getRegionServer(0).getServerName();
+ List<HRegionInfo> regionsOfUserTable = regionStates.getRegionsOfTable(tableName);
+ Pair<HRegionInfo, HRegionInfo> regionsToMerge = new Pair<HRegionInfo, HRegionInfo>();
+ byte[] startKey1 = { (byte) 'C' };
+ byte[] startKey2 = { (byte) 'D' };
+ for (HRegionInfo region : regionsOfUserTable) {
+ if (Bytes.compareTo(startKey1, region.getStartKey()) == 0) {
+ regionsToMerge.setFirst(region);
+ } else if (Bytes.compareTo(startKey2, region.getStartKey()) == 0) {
+ regionsToMerge.setSecond(region);
+ }
+ }
+ admin.move(regionsToMerge.getFirst().getEncodedNameAsBytes(), Bytes.toBytes(server
+ .toString()));
+ admin.move(regionsToMerge.getSecond().getEncodedNameAsBytes(), Bytes.toBytes(server
+ .toString()));
+
+ List<HRegionInfo> regionsOfIndexTable = regionStates.getRegionsOfTable(indexTableName);
+ Pair<HRegionInfo, HRegionInfo> indexRegionsToMerge = new Pair<HRegionInfo, HRegionInfo>();
+ for (HRegionInfo region : regionsOfIndexTable) {
+ if (Bytes.compareTo(startKey1, region.getStartKey()) == 0) {
+ indexRegionsToMerge.setFirst(region);
+ } else if (Bytes.compareTo(startKey2, region.getStartKey()) == 0) {
+ indexRegionsToMerge.setSecond(region);
+ }
+ }
+ admin.move(indexRegionsToMerge.getFirst().getEncodedNameAsBytes(), Bytes.toBytes(server
+ .toString()));
+ admin.move(indexRegionsToMerge.getSecond().getEncodedNameAsBytes(), Bytes.toBytes(server
+ .toString()));
+ while (!regionStates.getRegionServerOfRegion(regionsToMerge.getFirst()).equals(server)
+ || !regionStates.getRegionServerOfRegion(regionsToMerge.getSecond()).equals(server)
+ || !regionStates.getRegionServerOfRegion(indexRegionsToMerge.getFirst()).equals(
+ server)
+ || !regionStates.getRegionServerOfRegion(indexRegionsToMerge.getSecond()).equals(
+ server)) {
+ Threads.sleep(1000);
+ }
+ admin.mergeRegions(regionsToMerge.getFirst().getEncodedNameAsBytes(), regionsToMerge
+ .getSecond().getEncodedNameAsBytes(), true);
+ admin.mergeRegions(indexRegionsToMerge.getFirst().getEncodedNameAsBytes(),
+ indexRegionsToMerge.getSecond().getEncodedNameAsBytes(), true);
+
+ while (regionsOfUserTable.size() != 20 || regionsOfIndexTable.size() != 20) {
+ Thread.sleep(100);
+ regionsOfUserTable = regionStates.getRegionsOfTable(tableName);
+ regionsOfIndexTable = regionStates.getRegionsOfTable(indexTableName);
+ }
+ boolean isRegionColocated =
+ checkForColocation(master, tableName.getNameAsString(), indexTableName
+ .getNameAsString());
+ assertTrue("User regions and index regions should colocate.", isRegionColocated);
+ }
+
+ private void insertData(TableName tableName) throws IOException, InterruptedException {
+ HTable table = new HTable(admin.getConfiguration(), tableName);
+ Put p = new Put("a".getBytes());
+ p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val"));
+ p.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2"));
+ table.put(p);
+
+ Put p1 = new Put("b".getBytes());
+ p1.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val"));
+ p1.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2"));
+ table.put(p1);
+
+ Put p2 = new Put("c".getBytes());
+ p2.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val"));
+ p2.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2"));
+ table.put(p2);
+
+ Put p3 = new Put("c1".getBytes());
+ p3.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val"));
+ p3.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2"));
+ table.put(p3);
+
+ Put p4 = new Put("d".getBytes());
+ p4.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val"));
+ p4.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2"));
+ table.put(p4);
+ admin.flush(tableName.getNameAsString());
+ }
+
+ @Test(timeout = 180000)
+ public void testRandomAssignmentDuringIndexTableEnable() throws Exception {
+ MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ master.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", false);
+ TableName tableName = TableName.valueOf("testRandomAssignmentDuringIndexTableEnable");
+ TableName indexTableName =
+ TableName.valueOf("testRandomAssignmentDuringIndexTableEnable_index");
+ createUserAndIndexTable(tableName, indexTableName);
+ admin.disableTable(tableName);
+ admin.disableTable(indexTableName);
+ admin.enableTable(tableName);
+ admin.enableTable(indexTableName);
+ boolean isRegionColocated =
+ checkForColocation(master, tableName.getNameAsString(), indexTableName
+ .getNameAsString());
+ assertTrue("User regions and index regions should colocate.", isRegionColocated);
+
+ }
+
+ @Test(timeout = 180000)
+ public void testBalanceCluster() throws Exception {
+ MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ master.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", false);
+ master.getConfiguration().setBoolean("hbase.master.startup.retainassign", false);
+ master.getConfiguration().setBoolean("hbase.master.loadbalance.bytable", false);
+
+ TableName tableName = TableName.valueOf("testBalanceCluster");
+ TableName indexTableName = TableName.valueOf("testBalanceCluster_index");
+ createUserAndIndexTable(tableName, indexTableName);
+ HTableDescriptor htd1 = new HTableDescriptor(TableName.valueOf("testBalanceCluster1"));
+ htd1.addFamily(new HColumnDescriptor("fam1"));
+ char c = 'A';
+ byte[][] split1 = new byte[12][];
+ for (int i = 0; i < 12; i++) {
+ byte[] b = { (byte) c };
+ split1[i] = b;
+ c++;
+ }
+ admin.setBalancerRunning(false, false);
+ admin.createTable(htd1, split1);
+ admin.disableTable(tableName);
+ admin.enableTable(tableName);
+ admin.setBalancerRunning(true, false);
+ admin.balancer();
+ boolean isRegionsColocated =
+ checkForColocation(master, tableName.getNameAsString(), indexTableName
+ .getNameAsString());
+ assertTrue("User regions and index regions should colocate.", isRegionsColocated);
+ }
+
+ @Test(timeout = 180000)
+ public void testBalanceByTable() throws Exception {
+ ZooKeeperWatcher zkw = UTIL.getZooKeeperWatcher(UTIL);
+ MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ master.getConfiguration().setBoolean("hbase.master.loadbalance.bytable", true);
+ TableName tableName = TableName.valueOf("testBalanceByTable");
+ TableName indexTableName = TableName.valueOf("testBalanceByTable_index");
+ createUserAndIndexTable(tableName, indexTableName);
+ HTableDescriptor htd1 = new HTableDescriptor(TableName.valueOf("testBalanceByTable1"));
+ htd1.addFamily(new HColumnDescriptor("fam1"));
+ char c = 'A';
+ byte[][] split1 = new byte[12][];
+ for (int i = 0; i < 12; i++) {
+ byte[] b = { (byte) c };
+ split1[i] = b;
+ c++;
+ }
+ admin.disableTable(tableName);
+ admin.enableTable(tableName);
+ admin.setBalancerRunning(true, false);
+ boolean isRegionColocated =
+ checkForColocation(master, tableName.getNameAsString(), indexTableName
+ .getNameAsString());
+ assertTrue("User regions and index regions should colocate.", isRegionColocated);
+ admin.balancer();
+ Thread.sleep(10000);
+ ZKAssign.blockUntilNoRIT(zkw);
+ while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+ Threads.sleep(1000);
+ }
+ isRegionColocated =
+ checkForColocation(master, tableName.getNameAsString(), indexTableName
+ .getNameAsString());
+ assertTrue("User regions and index regions should colocate.", isRegionColocated);
+ }
+
+ @Test(timeout = 180000)
+ public void testRoundRobinAssignmentAfterRegionServerDown() throws Exception {
+ ZooKeeperWatcher zkw = UTIL.getZooKeeperWatcher(UTIL);
+ MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ TableName tableName = TableName.valueOf("testRoundRobinAssignmentAfterRegionServerDown");
+ TableName indexTableName =
+ TableName.valueOf("testRoundRobinAssignmentAfterRegionServerDown_index");
+ createUserAndIndexTable(tableName, indexTableName);
+ HRegionServer regionServer = cluster.getRegionServer(1);
+ regionServer.abort("Aborting to test random assignment after region server down");
+ while (master.getServerManager().areDeadServersInProgress()) {
+ Thread.sleep(1000);
+ }
+ ZKAssign.blockUntilNoRIT(zkw);
+ while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+ Threads.sleep(1000);
+ }
+ boolean isRegionColocated =
+ checkForColocation(master, tableName.getNameAsString(), indexTableName
+ .getNameAsString());
+ assertTrue("User regions and index regions should colocate.", isRegionColocated);
+
+ }
+
+ @Test(timeout = 180000)
+ public void testRetainAssignmentDuringMasterStartUp() throws Exception {
+ ZooKeeperWatcher zkw = UTIL.getZooKeeperWatcher(UTIL);
+ MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ master.getConfiguration().setBoolean("hbase.master.startup.retainassign", true);
+ TableName tableName = TableName.valueOf("testRetainAssignmentDuringMasterStartUp");
+ TableName indexTableName =
+ TableName.valueOf("testRetainAssignmentDuringMasterStartUp_index");
+ createUserAndIndexTable(tableName, indexTableName);
+ UTIL.shutdownMiniHBaseCluster();
+ UTIL.startMiniHBaseCluster(1, 4);
+ cluster = UTIL.getHBaseCluster();
+ master = cluster.getMaster();
+ if (admin != null) {
+ admin.close();
+ admin = new HBaseAdmin(master.getConfiguration());
+ }
+ ZKAssign.blockUntilNoRIT(zkw);
+ while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+ Threads.sleep(1000);
+ }
+ boolean isRegionColocated =
+ checkForColocation(master, tableName.getNameAsString(), indexTableName
+ .getNameAsString());
+ assertTrue("User regions and index regions should colocate.", isRegionColocated);
+
+ }
+
+ @Test(timeout = 300000)
+ public void testRoundRobinAssignmentDuringMasterStartUp() throws Exception {
+ MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", false);
+
+ TableName tableName = TableName.valueOf("testRoundRobinAssignmentDuringMasterStartUp");
+ TableName indexTableName =
+ TableName.valueOf("testRoundRobinAssignmentDuringMasterStartUp_index");
+ createUserAndIndexTable(tableName, indexTableName);
+ UTIL.shutdownMiniHBaseCluster();
+ cluster.waitUntilShutDown();
+ UTIL.startMiniHBaseCluster(1, 4);
+ cluster = UTIL.getHBaseCluster();
+ if (admin != null) {
+ admin.close();
+ admin = new HBaseAdmin(cluster.getMaster().getConfiguration());
+ }
+ master = cluster.getMaster();
+ while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+ Threads.sleep(1000);
+ }
+ boolean isRegionColocated =
+ checkForColocation(master, tableName.getNameAsString(), indexTableName
+ .getNameAsString());
+ assertTrue("User regions and index regions should colocate.", isRegionColocated);
+ }
+
+ private void createUserAndIndexTable(TableName tableName, TableName indexTableName)
+ throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor("cf"));
+ char c = 'A';
+ byte[][] split = new byte[20][];
+ for (int i = 0; i < 20; i++) {
+ byte[] b = { (byte) c };
+ split[i] = b;
+ c++;
+ }
+ admin.createTable(htd, split);
+ HTableDescriptor iHtd = new HTableDescriptor(indexTableName);
+ iHtd.addFamily(new HColumnDescriptor("cf"));
+ iHtd.setValue(IndexLoadBalancer.PARENT_TABLE_KEY, tableName.toBytes());
+ admin.createTable(iHtd, split);
+ }
+
+ private List<Pair<byte[], ServerName>> getStartKeysAndLocations(HMaster master, String tableName)
+ throws IOException, InterruptedException {
+
+ List<Pair<HRegionInfo, ServerName>> tableRegionsAndLocations =
+ MetaReader.getTableRegionsAndLocations(master.getCatalogTracker(), TableName
+ .valueOf(tableName));
+ List<Pair<byte[], ServerName>> startKeyAndLocationPairs =
+ new ArrayList<Pair<byte[], ServerName>>(tableRegionsAndLocations.size());
+ Pair<byte[], ServerName> startKeyAndLocation = null;
+ for (Pair<HRegionInfo, ServerName> regionAndLocation : tableRegionsAndLocations) {
+ startKeyAndLocation =
+ new Pair<byte[], ServerName>(regionAndLocation.getFirst().getStartKey(),
+ regionAndLocation.getSecond());
+ startKeyAndLocationPairs.add(startKeyAndLocation);
+ }
+ return startKeyAndLocationPairs;
+
+ }
+
+ public boolean checkForColocation(HMaster master, String tableName, String indexTableName)
+ throws IOException, InterruptedException {
+ List<Pair<byte[], ServerName>> uTableStartKeysAndLocations =
+ getStartKeysAndLocations(master, tableName);
+ List<Pair<byte[], ServerName>> iTableStartKeysAndLocations =
+ getStartKeysAndLocations(master, indexTableName);
+
+ boolean regionsColocated = true;
+ if (uTableStartKeysAndLocations.size() != iTableStartKeysAndLocations.size()) {
+ regionsColocated = false;
+ } else {
+ for (int i = 0; i < uTableStartKeysAndLocations.size(); i++) {
+ Pair<byte[], ServerName> uStartKeyAndLocation = uTableStartKeysAndLocations.get(i);
+ Pair<byte[], ServerName> iStartKeyAndLocation = iTableStartKeysAndLocations.get(i);
+
+ if (Bytes.compareTo(uStartKeyAndLocation.getFirst(), iStartKeyAndLocation
+ .getFirst()) == 0) {
+ if (uStartKeyAndLocation.getSecond().equals(iStartKeyAndLocation.getSecond())) {
+ continue;
+ }
+ }
+ regionsColocated = false;
+ }
+ }
+ return regionsColocated;
+ }
+
+ public static class MockedMasterObserver extends BaseMasterObserver {
+ IndexLoadBalancer balancer = null;
+
+ @Override
+ public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> ctx)
+ throws IOException {
+ LoadBalancer loadBalancer =
+ ctx.getEnvironment().getMasterServices().getAssignmentManager().getBalancer();
+ if (loadBalancer instanceof IndexLoadBalancer) {
+ balancer = (IndexLoadBalancer) loadBalancer;
+ }
+ super.preMasterInitialization(ctx);
+ }
+
+ @Override
+ public void preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
+ TableName userTableName = null;
+ if (balancer != null && desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) != null) {
+ userTableName =
+ TableName.valueOf(desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY));
+ balancer.addTablesToColocate(userTableName, desc.getTableName());
+ }
+ if (userTableName != null) balancer.populateRegionLocations(userTableName);
+ super.preCreateTableHandler(ctx, desc, regions);
+ }
+
+ @Override
+ public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ TableName tableName) throws IOException {
+ if (balancer.isTableColocated(tableName)) {
+ balancer.removeTablesFromColocation(tableName);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/61104529/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
new file mode 100644
index 0000000..fc1963b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
@@ -0,0 +1,688 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.balancer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * <p>This class is an extension of the load balancer class.
+ * It allows to co-locate the regions of the user table and the regions of corresponding
+ * index table if any.</p>
+ *
+ * </>roundRobinAssignment, retainAssignment -> index regions will follow the actual table regions.
+ * randomAssignment, balancerCluster -> either index table or actual table region(s) will follow
+ * each other based on which ever comes first.</p>
+ *
+ * <p>In case of master failover there is a chance that the znodes of the index
+ * table and actual table are left behind. Then in that scenario we may get randomAssignment for
+ * either the actual table region first or the index table region first.</p>
+ *
+ * <p>In case of balancing by table any table can balance first.</p>
+ *
+ */
+
+public class IndexLoadBalancer implements LoadBalancer {
+
+ private static final Log LOG = LogFactory.getLog(IndexLoadBalancer.class);
+
+ public static final byte[] PARENT_TABLE_KEY = Bytes.toBytes("PARENT_TABLE");
+
+ public static final String INDEX_BALANCER_DELEGATOR = "hbase.index.balancer.delegator.class";
+
+ private LoadBalancer delegator;
+
+ private MasterServices master;
+
+ private Configuration conf;
+
+ private ClusterStatus clusterStatus;
+
+ private static final Random RANDOM = new Random(EnvironmentEdgeManager.currentTimeMillis());
+
+ Map<TableName, TableName> userTableVsIndexTable = new HashMap<TableName, TableName>();
+
+ Map<TableName, TableName> indexTableVsUserTable = new HashMap<TableName, TableName>();
+
+ /**
+ * Maintains colocation information of user regions and corresponding index regions.
+ */
+ private Map<TableName, Map<ImmutableBytesWritable, ServerName>> colocationInfo =
+ new ConcurrentHashMap<TableName, Map<ImmutableBytesWritable, ServerName>>();
+
+ private Set<TableName> balancedTables = new HashSet<TableName>();
+
+ private boolean stopped = false;
+
+ @Override
+ public void initialize() throws HBaseIOException {
+ Class<? extends LoadBalancer> delegatorKlass =
+ conf.getClass(INDEX_BALANCER_DELEGATOR, StochasticLoadBalancer.class,
+ LoadBalancer.class);
+ this.delegator = ReflectionUtils.newInstance(delegatorKlass, conf);
+ this.delegator.setClusterStatus(clusterStatus);
+ this.delegator.setMasterServices(this.master);
+ this.delegator.initialize();
+ try {
+ populateTablesToColocate(this.master.getTableDescriptors().getAll());
+ } catch (IOException e) {
+ throw new HBaseIOException(e);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration configuration) {
+ this.conf = configuration;
+ }
+
+ @Override
+ public void setClusterStatus(ClusterStatus st) {
+ this.clusterStatus = st;
+ }
+
+ public Map<TableName, Map<ImmutableBytesWritable, ServerName>> getColocationInfo() {
+ return colocationInfo;
+ }
+
+ @Override
+ public void setMasterServices(MasterServices masterServices) {
+ this.master = masterServices;
+ }
+
+ @Override
+ public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState)
+ throws HBaseIOException {
+ synchronized (this.colocationInfo) {
+ boolean balanceByTable = conf.getBoolean("hbase.master.loadbalance.bytable", false);
+ List<RegionPlan> regionPlans = null;
+
+ TableName tableName = null;
+ if (balanceByTable) {
+ Map<ImmutableBytesWritable, ServerName> tableKeys = null;
+ for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState
+ .entrySet()) {
+ ServerName sn = serverVsRegionList.getKey();
+ List<HRegionInfo> regionInfos = serverVsRegionList.getValue();
+ if (regionInfos.isEmpty()) {
+ continue;
+ }
+ if (!isTableColocated(regionInfos.get(0).getTable())) {
+ return this.delegator.balanceCluster(clusterState);
+ }
+ // Just get the table name from any one of the values in the regioninfo list
+ if (tableName == null) {
+ tableName = regionInfos.get(0).getTable();
+ tableKeys = this.colocationInfo.get(tableName);
+ }
+ // Check and modify the colocation info map based on values of cluster state
+ // because we
+ // will
+ // call balancer only when the cluster is in stable and reliable state.
+ if (tableKeys != null) {
+ for (HRegionInfo hri : regionInfos) {
+ updateServer(tableKeys, sn, hri);
+ }
+ }
+ }
+ // If user table is already balanced find the index table plans from the user table
+ // plans
+ // or vice verca.
+ TableName mappedTableName = getMappedTableToColocate(tableName);
+ if (balancedTables.contains(mappedTableName)) {
+ balancedTables.remove(mappedTableName);
+ regionPlans = new ArrayList<RegionPlan>();
+ return prepareRegionPlansForClusterState(clusterState, regionPlans);
+ } else {
+ balancedTables.add(tableName);
+ regionPlans = this.delegator.balanceCluster(clusterState);
+ if (regionPlans == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(tableName + " regions already balanced.");
+ }
+ return null;
+ } else {
+ updateRegionPlans(regionPlans);
+ return regionPlans;
+ }
+ }
+
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Seperating user tables and index tables regions of "
+ + "each region server in the cluster.");
+ }
+ Map<ServerName, List<HRegionInfo>> userClusterState =
+ new HashMap<ServerName, List<HRegionInfo>>();
+ Map<ServerName, List<HRegionInfo>> indexClusterState =
+ new HashMap<ServerName, List<HRegionInfo>>();
+ for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState
+ .entrySet()) {
+ ServerName sn = serverVsRegionList.getKey();
+ List<HRegionInfo> regionsInfos = serverVsRegionList.getValue();
+ List<HRegionInfo> idxRegionsToBeMoved = new ArrayList<HRegionInfo>();
+ List<HRegionInfo> userRegionsToBeMoved = new ArrayList<HRegionInfo>();
+ for (HRegionInfo hri : regionsInfos) {
+ if (hri.isMetaRegion()) {
+ continue;
+ }
+ tableName = hri.getTable();
+ // Check and modify the colocation info map based on values of cluster state
+ // because we
+ // will
+ // call balancer only when the cluster is in stable and reliable state.
+ if (isTableColocated(tableName)) {
+ // table name may change every time thats why always need to get table
+ // entries.
+ Map<ImmutableBytesWritable, ServerName> tableKeys =
+ this.colocationInfo.get(tableName);
+ if (tableKeys != null) {
+ updateServer(tableKeys, sn, hri);
+ }
+ }
+ if (indexTableVsUserTable.containsKey(tableName)) {
+ idxRegionsToBeMoved.add(hri);
+ continue;
+ }
+ userRegionsToBeMoved.add(hri);
+ }
+ // there may be dummy entries here if assignments by table is set
+ userClusterState.put(sn, userRegionsToBeMoved);
+ indexClusterState.put(sn, idxRegionsToBeMoved);
+ }
+
+ regionPlans = this.delegator.balanceCluster(userClusterState);
+ if (regionPlans == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("User region plan is null.");
+ }
+ regionPlans = new ArrayList<RegionPlan>();
+ } else {
+ updateRegionPlans(regionPlans);
+ }
+ return prepareRegionPlansForClusterState(indexClusterState, regionPlans);
+ }
+ }
+ }
+
+ private void updateServer(Map<ImmutableBytesWritable, ServerName> tableKeys, ServerName sn,
+ HRegionInfo hri) {
+ ImmutableBytesWritable startKey = new ImmutableBytesWritable(hri.getStartKey());
+ ServerName existingServer = tableKeys.get(startKey);
+ if (!sn.equals(existingServer)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("There is a mismatch in the existing server name for the region " + hri
+ + ". Replacing the server " + existingServer + " with " + sn + ".");
+ }
+ tableKeys.put(startKey, sn);
+ }
+ }
+
+ /**
+ * Prepare region plans for cluster state
+ * @param clusterState if balancing is table wise then cluster state contains only indexed or
+ * index table regions, otherwise it contains all index tables regions.
+ * @param regionPlans
+ * @return
+ */
+ private List<RegionPlan> prepareRegionPlansForClusterState(
+ Map<ServerName, List<HRegionInfo>> clusterState, List<RegionPlan> regionPlans) {
+ if (regionPlans == null) regionPlans = new ArrayList<RegionPlan>();
+ ImmutableBytesWritable startKey = new ImmutableBytesWritable();
+ for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState.entrySet()) {
+ List<HRegionInfo> regionInfos = serverVsRegionList.getValue();
+ ServerName server = serverVsRegionList.getKey();
+ for (HRegionInfo regionInfo : regionInfos) {
+ if (!isTableColocated(regionInfo.getTable())) continue;
+ TableName mappedTableName = getMappedTableToColocate(regionInfo.getTable());
+ startKey.set(regionInfo.getStartKey());
+ ServerName sn = this.colocationInfo.get(mappedTableName).get(startKey);
+ if (sn.equals(server)) {
+ continue;
+ } else {
+ RegionPlan rp = new RegionPlan(regionInfo, server, sn);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Selected server " + rp.getDestination()
+ + " as destination for region "
+ + regionInfo.getRegionNameAsString() + " from colocation info.");
+ }
+ regionOnline(regionInfo, rp.getDestination());
+ regionPlans.add(rp);
+ }
+ }
+ }
+ return regionPlans;
+ }
+
+ private void updateRegionPlans(List<RegionPlan> regionPlans) {
+ for (RegionPlan regionPlan : regionPlans) {
+ HRegionInfo hri = regionPlan.getRegionInfo();
+ if (!isTableColocated(hri.getTable())) continue;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Saving region plan of region " + hri.getRegionNameAsString() + '.');
+ }
+ regionOnline(hri, regionPlan.getDestination());
+ }
+ }
+
+ @Override
+ public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
+ List<ServerName> servers) throws HBaseIOException {
+ List<HRegionInfo> userRegions = new ArrayList<HRegionInfo>();
+ List<HRegionInfo> indexRegions = new ArrayList<HRegionInfo>();
+ for (HRegionInfo hri : regions) {
+ seperateUserAndIndexRegion(hri, userRegions, indexRegions);
+ }
+ Map<ServerName, List<HRegionInfo>> bulkPlan = null;
+ if (!userRegions.isEmpty()) {
+ bulkPlan = this.delegator.roundRobinAssignment(userRegions, servers);
+ // This should not happen.
+ if (null == bulkPlan) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No region plans selected for user regions in roundRobinAssignment.");
+ }
+ return null;
+ }
+ savePlan(bulkPlan);
+ }
+ bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers);
+ return bulkPlan;
+ }
+
+ private void seperateUserAndIndexRegion(HRegionInfo hri, List<HRegionInfo> userRegions,
+ List<HRegionInfo> indexRegions) {
+ if (indexTableVsUserTable.containsKey(hri.getTable())) {
+ indexRegions.add(hri);
+ return;
+ }
+ userRegions.add(hri);
+ }
+
+ private Map<ServerName, List<HRegionInfo>> prepareIndexRegionsPlan(
+ List<HRegionInfo> indexRegions, Map<ServerName, List<HRegionInfo>> bulkPlan,
+ List<ServerName> servers) throws HBaseIOException {
+ if (null != indexRegions && !indexRegions.isEmpty()) {
+ if (null == bulkPlan) {
+ bulkPlan = new ConcurrentHashMap<ServerName, List<HRegionInfo>>();
+ }
+ for (HRegionInfo hri : indexRegions) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Preparing region plan for index region "
+ + hri.getRegionNameAsString() + '.');
+ }
+ ServerName destServer = getDestServerForIdxRegion(hri);
+ List<HRegionInfo> destServerRegions = null;
+ if (destServer == null) destServer = this.randomAssignment(hri, servers);
+ if (destServer != null) {
+ destServerRegions = bulkPlan.get(destServer);
+ if (null == destServerRegions) {
+ destServerRegions = new ArrayList<HRegionInfo>();
+ bulkPlan.put(destServer, destServerRegions);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Server " + destServer + " selected for region "
+ + hri.getRegionNameAsString() + '.');
+ }
+ destServerRegions.add(hri);
+ regionOnline(hri, destServer);
+ }
+ }
+ }
+ return bulkPlan;
+ }
+
+ private ServerName getDestServerForIdxRegion(HRegionInfo hri) {
+ // Every time we calculate the table name because in case of master restart the index
+ // regions
+ // may be coming for different index tables.
+ TableName actualTable = getMappedTableToColocate(hri.getTable());
+ ImmutableBytesWritable startkey = new ImmutableBytesWritable(hri.getStartKey());
+ synchronized (this.colocationInfo) {
+
+ Map<ImmutableBytesWritable, ServerName> tableKeys = colocationInfo.get(actualTable);
+ if (null == tableKeys) {
+ // Can this case come
+ return null;
+ }
+ if (tableKeys.containsKey(startkey)) {
+ // put index region location if corresponding user region found in regionLocation
+ // map.
+ ServerName sn = tableKeys.get(startkey);
+ regionOnline(hri, sn);
+ return sn;
+ }
+ }
+ return null;
+ }
+
+ private void savePlan(Map<ServerName, List<HRegionInfo>> bulkPlan) {
+ synchronized (this.colocationInfo) {
+ for (Entry<ServerName, List<HRegionInfo>> e : bulkPlan.entrySet()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Saving user regions' plans for server " + e.getKey() + '.');
+ }
+ for (HRegionInfo hri : e.getValue()) {
+ if (!isTableColocated(hri.getTable())) continue;
+ regionOnline(hri, e.getKey());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Saved user regions' plans for server " + e.getKey() + '.');
+ }
+ }
+ }
+ }
+
+ @Override
+ public Map<ServerName, List<HRegionInfo>> retainAssignment(
+ Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
+ Map<HRegionInfo, ServerName> userRegionsMap =
+ new ConcurrentHashMap<HRegionInfo, ServerName>();
+ List<HRegionInfo> indexRegions = new ArrayList<HRegionInfo>();
+ for (Entry<HRegionInfo, ServerName> e : regions.entrySet()) {
+ seperateUserAndIndexRegion(e, userRegionsMap, indexRegions, servers);
+ }
+ Map<ServerName, List<HRegionInfo>> bulkPlan = null;
+ if (!userRegionsMap.isEmpty()) {
+ bulkPlan = this.delegator.retainAssignment(userRegionsMap, servers);
+ if (bulkPlan == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Empty region plan for user regions.");
+ }
+ return null;
+ }
+ savePlan(bulkPlan);
+ }
+ bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers);
+ return bulkPlan;
+ }
+
+ private void seperateUserAndIndexRegion(Entry<HRegionInfo, ServerName> e,
+ Map<HRegionInfo, ServerName> userRegionsMap, List<HRegionInfo> indexRegions,
+ List<ServerName> servers) {
+ HRegionInfo hri = e.getKey();
+ if (indexTableVsUserTable.containsKey(hri.getTable())) {
+ indexRegions.add(hri);
+ return;
+ }
+ if (e.getValue() == null) {
+ userRegionsMap.put(hri, servers.get(RANDOM.nextInt(servers.size())));
+ } else {
+ userRegionsMap.put(hri, e.getValue());
+ }
+ }
+
+ @Override
+ public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions,
+ List<ServerName> servers) throws HBaseIOException {
+ return this.delegator.immediateAssignment(regions, servers);
+ }
+
+ @Override
+ public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers)
+ throws HBaseIOException {
+ if (!isTableColocated(regionInfo.getTable())) {
+ return this.delegator.randomAssignment(regionInfo, servers);
+ }
+ ServerName sn = getServerNameFromMap(regionInfo, servers);
+ if (sn == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No server found for region " + regionInfo.getRegionNameAsString() + '.');
+ }
+ sn = getRandomServer(regionInfo, servers);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Destination server for region " + regionInfo.getRegionNameAsString()
+ + " is " + ((sn == null) ? "null" : sn.toString()) + '.');
+ }
+ return sn;
+ }
+
+ private ServerName getRandomServer(HRegionInfo regionInfo, List<ServerName> servers)
+ throws HBaseIOException {
+ ServerName sn = null;
+ sn = this.delegator.randomAssignment(regionInfo, servers);
+ if (sn == null) return null;
+ regionOnline(regionInfo, sn);
+ return sn;
+ }
+
+ private ServerName getServerNameFromMap(HRegionInfo regionInfo, List<ServerName> onlineServers) {
+ TableName tableName = regionInfo.getTable();
+ TableName mappedTable = getMappedTableToColocate(regionInfo.getTable());
+ ImmutableBytesWritable startKey = new ImmutableBytesWritable(regionInfo.getStartKey());
+ synchronized (this.colocationInfo) {
+ Map<ImmutableBytesWritable, ServerName> correspondingTableKeys =
+ this.colocationInfo.get(mappedTable);
+ Map<ImmutableBytesWritable, ServerName> actualTableKeys =
+ this.colocationInfo.get(tableName);
+
+ if (null != correspondingTableKeys) {
+ if (correspondingTableKeys.containsKey(startKey)) {
+ ServerName previousServer = null;
+ if (null != actualTableKeys) {
+ previousServer = actualTableKeys.get(startKey);
+ }
+ ServerName sn = correspondingTableKeys.get(startKey);
+ if (null != previousServer) {
+ // if servername of index region and user region are same in colocationInfo
+ // clean
+ // previous plans and return null
+ if (previousServer.equals(sn)) {
+ correspondingTableKeys.remove(startKey);
+ actualTableKeys.remove(startKey);
+ if (LOG.isDebugEnabled()) {
+ LOG
+ .debug("Both user region plan and corresponding index region plan "
+ + "in colocation info are same. Hence clearing the plans to select new plan"
+ + " for the region "
+ + regionInfo.getRegionNameAsString() + ".");
+ }
+ return null;
+ }
+ }
+ if (sn != null && onlineServers.contains(sn)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updating the region plan of the region "
+ + regionInfo.getRegionNameAsString() + " with server " + sn);
+ }
+ regionOnline(regionInfo, sn);
+ return sn;
+ } else if (sn != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The location " + sn + " of region with start key"
+ + Bytes.toStringBinary(regionInfo.getStartKey())
+ + " is not in online. Selecting other region server.");
+ }
+ return null;
+ }
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No region plans in colocationInfo for table " + mappedTable);
+ }
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
+ TableName tableName = regionInfo.getTable();
+ synchronized (this.colocationInfo) {
+ Map<ImmutableBytesWritable, ServerName> tabkeKeys = this.colocationInfo.get(tableName);
+ if (tabkeKeys == null) {
+ tabkeKeys = new ConcurrentHashMap<ImmutableBytesWritable, ServerName>();
+ this.colocationInfo.put(tableName, tabkeKeys);
+ }
+ tabkeKeys.put(new ImmutableBytesWritable(regionInfo.getStartKey()), sn);
+ }
+ }
+
+ public void clearTableRegionPlans(TableName tableName) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Clearing regions plans from colocationInfo for table " + tableName);
+ }
+ synchronized (this.colocationInfo) {
+ this.colocationInfo.remove(tableName);
+ }
+ }
+
+ @Override
+ public void regionOffline(HRegionInfo regionInfo) {
+ TableName tableName = regionInfo.getTable();
+ synchronized (this.colocationInfo) {
+ Map<ImmutableBytesWritable, ServerName> tableKeys = this.colocationInfo.get(tableName);
+ if (null == tableKeys) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No regions of table " + tableName + " in the colocationInfo.");
+ }
+ } else {
+ tableKeys.remove(new ImmutableBytesWritable(regionInfo.getStartKey()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The regioninfo " + regionInfo + " removed from the colocationInfo");
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ @Override
+ public void stop(String why) {
+ LOG.info("Load Balancer stop requested: " + why);
+ stopped = true;
+ }
+
+ public void populateTablesToColocate(Map<String, HTableDescriptor> tableDescriptors) {
+ HTableDescriptor desc = null;
+ for (Entry<String, HTableDescriptor> entry : tableDescriptors.entrySet()) {
+ desc = entry.getValue();
+ if (desc.getValue(PARENT_TABLE_KEY) != null) {
+ addTablesToColocate(TableName.valueOf(desc.getValue(PARENT_TABLE_KEY)), desc
+ .getTableName());
+ }
+ }
+ }
+
+ /**
+ * Add tables whose regions to co-locate.
+ * @param userTable
+ * @param indexTable
+ */
+ public void addTablesToColocate(TableName userTable, TableName indexTable) {
+ if (userTable.equals(indexTable)) {
+ throw new IllegalArgumentException("Tables to colocate should not be same.");
+ } else if (isTableColocated(userTable)) {
+ throw new IllegalArgumentException("User table already colocated with table "
+ + getMappedTableToColocate(userTable));
+ } else if (isTableColocated(indexTable)) {
+ throw new IllegalArgumentException("Index table is already colocated with table "
+ + getMappedTableToColocate(indexTable));
+ }
+ userTableVsIndexTable.put(userTable, indexTable);
+ indexTableVsUserTable.put(indexTable, userTable);
+ }
+
+ /**
+ * Removes the specified table and corresponding table from co-location.
+ * @param table
+ */
+ public void removeTablesFromColocation(TableName table) {
+ TableName other = userTableVsIndexTable.remove(table);
+ if (other != null) {
+ indexTableVsUserTable.remove(other);
+ } else {
+ other = indexTableVsUserTable.remove(table);
+ if (other != null) userTableVsIndexTable.remove(table);
+ }
+ }
+
+ /**
+ * Return mapped table to co-locate.
+ * @param tableName
+ * @return index table if the specified table is user table or vice versa.
+ */
+ public TableName getMappedTableToColocate(TableName tableName) {
+ TableName other = userTableVsIndexTable.get(tableName);
+ return other == null ? indexTableVsUserTable.get(tableName) : other;
+ }
+
+ public boolean isTableColocated(TableName table) {
+ return userTableVsIndexTable.containsKey(table) || indexTableVsUserTable.containsKey(table);
+ }
+
+ /**
+ * Populates table's region locations into co-location info from master.
+ * @param table
+ */
+ public void populateRegionLocations(TableName table) {
+ synchronized (this.colocationInfo) {
+ if (!isTableColocated(table)) {
+ throw new IllegalArgumentException("Specified table " + table
+ + " should be in one of the tables to co-locate.");
+ }
+ RegionStates regionStates = this.master.getAssignmentManager().getRegionStates();
+ List<HRegionInfo> onlineRegions = regionStates.getRegionsOfTable(table);
+ for (HRegionInfo hri : onlineRegions) {
+ regionOnline(hri, regionStates.getRegionServerOfRegion(hri));
+ }
+ Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
+ for (RegionState regionState : regionsInTransition.values()) {
+ if (table.equals(regionState.getRegion().getTable())
+ && regionState.getServerName() != null) {
+ regionOnline(regionState.getRegion(), regionState.getServerName());
+ }
+ }
+ }
+ }
+}