You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2016/05/24 15:29:04 UTC

[2/5] phoenix git commit: PHOENIX-1734 Local index improvements(Rajeshbabu)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
index 146028e..3353f9e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
@@ -17,677 +17,8 @@
  */
 package org.apache.phoenix.hbase.index.balancer;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.master.LoadBalancer;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.RegionPlan;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * <p>This class is an extension of the load balancer class. 
- * It allows to co-locate the regions of the user table and the regions of corresponding
- * index table if any.</p> 
- * 
- * </>roundRobinAssignment, retainAssignment -> index regions will follow the actual table regions. 
- * randomAssignment, balancerCluster -> either index table or actual table region(s) will follow
- * each other based on which ever comes first.</p> 
- * 
- * <p>In case of master failover there is a chance that the znodes of the index
- * table and actual table are left behind. Then in that scenario we may get randomAssignment for
- * either the actual table region first or the index table region first.</p>
- * 
- * <p>In case of balancing by table any table can balance first.</p>
- * 
- */
-
-public class IndexLoadBalancer implements LoadBalancer {
-
-    private static final Log LOG = LogFactory.getLog(IndexLoadBalancer.class);
-
-    public static final byte[] PARENT_TABLE_KEY = Bytes.toBytes("PARENT_TABLE");
-
-    public static final String INDEX_BALANCER_DELEGATOR = "hbase.index.balancer.delegator.class";
-
-    private LoadBalancer delegator;
-
-    private MasterServices master;
-
-    private Configuration conf;
-
-    private ClusterStatus clusterStatus;
-
-    private static final Random RANDOM = new Random(EnvironmentEdgeManager.currentTimeMillis());
-
-    Map<TableName, TableName> userTableVsIndexTable = new HashMap<TableName, TableName>();
-
-    Map<TableName, TableName> indexTableVsUserTable = new HashMap<TableName, TableName>();
-
-    /**
-     * Maintains colocation information of user regions and corresponding index regions.
-     */
-    private Map<TableName, Map<ImmutableBytesWritable, ServerName>> colocationInfo =
-            new ConcurrentHashMap<TableName, Map<ImmutableBytesWritable, ServerName>>();
-
-    private Set<TableName> balancedTables = new HashSet<TableName>();
-
-    private boolean stopped = false;
-
-    @Override
-    public void initialize() throws HBaseIOException {
-        Class<? extends LoadBalancer> delegatorKlass =
-                conf.getClass(INDEX_BALANCER_DELEGATOR, StochasticLoadBalancer.class,
-                    LoadBalancer.class);
-        this.delegator = ReflectionUtils.newInstance(delegatorKlass, conf);
-        this.delegator.setClusterStatus(clusterStatus);
-        this.delegator.setMasterServices(this.master);
-        this.delegator.initialize();
-        try {
-            populateTablesToColocate(this.master.getTableDescriptors().getAll());
-        } catch (IOException e) {
-            throw new HBaseIOException(e);
-        }
-    }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-
-    @Override
-    public void setConf(Configuration configuration) {
-        this.conf = configuration;
-    }
-
-    @Override
-    public void onConfigurationChange(Configuration conf) {
-        setConf(conf);
-    }
-
-    @Override
-    public void setClusterStatus(ClusterStatus st) {
-        this.clusterStatus = st;
-    }
-
-    public Map<TableName, Map<ImmutableBytesWritable, ServerName>> getColocationInfo() {
-        return colocationInfo;
-    }
-
-    @Override
-    public void setMasterServices(MasterServices masterServices) {
-        this.master = masterServices;
-    }
-
-    @Override
-    public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState)
-            throws HBaseIOException {
-        synchronized (this.colocationInfo) {
-            boolean balanceByTable = conf.getBoolean("hbase.master.loadbalance.bytable", false);
-            List<RegionPlan> regionPlans = null;
-
-            TableName tableName = null;
-            if (balanceByTable) {
-                Map<ImmutableBytesWritable, ServerName> tableKeys = null;
-                for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState
-                        .entrySet()) {
-                    ServerName sn = serverVsRegionList.getKey();
-                    List<HRegionInfo> regionInfos = serverVsRegionList.getValue();
-                    if (regionInfos.isEmpty()) {
-                        continue;
-                    }
-                    if (!isTableColocated(regionInfos.get(0).getTable())) {
-                        return this.delegator.balanceCluster(clusterState);
-                    }
-                    // Just get the table name from any one of the values in the regioninfo list
-                    if (tableName == null) {
-                        tableName = regionInfos.get(0).getTable();
-                        tableKeys = this.colocationInfo.get(tableName);
-                    }
-                    // Check and modify the colocation info map based on values of cluster state
-                    // because we
-                    // will
-                    // call balancer only when the cluster is in stable and reliable state.
-                    if (tableKeys != null) {
-                        for (HRegionInfo hri : regionInfos) {
-                            updateServer(tableKeys, sn, hri);
-                        }
-                    }
-                }
-                // If user table is already balanced find the index table plans from the user table
-                // plans
-                // or vice verca.
-                TableName mappedTableName = getMappedTableToColocate(tableName);
-                if (balancedTables.contains(mappedTableName)) {
-                    balancedTables.remove(mappedTableName);
-                    regionPlans = new ArrayList<RegionPlan>();
-                    return prepareRegionPlansForClusterState(clusterState, regionPlans);
-                } else {
-                    balancedTables.add(tableName);
-                    regionPlans = this.delegator.balanceCluster(clusterState);
-                    if (regionPlans == null) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug(tableName + " regions already balanced.");
-                        }
-                        return null;
-                    } else {
-                        updateRegionPlans(regionPlans);
-                        return regionPlans;
-                    }
-                }
-
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Seperating user tables and index tables regions of "
-                            + "each region server in the cluster.");
-                }
-                Map<ServerName, List<HRegionInfo>> userClusterState =
-                        new HashMap<ServerName, List<HRegionInfo>>();
-                Map<ServerName, List<HRegionInfo>> indexClusterState =
-                        new HashMap<ServerName, List<HRegionInfo>>();
-                for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState
-                        .entrySet()) {
-                    ServerName sn = serverVsRegionList.getKey();
-                    List<HRegionInfo> regionsInfos = serverVsRegionList.getValue();
-                    List<HRegionInfo> idxRegionsToBeMoved = new ArrayList<HRegionInfo>();
-                    List<HRegionInfo> userRegionsToBeMoved = new ArrayList<HRegionInfo>();
-                    for (HRegionInfo hri : regionsInfos) {
-                        if (hri.isMetaRegion()) {
-                            continue;
-                        }
-                        tableName = hri.getTable();
-                        // Check and modify the colocation info map based on values of cluster state
-                        // because we
-                        // will
-                        // call balancer only when the cluster is in stable and reliable state.
-                        if (isTableColocated(tableName)) {
-                            // table name may change every time thats why always need to get table
-                            // entries.
-                            Map<ImmutableBytesWritable, ServerName> tableKeys =
-                                    this.colocationInfo.get(tableName);
-                            if (tableKeys != null) {
-                                updateServer(tableKeys, sn, hri);
-                            }
-                        }
-                        if (indexTableVsUserTable.containsKey(tableName)) {
-                            idxRegionsToBeMoved.add(hri);
-                            continue;
-                        }
-                        userRegionsToBeMoved.add(hri);
-                    }
-                    // there may be dummy entries here if assignments by table is set
-                    userClusterState.put(sn, userRegionsToBeMoved);
-                    indexClusterState.put(sn, idxRegionsToBeMoved);
-                }
-
-                regionPlans = this.delegator.balanceCluster(userClusterState);
-                if (regionPlans == null) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("User region plan is null.");
-                    }
-                    regionPlans = new ArrayList<RegionPlan>();
-                } else {
-                    updateRegionPlans(regionPlans);
-                }
-                return prepareRegionPlansForClusterState(indexClusterState, regionPlans);
-            }
-        }
-    }
-
-    private void updateServer(Map<ImmutableBytesWritable, ServerName> tableKeys, ServerName sn,
-            HRegionInfo hri) {
-        ImmutableBytesWritable startKey = new ImmutableBytesWritable(hri.getStartKey());
-        ServerName existingServer = tableKeys.get(startKey);
-        if (!sn.equals(existingServer)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("There is a mismatch in the existing server name for the region " + hri
-                        + ".  Replacing the server " + existingServer + " with " + sn + ".");
-            }
-            tableKeys.put(startKey, sn);
-        }
-    }
-
-    /**
-     * Prepare region plans for cluster state
-     * @param clusterState if balancing is table wise then cluster state contains only indexed or
-     *            index table regions, otherwise it contains all index tables regions.
-     * @param regionPlans
-     * @return
-     */
-    private List<RegionPlan> prepareRegionPlansForClusterState(
-            Map<ServerName, List<HRegionInfo>> clusterState, List<RegionPlan> regionPlans) {
-        if (regionPlans == null) regionPlans = new ArrayList<RegionPlan>();
-        ImmutableBytesWritable startKey = new ImmutableBytesWritable();
-        for (Entry<ServerName, List<HRegionInfo>> serverVsRegionList : clusterState.entrySet()) {
-            List<HRegionInfo> regionInfos = serverVsRegionList.getValue();
-            ServerName server = serverVsRegionList.getKey();
-            for (HRegionInfo regionInfo : regionInfos) {
-                if (!isTableColocated(regionInfo.getTable())) continue;
-                TableName mappedTableName = getMappedTableToColocate(regionInfo.getTable());
-                startKey.set(regionInfo.getStartKey());
-                ServerName sn = this.colocationInfo.get(mappedTableName).get(startKey);
-                if (sn.equals(server)) {
-                    continue;
-                } else {
-                    RegionPlan rp = new RegionPlan(regionInfo, server, sn);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Selected server " + rp.getDestination()
-                                + " as destination for region "
-                                + regionInfo.getRegionNameAsString() + " from colocation info.");
-                    }
-                    regionOnline(regionInfo, rp.getDestination());
-                    regionPlans.add(rp);
-                }
-            }
-        }
-        return regionPlans;
-    }
-
-    private void updateRegionPlans(List<RegionPlan> regionPlans) {
-        for (RegionPlan regionPlan : regionPlans) {
-            HRegionInfo hri = regionPlan.getRegionInfo();
-            if (!isTableColocated(hri.getTable())) continue;
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Saving region plan of region " + hri.getRegionNameAsString() + '.');
-            }
-            regionOnline(hri, regionPlan.getDestination());
-        }
-    }
-
-    @Override
-    public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(List<HRegionInfo> regions,
-            List<ServerName> servers) throws HBaseIOException {
-        List<HRegionInfo> userRegions = new ArrayList<HRegionInfo>();
-        List<HRegionInfo> indexRegions = new ArrayList<HRegionInfo>();
-        for (HRegionInfo hri : regions) {
-            seperateUserAndIndexRegion(hri, userRegions, indexRegions);
-        }
-        Map<ServerName, List<HRegionInfo>> bulkPlan = null;
-        if (!userRegions.isEmpty()) {
-            bulkPlan = this.delegator.roundRobinAssignment(userRegions, servers);
-            // This should not happen.
-            if (null == bulkPlan) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("No region plans selected for user regions in roundRobinAssignment.");
-                }
-                return null;
-            }
-            savePlan(bulkPlan);
-        }
-        bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers);
-        return bulkPlan;
-    }
-
-    private void seperateUserAndIndexRegion(HRegionInfo hri, List<HRegionInfo> userRegions,
-            List<HRegionInfo> indexRegions) {
-        if (indexTableVsUserTable.containsKey(hri.getTable())) {
-            indexRegions.add(hri);
-            return;
-        }
-        userRegions.add(hri);
-    }
-
-    private Map<ServerName, List<HRegionInfo>> prepareIndexRegionsPlan(
-            List<HRegionInfo> indexRegions, Map<ServerName, List<HRegionInfo>> bulkPlan,
-            List<ServerName> servers) throws HBaseIOException {
-        if (null != indexRegions && !indexRegions.isEmpty()) {
-            if (null == bulkPlan) {
-                bulkPlan = new ConcurrentHashMap<ServerName, List<HRegionInfo>>();
-            }
-            for (HRegionInfo hri : indexRegions) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Preparing region plan for index region "
-                            + hri.getRegionNameAsString() + '.');
-                }
-                ServerName destServer = getDestServerForIdxRegion(hri);
-                List<HRegionInfo> destServerRegions = null;
-                if (destServer == null) destServer = this.randomAssignment(hri, servers);
-                if (destServer != null) {
-                    destServerRegions = bulkPlan.get(destServer);
-                    if (null == destServerRegions) {
-                        destServerRegions = new ArrayList<HRegionInfo>();
-                        bulkPlan.put(destServer, destServerRegions);
-                    }
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Server " + destServer + " selected for region "
-                                + hri.getRegionNameAsString() + '.');
-                    }
-                    destServerRegions.add(hri);
-                    regionOnline(hri, destServer);
-                }
-            }
-        }
-        return bulkPlan;
-    }
-
-    private ServerName getDestServerForIdxRegion(HRegionInfo hri) {
-        // Every time we calculate the table name because in case of master restart the index
-        // regions
-        // may be coming for different index tables.
-        TableName actualTable = getMappedTableToColocate(hri.getTable());
-        ImmutableBytesWritable startkey = new ImmutableBytesWritable(hri.getStartKey());
-        synchronized (this.colocationInfo) {
-
-            Map<ImmutableBytesWritable, ServerName> tableKeys = colocationInfo.get(actualTable);
-            if (null == tableKeys) {
-                // Can this case come
-                return null;
-            }
-            if (tableKeys.containsKey(startkey)) {
-                // put index region location if corresponding user region found in regionLocation
-                // map.
-                ServerName sn = tableKeys.get(startkey);
-                regionOnline(hri, sn);
-                return sn;
-            }
-        }
-        return null;
-    }
-
-    private void savePlan(Map<ServerName, List<HRegionInfo>> bulkPlan) {
-        synchronized (this.colocationInfo) {
-            for (Entry<ServerName, List<HRegionInfo>> e : bulkPlan.entrySet()) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Saving user regions' plans for server " + e.getKey() + '.');
-                }
-                for (HRegionInfo hri : e.getValue()) {
-                    if (!isTableColocated(hri.getTable())) continue;
-                    regionOnline(hri, e.getKey());
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Saved user regions' plans for server " + e.getKey() + '.');
-                }
-            }
-        }
-    }
-
-    @Override
-    public Map<ServerName, List<HRegionInfo>> retainAssignment(
-            Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
-        Map<HRegionInfo, ServerName> userRegionsMap =
-                new ConcurrentHashMap<HRegionInfo, ServerName>();
-        List<HRegionInfo> indexRegions = new ArrayList<HRegionInfo>();
-        for (Entry<HRegionInfo, ServerName> e : regions.entrySet()) {
-            seperateUserAndIndexRegion(e, userRegionsMap, indexRegions, servers);
-        }
-        Map<ServerName, List<HRegionInfo>> bulkPlan = null;
-        if (!userRegionsMap.isEmpty()) {
-            bulkPlan = this.delegator.retainAssignment(userRegionsMap, servers);
-            if (bulkPlan == null) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Empty region plan for user regions.");
-                }
-                return null;
-            }
-            savePlan(bulkPlan);
-        }
-        bulkPlan = prepareIndexRegionsPlan(indexRegions, bulkPlan, servers);
-        return bulkPlan;
-    }
-
-    private void seperateUserAndIndexRegion(Entry<HRegionInfo, ServerName> e,
-            Map<HRegionInfo, ServerName> userRegionsMap, List<HRegionInfo> indexRegions,
-            List<ServerName> servers) {
-        HRegionInfo hri = e.getKey();
-        if (indexTableVsUserTable.containsKey(hri.getTable())) {
-            indexRegions.add(hri);
-            return;
-        }
-        if (e.getValue() == null) {
-            userRegionsMap.put(hri, servers.get(RANDOM.nextInt(servers.size())));
-        } else {
-            userRegionsMap.put(hri, e.getValue());
-        }
-    }
-
-    @Override
-    public Map<HRegionInfo, ServerName> immediateAssignment(List<HRegionInfo> regions,
-            List<ServerName> servers) throws HBaseIOException {
-        return this.delegator.immediateAssignment(regions, servers);
-    }
-
-    @Override
-    public ServerName randomAssignment(HRegionInfo regionInfo, List<ServerName> servers)
-            throws HBaseIOException {
-        if (!isTableColocated(regionInfo.getTable())) {
-            return this.delegator.randomAssignment(regionInfo, servers);
-        }
-        ServerName sn = getServerNameFromMap(regionInfo, servers);
-        if (sn == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("No server found for region " + regionInfo.getRegionNameAsString() + '.');
-            }
-            sn = getRandomServer(regionInfo, servers);
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Destination server for region " + regionInfo.getRegionNameAsString()
-                    + " is " + ((sn == null) ? "null" : sn.toString()) + '.');
-        }
-        return sn;
-    }
-
-    private ServerName getRandomServer(HRegionInfo regionInfo, List<ServerName> servers)
-            throws HBaseIOException {
-        ServerName sn = null;
-        sn = this.delegator.randomAssignment(regionInfo, servers);
-        if (sn == null) return null;
-        regionOnline(regionInfo, sn);
-        return sn;
-    }
-
-    private ServerName getServerNameFromMap(HRegionInfo regionInfo, List<ServerName> onlineServers) {
-        TableName tableName = regionInfo.getTable();
-        TableName mappedTable = getMappedTableToColocate(regionInfo.getTable());
-        ImmutableBytesWritable startKey = new ImmutableBytesWritable(regionInfo.getStartKey());
-        synchronized (this.colocationInfo) {
-            Map<ImmutableBytesWritable, ServerName> correspondingTableKeys =
-                    this.colocationInfo.get(mappedTable);
-            Map<ImmutableBytesWritable, ServerName> actualTableKeys =
-                    this.colocationInfo.get(tableName);
-
-            if (null != correspondingTableKeys) {
-                if (correspondingTableKeys.containsKey(startKey)) {
-                    ServerName previousServer = null;
-                    if (null != actualTableKeys) {
-                        previousServer = actualTableKeys.get(startKey);
-                    }
-                    ServerName sn = correspondingTableKeys.get(startKey);
-                    if (null != previousServer) {
-                        // if servername of index region and user region are same in colocationInfo
-                        // clean
-                        // previous plans and return null
-                        if (previousServer.equals(sn)) {
-                            correspondingTableKeys.remove(startKey);
-                            actualTableKeys.remove(startKey);
-                            if (LOG.isDebugEnabled()) {
-                                LOG
-                                        .debug("Both user region plan and corresponding index region plan "
-                                                + "in colocation info are same. Hence clearing the plans to select new plan"
-                                                + " for the region "
-                                                + regionInfo.getRegionNameAsString() + ".");
-                            }
-                            return null;
-                        }
-                    }
-                    if (sn != null && onlineServers.contains(sn)) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Updating the region plan of the region "
-                                    + regionInfo.getRegionNameAsString() + " with server " + sn);
-                        }
-                        regionOnline(regionInfo, sn);
-                        return sn;
-                    } else if (sn != null) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("The location " + sn + " of region with start key"
-                                    + Bytes.toStringBinary(regionInfo.getStartKey())
-                                    + " is not in online. Selecting other region server.");
-                        }
-                        return null;
-                    }
-                }
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("No region plans in colocationInfo for table " + mappedTable);
-                }
-            }
-            return null;
-        }
-    }
-
-    @Override
-    public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
-        TableName tableName = regionInfo.getTable();
-        synchronized (this.colocationInfo) {
-            Map<ImmutableBytesWritable, ServerName> tabkeKeys = this.colocationInfo.get(tableName);
-            if (tabkeKeys == null) {
-                tabkeKeys = new ConcurrentHashMap<ImmutableBytesWritable, ServerName>();
-                this.colocationInfo.put(tableName, tabkeKeys);
-            }
-            tabkeKeys.put(new ImmutableBytesWritable(regionInfo.getStartKey()), sn);
-        }
-    }
-
-    public void clearTableRegionPlans(TableName tableName) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Clearing regions plans from colocationInfo for table " + tableName);
-        }
-        synchronized (this.colocationInfo) {
-            this.colocationInfo.remove(tableName);
-        }
-    }
-
-    @Override
-    public void regionOffline(HRegionInfo regionInfo) {
-        TableName tableName = regionInfo.getTable();
-        synchronized (this.colocationInfo) {
-            Map<ImmutableBytesWritable, ServerName> tableKeys = this.colocationInfo.get(tableName);
-            if (null == tableKeys) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("No regions of table " + tableName + " in the colocationInfo.");
-                }
-            } else {
-                tableKeys.remove(new ImmutableBytesWritable(regionInfo.getStartKey()));
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("The regioninfo " + regionInfo + " removed from the colocationInfo");
-                }
-            }
-        }
-    }
-
-    @Override
-    public boolean isStopped() {
-        return stopped;
-    }
-
-    @Override
-    public void stop(String why) {
-        LOG.info("Load Balancer stop requested: " + why);
-        stopped = true;
-    }
-
-    public void populateTablesToColocate(Map<String, HTableDescriptor> tableDescriptors) {
-        HTableDescriptor desc = null;
-        for (Entry<String, HTableDescriptor> entry : tableDescriptors.entrySet()) {
-            desc = entry.getValue();
-            if (desc.getValue(PARENT_TABLE_KEY) != null) {
-                addTablesToColocate(TableName.valueOf(desc.getValue(PARENT_TABLE_KEY)), desc
-                        .getTableName());
-            }
-        }
-    }
-
-    /**
-     * Add tables whose regions to co-locate.
-     * @param userTable
-     * @param indexTable
-     */
-    public void addTablesToColocate(TableName userTable, TableName indexTable) {
-        if (userTable.equals(indexTable)) {
-            throw new IllegalArgumentException("Tables to colocate should not be same.");
-        } else if (isTableColocated(userTable)) {
-            throw new IllegalArgumentException("User table already colocated with table "
-                    + getMappedTableToColocate(userTable));
-        } else if (isTableColocated(indexTable)) {
-            throw new IllegalArgumentException("Index table is already colocated with table "
-                    + getMappedTableToColocate(indexTable));
-        }
-        userTableVsIndexTable.put(userTable, indexTable);
-        indexTableVsUserTable.put(indexTable, userTable);
-    }
-
-    /**
-     * Removes the specified table and corresponding table from co-location.
-     * @param table
-     */
-    public void removeTablesFromColocation(TableName table) {
-        TableName other = userTableVsIndexTable.remove(table);
-        if (other != null) {
-            indexTableVsUserTable.remove(other);
-        } else {
-            other = indexTableVsUserTable.remove(table);
-            if (other != null) userTableVsIndexTable.remove(other);
-        }
-    }
-
-    /**
-     * Return mapped table to co-locate.
-     * @param tableName
-     * @return index table if the specified table is user table or vice versa.
-     */
-    public TableName getMappedTableToColocate(TableName tableName) {
-        TableName other = userTableVsIndexTable.get(tableName);
-        return other == null ? indexTableVsUserTable.get(tableName) : other;
-    }
 
-    public boolean isTableColocated(TableName table) {
-        return userTableVsIndexTable.containsKey(table) || indexTableVsUserTable.containsKey(table);
-    }
+public class IndexLoadBalancer extends StochasticLoadBalancer {
 
-    /**
-     * Populates table's region locations into co-location info from master.
-     * @param table
-     */
-    public void populateRegionLocations(TableName table) {
-        synchronized (this.colocationInfo) {
-            if (!isTableColocated(table)) {
-                throw new IllegalArgumentException("Specified table " + table
-                        + " should be in one of the tables to co-locate.");
-            }
-            RegionStates regionStates = this.master.getAssignmentManager().getRegionStates();
-            List<HRegionInfo> onlineRegions = regionStates.getRegionsOfTable(table);
-            for (HRegionInfo hri : onlineRegions) {
-                regionOnline(hri, regionStates.getRegionServerOfRegion(hri));
-            }
-            Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
-            for (RegionState regionState : regionsInTransition.values()) {
-                if (table.equals(regionState.getRegion().getTable())
-                        && regionState.getServerName() != null) {
-                    regionOnline(regionState.getRegion(), regionState.getServerName());
-                }
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
index a014da2..2f83f8d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java
@@ -17,98 +17,12 @@
  */
 package org.apache.phoenix.hbase.index.master;
 
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
-import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.master.AssignmentManager;
-import org.apache.hadoop.hbase.master.LoadBalancer;
-import org.apache.hadoop.hbase.master.RegionPlan;
-import org.apache.hadoop.hbase.master.RegionStates;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer;
-import org.apache.phoenix.util.MetaDataUtil;
 
 /**
  * Defines of coprocessor hooks(to support secondary indexing) of operations on
  * {@link org.apache.hadoop.hbase.master.HMaster} process.
  */
 public class IndexMasterObserver extends BaseMasterObserver {
-    IndexLoadBalancer balancer = null;
-
-    @Override
-    public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> ctx)
-            throws IOException {
-        LoadBalancer loadBalancer =
-                ctx.getEnvironment().getMasterServices().getAssignmentManager().getBalancer();
-        if (loadBalancer instanceof IndexLoadBalancer) {
-            balancer = (IndexLoadBalancer) loadBalancer;
-        }
-        super.preMasterInitialization(ctx);
-    }
-
-    @Override
-    public void preCreateTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
-            HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
-        TableName userTableName = null;
-        if (balancer != null && desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) != null) {
-            userTableName =
-                    TableName.valueOf(desc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY));
-            balancer.addTablesToColocate(userTableName, desc.getTableName());
-        }
-        if (userTableName != null) balancer.populateRegionLocations(userTableName);
-        super.preCreateTableHandler(ctx, desc, regions);
-    }
-
-    @Override
-    public void preModifyTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
-            TableName tableName, HTableDescriptor htd) throws IOException {
-        HTableDescriptor oldDesc =
-                ctx.getEnvironment().getMasterServices().getTableDescriptors().get(tableName);
-        if (oldDesc.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) == null
-                && htd.getValue(IndexLoadBalancer.PARENT_TABLE_KEY) != null) {
-            TableName userTableName =
-                    TableName.valueOf(htd.getValue(IndexLoadBalancer.PARENT_TABLE_KEY));
-            balancer.addTablesToColocate(userTableName, htd.getTableName());
-        }
-        super.preModifyTableHandler(ctx, tableName, htd);
-    }
-
-    @Override
-    public void postMove(ObserverContext<MasterCoprocessorEnvironment> ctx, HRegionInfo region,
-            ServerName srcServer, ServerName destServer) throws IOException {
-        if (balancer != null && balancer.isTableColocated(region.getTable())) {
-            AssignmentManager am = ctx.getEnvironment().getMasterServices().getAssignmentManager();
-            RegionStates regionStates = am.getRegionStates();
-            String tableName = region.getTable().getNameAsString();
-            String correspondingTable = MetaDataUtil.isLocalIndex(region.getTable().getNameAsString())
-                    ? MetaDataUtil.getUserTableName(tableName)
-                    : Bytes.toString(MetaDataUtil.getLocalIndexPhysicalName(tableName.getBytes()));
-            List<HRegionInfo> regions =
-                    regionStates.getRegionsOfTable(TableName.valueOf(correspondingTable));
-            for (HRegionInfo hri : regions) {
-                if (Bytes.compareTo(region.getStartKey(), hri.getStartKey()) == 0
-                        && destServer != null) {
-                    balancer.regionOnline(hri, destServer);
-                    am.addPlan(hri.getEncodedName(), new RegionPlan(hri, null, destServer));
-                    am.unassign(hri);
-                }
-            }
-        }
-        super.postMove(ctx, region, srcServer, destServer);
-    }
 
-    @Override
-    public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
-            TableName tableName) throws IOException {
-        if (balancer != null && balancer.isTableColocated(tableName)) {
-            balancer.removeTablesFromColocation(tableName);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
index d7fef5e..5e3f3ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
@@ -32,6 +32,6 @@ public interface IndexCommitter extends Stoppable {
 
   void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name);
 
-  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates)
       throws IndexWriteException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index 30797b2..cbcec3b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -128,10 +128,11 @@ public class IndexWriter implements Stoppable {
    * @param indexUpdates Updates to write
  * @throws IOException 
    */
-  public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException  {
+    public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates,
+            boolean allowLocalUpdates) throws IOException {
     // convert the strings to htableinterfaces to which we can talk and group by TABLE
     Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates);
-    writeAndKillYourselfOnFailure(toWrite);
+    writeAndKillYourselfOnFailure(toWrite, allowLocalUpdates);
   }
 
   /**
@@ -139,9 +140,10 @@ public class IndexWriter implements Stoppable {
    * @param toWrite
  * @throws IOException 
    */
-  public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite) throws IOException {
+    public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite,
+            boolean allowLocalUpdates) throws IOException {
     try {
-      write(toWrite);
+      write(toWrite, allowLocalUpdates);
       if (LOG.isTraceEnabled()) {
         LOG.trace("Done writing all index updates!\n\t" + toWrite);
       }
@@ -165,21 +167,24 @@ public class IndexWriter implements Stoppable {
    * @throws IndexWriteException if we cannot successfully write to the index. Whether or not we
    *           stop early depends on the {@link IndexCommitter}.
    */
-  public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException {
-    write(resolveTableReferences(toWrite));
-  }
+    public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException {
+    	write(resolveTableReferences(toWrite), false);
+    }
+
+    public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IndexWriteException {
+    	write(resolveTableReferences(toWrite), allowLocalUpdates);
+    }
 
   /**
    * see {@link #write(Collection)}
    * @param toWrite
    * @throws IndexWriteException
    */
-  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
-      throws IndexWriteException {
-    this.writer.write(toWrite);
+  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates)
+	      throws IndexWriteException {
+	  this.writer.write(toWrite, allowLocalUpdates);
   }
 
-
   /**
    * Convert the passed index updates to {@link HTableInterfaceReference}s.
    * @param indexUpdates from the index builder

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index 233dc57..0dc11bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -21,8 +21,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -98,7 +100,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
     }
 
     @Override
-    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws SingleIndexWriteFailureException {
+    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws SingleIndexWriteFailureException {
         /*
          * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the writes in
          * parallel to each index table, so each table gets its own task and is submitted to the pool. Where it gets
@@ -116,7 +118,12 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
             // doing a complete copy over of all the index update for each table.
             final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue());
             final HTableInterfaceReference tableReference = entry.getKey();
-            final RegionCoprocessorEnvironment env = this.env;
+			if (env != null
+					&& !allowLocalUpdates
+					&& tableReference.getTableName().equals(
+							env.getRegion().getTableDesc().getNameAsString())) {
+				continue;
+			}
             /*
              * Write a batch of index updates to an index table. This operation stops (is cancelable) via two
              * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
@@ -145,29 +152,14 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
                         LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
                     }
                     try {
-                        // TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary.
-                        // Also, checking the prefix of the table name to determine if this is a local
-                        // index is pretty hacky. If we're going to keep this, we should revisit that
-                        // as well.
-                        try {
-                            if (MetaDataUtil.isLocalIndex(tableReference.getTableName())) {
-                                Region indexRegion = IndexUtil.getIndexRegion(env);
-                                if (indexRegion != null) {
-                                    throwFailureIfDone();
-                                    indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]),
-                                        HConstants.NO_NONCE, HConstants.NO_NONCE);
-                                    return null;
-                                }
-                            }
-                        } catch (IOException ignord) {
-                            // when it's failed we fall back to the standard & slow way
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
-                                        + ignord);
-                            }
-                        }
+						if (allowLocalUpdates) {
+							for (Mutation m : mutations) {
+								m.setDurability(Durability.SKIP_WAL);
+							}
+						}
                         HTableInterface table = factory.getTable(tableReference.get());
                         throwFailureIfDone();
+                        int i = 0;
                         table.batch(mutations);
                     } catch (SingleIndexWriteFailureException e) {
                         throw e;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
index 14768ac..fec74ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -110,7 +111,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
     }
 
     @Override
-    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws MultiIndexWriteFailureException {
+    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws MultiIndexWriteFailureException {
         Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
         TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
         List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
@@ -121,6 +122,12 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
             // track each reference so we can get at it easily later, when determing failures
             final HTableInterfaceReference tableReference = entry.getKey();
             final RegionCoprocessorEnvironment env = this.env;
+			if (env != null
+					&& !allowLocalUpdates
+					&& tableReference.getTableName().equals(
+							env.getRegion().getTableDesc().getNameAsString())) {
+				continue;
+			}
             tables.add(tableReference);
 
             /*
@@ -144,33 +151,16 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
                     try {
                         // this may have been queued, but there was an abort/stop so we try to early exit
                         throwFailureIfDone();
+						if (allowLocalUpdates) {
+							for (Mutation m : mutations) {
+								m.setDurability(Durability.SKIP_WAL);
+							}
+						}
 
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
                         }
 
-                        try {
-                            // TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary.
-                            // Also, checking the prefix of the table name to determine if this is a local
-                            // index is pretty hacky. If we're going to keep this, we should revisit that
-                            // as well.
-                            if (MetaDataUtil.isLocalIndex(tableReference.getTableName())) {
-                                Region indexRegion = IndexUtil.getIndexRegion(env);
-                                if (indexRegion != null) {
-                                    throwFailureIfDone();
-                                    indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]),
-                                        HConstants.NO_NONCE, HConstants.NO_NONCE);
-                                    return Boolean.TRUE;
-                                }
-                            }
-                        } catch (IOException ignord) {
-                            // when it's failed we fall back to the standard & slow way
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
-                                        + ignord);
-                            }
-                        }
-
                         HTableInterface table = factory.getTable(tableReference.get());
                         throwFailureIfDone();
                         table.batch(mutations);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 1725b11..6a316d9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -274,6 +274,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     // columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key)
     private Set<ColumnReference> indexedColumns;
     private Set<ColumnReference> coveredColumns;
+    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
     // columns required to create index row i.e. indexedColumns + coveredColumns  (this does not include columns in the data row key)
     private Set<ColumnReference> allColumns;
     // TODO remove this in the next major release
@@ -363,6 +364,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         this.indexedColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns);
+        this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns);
         this.nIndexSaltBuckets  = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets;
         this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable);
         this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index);
@@ -430,6 +432,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             for (PColumn indexColumn : family.getColumns()) {
                 PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
                 this.coveredColumns.add(new ColumnReference(column.getFamilyName().getBytes(), column.getName().getBytes()));
+                if(isLocalIndex) {
+                    this.coveredColumnsMap.put(
+                        new ColumnReference(column.getFamilyName().getBytes(), column.getName()
+                                .getBytes()),
+                        new ColumnReference(isLocalIndex ? Bytes.toBytes(IndexUtil
+                                .getLocalIndexColumnFamily(column.getFamilyName().getString()))
+                                : column.getFamilyName().getBytes(), column.getName().getBytes()));
+                }
             }
         }
         this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize);
@@ -861,7 +871,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                     put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
                 }
                 //this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC
-                put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value));
+                if(this.isLocalIndex) {
+                    ColumnReference columnReference = this.coveredColumnsMap.get(ref);
+					put.add(kvBuilder.buildPut(rowKey, columnReference.getFamilyWritable(), cq, ts, value));
+                } else {
+                    put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value));
+                }
             }
         }
         return put;
@@ -949,11 +964,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             // If table delete was single version, then index delete should be as well
             if (deleteType == DeleteType.SINGLE_VERSION) {
                 for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set<byte[]> for index CFs?
+                    if(this.isLocalIndex) {
+						ref = this.coveredColumnsMap.get(ref);
+                    }
                     delete.deleteFamilyVersion(ref.getFamily(), ts);
                 }
                 delete.deleteFamilyVersion(emptyCF, ts);
             } else {
                 for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set<byte[]> for index CFs?
+                    if(this.isLocalIndex) {
+						ref = this.coveredColumnsMap.get(ref);
+                    }
                     delete.deleteFamily(ref.getFamily(), ts);
                 }
                 delete.deleteFamily(emptyCF, ts);
@@ -971,11 +992,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                         delete = new Delete(indexRowKey);                    
                         delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
                     }
+                    ColumnReference columnReference = ref;
+                    if(this.isLocalIndex) {
+                        columnReference = this.coveredColumnsMap.get(ref);
+                    }
                     // If point delete for data table, then use point delete for index as well
                     if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) {
-                        delete.deleteColumn(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                        delete.deleteColumn(columnReference.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
                     } else {
-                        delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                        delete.deleteColumns(columnReference.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
                     }
                 }
             }
@@ -1030,10 +1055,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0;
         int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1;
         coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns);
+        coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns);
         for (int i = 0; i < nCoveredColumns; i++) {
             byte[] cf = Bytes.readByteArray(input);
             byte[] cq = Bytes.readByteArray(input);
-            coveredColumns.add(new ColumnReference(cf,cq));
+            ColumnReference ref = new ColumnReference(cf,cq);
+            coveredColumns.add(ref);
+            if(isLocalIndex) {
+                coveredColumnsMap.put(ref, new ColumnReference(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf))), cq));
+            }
         }
         // Hack to serialize whether the index row key is optimizable
         int len = WritableUtils.readVInt(input);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 8ad4d3e..9d2955b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -71,7 +71,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
             Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations());
             ValueGetter valueGetter = statePair.getFirst();
             IndexUpdate indexUpdate = statePair.getSecond();
-            indexUpdate.setTable(maintainer.getIndexTableName());
+            indexUpdate.setTable(maintainer.isLocalIndex() ? state.getEnvironment().getRegion()
+                    .getTableDesc().getName() : maintainer.getIndexTableName());
             Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
                     .getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey());
             indexUpdate.setUpdate(put);
@@ -95,7 +96,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
             Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations());
             ValueGetter valueGetter = statePair.getFirst();
             IndexUpdate indexUpdate = statePair.getSecond();
-            indexUpdate.setTable(maintainer.getIndexTableName());
+            indexUpdate.setTable(maintainer.isLocalIndex() ? state.getEnvironment().getRegion()
+                    .getTableDesc().getName() : maintainer.getIndexTableName());
             Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
                     state.getCurrentTimestamp(), env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey());
             indexUpdate.setUpdate(delete);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 17da04e..d7850ba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -138,7 +138,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
             }
 
             // its a local index table, so we need to convert it to the index table names we should disable
-            if (MetaDataUtil.isLocalIndex(ref.getTableName())) {
+            if (MetaDataUtil.hasLocalIndexColumnFamily(env.getRegion().getTableDesc())) {
                 for (String tableName : getLocalIndexNames(ref, mutations)) {
                     indexTableNames.put(tableName, minTimeStamp);
                 }
@@ -224,8 +224,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
         try {
             conn = QueryUtil.getConnectionOnServer(this.env.getConfiguration()).unwrap(
                     PhoenixConnection.class);
-            String userTableName = MetaDataUtil.getUserTableName(ref.getTableName());
-            PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
+            PTable dataTable = PhoenixRuntime.getTable(conn, ref.getTableName());
             List<PTable> indexes = dataTable.getIndexes();
             // local index used to get view id from index mutation row key.
             PTable localIndex = null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 5d8879c..3d8124c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
@@ -62,6 +63,7 @@ import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.query.KeyRange;
@@ -69,6 +71,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -165,7 +168,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
 
             // no index updates, so we are done
             if (!indexUpdates.isEmpty()) {
-                this.writer.write(indexUpdates);
+                this.writer.write(indexUpdates, true);
             }
         } catch (Throwable t) {
             String msg = "Failed to update index with entries:" + indexUpdates;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 7f403b0..3e0fd99 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -176,7 +176,7 @@ public abstract class ExplainTable {
         }
     }
 
-    private void appendPKColumnValue(StringBuilder buf, byte[] range, Boolean isNull, int slotIndex) {
+    private void appendPKColumnValue(StringBuilder buf, byte[] range, Boolean isNull, int slotIndex, boolean changeViewIndexId) {
         if (Boolean.TRUE.equals(isNull)) {
             buf.append("null");
             return;
@@ -198,8 +198,14 @@ public abstract class ExplainTable {
             type.coerceBytes(ptr, type, sortOrder, SortOrder.getDefault());
             range = ptr.get();
         }
-        Format formatter = context.getConnection().getFormatter(type);
-        buf.append(type.toStringLiteral(range, formatter));
+        if (changeViewIndexId) {
+            Short s = (Short) type.toObject(range);
+            s = (short) (s + (-Short.MAX_VALUE));
+            buf.append(s.toString());
+        } else {
+            Format formatter = context.getConnection().getFormatter(type);
+            buf.append(type.toStringLiteral(range, formatter));
+        }
     }
     
     private static class RowKeyValueIterator implements Iterator<byte[]> {
@@ -257,6 +263,7 @@ public abstract class ExplainTable {
                 minMaxIterator = new RowKeyValueIterator(schema, minMaxRange.getRange(bound));
             }
         }
+        boolean isLocalIndex = ScanUtil.isLocalIndex(context.getScan());
         boolean forceSkipScan = this.hint.hasHint(Hint.SKIP_SCAN);
         int nRanges = forceSkipScan ? scanRanges.getRanges().size() : scanRanges.getBoundSlotCount();
         for (int i = 0, minPos = 0; minPos < nRanges || minMaxIterator.hasNext(); i++) {
@@ -275,7 +282,13 @@ public abstract class ExplainTable {
                     minMaxIterator = Iterators.emptyIterator();
                 }
             }
-            appendPKColumnValue(buf, b, isNull, i);
+            if (isLocalIndex
+                    && ((context.getConnection().getTenantId() != null && i == 1) || (context
+                            .getConnection().getTenantId() == null && i == 0))) {
+                appendPKColumnValue(buf, b, isNull, i, true);
+            } else {
+                appendPKColumnValue(buf, b, isNull, i, false);
+            }
             buf.append(',');
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 0525de9..41c39a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -395,13 +395,8 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
         List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
         for(PTable indexTable : table.getIndexes()){
-            if (indexTable.getIndexType() == PTable.IndexType.LOCAL) {
-                indexTables.add(new TargetTableRef(indexTable.getName().getString(),
-                        Bytes.toString(MetaDataUtil.getLocalIndexPhysicalName(table.getPhysicalName().getBytes()))));
-            } else {
-                indexTables.add(
-                        new TargetTableRef(indexTable.getName().getString(), indexTable.getPhysicalName().getString()));
-            }
+            indexTables.add(new TargetTableRef(indexTable.getName().getString(), indexTable
+                    .getPhysicalName().getString()));
         }
         return indexTables;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 6743688..c93a58b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -206,8 +206,7 @@ public class IndexTool extends Configured implements Tool {
             // computed from the qDataTable name.
             String physicalIndexTable = pindexTable.getPhysicalName().getString();
             if (IndexType.LOCAL.equals(pindexTable.getIndexType())) {
-                physicalIndexTable = Bytes
-                        .toString(MetaDataUtil.getLocalIndexPhysicalName(pdataTable.getPhysicalName().getBytes()));
+                physicalIndexTable = qDataTable;
             }
 
             final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f593dd0..c29c0bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -81,7 +81,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator;
-import org.apache.hadoop.hbase.regionserver.LocalIndexSplitter;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -850,18 +849,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         null, priority, null);
             }
 
-            if (descriptor.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null
-                    && Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(descriptor
-                            .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
-                if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) {
-                    descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(),
-                        null, priority, null);
-                }
-            } else {
-                if (!descriptor.hasCoprocessor(LocalIndexSplitter.class.getName())
-                        && !SchemaUtil.isMetaTable(tableName)
-                        && !SchemaUtil.isSequenceTable(tableName)) {
-                    descriptor.addCoprocessor(LocalIndexSplitter.class.getName(), null, priority, null);
+            Set<byte[]> familiesKeys = descriptor.getFamiliesKeys();
+            for(byte[] family: familiesKeys) {
+                if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+                    if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) {
+                        descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(),
+                            null, priority, null);
+                        break;
+                    }
                 }
             }
 
@@ -1071,8 +1066,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             } else {
                 if (isMetaTable) {
                     checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName());
+                } else {
+                    for(Pair<byte[],Map<String,Object>> family: families) {
+                        if ((newDesc.getValue(HTableDescriptor.SPLIT_POLICY)==null || !newDesc.getValue(HTableDescriptor.SPLIT_POLICY).equals(
+                            IndexRegionSplitPolicy.class.getName()))
+                                && Bytes.toString(family.getFirst()).startsWith(
+                                    QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+                                   newDesc.setValue(HTableDescriptor.SPLIT_POLICY, IndexRegionSplitPolicy.class.getName());
+                                   break;
+                           }
+                    }
                 }
 
+
                 if (!modifyExistingMetaData) {
                     return existingDesc; // Caller already knows that no metadata was changed
                 }
@@ -1303,60 +1309,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
     }
 
-    private void ensureLocalIndexTableCreated(byte[] physicalTableName, Map<String, Object> tableProps,
-            List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, long timestamp,
-            boolean isNamespaceMapped) throws SQLException {
-        PTable table;
-        String parentTableName = SchemaUtil
-                .getParentTableNameFromIndexTable(Bytes.toString(physicalTableName),
-                        MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)
-                .replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR);
-        try {
-            synchronized (latestMetaDataLock) {
-                throwConnectionClosedIfNullMetaData();
-                table = latestMetaData.getTableRef(new PTableKey(PName.EMPTY_NAME, parentTableName)).getTable();
-                latestMetaDataLock.notifyAll();
-            }
-            if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be the case
-                throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
-            }
-        } catch (TableNotFoundException e) {
-            byte[] schemaName = Bytes.toBytes(SchemaUtil.getSchemaNameFromFullName(parentTableName));
-            byte[] tableName = Bytes.toBytes(SchemaUtil.getTableNameFromFullName(parentTableName));
-            MetaDataMutationResult result = this.getTable(null, schemaName, tableName, HConstants.LATEST_TIMESTAMP, timestamp);
-            table = result.getTable();
-            if (table == null) {
-                throw e;
-            }
-        }
-        ensureLocalIndexTableCreated(physicalTableName, tableProps, families, splits, isNamespaceMapped);
-    }
-
-    private void ensureLocalIndexTableCreated(byte[] physicalTableName, Map<String, Object> tableProps,
-            List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, boolean isNamespaceMapped)
-                    throws SQLException, TableAlreadyExistsException {
-        
-        // If we're not allowing local indexes or the hbase version is too low,
-        // don't create the local index table
-        if (   !this.getProps().getBoolean(QueryServices.ALLOW_LOCAL_INDEX_ATTRIB, QueryServicesOptions.DEFAULT_ALLOW_LOCAL_INDEX) 
-            || !this.supportsFeature(Feature.LOCAL_INDEX)) {
-                    return;
-        }
-        
-        tableProps.put(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME, TRUE_BYTES_AS_STRING);
-        HTableDescriptor desc = ensureTableCreated(physicalTableName, PTableType.TABLE, tableProps, families, splits,
-                true, isNamespaceMapped);
-        if (desc != null) {
-            if (!Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
-                String fullTableName = Bytes.toString(physicalTableName);
-                throw new TableAlreadyExistsException(
-                        "Unable to create shared physical table for local indexes.",
-                        SchemaUtil.getSchemaNameFromFullName(fullTableName),
-                        SchemaUtil.getTableNameFromFullName(fullTableName));
-            }
-        }
-    }
-
     private boolean ensureViewIndexTableDropped(byte[] physicalTableName, long timestamp) throws SQLException {
         byte[] physicalIndexName = MetaDataUtil.getViewIndexPhysicalName(physicalTableName);
         HTableDescriptor desc = null;
@@ -1385,22 +1337,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     }
 
     private boolean ensureLocalIndexTableDropped(byte[] physicalTableName, long timestamp) throws SQLException {
-        byte[] physicalIndexName = MetaDataUtil.getLocalIndexPhysicalName(physicalTableName);
         HTableDescriptor desc = null;
         boolean wasDeleted = false;
         try (HBaseAdmin admin = getAdmin()) {
             try {
-                desc = admin.getTableDescriptor(physicalIndexName);
-                if (Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
-                    this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalIndexName));
-                    final ReadOnlyProps props = this.getProps();
-                    final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
-                    if (dropMetadata) {
-                        admin.disableTable(physicalIndexName);
-                        admin.deleteTable(physicalIndexName);
-                        clearTableRegionCache(physicalIndexName);
-                        wasDeleted = true;
+                desc = admin.getTableDescriptor(physicalTableName);
+                this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalTableName));
+                final ReadOnlyProps props = this.getProps();
+                final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
+                if (dropMetadata) {
+                    List<String> columnFamiles = new ArrayList<String>();
+                    for(HColumnDescriptor cf : desc.getColumnFamilies()) {
+                        if(cf.getNameAsString().startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+                            columnFamiles.add(cf.getNameAsString());
+                        }  
                     }
+                    for(String cf: columnFamiles) {
+                        admin.deleteColumn(physicalTableName, cf);
+                    }  
+                    clearTableRegionCache(physicalTableName);
+                    wasDeleted = true;
                 }
             } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) {
                 // Ignore, as we may never have created a view index table
@@ -1424,9 +1380,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         byte[] schemaBytes = rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
         byte[] tableBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
         byte[] tableName = physicalTableName != null ? physicalTableName : SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes);
-        boolean localIndexTable = Boolean.TRUE.equals(tableProps.remove(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME));
-
-        if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && physicalTableName == null)) {
+        boolean localIndexTable = false;
+        for(Pair<byte[], Map<String, Object>> family: families) {
+               if(Bytes.toString(family.getFirst()).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+                       localIndexTable = true;
+                       break;
+               }
+        }
+        if ((tableType == PTableType.VIEW && physicalTableName != null) || (tableType != PTableType.VIEW && (physicalTableName == null || localIndexTable))) {
             // For views this will ensure that metadata already exists
             // For tables and indexes, this will create the metadata if it doesn't already exist
             ensureTableCreated(tableName, tableType, tableProps, families, splits, true, isNamespaceMapped);
@@ -1436,10 +1397,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             // Physical index table created up front for multi tenant
             // TODO: if viewIndexId is Short.MIN_VALUE, then we don't need to attempt to create it
             if (physicalTableName != null) {
-                if (localIndexTable) {
-                    ensureLocalIndexTableCreated(tableName, tableProps, families, splits,
-                            MetaDataUtil.getClientTimeStamp(m), isNamespaceMapped);
-                } else if (!MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) {
+                if (!localIndexTable && !MetaDataUtil.isMultiTenant(m, kvBuilder, ptr)) {
                     ensureViewIndexTableCreated(tenantIdBytes.length == 0 ? null : PNameFactory.newName(tenantIdBytes),
                             physicalTableName, MetaDataUtil.getClientTimeStamp(m), isNamespaceMapped);
                 }
@@ -1561,6 +1519,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 dropTables(result.getTableNamesToDelete());
             }
             invalidateTables(result.getTableNamesToDelete());
+            long timestamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
             if (tableType == PTableType.TABLE) {
                 boolean isNamespaceMapped = result.getTable().isNamespaceMapped();
                 byte[] physicalName;
@@ -1569,7 +1528,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 } else {
                     physicalName = TableName.valueOf(schemaBytes, tableBytes).getName();
                 }
-                long timestamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
                 ensureViewIndexTableDropped(physicalName, timestamp);
                 ensureLocalIndexTableDropped(physicalName, timestamp);
                 tableStatsCache.invalidate(new ImmutableBytesPtr(physicalName));
@@ -2479,6 +2437,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                 }
 
                                 if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) {
+                                    Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo());
+                                    props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
+                                    props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+                                    PhoenixConnection conn =
+                                            new PhoenixConnection(ConnectionQueryServicesImpl.this,
+                                                    metaConnection.getURL(), props, metaConnection
+                                                            .getMetaDataCache());
+                                    try {
+                                        UpgradeUtil.upgradeLocalIndexes(conn, true);
+                                    } finally {
+                                        if (conn != null) conn.close();
+                                    }
+
                                     metaConnection = addColumnsIfNotExists(metaConnection,
                                             PhoenixDatabaseMetaData.SYSTEM_CATALOG,
                                             MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2,
@@ -3622,7 +3593,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         return supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE) && renewLeaseEnabled;
     }
 
-
     @Override
     public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException {
        /*

http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 4efb708..91c84e0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -166,6 +166,7 @@ public interface QueryConstants {
     public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = ByteUtil.EMPTY_BYTE_ARRAY;
 
     public static final byte[] TRUE = new byte[] {1};
+    
 
     /**
      * Separator used between variable length keys for a composite key.
@@ -195,6 +196,16 @@ public interface QueryConstants {
     public static final ImmutableBytesPtr DEFAULT_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr(
             DEFAULT_COLUMN_FAMILY_BYTES);
 
+    public static final String LOCAL_INDEX_COLUMN_FAMILY_PREFIX = "L#";
+    public static final byte[] LOCAL_INDEX_COLUMN_FAMILY_PREFIX_BYTES = Bytes.toBytes(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
+    public static final ImmutableBytesPtr LOCAL_INDEX_COLUMN_FAMILY_PREFIX_PTR = new ImmutableBytesPtr(
+        LOCAL_INDEX_COLUMN_FAMILY_PREFIX_BYTES);
+    
+    public static final String DEFAULT_LOCAL_INDEX_COLUMN_FAMILY = LOCAL_INDEX_COLUMN_FAMILY_PREFIX + DEFAULT_COLUMN_FAMILY;
+    public static final byte[] DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES = Bytes.toBytes(DEFAULT_LOCAL_INDEX_COLUMN_FAMILY);
+    public static final ImmutableBytesPtr DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr(
+               DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES);
+
     public static final String ALL_FAMILY_PROPERTIES_KEY = "";
     public static final String SYSTEM_TABLE_PK_NAME = "pk";