You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2016/05/24 15:29:04 UTC
[2/5] phoenix git commit: PHOENIX-1734 Local index
improvements(Rajeshbabu)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
index 146028e..3353f9e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
@@ -17,677 +17,8 @@
*/
package org.apache.phoenix.hbase.index.balancer;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.master.LoadBalancer;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.RegionPlan;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * <p>This class is an extension of the load balancer class.
- * It allows to co-locate the regions of the user table and the regions of corresponding
- * index table if any.</p>
- *
- * </>roundRobinAssignment, retainAssignment -> index regions will follow the actual table regions.
- * randomAssignment, balancerCluster -> either index table or actual table region(s) will follow
- * each other based on which ever comes first.</p>
- *
- * <p>In case of master failover there is a chance that the znodes of the index
- * table and actual table are left behind. Then in that scenario we may get randomAssignment for
- * either the actual table region first or the index table region first.</p>
- *
- * <p>In case of balancing by table any table can balance first.</p>
- *
- */
-
-public class IndexLoadBalancer implements LoadBalancer {
-
- private static final Log LOG = LogFactory.getLog(IndexLoadBalancer.class);
-
- public static final byte[] PARENT_TABLE_KEY = Bytes.toBytes("PARENT_TABLE");
-
- public static final String INDEX_BALANCER_DELEGATOR = "hbase.index.balancer.delegator.class";
-
- private LoadBalancer delegator;
-
- private MasterServices master;
-
- private Configuration conf;
-
- private ClusterStatus clusterStatus;
-
- private static final Random RANDOM = new Random(EnvironmentEdgeManager.currentTimeMillis());
-
- Map<TableName, TableName> userTableVsIndexTable = new HashMap<TableName, TableName>();
-
- Map<TableName, TableName> indexTableVsUserTable = new HashMap<TableName, TableName>();
-
- /**
- * Maintains colocation information of user regions and corresponding index regions.
- */
- private Map<TableName, Map<ImmutableBytesWritable, ServerName>> colocationInfo =
- new ConcurrentHashMap<TableName, Map<ImmutableBytesWritable, ServerName>>();
-
- private Set<TableName> balancedTables = new HashSet<TableName>();
-
- private boolean stopped = false;
-
- @Override
- public void initialize() throws HBaseIOException {
- Class<? extends LoadBalancer> delegatorKlass =
- conf.getClass(INDEX_BALANCER_DELEGATOR, StochasticLoadBalancer.class,
- LoadBalancer.class);
- this.delegator = ReflectionUtils.newInstance(delegatorKlass, conf);
- this.delegator.setClusterStatus(clusterStatus);
- this.delegator.setMasterServices(this.master);
- this.delegator.initialize();
- try {
- populateTablesToColocate(this.master.getTableDescriptors().getAll());
- } catch (IOException e) {
- throw new HBaseIOException(e);
- }
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration configuration) {
- this.conf = configuration;
- }
-
- @Override
- public void onConfigurationChange(Configuration conf) {
- setConf(conf);
- }
-
- @Override
- public void setClusterStatus(ClusterStatus st) {
- this.clusterStatus = st;
- }
-
- public Map<TableName, Map<ImmutableBytesWritable, ServerName>> getColocationInfo() {
- return colocationInfo;
- }
-
- @Override
- public void setMasterServices(MasterServices masterServices) {
- this.master = masterServices;
- }
-
- @Override
- public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState)
- throws HBaseIOException {
- synchronized (this.colocationInfo) {
- boolean balanceByTable = conf.getBoolean("hbase.master.loadbalance.bytable", false);
- List<RegionPlan> regionPlans = null;
-
- TableName tableName = null;
- if (balanceByTable) {
- Map<ImmutableBytesWritable, ServerName> tableKeys = null;
- for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState
- .entrySet()) {
- ServerName sn = serverVsRegionList.getKey();
- List<HRegionInfo> regionInfos = serverVsRegionList.getValue();
- if (regionInfos.isEmpty()) {
- continue;
- }
- if (!isTableColocated(regionInfos.get(0).getTable())) {
- return this.delegator.balanceCluster(clusterState);
- }
- // Just get the table name from any one of the values in the regioninfo list
- if (tableName == null) {
- tableName = regionInfos.get(0).getTable();
- tableKeys = this.colocationInfo.get(tableName);
- }
- // Check and modify the colocation info map based on values of cluster state
- // because we
- // will
- // call balancer only when the cluster is in stable and reliable state.
- if (tableKeys != null) {
- for (HRegionInfo hri : regionInfos) {
- updateServer(tableKeys, sn, hri);
- }
- }
- }
- // If user table is already balanced find the index table plans from the user table
- // plans
- // or vice verca.
- TableName mappedTableName = getMappedTableToColocate(tableName);
- if (balancedTables.contains(mappedTableName)) {
- balancedTables.remove(mappedTableName);
- regionPlans = new ArrayList<RegionPlan>();
- return prepareRegionPlansForClusterState(clusterState, regionPlans);
- } else {
- balancedTables.add(tableName);
- regionPlans = this.delegator.balanceCluster(clusterState);
- if (regionPlans == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(tableName + " regions already balanced.");
- }
- return null;
- } else {
- updateRegionPlans(regionPlans);
- return regionPlans;
- }
- }
-
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Seperating user tables and index tables regions of "
- + "each region server in the cluster.");
- }
- Map<ServerName, List<HRegionInfo>> userClusterState =
- new HashMap<ServerName, List<HRegionInfo>>();
- Map<ServerName, List<HRegionInfo>> indexClusterState =
- new HashMap<ServerName, List<HRegionInfo>>();
- for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState
- .entrySet()) {
- ServerName sn = serverVsRegionList.getKey();
- List<HRegionInfo> regionsInfos = serverVsRegionList.getValue();
- List<HRegionInfo> idxRegionsToBeMoved = new ArrayList<HRegionInfo>();
- List<HRegionInfo> userRegionsToBeMoved = new ArrayList<HRegionInfo>();
- for (HRegionInfo hri : regionsInfos) {
- if (hri.isMetaRegion()) {
- continue;
- }
- tableName = hri.getTable();
- // Check and modify the colocation info map based on values of cluster state
- // because we
- // will
- // call balancer only when the cluster is in stable and reliable state.
- if (isTableColocated(tableName)) {
- // table name may change every time thats why always need to get table
- // entries.
- Map<ImmutableBytesWritable, ServerName> tableKeys =
- this.colocationInfo.get(tableName);
- if (tableKeys != null) {
- updateServer(tableKeys, sn, hri);
- }
- }
- if (indexTableVsUserTable.containsKey(tableName)) {
- idxRegionsToBeMoved.add(hri);
- continue;
- }
- userRegionsToBeMoved.add(hri);
- }
- // there may be dummy entries here if assignments by table is set
- userClusterState.put(sn, userRegionsToBeMoved);
- indexClusterState.put(sn, idxRegionsToBeMoved);
- }
-
- regionPlans = this.delegator.balanceCluster(userClusterState);
- if (regionPlans == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("User region plan is null.");
- }
- regionPlans = new ArrayList<RegionPlan>();
- } else {
- updateRegionPlans(regionPlans);
- }
- return prepareRegionPlansForClusterState(indexClusterState, regionPlans);
- }
- }
- }
-
- private void updateServer(Map<ImmutableBytesWritable, ServerName> tableKeys, ServerName sn,
- HRegionInfo hri) {
- ImmutableBytesWritable startKey = new ImmutableBytesWritable(hri.getStartKey());
- ServerName existingServer = tableKeys.get(startKey);
- if (!sn.equals(existingServer)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("There is a mismatch in the existing server name for the region " + hri
- + ". Replacing the server " + existingServer + " with " + sn + ".");
- }
- tableKeys.put(startKey, sn);
- }
- }
-
- /**
- * Prepare region plans for cluster state
- * @param clusterState if balancing is table wise then cluster state contains only indexed or
- * index table regions, otherwise it contains all index tables regions.
- * @param regionPlans
- * @return
- */
- private List<RegionPlan> prepareRegionPlansForClusterState(
- Map<ServerName, List<HRegionInfo>> clusterState, List<RegionPlan> regionPlans) {
- if (regionPlans == null) regionPlans = new ArrayList<RegionPlan>();
- ImmutableBytesWritable startKey = new ImmutableBytesWritable();
- for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState.entrySet()) {
- List<HRegionInfo> regionInfos = serverVsRegionList.getValue();
- ServerName server = serverVsRegionList.getKey();
- for (HRegionInfo regionInfo : regionInfos) {
- if (!isTableColocated(regionInfo.getTable())) continue;
- TableName mappedTableName = getMappedTableToColocate(regionInfo.getTable());
- startKey.set(regionInfo.getStartKey());
- ServerName sn = this.colocationInfo.get(mappedTableName).get(startKey);
- if (sn.equals(server)) {
- continue;
- } else {
- RegionPlan rp = new RegionPlan(regionInfo, server, sn);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Selected server " + rp.getDestination()
- + " as destination for region "
- + regionInfo.getRegionNameAsString() + " from colocation info.");
- }
- regionOnline(regionInfo, rp.getDestination());
- regionPlans.add(rp);
- }
- }
- }
- return regionPlans;
- }
-
- private void updateRegionPlans(List<RegionPlan> regionPlans) {
- for (RegionPlan regionPlan : regionPlans) {
- HRegionInfo hri = regionPlan.getRegionInfo();
- if (!isTableColocated(hri.getTable())) continue;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Saving region plan of region " + hri.getRegionNameAsString() + '.');
- }
- regionOnline(hri, regionPlan.getDestination());
- }
- }
-
- @Override
- public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
- List<ServerName> servers) throws HBaseIOException {
- List<HRegionInfo> userRegions = new ArrayList<HRegionInfo>();
- List<HRegionInfo> indexRegions = new ArrayList<HRegionInfo>();
- for (HRegionInfo hri : regions) {
- seperateUserAndIndexRegion(hri, userRegions, indexRegions);
- }
- Map<ServerName, List<HRegionInfo>> bulkPlan = null;
- if (!userRegions.isEmpty()) {
- bulkPlan = this.delegator.roundRobinAssignment(userRegions, servers);
- // This should not happen.
- if (null == bulkPlan) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No region plans selected for user regions in roundRobinAssignment.");
- }
- return null;
- }
- savePlan(bulkPlan);
- }
- bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers);
- return bulkPlan;
- }
-
- private void seperateUserAndIndexRegion(HRegionInfo hri, List<HRegionInfo> userRegions,
- List<HRegionInfo> indexRegions) {
- if (indexTableVsUserTable.containsKey(hri.getTable())) {
- indexRegions.add(hri);
- return;
- }
- userRegions.add(hri);
- }
-
- private Map<ServerName, List<HRegionInfo>> prepareIndexRegionsPlan(
- List<HRegionInfo> indexRegions, Map<ServerName, List<HRegionInfo>> bulkPlan,
- List<ServerName> servers) throws HBaseIOException {
- if (null != indexRegions && !indexRegions.isEmpty()) {
- if (null == bulkPlan) {
- bulkPlan = new ConcurrentHashMap<ServerName, List<HRegionInfo>>();
- }
- for (HRegionInfo hri : indexRegions) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Preparing region plan for index region "
- + hri.getRegionNameAsString() + '.');
- }
- ServerName destServer = getDestServerForIdxRegion(hri);
- List<HRegionInfo> destServerRegions = null;
- if (destServer == null) destServer = this.randomAssignment(hri, servers);
- if (destServer != null) {
- destServerRegions = bulkPlan.get(destServer);
- if (null == destServerRegions) {
- destServerRegions = new ArrayList<HRegionInfo>();
- bulkPlan.put(destServer, destServerRegions);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Server " + destServer + " selected for region "
- + hri.getRegionNameAsString() + '.');
- }
- destServerRegions.add(hri);
- regionOnline(hri, destServer);
- }
- }
- }
- return bulkPlan;
- }
-
- private ServerName getDestServerForIdxRegion(HRegionInfo hri) {
- // Every time we calculate the table name because in case of master restart the index
- // regions
- // may be coming for different index tables.
- TableName actualTable = getMappedTableToColocate(hri.getTable());
- ImmutableBytesWritable startkey = new ImmutableBytesWritable(hri.getStartKey());
- synchronized (this.colocationInfo) {
-
- Map<ImmutableBytesWritable, ServerName> tableKeys = colocationInfo.get(actualTable);
- if (null == tableKeys) {
- // Can this case come
- return null;
- }
- if (tableKeys.containsKey(startkey)) {
- // put index region location if corresponding user region found in regionLocation
- // map.
- ServerName sn = tableKeys.get(startkey);
- regionOnline(hri, sn);
- return sn;
- }
- }
- return null;
- }
-
- private void savePlan(Map<ServerName, List<HRegionInfo>> bulkPlan) {
- synchronized (this.colocationInfo) {
- for (Entry<ServerName, List<HRegionInfo>> e : bulkPlan.entrySet()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Saving user regions' plans for server " + e.getKey() + '.');
- }
- for (HRegionInfo hri : e.getValue()) {
- if (!isTableColocated(hri.getTable())) continue;
- regionOnline(hri, e.getKey());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Saved user regions' plans for server " + e.getKey() + '.');
- }
- }
- }
- }
-
- @Override
- public Map<ServerName, List<HRegionInfo>> retainAssignment(
- Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
- Map<HRegionInfo, ServerName> userRegionsMap =
- new ConcurrentHashMap<HRegionInfo, ServerName>();
- List<HRegionInfo> indexRegions = new ArrayList<HRegionInfo>();
- for (Entry<HRegionInfo, ServerName> e : regions.entrySet()) {
- seperateUserAndIndexRegion(e, userRegionsMap, indexRegions, servers);
- }
- Map<ServerName, List<HRegionInfo>> bulkPlan = null;
- if (!userRegionsMap.isEmpty()) {
- bulkPlan = this.delegator.retainAssignment(userRegionsMap, servers);
- if (bulkPlan == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Empty region plan for user regions.");
- }
- return null;
- }
- savePlan(bulkPlan);
- }
- bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers);
- return bulkPlan;
- }
-
- private void seperateUserAndIndexRegion(Entry<HRegionInfo, ServerName> e,
- Map<HRegionInfo, ServerName> userRegionsMap, List<HRegionInfo> indexRegions,
- List<ServerName> servers) {
- HRegionInfo hri = e.getKey();
- if (indexTableVsUserTable.containsKey(hri.getTable())) {
- indexRegions.add(hri);
- return;
- }
- if (e.getValue() == null) {
- userRegionsMap.put(hri, servers.get(RANDOM.nextInt(servers.size())));
- } else {
- userRegionsMap.put(hri, e.getValue());
- }
- }
-
- @Override
- public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions,
- List<ServerName> servers) throws HBaseIOException {
- return this.delegator.immediateAssignment(regions, servers);
- }
-
- @Override
- public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers)
- throws HBaseIOException {
- if (!isTableColocated(regionInfo.getTable())) {
- return this.delegator.randomAssignment(regionInfo, servers);
- }
- ServerName sn = getServerNameFromMap(regionInfo, servers);
- if (sn == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No server found for region " + regionInfo.getRegionNameAsString() + '.');
- }
- sn = getRandomServer(regionInfo, servers);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Destination server for region " + regionInfo.getRegionNameAsString()
- + " is " + ((sn == null) ? "null" : sn.toString()) + '.');
- }
- return sn;
- }
-
- private ServerName getRandomServer(HRegionInfo regionInfo, List<ServerName> servers)
- throws HBaseIOException {
- ServerName sn = null;
- sn = this.delegator.randomAssignment(regionInfo, servers);
- if (sn == null) return null;
- regionOnline(regionInfo, sn);
- return sn;
- }
-
- private ServerName getServerNameFromMap(HRegionInfo regionInfo, List<ServerName> onlineServers) {
- TableName tableName = regionInfo.getTable();
- TableName mappedTable = getMappedTableToColocate(regionInfo.getTable());
- ImmutableBytesWritable startKey = new ImmutableBytesWritable(regionInfo.getStartKey());
- synchronized (this.colocationInfo) {
- Map<ImmutableBytesWritable, ServerName> correspondingTableKeys =
- this.colocationInfo.get(mappedTable);
- Map<ImmutableBytesWritable, ServerName> actualTableKeys =
- this.colocationInfo.get(tableName);
-
- if (null != correspondingTableKeys) {
- if (correspondingTableKeys.containsKey(startKey)) {
- ServerName previousServer = null;
- if (null != actualTableKeys) {
- previousServer = actualTableKeys.get(startKey);
- }
- ServerName sn = correspondingTableKeys.get(startKey);
- if (null != previousServer) {
- // if servername of index region and user region are same in colocationInfo
- // clean
- // previous plans and return null
- if (previousServer.equals(sn)) {
- correspondingTableKeys.remove(startKey);
- actualTableKeys.remove(startKey);
- if (LOG.isDebugEnabled()) {
- LOG
- .debug("Both user region plan and corresponding index region plan "
- + "in colocation info are same. Hence clearing the plans to select new plan"
- + " for the region "
- + regionInfo.getRegionNameAsString() + ".");
- }
- return null;
- }
- }
- if (sn != null && onlineServers.contains(sn)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Updating the region plan of the region "
- + regionInfo.getRegionNameAsString() + " with server " + sn);
- }
- regionOnline(regionInfo, sn);
- return sn;
- } else if (sn != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The location " + sn + " of region with start key"
- + Bytes.toStringBinary(regionInfo.getStartKey())
- + " is not in online. Selecting other region server.");
- }
- return null;
- }
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No region plans in colocationInfo for table " + mappedTable);
- }
- }
- return null;
- }
- }
-
- @Override
- public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
- TableName tableName = regionInfo.getTable();
- synchronized (this.colocationInfo) {
- Map<ImmutableBytesWritable, ServerName> tabkeKeys = this.colocationInfo.get(tableName);
- if (tabkeKeys == null) {
- tabkeKeys = new ConcurrentHashMap<ImmutableBytesWritable, ServerName>();
- this.colocationInfo.put(tableName, tabkeKeys);
- }
- tabkeKeys.put(new ImmutableBytesWritable(regionInfo.getStartKey()), sn);
- }
- }
-
- public void clearTableRegionPlans(TableName tableName) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Clearing regions plans from colocationInfo for table " + tableName);
- }
- synchronized (this.colocationInfo) {
- this.colocationInfo.remove(tableName);
- }
- }
-
- @Override
- public void regionOffline(HRegionInfo regionInfo) {
- TableName tableName = regionInfo.getTable();
- synchronized (this.colocationInfo) {
- Map<ImmutableBytesWritable, ServerName> tableKeys = this.colocationInfo.get(tableName);
- if (null == tableKeys) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No regions of table " + tableName + " in the colocationInfo.");
- }
- } else {
- tableKeys.remove(new ImmutableBytesWritable(regionInfo.getStartKey()));
- if (LOG.isDebugEnabled()) {
- LOG.debug("The regioninfo " + regionInfo + " removed from the colocationInfo");
- }
- }
- }
- }
-
- @Override
- public boolean isStopped() {
- return stopped;
- }
-
- @Override
- public void stop(String why) {
- LOG.info("Load Balancer stop requested: " + why);
- stopped = true;
- }
-
- public void populateTablesToColocate(Map<String, HTableDescriptor> tableDescriptors) {
- HTableDescriptor desc = null;
- for (Entry<String, HTableDescriptor> entry : tableDescriptors.entrySet()) {
- desc = entry.getValue();
- if (desc.getValue(PARENT_TABLE_KEY) != null) {
- addTablesToColocate(TableName.valueOf(desc.getValue(PARENT_TABLE_KEY)), desc
- .getTableName());
- }
- }
- }
-
- /**
- * Add tables whose regions to co-locate.
- * @param userTable
- * @param indexTable
- */
- public void addTablesToColocate(TableName userTable, TableName indexTable) {
- if (userTable.equals(indexTable)) {
- throw new IllegalArgumentException("Tables to colocate should not be same.");
- } else if (isTableColocated(userTable)) {
- throw new IllegalArgumentException("User table already colocated with table "
- + getMappedTableToColocate(userTable));
- } else if (isTableColocated(indexTable)) {
- throw new IllegalArgumentException("Index table is already colocated with table "
- + getMappedTableToColocate(indexTable));
- }
- userTableVsIndexTable.put(userTable, indexTable);
- indexTableVsUserTable.put(indexTable, userTable);
- }
-
- /**
- * Removes the specified table and corresponding table from co-location.
- * @param table
- */
- public void removeTablesFromColocation(TableName table) {
- TableName other = userTableVsIndexTable.remove(table);
- if (other != null) {
- indexTableVsUserTable.remove(other);
- } else {
- other = indexTableVsUserTable.remove(table);
- if (other != null) userTableVsIndexTable.remove(other);
- }
- }
-
- /**
- * Return mapped table to co-locate.
- * @param tableName
- * @return index table if the specified table is user table or vice versa.
- */
- public TableName getMappedTableToColocate(TableName tableName) {
- TableName other = userTableVsIndexTable.get(tableName);
- return other == null ? indexTableVsUserTable.get(tableName) : other;
- }
- public boolean isTableColocated(TableName table) {
- return userTableVsIndexTable.containsKey(table) || indexTableVsUserTable.containsKey(table);
- }
+public class IndexLoadBalancer extends StochasticLoadBalancer {
- /**
- * Populates table's region locations into co-location info from master.
- * @param table
- */
- public void populateRegionLocations(TableName table) {
- synchronized (this.colocationInfo) {
- if (!isTableColocated(table)) {
- throw new IllegalArgumentException("Specified table " + table
- + " should be in one of the tables to co-locate.");
- }
- RegionStates regionStates = this.master.getAssignmentManager().getRegionStates();
- List<HRegionInfo> onlineRegions = regionStates.getRegionsOfTable(table);
- for (HRegionInfo hri : onlineRegions) {
- regionOnline(hri, regionStates.getRegionServerOfRegion(hri));
- }
- Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
- for (RegionState regionState : regionsInTransition.values()) {
- if (table.equals(regionState.getRegion().getTable())
- && regionState.getServerName() != null) {
- regionOnline(regionState.getRegion(), regionState.getServerName());
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
index a014da2..2f83f8d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
@@ -17,98 +17,12 @@
*/
package org.apache.phoenix.hbase.index.master;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
-import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.LoadBalancer;
-import org.apache.hadoop.hbase.master.RegionPlan;
-import org.apache.hadoop.hbase.master.RegionStates;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer;
-import org.apache.phoenix.util.MetaDataUtil;
/**
* Defines of coprocessor hooks(to support secondary indexing) of operations on
* {@link org.apache.hadoop.hbase.master.HMaster} process.
*/
public class IndexMasterObserver extends BaseMasterObserver {
- IndexLoadBalancer balancer = null;
-
- @Override
- public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> ctx)
- throws IOException {
- LoadBalancer loadBalancer =
- ctx.getEnvironment().getMasterServices().getAssignmentManager().getBalancer();
- if (loadBalancer instanceof IndexLoadBalancer) {
- balancer = (IndexLoadBalancer) loadBalancer;
- }
- super.preMasterInitialization(ctx);
- }
-
- @Override
- public void preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
- HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
- TableName userTableName = null;
- if (balancer != null && desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) != null) {
- userTableName =
- TableName.valueOf(desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY));
- balancer.addTablesToColocate(userTableName, desc.getTableName());
- }
- if (userTableName != null) balancer.populateRegionLocations(userTableName);
- super.preCreateTableHandler(ctx, desc, regions);
- }
-
- @Override
- public void preModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
- TableName tableName, HTableDescriptor htd) throws IOException {
- HTableDescriptor oldDesc =
- ctx.getEnvironment().getMasterServices().getTableDescriptors().get(tableName);
- if (oldDesc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) == null
- && htd.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) != null) {
- TableName userTableName =
- TableName.valueOf(htd.getValue(IndexLoadBalancer.PARENT_TABLE_KEY));
- balancer.addTablesToColocate(userTableName, htd.getTableName());
- }
- super.preModifyTableHandler(ctx, tableName, htd);
- }
-
- @Override
- public void postMove(ObserverContext<MasterCoprocessorEnvironment> ctx, HRegionInfo region,
- ServerName srcServer, ServerName destServer) throws IOException {
- if (balancer != null && balancer.isTableColocated(region.getTable())) {
- AssignmentManager am = ctx.getEnvironment().getMasterServices().getAssignmentManager();
- RegionStates regionStates = am.getRegionStates();
- String tableName = region.getTable().getNameAsString();
- String correspondingTable = MetaDataUtil.isLocalIndex(region.getTable().getNameAsString())
- ? MetaDataUtil.getUserTableName(tableName)
- : Bytes.toString(MetaDataUtil.getLocalIndexPhysicalName(tableName.getBytes()));
- List<HRegionInfo> regions =
- regionStates.getRegionsOfTable(TableName.valueOf(correspondingTable));
- for (HRegionInfo hri : regions) {
- if (Bytes.compareTo(region.getStartKey(), hri.getStartKey()) == 0
- && destServer != null) {
- balancer.regionOnline(hri, destServer);
- am.addPlan(hri.getEncodedName(), new RegionPlan(hri, null, destServer));
- am.unassign(hri);
- }
- }
- }
- super.postMove(ctx, region, srcServer, destServer);
- }
- @Override
- public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
- TableName tableName) throws IOException {
- if (balancer != null && balancer.isTableColocated(tableName)) {
- balancer.removeTablesFromColocation(tableName);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
index d7fef5e..5e3f3ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
@@ -32,6 +32,6 @@ public interface IndexCommitter extends Stoppable {
void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name);
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates)
throws IndexWriteException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index 30797b2..cbcec3b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -128,10 +128,11 @@ public class IndexWriter implements Stoppable {
* @param indexUpdates Updates to write
* @throws IOException
*/
- public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException {
+ public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates,
+ boolean allowLocalUpdates) throws IOException {
// convert the strings to htableinterfaces to which we can talk and group by TABLE
Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates);
- writeAndKillYourselfOnFailure(toWrite);
+ writeAndKillYourselfOnFailure(toWrite, allowLocalUpdates);
}
/**
@@ -139,9 +140,10 @@ public class IndexWriter implements Stoppable {
* @param toWrite
* @throws IOException
*/
- public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite) throws IOException {
+ public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite,
+ boolean allowLocalUpdates) throws IOException {
try {
- write(toWrite);
+ write(toWrite, allowLocalUpdates);
if (LOG.isTraceEnabled()) {
LOG.trace("Done writing all index updates!\n\t" + toWrite);
}
@@ -165,21 +167,24 @@ public class IndexWriter implements Stoppable {
* @throws IndexWriteException if we cannot successfully write to the index. Whether or not we
* stop early depends on the {@link IndexCommitter}.
*/
- public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException {
- write(resolveTableReferences(toWrite));
- }
+ public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException {
+ write(resolveTableReferences(toWrite), false);
+ }
+
+ public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IndexWriteException {
+ write(resolveTableReferences(toWrite), allowLocalUpdates);
+ }
/**
* see {@link #write(Collection)}
* @param toWrite
* @throws IndexWriteException
*/
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
- throws IndexWriteException {
- this.writer.write(toWrite);
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates)
+ throws IndexWriteException {
+ this.writer.write(toWrite, allowLocalUpdates);
}
-
/**
* Convert the passed index updates to {@link HTableInterfaceReference}s.
* @param indexUpdates from the index builder
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index 233dc57..0dc11bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -21,8 +21,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -98,7 +100,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
}
@Override
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws SingleIndexWriteFailureException {
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws SingleIndexWriteFailureException {
/*
* This bit here is a little odd, so let's explain what's going on. Basically, we want to do the writes in
* parallel to each index table, so each table gets its own task and is submitted to the pool. Where it gets
@@ -116,7 +118,12 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
// doing a complete copy over of all the index update for each table.
final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue());
final HTableInterfaceReference tableReference = entry.getKey();
- final RegionCoprocessorEnvironment env = this.env;
+ if (env != null
+ && !allowLocalUpdates
+ && tableReference.getTableName().equals(
+ env.getRegion().getTableDesc().getNameAsString())) {
+ continue;
+ }
/*
* Write a batch of index updates to an index table. This operation stops (is cancelable) via two
* mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
@@ -145,29 +152,14 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
}
try {
- // TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary.
- // Also, checking the prefix of the table name to determine if this is a local
- // index is pretty hacky. If we're going to keep this, we should revisit that
- // as well.
- try {
- if (MetaDataUtil.isLocalIndex(tableReference.getTableName())) {
- Region indexRegion = IndexUtil.getIndexRegion(env);
- if (indexRegion != null) {
- throwFailureIfDone();
- indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]),
- HConstants.NO_NONCE, HConstants.NO_NONCE);
- return null;
- }
- }
- } catch (IOException ignord) {
- // when it's failed we fall back to the standard & slow way
- if (LOG.isDebugEnabled()) {
- LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
- + ignord);
- }
- }
+ if (allowLocalUpdates) {
+ for (Mutation m : mutations) {
+ m.setDurability(Durability.SKIP_WAL);
+ }
+ }
HTableInterface table = factory.getTable(tableReference.get());
throwFailureIfDone();
+ int i = 0;
table.batch(mutations);
} catch (SingleIndexWriteFailureException e) {
throw e;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
index 14768ac..fec74ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -110,7 +111,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
}
@Override
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws MultiIndexWriteFailureException {
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws MultiIndexWriteFailureException {
Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
@@ -121,6 +122,12 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
// track each reference so we can get at it easily later, when determing failures
final HTableInterfaceReference tableReference = entry.getKey();
final RegionCoprocessorEnvironment env = this.env;
+ if (env != null
+ && !allowLocalUpdates
+ && tableReference.getTableName().equals(
+ env.getRegion().getTableDesc().getNameAsString())) {
+ continue;
+ }
tables.add(tableReference);
/*
@@ -144,33 +151,16 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
try {
// this may have been queued, but there was an abort/stop so we try to early exit
throwFailureIfDone();
+ if (allowLocalUpdates) {
+ for (Mutation m : mutations) {
+ m.setDurability(Durability.SKIP_WAL);
+ }
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
}
- try {
- // TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary.
- // Also, checking the prefix of the table name to determine if this is a local
- // index is pretty hacky. If we're going to keep this, we should revisit that
- // as well.
- if (MetaDataUtil.isLocalIndex(tableReference.getTableName())) {
- Region indexRegion = IndexUtil.getIndexRegion(env);
- if (indexRegion != null) {
- throwFailureIfDone();
- indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]),
- HConstants.NO_NONCE, HConstants.NO_NONCE);
- return Boolean.TRUE;
- }
- }
- } catch (IOException ignord) {
- // when it's failed we fall back to the standard & slow way
- if (LOG.isDebugEnabled()) {
- LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
- + ignord);
- }
- }
-
HTableInterface table = factory.getTable(tableReference.get());
throwFailureIfDone();
table.batch(mutations);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 1725b11..6a316d9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -274,6 +274,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
// columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key)
private Set<ColumnReference> indexedColumns;
private Set<ColumnReference> coveredColumns;
+ private Map<ColumnReference, ColumnReference> coveredColumnsMap;
// columns required to create index row i.e. indexedColumns + coveredColumns (this does not include columns in the data row key)
private Set<ColumnReference> allColumns;
// TODO remove this in the next major release
@@ -363,6 +364,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
this.indexedColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
this.coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns);
+ this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns);
this.nIndexSaltBuckets = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets;
this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable);
this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index);
@@ -430,6 +432,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
for (PColumn indexColumn : family.getColumns()) {
PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
this.coveredColumns.add(new ColumnReference(column.getFamilyName().getBytes(), column.getName().getBytes()));
+ if(isLocalIndex) {
+ this.coveredColumnsMap.put(
+ new ColumnReference(column.getFamilyName().getBytes(), column.getName()
+ .getBytes()),
+ new ColumnReference(isLocalIndex ? Bytes.toBytes(IndexUtil
+ .getLocalIndexColumnFamily(column.getFamilyName().getString()))
+ : column.getFamilyName().getBytes(), column.getName().getBytes()));
+ }
}
}
this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize);
@@ -861,7 +871,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
//this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC
- put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value));
+ if(this.isLocalIndex) {
+ ColumnReference columnReference = this.coveredColumnsMap.get(ref);
+ put.add(kvBuilder.buildPut(rowKey, columnReference.getFamilyWritable(), cq, ts, value));
+ } else {
+ put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value));
+ }
}
}
return put;
@@ -949,11 +964,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
// If table delete was single version, then index delete should be as well
if (deleteType == DeleteType.SINGLE_VERSION) {
for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set<byte[]> for index CFs?
+ if(this.isLocalIndex) {
+ ref = this.coveredColumnsMap.get(ref);
+ }
delete.deleteFamilyVersion(ref.getFamily(), ts);
}
delete.deleteFamilyVersion(emptyCF, ts);
} else {
for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set<byte[]> for index CFs?
+ if(this.isLocalIndex) {
+ ref = this.coveredColumnsMap.get(ref);
+ }
delete.deleteFamily(ref.getFamily(), ts);
}
delete.deleteFamily(emptyCF, ts);
@@ -971,11 +992,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
delete = new Delete(indexRowKey);
delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
+ ColumnReference columnReference = ref;
+ if(this.isLocalIndex) {
+ columnReference = this.coveredColumnsMap.get(ref);
+ }
// If point delete for data table, then use point delete for index as well
if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) {
- delete.deleteColumn(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+ delete.deleteColumn(columnReference.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
} else {
- delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+ delete.deleteColumns(columnReference.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
}
}
}
@@ -1030,10 +1055,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0;
int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1;
coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns);
+ coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns);
for (int i = 0; i < nCoveredColumns; i++) {
byte[] cf = Bytes.readByteArray(input);
byte[] cq = Bytes.readByteArray(input);
- coveredColumns.add(new ColumnReference(cf,cq));
+ ColumnReference ref = new ColumnReference(cf,cq);
+ coveredColumns.add(ref);
+ if(isLocalIndex) {
+ coveredColumnsMap.put(ref, new ColumnReference(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf))), cq));
+ }
}
// Hack to serialize whether the index row key is optimizable
int len = WritableUtils.readVInt(input);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 8ad4d3e..9d2955b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -71,7 +71,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations());
ValueGetter valueGetter = statePair.getFirst();
IndexUpdate indexUpdate = statePair.getSecond();
- indexUpdate.setTable(maintainer.getIndexTableName());
+ indexUpdate.setTable(maintainer.isLocalIndex() ? state.getEnvironment().getRegion()
+ .getTableDesc().getName() : maintainer.getIndexTableName());
Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey());
indexUpdate.setUpdate(put);
@@ -95,7 +96,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations());
ValueGetter valueGetter = statePair.getFirst();
IndexUpdate indexUpdate = statePair.getSecond();
- indexUpdate.setTable(maintainer.getIndexTableName());
+ indexUpdate.setTable(maintainer.isLocalIndex() ? state.getEnvironment().getRegion()
+ .getTableDesc().getName() : maintainer.getIndexTableName());
Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
state.getCurrentTimestamp(), env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey());
indexUpdate.setUpdate(delete);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 17da04e..d7850ba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -138,7 +138,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
}
// its a local index table, so we need to convert it to the index table names we should disable
- if (MetaDataUtil.isLocalIndex(ref.getTableName())) {
+ if (MetaDataUtil.hasLocalIndexColumnFamily(env.getRegion().getTableDesc())) {
for (String tableName : getLocalIndexNames(ref, mutations)) {
indexTableNames.put(tableName, minTimeStamp);
}
@@ -224,8 +224,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
try {
conn = QueryUtil.getConnectionOnServer(this.env.getConfiguration()).unwrap(
PhoenixConnection.class);
- String userTableName = MetaDataUtil.getUserTableName(ref.getTableName());
- PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
+ PTable dataTable = PhoenixRuntime.getTable(conn, ref.getTableName());
List<PTable> indexes = dataTable.getIndexes();
// local index used to get view id from index mutation row key.
PTable localIndex = null;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 5d8879c..3d8124c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
@@ -62,6 +63,7 @@ import org.apache.phoenix.hbase.index.covered.TableState;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.query.KeyRange;
@@ -69,6 +71,7 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -165,7 +168,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
// no index updates, so we are done
if (!indexUpdates.isEmpty()) {
- this.writer.write(indexUpdates);
+ this.writer.write(indexUpdates, true);
}
} catch (Throwable t) {
String msg = "Failed to update index with entries:" + indexUpdates;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 7f403b0..3e0fd99 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -176,7 +176,7 @@ public abstract class ExplainTable {
}
}
- private void appendPKColumnValue(StringBuilder buf, byte[] range, Boolean isNull, int slotIndex) {
+ private void appendPKColumnValue(StringBuilder buf, byte[] range, Boolean isNull, int slotIndex, boolean changeViewIndexId) {
if (Boolean.TRUE.equals(isNull)) {
buf.append("null");
return;
@@ -198,8 +198,14 @@ public abstract class ExplainTable {
type.coerceBytes(ptr, type, sortOrder, SortOrder.getDefault());
range = ptr.get();
}
- Format formatter = context.getConnection().getFormatter(type);
- buf.append(type.toStringLiteral(range, formatter));
+ if (changeViewIndexId) {
+ Short s = (Short) type.toObject(range);
+ s = (short) (s + (-Short.MAX_VALUE));
+ buf.append(s.toString());
+ } else {
+ Format formatter = context.getConnection().getFormatter(type);
+ buf.append(type.toStringLiteral(range, formatter));
+ }
}
private static class RowKeyValueIterator implements Iterator<byte[]> {
@@ -257,6 +263,7 @@ public abstract class ExplainTable {
minMaxIterator = new RowKeyValueIterator(schema, minMaxRange.getRange(bound));
}
}
+ boolean isLocalIndex = ScanUtil.isLocalIndex(context.getScan());
boolean forceSkipScan = this.hint.hasHint(Hint.SKIP_SCAN);
int nRanges = forceSkipScan ? scanRanges.getRanges().size() : scanRanges.getBoundSlotCount();
for (int i = 0, minPos = 0; minPos < nRanges || minMaxIterator.hasNext(); i++) {
@@ -275,7 +282,13 @@ public abstract class ExplainTable {
minMaxIterator = Iterators.emptyIterator();
}
}
- appendPKColumnValue(buf, b, isNull, i);
+ if (isLocalIndex
+ && ((context.getConnection().getTenantId() != null && i == 1) || (context
+ .getConnection().getTenantId() == null && i == 0))) {
+ appendPKColumnValue(buf, b, isNull, i, true);
+ } else {
+ appendPKColumnValue(buf, b, isNull, i, false);
+ }
buf.append(',');
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 0525de9..41c39a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -395,13 +395,8 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
for(PTable indexTable : table.getIndexes()){
- if (indexTable.getIndexType() == PTable.IndexType.LOCAL) {
- indexTables.add(new TargetTableRef(indexTable.getName().getString(),
- Bytes.toString(MetaDataUtil.getLocalIndexPhysicalName(table.getPhysicalName().getBytes()))));
- } else {
- indexTables.add(
- new TargetTableRef(indexTable.getName().getString(), indexTable.getPhysicalName().getString()));
- }
+ indexTables.add(new TargetTableRef(indexTable.getName().getString(), indexTable
+ .getPhysicalName().getString()));
}
return indexTables;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 6743688..c93a58b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -206,8 +206,7 @@ public class IndexTool extends Configured implements Tool {
// computed from the qDataTable name.
String physicalIndexTable = pindexTable.getPhysicalName().getString();
if (IndexType.LOCAL.equals(pindexTable.getIndexType())) {
- physicalIndexTable = Bytes
- .toString(MetaDataUtil.getLocalIndexPhysicalName(pdataTable.getPhysicalName().getBytes()));
+ physicalIndexTable = qDataTable;
}
final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f593dd0..c29c0bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -81,7 +81,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator;
-import org.apache.hadoop.hbase.regionserver.LocalIndexSplitter;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -850,18 +849,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
null, priority, null);
}
- if (descriptor.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null
- && Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(descriptor
- .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
- if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) {
- descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(),
- null, priority, null);
- }
- } else {
- if (!descriptor.hasCoprocessor(LocalIndexSplitter.class.getName())
- && !SchemaUtil.isMetaTable(tableName)
- && !SchemaUtil.isSequenceTable(tableName)) {
- descriptor.addCoprocessor(LocalIndexSplitter.class.getName(), null, priority, null);
+ Set<byte[]> familiesKeys = descriptor.getFamiliesKeys();
+ for(byte[] family: familiesKeys) {
+ if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+ if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) {
+ descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(),
+ null, priority, null);
+ break;
+ }
}
}
@@ -1071,8 +1066,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} else {
if (isMetaTable) {
checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName());
+ } else {
+ for(Pair<byte[],Map<String,Object>> family: families) {
+ if ((newDesc.getValue(HTableDescriptor.SPLIT_POLICY)==null || !newDesc.getValue(HTableDescriptor.SPLIT_POLICY).equals(
+ IndexRegionSplitPolicy.class.getName()))
+ && Bytes.toString(family.getFirst()).startsWith(
+ QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+ newDesc.setValue(HTableDescriptor.SPLIT_POLICY, IndexRegionSplitPolicy.class.getName());
+ break;
+ }
+ }
}
+
if (!modifyExistingMetaData) {
return existingDesc; // Caller already knows that no metadata was changed
}
@@ -1303,60 +1309,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
- private void ensureLocalIndexTableCreated(byte[] physicalTableName, Map<String, Object> tableProps,
- List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, long timestamp,
- boolean isNamespaceMapped) throws SQLException {
- PTable table;
- String parentTableName = SchemaUtil
- .getParentTableNameFromIndexTable(Bytes.toString(physicalTableName),
- MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)
- .replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR);
- try {
- synchronized (latestMetaDataLock) {
- throwConnectionClosedIfNullMetaData();
- table = latestMetaData.getTableRef(new PTableKey(PName.EMPTY_NAME, parentTableName)).getTable();
- latestMetaDataLock.notifyAll();
- }
- if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case
- throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
- }
- } catch (TableNotFoundException e) {
- byte[] schemaName = Bytes.toBytes(SchemaUtil.getSchemaNameFromFullName(parentTableName));
- byte[] tableName = Bytes.toBytes(SchemaUtil.getTableNameFromFullName(parentTableName));
- MetaDataMutationResult result = this.getTable(null, schemaName, tableName, HConstants.LATEST_TIMESTAMP, timestamp);
- table = result.getTable();
- if (table == null) {
- throw e;
- }
- }
- ensureLocalIndexTableCreated(physicalTableName, tableProps, families, splits, isNamespaceMapped);
- }
-
- private void ensureLocalIndexTableCreated(byte[] physicalTableName, Map<String, Object> tableProps,
- List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, boolean isNamespaceMapped)
- throws SQLException, TableAlreadyExistsException {
-
- // If we're not allowing local indexes or the hbase version is too low,
- // don't create the local index table
- if ( !this.getProps().getBoolean(QueryServices.ALLOW_LOCAL_INDEX_ATTRIB, QueryServicesOptions.DEFAULT_ALLOW_LOCAL_INDEX)
- || !this.supportsFeature(Feature.LOCAL_INDEX)) {
- return;
- }
-
- tableProps.put(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME, TRUE_BYTES_AS_STRING);
- HTableDescriptor desc = ensureTableCreated(physicalTableName, PTableType.TABLE, tableProps, families, splits,
- true, isNamespaceMapped);
- if (desc != null) {
- if (!Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
- String fullTableName = Bytes.toString(physicalTableName);
- throw new TableAlreadyExistsException(
- "Unable to create shared physical table for local indexes.",
- SchemaUtil.getSchemaNameFromFullName(fullTableName),
- SchemaUtil.getTableNameFromFullName(fullTableName));
- }
- }
- }
-
private boolean ensureViewIndexTableDropped(byte[] physicalTableName, long timestamp) throws SQLException {
byte[] physicalIndexName = MetaDataUtil.getViewIndexPhysicalName(physicalTableName);
HTableDescriptor desc = null;
@@ -1385,22 +1337,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
private boolean ensureLocalIndexTableDropped(byte[] physicalTableName, long timestamp) throws SQLException {
- byte[] physicalIndexName = MetaDataUtil.getLocalIndexPhysicalName(physicalTableName);
HTableDescriptor desc = null;
boolean wasDeleted = false;
try (HBaseAdmin admin = getAdmin()) {
try {
- desc = admin.getTableDescriptor(physicalIndexName);
- if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
- this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalIndexName));
- final ReadOnlyProps props = this.getProps();
- final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
- if (dropMetadata) {
- admin.disableTable(physicalIndexName);
- admin.deleteTable(physicalIndexName);
- clearTableRegionCache(physicalIndexName);
- wasDeleted = true;
+ desc = admin.getTableDescriptor(physicalTableName);
+ this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalTableName));
+ final ReadOnlyProps props = this.getProps();
+ final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
+ if (dropMetadata) {
+ List<String> columnFamiles = new ArrayList<String>();
+ for(HColumnDescriptor cf : desc.getColumnFamilies()) {
+ if(cf.getNameAsString().startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+ columnFamiles.add(cf.getNameAsString());
+ }
}
+ for(String cf: columnFamiles) {
+ admin.deleteColumn(physicalTableName, cf);
+ }
+ clearTableRegionCache(physicalTableName);
+ wasDeleted = true;
}
} catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
// Ignore, as we may never have created a view index table
@@ -1424,9 +1380,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
byte[] tableName = physicalTableName != null ? physicalTableName : SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes);
- boolean localIndexTable = Boolean.TRUE.equals(tableProps.remove(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME));
-
- if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && physicalTableName == null)) {
+ boolean localIndexTable = false;
+ for(Pair<byte[], Map<String, Object>> family: families) {
+ if(Bytes.toString(family.getFirst()).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+ localIndexTable = true;
+ break;
+ }
+ }
+ if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && (physicalTableName == null || localIndexTable))) {
// For views this will ensure that metadata already exists
// For tables and indexes, this will create the metadata if it doesn't already exist
ensureTableCreated(tableName, tableType, tableProps, families, splits, true, isNamespaceMapped);
@@ -1436,10 +1397,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// Physical index table created up front for multi tenant
// TODO: if viewIndexId is Short.MIN_VALUE, then we don't need to attempt to create it
if (physicalTableName != null) {
- if (localIndexTable) {
- ensureLocalIndexTableCreated(tableName, tableProps, families, splits,
- MetaDataUtil.getClientTimeStamp(m), isNamespaceMapped);
- } else if (!MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) {
+ if (!localIndexTable && !MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) {
ensureViewIndexTableCreated(tenantIdBytes.length == 0 ? null : PNameFactory.newName(tenantIdBytes),
physicalTableName, MetaDataUtil.getClientTimeStamp(m), isNamespaceMapped);
}
@@ -1561,6 +1519,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
dropTables(result.getTableNamesToDelete());
}
invalidateTables(result.getTableNamesToDelete());
+ long timestamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
if (tableType == PTableType.TABLE) {
boolean isNamespaceMapped = result.getTable().isNamespaceMapped();
byte[] physicalName;
@@ -1569,7 +1528,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} else {
physicalName = TableName.valueOf(schemaBytes, tableBytes).getName();
}
- long timestamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
ensureViewIndexTableDropped(physicalName, timestamp);
ensureLocalIndexTableDropped(physicalName, timestamp);
tableStatsCache.invalidate(new ImmutableBytesPtr(physicalName));
@@ -2479,6 +2437,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
+ Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo());
+ props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
+ props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+ PhoenixConnection conn =
+ new PhoenixConnection(ConnectionQueryServicesImpl.this,
+ metaConnection.getURL(), props, metaConnection
+ .getMetaDataCache());
+ try {
+ UpgradeUtil.upgradeLocalIndexes(conn, true);
+ } finally {
+ if (conn != null) conn.close();
+ }
+
metaConnection = addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
@@ -3622,7 +3593,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE) && renewLeaseEnabled;
}
-
@Override
public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException {
/*
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 4efb708..91c84e0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -166,6 +166,7 @@ public interface QueryConstants {
public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = ByteUtil.EMPTY_BYTE_ARRAY;
public static final byte[] TRUE = new byte[] {1};
+
/**
* Separator used between variable length keys for a composite key.
@@ -195,6 +196,16 @@ public interface QueryConstants {
public static final ImmutableBytesPtr DEFAULT_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr(
DEFAULT_COLUMN_FAMILY_BYTES);
+ public static final String LOCAL_INDEX_COLUMN_FAMILY_PREFIX = "L#";
+ public static final byte[] LOCAL_INDEX_COLUMN_FAMILY_PREFIX_BYTES = Bytes.toBytes(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
+ public static final ImmutableBytesPtr LOCAL_INDEX_COLUMN_FAMILY_PREFIX_PTR = new ImmutableBytesPtr(
+ LOCAL_INDEX_COLUMN_FAMILY_PREFIX_BYTES);
+
+ public static final String DEFAULT_LOCAL_INDEX_COLUMN_FAMILY = LOCAL_INDEX_COLUMN_FAMILY_PREFIX + DEFAULT_COLUMN_FAMILY;
+ public static final byte[] DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES = Bytes.toBytes(DEFAULT_LOCAL_INDEX_COLUMN_FAMILY);
+ public static final ImmutableBytesPtr DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr(
+ DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES);
+
public static final String ALL_FAMILY_PROPERTIES_KEY = "";
public static final String SYSTEM_TABLE_PK_NAME = "pk";