You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2018/07/19 20:04:33 UTC

[05/10] phoenix git commit: PHOENIX-3534 Support multi region SYSTEM.CATALOG table (Thomas D'Silva and Rahul Gidwani)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4d6dbf9c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index ae2fa66..5e8a5dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
 import static org.apache.hadoop.hbase.KeyValueUtil.createFirstOnRow;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE_BYTES;
@@ -55,7 +53,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT_BYTES
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARENT_TENANT_ID_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES;
@@ -78,9 +75,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
 import static org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT;
-import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE_ARRAY;
 import static org.apache.phoenix.schema.PTableType.INDEX;
-import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.schema.PTableType.TABLE;
 import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
 
@@ -91,14 +87,16 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.NavigableMap;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -108,26 +106,21 @@ import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
 import org.apache.hadoop.hbase.ipc.RpcUtil;
@@ -140,6 +133,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.GlobalCache.FunctionBytesPtr;
+import org.apache.phoenix.compile.ColumnNameTrackingExpressionCompiler;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.QueryPlan;
@@ -183,6 +177,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.metrics.Metrics;
+import org.apache.phoenix.parse.DropTableStatement;
 import org.apache.phoenix.parse.LiteralParseNode;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PFunction.FunctionArgument;
@@ -190,6 +185,7 @@ import org.apache.phoenix.parse.PSchema;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -197,6 +193,8 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PColumnImpl;
@@ -212,16 +210,17 @@ import org.apache.phoenix.schema.PTable.LinkType;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.ParentTableNotFoundException;
+import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceAlreadyExistsException;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.SequenceNotFoundException;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.TableProperty;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
@@ -239,6 +238,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
@@ -248,6 +248,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.cache.Cache;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
@@ -256,17 +257,35 @@ import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
 
 /**
- *
  * Endpoint co-processor through which all Phoenix metadata mutations flow.
+ * Phoenix metadata is stored in SYSTEM.CATALOG. The table specific information
+ * is stored in a single header row. Column information is stored in a separate
+ * row per column. Linking information (indexes, views etc) are stored using a
+ * separate row for each link that uses the {@link LinkType} column value. The
+ * parent->child links are stored in a separate SYSTEM.CHILD_LINK table.
+ * Metadata for all tables/views/indexes in the same schema are stored in a
+ * single region which is enforced using the {@link MetaDataSplitPolicy}.
+ * 
+ * While creating child views we only store columns added by the view. When
+ * resolving a view we resolve all its parents and add their columns to the
+ * PTable that is returned. We lock the parent table while creating an index to
+ * ensure its metadata doesn't change.
+ * While adding or dropping columns we lock the table or view to ensure that
+ * concurrent conflicting changes are prevented. We also validate that there are
+ * no existing conflicting child view columns when we add a column to a parent.
+ * While dropping a column from a parent we check if there are any child views
+ * that need the column and throw an exception. If there are view indexes that
+ * required the column we drop them as well.
+ * While dropping a table or view that has children using the cascade option, we
+ * do not drop the child view metadata which will be removed at compaction time.
+ * If we recreate a table or view that was dropped whose child metadata hasn't
+ * been removed yet, we delete the child view metadata. When resolving a view,
+ * we resolve all its parents, if any of them are dropped the child view is
+ * considered to be dropped and we throw a TableNotFoundException.
+ * 
  * We only allow mutations to the latest version of a Phoenix table (i.e. the
- * timeStamp must be increasing).
- * For adding/dropping columns use a sequence number on the table to ensure that
- * the client has the latest version.
- * The timeStamp on the table correlates with the timeStamp on the data row.
- * TODO: we should enforce that a metadata mutation uses a timeStamp bigger than
- * any in use on the data table, b/c otherwise we can end up with data rows that
- * are not valid against a schema row.
- *
+ * timeStamp must be increasing). For adding/dropping columns we use a sequence
+ * number on the table to ensure that the client has the latest version.
  *
  * @since 0.1
  */
@@ -423,7 +442,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int COLUMN_DEF_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV);
     private static final int IS_ROW_TIMESTAMP_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV);
     private static final int COLUMN_QUALIFIER_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_QUALIFIER_KV);
-    
+
     private static final int LINK_TYPE_INDEX = 0;
 
     private static final KeyValue CLASS_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, CLASS_NAME_BYTES);
@@ -447,7 +466,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     static {
         Collections.sort(FUNCTION_KV_COLUMNS, KeyValue.COMPARATOR);
     }
-    
+
     private static final int CLASS_NAME_INDEX = FUNCTION_KV_COLUMNS.indexOf(CLASS_NAME_KV);
     private static final int JAR_PATH_INDEX = FUNCTION_KV_COLUMNS.indexOf(JAR_PATH_KV);
     private static final int RETURN_TYPE_INDEX = FUNCTION_KV_COLUMNS.indexOf(RETURN_TYPE_KV);
@@ -464,7 +483,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     static {
         Collections.sort(FUNCTION_ARG_KV_COLUMNS, KeyValue.COMPARATOR);
     }
-    
+
     private static final int IS_ARRAY_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(IS_ARRAY_KV);
     private static final int IS_CONSTANT_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(IS_CONSTANT_KV);
     private static final int DEFAULT_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(DEFAULT_VALUE_KV);
@@ -553,7 +572,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             }
 
             long currentTime = EnvironmentEdgeManager.currentTimeMillis();
-            PTable table = doGetTable(key, request.getClientTimestamp(), request.getClientVersion());
+            PTable table =
+                    doGetTable(tenantId, schemaName, tableName, request.getClientTimestamp(),
+                        null, request.getClientVersion(), request.getSkipAddingIndexes(),
+                        request.getSkipAddingParentColumns(),
+                        PTableImpl.createFromProto(request.getLockedAncestorTable()));
             if (table == null) {
                 builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
                 builder.setMutationTime(currentTime);
@@ -586,12 +609,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     builder.setMutationTime(minNonZerodisableIndexTimestamp - 1);
                 }
             }
-
-            if (table.getTimeStamp() != tableTimeStamp) {
+            // the PTable of views and indexes on views might get updated because a column is added to one of
+            // their parents (this won't change the timestamp)
+            if (table.getType()!=PTableType.TABLE || table.getTimeStamp() != tableTimeStamp) {
                 builder.setTable(PTableImpl.toProto(table));
             }
             done.run(builder.build());
-            return;
         } catch (Throwable t) {
             logger.error("getTable failed", t);
             ProtobufUtil.setControllerException(controller,
@@ -599,35 +622,349 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         }
     }
 
+    /**
+     * Used to add the columns present the ancestor hierarchy to the PTable of the given view or
+     * view index
+     * @param table PTable of the view or view index
+     * @param skipAddingIndexes if true the returned PTable won't include indexes
+     * @param skipAddingParentColumns if true the returned PTable won't include columns derived from
+     *            ancestor tables
+     * @param lockedAncestorTable ancestor table table that is being mutated (as we won't be able to
+     *            resolve this table as its locked)
+     */
+    private Pair<PTable, MetaDataProtos.MutationCode> combineColumns(PTable table, long timestamp,
+            int clientVersion, boolean skipAddingIndexes, boolean skipAddingParentColumns,
+            PTable lockedAncestorTable) throws SQLException, IOException {
+        boolean hasIndexId = table.getViewIndexId() != null;
+        if (table.getType() != PTableType.VIEW && !hasIndexId) {
+            return new Pair<PTable, MetaDataProtos.MutationCode>(table,
+                    MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
+        }
+        if (!skipAddingParentColumns) {
+            table =
+                    addDerivedColumnsFromAncestors(table, timestamp, clientVersion,
+                        lockedAncestorTable);
+            if (table==null) {
+                return new Pair<PTable, MetaDataProtos.MutationCode>(table,
+                        MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
+            }
+            // we need to resolve the indexes of views (to get ensure they also have all the columns
+            // derived from their ancestors) 
+            if (!skipAddingIndexes && !table.getIndexes().isEmpty()) {
+                List<PTable> indexes = Lists.newArrayListWithExpectedSize(table.getIndexes().size());
+                for (PTable index : table.getIndexes()) {
+                    byte[] tenantIdBytes =
+                            index.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY
+                                    : index.getTenantId().getBytes();
+                    PTable latestIndex =
+                            doGetTable(tenantIdBytes, index.getSchemaName().getBytes(),
+                                index.getTableName().getBytes(), timestamp, null, clientVersion, true,
+                                false, lockedAncestorTable);
+                    if (latestIndex == null) {
+                        throw new TableNotFoundException(
+                                "Could not find index table while combining columns "
+                                        + index.getTableName().getString() + " with tenant id "
+                                        + index.getTenantId());
+                    }
+                    indexes.add(latestIndex);
+                }
+                table = PTableImpl.makePTable(table, table.getTimeStamp(), indexes);
+            }
+        }
+        
+        MetaDataProtos.MutationCode mutationCode =
+                table != null ? MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS
+                        : MetaDataProtos.MutationCode.TABLE_NOT_FOUND;
+        return new Pair<PTable, MetaDataProtos.MutationCode>(table, mutationCode);
+    }
+
+    
+    private PTable addDerivedColumnsFromAncestors(PTable table, long timestamp,
+            int clientVersion, PTable lockedAncestorTable) throws IOException, SQLException, TableNotFoundException {
+        // combine columns for view and view indexes
+        byte[] tenantId =
+                table.getTenantId() != null ? table.getTenantId().getBytes()
+                        : ByteUtil.EMPTY_BYTE_ARRAY;
+        byte[] schemaName = table.getSchemaName().getBytes();
+        byte[] tableName = table.getTableName().getBytes();
+        String fullTableName = SchemaUtil.getTableName(table.getSchemaName().getString(),
+                table.getTableName().getString());
+        boolean hasIndexId = table.getViewIndexId() != null;
+        boolean isSalted = table.getBucketNum() != null;
+        if (table.getType() != PTableType.VIEW && !hasIndexId) {
+            return table;
+        }
+        boolean isDiverged = isDivergedView(table);
+        // here you combine columns from the parent tables the logic is as follows, if the PColumn
+        // is in the EXCLUDED_COLUMNS remove it, otherwise priority of keeping duplicate columns is
+        // child -> parent
+        List<TableInfo> ancestorList = Lists.newArrayList();
+        TableViewFinderResult viewFinderResult = new TableViewFinderResult();
+        if (PTableType.VIEW == table.getType()) {
+            findAncestorViews(tenantId, schemaName, tableName, viewFinderResult,
+                table.isNamespaceMapped());
+        } else { // is a view index
+            findAncestorViewsOfIndex(tenantId, schemaName, tableName, viewFinderResult,
+                table.isNamespaceMapped());
+        }
+        if (viewFinderResult.getLinks().isEmpty()) {
+            // no need to combine columns for local indexes on regular tables
+            return table;
+        }
+        for (TableInfo viewInfo : viewFinderResult.getLinks()) {
+            ancestorList.add(viewInfo);
+        }
+        List<PColumn> allColumns = Lists.newArrayList();
+        List<PColumn> excludedColumns = Lists.newArrayList();
+        // add my own columns first in reverse order
+        List<PColumn> myColumns = table.getColumns();
+        // skip salted column as it will be added from the base table columns
+        int startIndex = table.getBucketNum() != null ? 1 : 0;
+        for (int i = myColumns.size() - 1; i >= startIndex; i--) {
+            PColumn pColumn = myColumns.get(i);
+            if (pColumn.isExcluded()) {
+                excludedColumns.add(pColumn);
+            }
+            allColumns.add(pColumn);
+        }
+
+        // initialize map from with indexed expression to list of required data columns
+        // then remove the data columns that have not been dropped, so that we get the columns that
+        // have been dropped
+        Map<PColumn, List<String>> indexRequiredDroppedDataColMap =
+                Maps.newHashMapWithExpectedSize(table.getColumns().size());
+        if (hasIndexId) {
+            int indexPosOffset = (isSalted ? 1 : 0) + (table.isMultiTenant() ? 1 : 0) + 1;
+            ColumnNameTrackingExpressionCompiler expressionCompiler =
+                    new ColumnNameTrackingExpressionCompiler();
+            for (int i = indexPosOffset; i < table.getPKColumns().size(); i++) {
+                PColumn indexColumn = table.getPKColumns().get(i);
+                try {
+                    expressionCompiler.reset();
+                    String expressionStr = IndexUtil.getIndexColumnExpressionStr(indexColumn);
+                    ParseNode parseNode = SQLParser.parseCondition(expressionStr);
+                    parseNode.accept(expressionCompiler);
+                    indexRequiredDroppedDataColMap.put(indexColumn,
+                        Lists.newArrayList(expressionCompiler.getDataColumnNames()));
+                } catch (SQLException e) {
+                    throw new RuntimeException(e); // Impossible
+                }
+            }
+        }
+
+        // now go up from child to parent all the way to the base table:
+        PTable baseTable = null;
+        long maxTableTimestamp = -1;
+        int numPKCols = table.getPKColumns().size();
+        for (int i = 0; i < ancestorList.size(); i++) {
+            TableInfo parentTableInfo = ancestorList.get(i);
+            PTable pTable = null;
+            String fullParentTableName = SchemaUtil.getTableName(parentTableInfo.getSchemaName(),
+                parentTableInfo.getTableName());
+            PName parentTenantId =
+                    (parentTableInfo.getTenantId() != null && parentTableInfo.getTenantId().length!=0)
+                            ? PNameFactory.newName(parentTableInfo.getTenantId()) : null;
+            PTableKey pTableKey = new PTableKey(parentTenantId, fullParentTableName);
+            // if we already have the PTable of an ancestor that has been locked, no need to look up
+            // the table
+            if (lockedAncestorTable != null && lockedAncestorTable.getKey().equals(pTableKey)) {
+                pTable = lockedAncestorTable;
+            } else {
+                // if we are currently combining columns for a view index and are looking up its
+                // ancestors we do not add the indexes to the ancestor PTable (or else we end up in
+                // a circular loop)
+                // we also don't need to add parent columns of the ancestors as we combine columns
+                // from all ancestors
+                pTable =
+                        doGetTable(parentTableInfo.getTenantId(), parentTableInfo.getSchemaName(),
+                            parentTableInfo.getTableName(), timestamp, null, clientVersion, hasIndexId,
+                            true, null);
+            }
+            if (pTable == null) {
+                throw new ParentTableNotFoundException(parentTableInfo, fullTableName);
+            } else {
+                // only combine columns for view indexes (and not local indexes on regular tables
+                // which also have a viewIndexId)
+                if (i == 0 && hasIndexId && pTable.getType() != PTableType.VIEW) {
+                    return table;
+                }
+                if (TABLE.equals(pTable.getType())) {
+                    baseTable = pTable;
+                }
+                // set the final table timestamp as the max timestamp of the view/view index or its
+                // ancestors
+                maxTableTimestamp = Math.max(maxTableTimestamp, pTable.getTimeStamp());
+                if (hasIndexId) {
+                    // add all pk columns of parent tables to indexes
+                    // skip salted column as it will be added from the base table columns
+                    startIndex = pTable.getBucketNum() != null ? 1 : 0;
+                    for (int index=startIndex; index<pTable.getPKColumns().size(); index++) {
+                        PColumn column = pTable.getPKColumns().get(index);
+                        // don't add the salt column of ancestor tables for view indexes
+                        if (column.equals(SaltingUtil.SALTING_COLUMN) || column.isExcluded()) {
+                            continue;
+                        }
+                        column = IndexUtil.getIndexPKColumn(++numPKCols, column);
+                        int existingColumnIndex = allColumns.indexOf(column);
+                        if (existingColumnIndex == -1) {
+                            allColumns.add(0, column);
+                        }
+                    }
+                    for (int j = 0; j < pTable.getColumns().size(); j++) {
+                        PColumn tableColumn = pTable.getColumns().get(j);
+                        if (tableColumn.isExcluded()) {
+                            continue;
+                        }
+                        String dataColumnName = tableColumn.getName().getString();
+                        // remove from list of columns since it has not been dropped
+                        for (Entry<PColumn, List<String>> entry : indexRequiredDroppedDataColMap
+                                .entrySet()) {
+                            entry.getValue().remove(dataColumnName);
+                        }
+                    }
+                } else {
+                    List<PColumn> currAncestorTableCols = PTableImpl.getColumnsToClone(pTable);
+                    if (currAncestorTableCols != null) {
+                        for (int j = currAncestorTableCols.size() - 1; j >= 0; j--) {
+                            PColumn column = currAncestorTableCols.get(j);
+                            // for diverged views we always include pk columns of the base table. We
+                            // have to include these pk columns to be able to support adding pk
+                            // columns to the diverged view
+                            // we only include regular columns that were created before the view
+                            // diverged
+                            if (isDiverged && column.getFamilyName() != null
+                                    && column.getTimestamp() > table.getTimeStamp()) {
+                                continue;
+                            }
+                            // need to check if this column is in the list of excluded (dropped)
+                            // columns of the view
+                            int existingIndex = excludedColumns.indexOf(column);
+                            if (existingIndex != -1) {
+                                // if it is, only exclude the column if was created before the
+                                // column was dropped in the view in order to handle the case where
+                                // a base table column is dropped in a view, then dropped in the
+                                // base table and then added back to the base table
+                                if (column.getTimestamp() <= excludedColumns.get(existingIndex)
+                                        .getTimestamp()) {
+                                    continue;
+                                }
+                            }
+                            if (column.isExcluded()) {
+                                excludedColumns.add(column);
+                            } else {
+                                int existingColumnIndex = allColumns.indexOf(column);
+                                if (existingColumnIndex != -1) {
+                                    // if the same column exists in a parent and child, we keep the
+                                    // latest column
+                                    PColumn existingColumn = allColumns.get(existingColumnIndex);
+                                    if (column.getTimestamp() > existingColumn.getTimestamp()) {
+                                        allColumns.remove(existingColumnIndex);
+                                        allColumns.add(column);
+                                    }
+                                } else {
+                                    allColumns.add(column);
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        // at this point indexRequiredDroppedDataColMap only contain the columns required by a view
+        // index that have dropped
+        for (Entry<PColumn, List<String>> entry : indexRequiredDroppedDataColMap.entrySet()) {
+            if (!entry.getValue().isEmpty()) {
+                PColumn indexColumnToBeDropped = entry.getKey();
+                if (SchemaUtil.isPKColumn(indexColumnToBeDropped)) {
+                    // if an indexed column was dropped in an ancestor then we
+                    // cannot use this index an more
+                    // TODO figure out a way to actually drop this view index
+                    return null;
+                } else {
+                    allColumns.remove(indexColumnToBeDropped);
+                }
+            }
+        }
+        // lets remove the excluded columns first if the timestamp is newer than
+        // the added column
+        for (PColumn excludedColumn : excludedColumns) {
+            int index = allColumns.indexOf(excludedColumn);
+            if (index != -1) {
+                if (allColumns.get(index).getTimestamp() <= excludedColumn.getTimestamp()) {
+                    allColumns.remove(excludedColumn);
+                }
+            }
+        }
+        List<PColumn> columnsToAdd = Lists.newArrayList();
+        int position = isSalted ? 1 : 0;
+        for (int i = allColumns.size() - 1; i >= 0; i--) {
+            PColumn column = allColumns.get(i);
+            if (table.getColumns().contains(column)) {
+                // for views this column is not derived from an ancestor
+                columnsToAdd.add(new PColumnImpl(column, position));
+            } else {
+                columnsToAdd.add(new PColumnImpl(column, true, position));
+            }
+            position++;
+        }
+        // need to have the columns in the PTable to use the WhereCompiler
+        // unfortunately so this needs to be done
+        // twice....
+        // TODO set the view properties correctly instead of just setting them
+        // same as the base table
+        int baseTableColumnCount =
+                isDiverged ? QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT
+                        : columnsToAdd.size() - myColumns.size();
+        PTableImpl pTable =
+                PTableImpl.makePTable(table, baseTable, columnsToAdd, maxTableTimestamp,
+                    baseTableColumnCount);
+        return WhereConstantParser.addViewInfoToPColumnsIfNeeded(pTable);
+    }
+
     private PhoenixMetaDataCoprocessorHost getCoprocessorHost() {
         return phoenixAccessCoprocessorHost;
     }
 
+    /**
+     * @param skipAddingIndexes if true the PTable will not include indexes for tables or views
+     * @param skipAddingParentColumns if true the PTable will not include parent columns for views
+     *            or indexes
+     * @param lockedAncestorTable ancestor table table that is being mutated (as we won't be able to
+     *            resolve this table as its locked)
+     */
     private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, Region region,
-            long clientTimeStamp, int clientVersion) throws IOException, SQLException {
+            long clientTimeStamp, int clientVersion, boolean skipAddingIndexes,
+            boolean skipAddingParentColumns, PTable lockedAncestorTable)
+            throws IOException, SQLException {
         Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
         Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+        PTable newTable;
         try (RegionScanner scanner = region.getScanner(scan)) {
             PTable oldTable = (PTable)metaDataCache.getIfPresent(cacheKey);
             long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp();
-            PTable newTable;
-            newTable = getTable(scanner, clientTimeStamp, tableTimeStamp, clientVersion);
-            if (newTable == null) {
-                return null;
-            }
-            if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()
-                    || (blockWriteRebuildIndex && newTable.getIndexDisableTimestamp() > 0)) {
+            newTable = getTable(scanner, clientTimeStamp, tableTimeStamp, clientVersion, skipAddingIndexes, skipAddingParentColumns);
+            if (newTable != null
+                    && (oldTable == null || tableTimeStamp < newTable.getTimeStamp()
+                            || (blockWriteRebuildIndex && newTable.getIndexDisableTimestamp() > 0))
+                    // only cache the PTable if it has the required indexes,
+                    // the PTable added to the cache doesn't include parent columns as we always call 
+                    // combine columns after looking up the PTable from the cache
+                    && !skipAddingIndexes) {
                 if (logger.isDebugEnabled()) {
                     logger.debug("Caching table "
                             + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
-                                cacheKey.getLength()) + " at seqNum "
-                            + newTable.getSequenceNumber() + " with newer timestamp "
-                            + newTable.getTimeStamp() + " versus " + tableTimeStamp);
+                                cacheKey.getLength())
+                            + " at seqNum " + newTable.getSequenceNumber()
+                            + " with newer timestamp " + newTable.getTimeStamp() + " versus "
+                            + tableTimeStamp);
                 }
                 metaDataCache.put(cacheKey, newTable);
             }
-            return newTable;
+            if (newTable != null) {
+                newTable = combineColumns(newTable, clientTimeStamp, clientVersion, skipAddingIndexes, skipAddingParentColumns, lockedAncestorTable).getFirst();
+            }
         }
+        return newTable;
     }
 
     private List<PFunction> buildFunctions(List<byte[]> keys, Region region,
@@ -694,9 +1031,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         }
     }
 
-    private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName, long clientTimeStamp, List<PTable> indexes, int clientVersion) throws IOException, SQLException {
-        byte[] key = SchemaUtil.getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName.getBytes(), indexName.getBytes());
-        PTable indexTable = doGetTable(key, clientTimeStamp, clientVersion);
+    private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName,
+            long clientTimeStamp, List<PTable> indexes, int clientVersion, boolean skipAddingParentColumns)
+            throws IOException, SQLException {
+        byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes();
+        PTable indexTable = doGetTable(tenantIdBytes, schemaName.getBytes(), indexName.getBytes(), clientTimeStamp,
+                null, clientVersion, false, skipAddingParentColumns, null);
         if (indexTable == null) {
             ServerUtil.throwIOException("Index not found", new TableNotFoundException(schemaName.getString(), indexName.getString()));
             return;
@@ -704,6 +1044,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         indexes.add(indexTable);
     }
 
+    private void addExcludedColumnToTable(List<PColumn> pColumns, PName colName, PName famName, long timestamp) {
+        PColumnImpl pColumn = PColumnImpl.createExcludedColumn(famName, colName, timestamp);
+        pColumns.add(pColumn);
+    }
+
     private void addColumnToTable(List<Cell> results, PName colName, PName famName,
         Cell[] colKeyValues, List<PColumn> columns, boolean isSalted) {
         int i = 0;
@@ -775,7 +1120,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 isRowTimestampKV == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
                         isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(),
                         isRowTimestampKV.getValueLength()));
-        
+
         boolean isPkColumn = famName == null || famName.getString() == null;
         Cell columnQualifierKV = colKeyValues[COLUMN_QUALIFIER_INDEX];
         // Older tables won't have column qualifier metadata present. To make things simpler, just set the
@@ -784,10 +1129,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 Arrays.copyOfRange(columnQualifierKV.getValueArray(),
                     columnQualifierKV.getValueOffset(), columnQualifierKV.getValueOffset()
                             + columnQualifierKV.getValueLength()) : (isPkColumn ? null : colName.getBytes());
-        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifierBytes);
+        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifierBytes,
+            results.get(0).getTimestamp());
         columns.add(column);
     }
-    
+
     private void addArgumentToFunction(List<Cell> results, PName functionName, PName type,
         Cell[] functionKeyValues, List<FunctionArgument> arguments, short argPosition) throws SQLException {
         int i = 0;
@@ -843,7 +1189,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         arguments.add(arg);
     }
 
-    private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp, int clientVersion)
+    /**
+     * @param skipAddingIndexes if true the returned PTable for a table or view won't include indexes
+     * @param skipAddingParentColumns if true the returned PTable won't include inherited columns
+     * @return PTable 
+     */
+    private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp,
+            int clientVersion, boolean skipAddingIndexes, boolean skipAddingParentColumns)
         throws IOException, SQLException {
         List<Cell> results = Lists.newArrayList();
         scanner.next(results);
@@ -1054,15 +1406,18 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
           if (isQualifierCounterKV(colKv)) {
               Integer value = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), colKv.getValueOffset(), SortOrder.ASC);
               cqCounter.setValue(famName.getString(), value);
-          } else if (Bytes.compareTo(LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length, colKv.getQualifierArray(), colKv.getQualifierOffset(), colKv.getQualifierLength())==0) {    
+          } else if (Bytes.compareTo(LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length, colKv.getQualifierArray(), colKv.getQualifierOffset(), colKv.getQualifierLength())==0) {
               LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
-              if (linkType == LinkType.INDEX_TABLE) {
-                  addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes, clientVersion);
+              if (linkType == LinkType.INDEX_TABLE && !skipAddingIndexes) {
+                  addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes, clientVersion, skipAddingParentColumns);
               } else if (linkType == LinkType.PHYSICAL_TABLE) {
                   physicalTables.add(famName);
               } else if (linkType == LinkType.PARENT_TABLE) {
                   parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes()));
                   parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes()));
+              } else if (linkType == LinkType.EXCLUDED_COLUMN) {
+                  // add the excludedColumn
+                  addExcludedColumnToTable(columns, colName, famName, colKv.getTimestamp());
               }
           } else {
               addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null);
@@ -1076,7 +1431,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 rowKeyOrderOptimizable, transactionProvider, updateCacheFrequency, baseColumnCount,
                 indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, encodingScheme, cqCounter, useStatsForParallelization);
     }
-    
+
     private boolean isQualifierCounterKV(Cell kv) {
         int cmp =
                 Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
@@ -1084,7 +1439,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     QUALIFIER_COUNTER_KV.getQualifierOffset(), QUALIFIER_COUNTER_KV.getQualifierLength());
         return cmp == 0;
     }
-    
+
     private PSchema getSchema(RegionScanner scanner, long clientTimeStamp) throws IOException, SQLException {
         List<Cell> results = Lists.newArrayList();
         scanner.next(results);
@@ -1203,7 +1558,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             PName typeName =
                     newPName(typeKv.getRowArray(), typeKv.getRowOffset() + offset, typeKeyLength
                             - offset - 3);
-            
+
             int argPositionOffset =  offset + typeName.getBytes().length + 1;
             short argPosition = Bytes.toShort(typeKv.getRowArray(), typeKv.getRowOffset() + argPositionOffset, typeKeyLength
                 - argPositionOffset);
@@ -1218,7 +1573,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         return new PFunction(tenantId, functionName.getString(), arguments, returnType.getString(),
                 className.getString(), jarPath == null ? null : jarPath.getString(), timeStamp);
     }
-    
+
     private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, Region region,
         long clientTimeStamp) throws IOException {
         if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
@@ -1245,7 +1600,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         return null;
     }
 
-    
+
     private PFunction buildDeletedFunction(byte[] key, ImmutableBytesPtr cacheKey, Region region,
         long clientTimeStamp) throws IOException {
         if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
@@ -1322,21 +1677,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         return function.getFunctionName() == null;
     }
 
-    private PTable getTable(RegionCoprocessorEnvironment env, byte[] key, ImmutableBytesPtr cacheKey,
-            long clientTimeStamp, long asOfTimeStamp, int clientVersion) throws IOException, SQLException {
-        PTable table = loadTable(env, key, cacheKey, clientTimeStamp, asOfTimeStamp, clientVersion);
-        if (table == null || isTableDeleted(table)) { return null; }
-        return table;
-    }
-
     private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
         ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp, int clientVersion)
         throws IOException, SQLException {
         Region region = env.getRegion();
-        Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
-        PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
+        PTable table = getTableFromCache(cacheKey, clientTimeStamp, clientVersion, false, false, null);
         // We always cache the latest version - fault in if not in cache
-        if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp, clientVersion)) != null) {
+        if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp, clientVersion, false, false, null)) != null) {
             return table;
         }
         // if not found then check if newer table already exists and add delete marker for timestamp
@@ -1347,6 +1694,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         }
         return null;
     }
+    
+    /**
+     * Returns a PTable if its found in the cache.
+     */
+   private PTable getTableFromCache(ImmutableBytesPtr cacheKey, long clientTimeStamp, int clientVersion, boolean skipAddingIndexes, boolean skipAddingParentColumns, PTable lockedAncestorTable) throws SQLException, IOException {
+       Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
+       PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
+       if (table!=null)
+           table = combineColumns(table, clientTimeStamp, clientVersion, skipAddingIndexes, skipAddingParentColumns, lockedAncestorTable).getFirst();
+       return table;
+   }
 
     private PFunction loadFunction(RegionCoprocessorEnvironment env, byte[] key,
             ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp, boolean isReplace, List<Mutation> deleteMutationsForReplace)
@@ -1390,9 +1748,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     }
 
     /**
-     * 
      * @return null if the physical table row information is not present.
-     * 
      */
     private static Mutation getPhysicalTableRowForView(List<Mutation> tableMetadata, byte[][] parentTenantSchemaTableNames, byte[][] physicalSchemaTableNames) {
         int size = tableMetadata.size();
@@ -1457,7 +1813,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             schemaTableNames[2] = tName;
         }
     }
-    
+
     @Override
     public void createTable(RpcController controller, CreateTableRequest request,
             RpcCallback<MetaDataResponse> done) {
@@ -1465,6 +1821,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         byte[][] rowKeyMetaData = new byte[3][];
         byte[] schemaName = null;
         byte[] tableName = null;
+        String fullTableName = null;
         try {
             int clientVersion = request.getClientVersion();
             List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
@@ -1472,13 +1829,92 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
             schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
             tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+            fullTableName = SchemaUtil.getTableName(schemaName, tableName);
             boolean isNamespaceMapped = MetaDataUtil.isNameSpaceMapped(tableMetadata, GenericKeyValueBuilder.INSTANCE,
                     new ImmutableBytesWritable());
             final IndexType indexType = MetaDataUtil.getIndexType(tableMetadata, GenericKeyValueBuilder.INSTANCE,
                     new ImmutableBytesWritable());
+            byte[] parentTenantId = null;
             byte[] parentSchemaName = null;
             byte[] parentTableName = null;
             PTableType tableType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());
+            ViewType viewType = MetaDataUtil.getViewType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());
+
+            // Load table to see if it already exists
+            byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
+            ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
+            long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
+            PTable table = null;
+            try {
+                // Get as of latest timestamp so we can detect if we have a newer table that already
+                // exists without making an additional query
+                table = loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP,
+                        clientVersion);
+            } catch (ParentTableNotFoundException e) {
+                dropChildMetadata(e.getParentSchemaName(), e.getParentTableName(), e.getParentTenantId());
+            }
+            if (table != null) {
+                if (table.getTimeStamp() < clientTimeStamp) {
+                    // If the table is older than the client time stamp and it's deleted,
+                    // continue
+                    if (!isTableDeleted(table)) {
+                        builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
+                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                        builder.setTable(PTableImpl.toProto(table));
+                        done.run(builder.build());
+                        return;
+                    }
+                } else {
+                    builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_TABLE_FOUND);
+                    builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                    builder.setTable(PTableImpl.toProto(table));
+                    done.run(builder.build());
+                    return;
+                }
+            }
+            
+            // check if the table was dropped, but had child views that were have not yet
+            // been cleaned up by compaction
+            if (!Bytes.toString(schemaName).equals(QueryConstants.SYSTEM_SCHEMA_NAME)) {
+                dropChildMetadata(schemaName, tableName, tenantIdBytes);
+            }
+            
+            // Here we are passed the parent's columns to add to a view, PHOENIX-3534 allows for a splittable
+            // System.Catalog thus we only store the columns that are new to the view, not the parents columns,
+            // thus here we remove everything that is ORDINAL.POSITION <= baseColumnCount and update the
+            // ORDINAL.POSITIONS to be shifted accordingly.
+            // TODO PHOENIX-4767 remove the following code that removes the base table column metadata in the next release 
+            if (PTableType.VIEW.equals(tableType) && !ViewType.MAPPED.equals(viewType)) {
+                boolean isSalted = MetaDataUtil.getSaltBuckets(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable()) > 0;
+                int baseColumnCount = MetaDataUtil.getBaseColumnCount(tableMetadata) - (isSalted ? 1 : 0);
+                if (baseColumnCount > 0) {
+                    Iterator<Mutation> mutationIterator = tableMetadata.iterator();
+                    while (mutationIterator.hasNext()) {
+                        Mutation mutation = mutationIterator.next();
+                        // if not null and ordinal position < base column count remove this mutation
+                        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                        MetaDataUtil.getMutationValue(mutation, PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES,
+                            GenericKeyValueBuilder.INSTANCE, ptr);
+                        if (MetaDataUtil.getMutationValue(mutation, PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES,
+                            GenericKeyValueBuilder.INSTANCE, ptr)) {
+                            int ordinalValue = PInteger.INSTANCE.getCodec().decodeInt(ptr, SortOrder.ASC);
+                            if (ordinalValue <= baseColumnCount) {
+                                mutationIterator.remove();
+                            } else {
+                                if (mutation instanceof Put) {
+                                    byte[] ordinalPositionBytes = new byte[PInteger.INSTANCE.getByteSize()];
+                                    int newOrdinalValue = ordinalValue - baseColumnCount;
+                                    PInteger.INSTANCE.getCodec()
+                                        .encodeInt(newOrdinalValue, ordinalPositionBytes, 0);
+                                    byte[] family = Iterables.getOnlyElement(mutation.getFamilyCellMap().keySet());
+                                    MetaDataUtil.mutatePutValue((Put) mutation, family, PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, ordinalPositionBytes);
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+
             byte[] parentTableKey = null;
             Mutation viewPhysicalTableRow = null;
             Set<TableName> indexes = new HashSet<TableName>();;
@@ -1489,19 +1925,18 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 byte[][] parentSchemaTableNames = new byte[3][];
                 byte[][] parentPhysicalSchemaTableNames = new byte[3][];
                 /*
-                 * For a view, we lock the base physical table row. For a mapped view, there is 
-                 * no link present to the physical table. So the viewPhysicalTableRow is null
-                 * in that case.
+                 * For a mapped view, there is no link present to the physical table. So the
+                 * viewPhysicalTableRow is null in that case.
                  */
                 
                 viewPhysicalTableRow = getPhysicalTableRowForView(tableMetadata, parentSchemaTableNames,parentPhysicalSchemaTableNames);
-                long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
                 if (parentPhysicalSchemaTableNames[2] != null) {
                     
                     parentTableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
                             parentPhysicalSchemaTableNames[1], parentPhysicalSchemaTableNames[2]);
-                    PTable parentTable = getTable(env, parentTableKey, new ImmutableBytesPtr(parentTableKey),
-                            clientTimeStamp, clientTimeStamp, clientVersion);
+                    PTable parentTable =
+                            doGetTable(ByteUtil.EMPTY_BYTE_ARRAY, parentPhysicalSchemaTableNames[1],
+                                parentPhysicalSchemaTableNames[2], clientTimeStamp, clientVersion);
                     if (parentTable == null) {
                         builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
                         builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
@@ -1512,17 +1947,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     if (parentSchemaTableNames[2] != null
                             && Bytes.compareTo(parentSchemaTableNames[2], parentPhysicalSchemaTableNames[2]) != 0) {
                         // if view is created on view
-                        byte[] parentKey = SchemaUtil.getTableKey(
-                                parentSchemaTableNames[0] == null ? ByteUtil.EMPTY_BYTE_ARRAY : parentSchemaTableNames[0],
-                                parentSchemaTableNames[1], parentSchemaTableNames[2]);
-                        parentTable = getTable(env, parentKey, new ImmutableBytesPtr(parentKey),
-                                clientTimeStamp, clientTimeStamp, clientVersion);
+                        byte[] tenantId =
+                                parentSchemaTableNames[0] == null ? ByteUtil.EMPTY_BYTE_ARRAY
+                                        : parentSchemaTableNames[0];
+                        parentTable =
+                                doGetTable(tenantId, parentSchemaTableNames[1],
+                                    parentSchemaTableNames[2], clientTimeStamp, clientVersion);
                         if (parentTable == null) {
                             // it could be a global view
-                            parentKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
-                                    parentSchemaTableNames[1], parentSchemaTableNames[2]);
-                            parentTable = getTable(env, parentKey, new ImmutableBytesPtr(parentKey),
-                                    clientTimeStamp, clientTimeStamp, clientVersion);
+                            parentTable =
+                                    doGetTable(ByteUtil.EMPTY_BYTE_ARRAY, parentSchemaTableNames[1],
+                                        parentSchemaTableNames[2], clientTimeStamp, clientVersion);
                         }
                     }
                     if (parentTable == null) {
@@ -1538,6 +1973,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     // Mapped View
                     cParentPhysicalName = SchemaUtil.getTableNameAsBytes(schemaName, tableName);
                 }
+                parentTenantId = ByteUtil.EMPTY_BYTE_ARRAY;
                 parentSchemaName = parentPhysicalSchemaTableNames[1];
                 parentTableName = parentPhysicalSchemaTableNames[2];
                     
@@ -1547,12 +1983,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                  * For an index we lock the parent table's row which could be a physical table or a view.
                  * If the parent table is a physical table, then the tenantIdBytes is empty because
                  * we allow creating an index with a tenant connection only if the parent table is a view.
-                 */ 
+                 */
+                parentTenantId = tenantIdBytes;
                 parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
                 parentTableKey = SchemaUtil.getTableKey(tenantIdBytes, parentSchemaName, parentTableName);
-                long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
-                PTable parentTable = loadTable(env, parentTableKey, new ImmutableBytesPtr(parentTableKey),
-                        clientTimeStamp, clientTimeStamp, clientVersion);
+                PTable parentTable =
+                        doGetTable(tenantIdBytes, parentSchemaName, parentTableName, clientTimeStamp, null,
+                            request.getClientVersion(), false, false, null);
                 if (IndexType.LOCAL == indexType) {
                     cPhysicalName = parentTable.getPhysicalName().getBytes();
                     cParentPhysicalName=parentTable.getPhysicalName().getBytes();
@@ -1566,7 +2003,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             }
             
             getCoprocessorHost().preCreateTable(Bytes.toString(tenantIdBytes),
-                    SchemaUtil.getTableName(schemaName, tableName),
+                    fullTableName,
                     (tableType == PTableType.VIEW) ? null : TableName.valueOf(cPhysicalName),
                     cParentPhysicalName == null ? null : TableName.valueOf(cParentPhysicalName), tableType,
                     /* TODO: During inital create we may not need the family map */
@@ -1575,7 +2012,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             Region region = env.getRegion();
             List<RowLock> locks = Lists.newArrayList();
             // Place a lock using key for the table to be created
-            byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
             try {
                 acquireLock(region, tableKey, locks);
 
@@ -1586,74 +2022,52 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     return;
                 }
 
-                long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
                 ImmutableBytesPtr parentCacheKey = null;
                 PTable parentTable = null;
                 if (parentTableName != null) {
-                    // Check if the parent table resides in the same region. If not, don't worry about locking the parent table row
-                    // or loading the parent table. For a view, the parent table that needs to be locked is the base physical table.
-                    // For an index on view, the view header row needs to be locked. 
-                    result = checkTableKeyInRegion(parentTableKey, region);
-                    if (result == null) {
-                        acquireLock(region, parentTableKey, locks);
-                        parentCacheKey = new ImmutableBytesPtr(parentTableKey);
-                        parentTable = loadTable(env, parentTableKey, parentCacheKey, clientTimeStamp,
-                                clientTimeStamp, clientVersion);
-                        if (parentTable == null || isTableDeleted(parentTable)) {
-                            builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
-                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
-                            done.run(builder.build());
-                            return;
-                        }
-                        // make sure we haven't gone over our threshold for indexes on this table.
-                        if (execeededIndexQuota(tableType, parentTable)) {
-                            builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES);
-                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
-                            done.run(builder.build());
-                            return;
-                        }
-                        long parentTableSeqNumber;
-                        if (tableType == PTableType.VIEW && viewPhysicalTableRow != null && request.hasClientVersion()) {
-                            // Starting 4.5, the client passes the sequence number of the physical table in the table metadata.
-                            parentTableSeqNumber = MetaDataUtil.getSequenceNumber(viewPhysicalTableRow);
-                        } else if (tableType == PTableType.VIEW && !request.hasClientVersion()) {
-                            // Before 4.5, due to a bug, the parent table key wasn't available.
-                            // So don't do anything and prevent the exception from being thrown.
-                            parentTableSeqNumber = parentTable.getSequenceNumber();
-                        } else {
-                            parentTableSeqNumber = MetaDataUtil.getParentSequenceNumber(tableMetadata);
-                        }
-                        // If parent table isn't at the expected sequence number, then return
-                        if (parentTable.getSequenceNumber() != parentTableSeqNumber) {
-                            builder.setReturnCode(MetaDataProtos.MutationCode.CONCURRENT_TABLE_MUTATION);
+                    // we lock the parent table when creating an index on a table or a view
+                    if (tableType == PTableType.INDEX) {
+                        result = checkTableKeyInRegion(parentTableKey, region);
+                        if (result != null) {
+                            builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_IN_REGION);
                             builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
-                            builder.setTable(PTableImpl.toProto(parentTable));
                             done.run(builder.build());
                             return;
                         }
+                        acquireLock(region, parentTableKey, locks);
                     }
-                }
-                // Load child table next
-                ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
-                // Get as of latest timestamp so we can detect if we have a newer table that already
-                // exists without making an additional query
-                PTable table =
-                        loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP, clientVersion);
-                if (table != null) {
-                    if (table.getTimeStamp() < clientTimeStamp) {
-                        // If the table is older than the client time stamp and it's deleted,
-                        // continue
-                        if (!isTableDeleted(table)) {
-                            builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
-                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
-                            builder.setTable(PTableImpl.toProto(table));
-                            done.run(builder.build());
-                            return;
-                        }
+                    parentTable =  doGetTable(parentTenantId, parentSchemaName, parentTableName,
+                        clientTimeStamp, null, clientVersion, false, false, null);
+                    parentCacheKey = new ImmutableBytesPtr(parentTableKey);
+                    if (parentTable == null || isTableDeleted(parentTable)) {
+                        builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
+                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                        done.run(builder.build());
+                        return;
+                    }
+                    // make sure we haven't gone over our threshold for indexes on this table.
+                    if (execeededIndexQuota(tableType, parentTable)) {
+                        builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES);
+                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                        done.run(builder.build());
+                        return;
+                    }
+                    long parentTableSeqNumber;
+                    if (tableType == PTableType.VIEW && viewPhysicalTableRow != null && request.hasClientVersion()) {
+                        // Starting 4.5, the client passes the sequence number of the physical table in the table metadata.
+                        parentTableSeqNumber = MetaDataUtil.getSequenceNumber(viewPhysicalTableRow);
+                    } else if (tableType == PTableType.VIEW && !request.hasClientVersion()) {
+                        // Before 4.5, due to a bug, the parent table key wasn't available.
+                        // So don't do anything and prevent the exception from being thrown.
+                        parentTableSeqNumber = parentTable.getSequenceNumber();
                     } else {
-                        builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_TABLE_FOUND);
+                        parentTableSeqNumber = MetaDataUtil.getParentSequenceNumber(tableMetadata);
+                    }
+                    // If parent table isn't at the expected sequence number, then return
+                    if (parentTable.getSequenceNumber() != parentTableSeqNumber) {
+                        builder.setReturnCode(MetaDataProtos.MutationCode.CONCURRENT_TABLE_MUTATION);
                         builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
-                        builder.setTable(PTableImpl.toProto(table));
+                        builder.setTable(PTableImpl.toProto(parentTable));
                         done.run(builder.build());
                         return;
                     }
@@ -1665,7 +2079,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 if (tableType != PTableType.VIEW) {
                     UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetadata, tableKey, clientTimeStamp);
                 }
-                // If the parent table of the view has the auto partition sequence name attribute, modify the 
+                // If the parent table of the view has the auto partition sequence name attribute, modify the
                 // tableMetadata and set the view statement and partition column correctly
                 if (parentTable!=null && parentTable.getAutoPartitionSeqName()!=null) {
                     long autoPartitionNum = 1;
@@ -1696,7 +2110,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                         return;
                     }
                     builder.setAutoPartitionNum(autoPartitionNum);
-                    
+
                     // set the VIEW STATEMENT column of the header row
                     Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
                     NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap();
@@ -1713,13 +2127,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     if (!Bytes.equals(value, QueryConstants.EMPTY_COLUMN_VALUE_BYTES)) {
                         viewStatement = Bytes.add(value, Bytes.toBytes(" AND "), Bytes.toBytes(autoPartitionWhere));
                     }
-                    else { 
+                    else {
                         viewStatement = Bytes.toBytes(QueryUtil.getViewStatement(parentTable.getSchemaName().getString(), parentTable.getTableName().getString(), autoPartitionWhere));
                     }
                     Cell viewStatementCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_STATEMENT_BYTES,
                         cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), viewStatement);
                     cells.add(viewStatementCell);
-                    
+
                     // set the IS_VIEW_REFERENCED column of the auto partition column row
                     Put autoPartitionPut = MetaDataUtil.getPutOnlyAutoPartitionColumn(parentTable, tableMetadata);
                     familyCellMap = autoPartitionPut.getFamilyCellMap();
@@ -1743,7 +2157,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                             nSequenceSaltBuckets, parentTable.isNamespaceMapped() );
                         // TODO Review Earlier sequence was created at (SCN-1/LATEST_TIMESTAMP) and incremented at the client max(SCN,dataTable.getTimestamp), but it seems we should
                         // use always LATEST_TIMESTAMP to avoid seeing wrong sequence values by different connection having SCN
-                        // or not. 
+                        // or not.
                         long sequenceTimestamp = HConstants.LATEST_TIMESTAMP;
                         try {
                             connection.getQueryServices().createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(),
@@ -1765,6 +2179,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                             return;
                         }
                         Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
+
                         NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap();
                         List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES);
                         Cell cell = cells.get(0);
@@ -1779,13 +2194,66 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     }
                 }
                 
+                // The mutations to create a table are written in the following order:
+                // 1. Write the child link as if the next two steps fail we
+                // ignore missing children while processing a parent
+                // 2. Update the encoded column qualifier for the parent table if its on a
+                // different region server (for tables that use column qualifier encoding)
+                // if the next step fails we end up wasting a few col qualifiers
+                // 3. Finally write the mutations to create the table
+
+                // From 4.15 the parent->child links are stored in a separate table SYSTEM.CHILD_LINK
+                // TODO remove this after PHOENIX-4763 is implemented
+                List<Mutation> childLinkMutations = MetaDataUtil.removeChildLinks(tableMetadata);
+                MetaDataResponse response =
+                        processRemoteRegionMutations(
+                            PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES,
+                            childLinkMutations, fullTableName,
+                            MetaDataProtos.MutationCode.UNABLE_TO_CREATE_CHILD_LINK);
+                if (response != null) {
+                    done.run(response);
+                    return;
+                }
+
+                List<Mutation> localMutations =
+                        Lists.newArrayListWithExpectedSize(tableMetadata.size());
+                List<Mutation> remoteMutations = Lists.newArrayListWithExpectedSize(2);
+                // check to see if there are any mutations that should not be applied to this region
+                separateLocalAndRemoteMutations(region, tableMetadata, localMutations, remoteMutations);
+                if (!remoteMutations.isEmpty()) {
+                    // there should only be remote mutations if we are creating a view that uses
+                    // encoded column qualifiers (the remote mutations are to update the encoded
+                    // column qualifier counter on the parent table)
+                    if (parentTable != null && tableType == PTableType.VIEW && parentTable
+                            .getEncodingScheme() != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS) {
+                        response =
+                                processRemoteRegionMutations(
+                                    PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
+                                    remoteMutations, fullTableName,
+                                    MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE);
+                        if (response != null) {
+                            done.run(response);
+                            return;
+                        }
+                    }
+                    else {
+                        String msg = "Found unexpected mutations while creating "+fullTableName;
+                        logger.error(msg);
+                        for (Mutation m : remoteMutations) {
+                            logger.debug("Mutation rowkey : " + Bytes.toStringBinary(m.getRow()));
+                            logger.debug("Mutation family cell map : " + m.getFamilyCellMap());
+                        }
+                        throw new IllegalStateException(msg);
+                    }
+                }
+                
                 // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the
                 // system table. Basically, we get all the locks that we don't already hold for all the
                 // tableMetadata rows. This ensures we don't have deadlock situations (ensuring
                 // primary and then index table locks are held, in that order). For now, we just don't support
                 // indexing on the system table. This is an issue because of the way we manage batch mutation
                 // in the Indexer.
-                mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+                mutateRowsWithLocks(region, localMutations, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
 
                 // Invalidate the cache - the next getTable call will add it
                 // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
@@ -1809,180 +2277,97 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         } catch (Throwable t) {
             logger.error("createTable failed", t);
             ProtobufUtil.setControllerException(controller,
-                    ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
+                    ServerUtil.createIOException(fullTableName, t));
         }
     }
 
-    private boolean execeededIndexQuota(PTableType tableType, PTable parentTable) {
-        return PTableType.INDEX == tableType && parentTable.getIndexes().size() >= maxIndexesPerTable;
-    }
-    
-    private void findAllChildViews(Region region, byte[] tenantId, PTable table,
-            TableViewFinder result, long clientTimeStamp, int clientVersion) throws IOException, SQLException {
-        TableViewFinder currResult = findChildViews(region, tenantId, table, clientVersion, false);
-        result.addResult(currResult);
-        for (ViewInfo viewInfo : currResult.getViewInfoList()) {
-            byte[] viewtenantId = viewInfo.getTenantId();
-            byte[] viewSchema = viewInfo.getSchemaName();
-            byte[] viewTable = viewInfo.getViewName();
-            byte[] tableKey = SchemaUtil.getTableKey(viewtenantId, viewSchema, viewTable);
-            ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
-            PTable view = loadTable(env, tableKey, cacheKey, clientTimeStamp, clientTimeStamp, clientVersion);
-            if (view == null) {
-                logger.warn("Found orphan tenant view row in SYSTEM.CATALOG with tenantId:"
-                        + Bytes.toString(tenantId) + ", schema:"
-                        + Bytes.toString(viewSchema) + ", table:"
-                        + Bytes.toString(viewTable));
-                continue;
+    private void dropChildMetadata(byte[] schemaName, byte[] tableName, byte[] tenantIdBytes)
+            throws IOException, SQLException, ClassNotFoundException {
+        TableViewFinderResult childViewsResult = new TableViewFinderResult();
+        findAllChildViews(tenantIdBytes, schemaName, tableName, childViewsResult);
+        if (childViewsResult.hasLinks()) {
+            for (TableInfo viewInfo : childViewsResult.getLinks()) {
+                byte[] viewTenantId = viewInfo.getTenantId();
+                byte[] viewSchemaName = viewInfo.getSchemaName();
+                byte[] viewName = viewInfo.getTableName();
+                Properties props = new Properties();
+                if (viewTenantId != null && viewTenantId.length != 0)
+                    props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, Bytes.toString(viewTenantId));
+                try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration())
+                        .unwrap(PhoenixConnection.class)) {
+                    MetaDataClient client = new MetaDataClient(connection);
+                    org.apache.phoenix.parse.TableName viewTableName = org.apache.phoenix.parse.TableName
+                            .create(Bytes.toString(viewSchemaName), Bytes.toString(viewName));
+                    client.dropTable(
+                            new DropTableStatement(viewTableName, PTableType.VIEW, false, true, true));
+                }
             }
-            findAllChildViews(region, viewtenantId, view, result, clientTimeStamp, clientVersion);
         }
     }
-        
-    // TODO use child link instead once splittable system catalog (PHOENIX-3534) is implemented
-    // and we have a separate table for links.
-    private TableViewFinder findChildViews_deprecated(Region region, byte[] tenantId, PTable table, byte[] linkTypeBytes, boolean stopAfterFirst) throws IOException {
-        byte[] schemaName = table.getSchemaName().getBytes();
-        byte[] tableName = table.getTableName().getBytes();
-        boolean isMultiTenant = table.isMultiTenant();
-        Scan scan = new Scan();
-        // If the table is multi-tenant, we need to check across all tenant_ids,
-        // so we can't constrain the row key. Otherwise, any views would have
-        // the same tenantId.
-        if (!isMultiTenant) {
-            byte[] startRow = ByteUtil.concat(tenantId, QueryConstants.SEPARATOR_BYTE_ARRAY);
-            byte[] stopRow = ByteUtil.nextKey(startRow);
-            scan.setStartRow(startRow);
-            scan.setStopRow(stopRow);
-        }
-        SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, linkTypeBytes);
-        SingleColumnValueFilter tableTypeFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES,
-                CompareOp.EQUAL, PTableType.VIEW.getSerializedValue().getBytes());
-        tableTypeFilter.setFilterIfMissing(false);
-        linkFilter.setFilterIfMissing(true);
-        byte[] suffix = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, SchemaUtil
-                .getPhysicalHBaseTableName(schemaName, tableName, table.isNamespaceMapped())
-                .getBytes());
-        SuffixFilter rowFilter = new SuffixFilter(suffix);
-        List<Filter> filters = Lists.<Filter>newArrayList(linkFilter,tableTypeFilter,rowFilter);
-        if (stopAfterFirst) {
-            filters.add(new PageFilter(1));
-        }
-        FilterList filter = new FilterList(filters);
-        scan.setFilter(filter);
-        scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
-        scan.addColumn(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
-        scan.addColumn(TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
-        
-        // Original region-only scanner modified due to PHOENIX-1208
-        // RegionScanner scanner = region.getScanner(scan);
-        // The following *should* work, but doesn't due to HBASE-11837
-        // TableName systemCatalogTableName = region.getTableDesc().getTableName();
-        // HTableInterface hTable = env.getTable(systemCatalogTableName);
-        // These deprecated calls work around the issue
-        try (HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env,
-            region.getTableDesc().getTableName().getName())) {
-            boolean allViewsInCurrentRegion = true;
-            int numOfChildViews = 0;
-            List<ViewInfo> viewInfoList = Lists.newArrayList();
-            try (ResultScanner scanner = hTable.getScanner(scan)) {
-                for (Result result = scanner.next(); (result != null); result = scanner.next()) {
-                    numOfChildViews++;
-                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                    ResultTuple resultTuple = new ResultTuple(result);
-                    resultTuple.getKey(ptr);
-                    byte[] key = ptr.copyBytes();
-                    if (checkTableKeyInRegion(key, region) != null) {
-                        allViewsInCurrentRegion = false;
-                    }
-                    byte[][] rowKeyMetaData = new byte[3][];
-                    getVarChars(result.getRow(), 3, rowKeyMetaData);
-                    byte[] viewTenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
-                    byte[] viewSchemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
-                    byte[] viewName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
-                    viewInfoList.add(new ViewInfo(viewTenantId, viewSchemaName, viewName));
-                }
-                TableViewFinder tableViewFinderResult = new TableViewFinder(viewInfoList);
-                if (numOfChildViews > 0 && !allViewsInCurrentRegion) {
-                    tableViewFinderResult.setAllViewsNotInSingleRegion();
-                }
-                return tableViewFinderResult;
+
+    private boolean execeededIndexQuota(PTableType tableType, PTable parentTable) {
+        return PTableType.INDEX == tableType && parentTable.getIndexes().size() >= maxIndexesPerTable;
+    }
+
+    private void findAncestorViewsOfIndex(byte[] tenantId, byte[] schemaName, byte[] indexName,
+            TableViewFinderResult result, boolean isNamespaceMapped) throws IOException {
+        try (Table hTable =
+                env.getTable(SchemaUtil.getPhysicalTableName(
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()))) {
+            TableViewFinderResult currentResult =
+                    ViewFinder.findParentViewofIndex(hTable, tenantId, schemaName, indexName);
+            if (currentResult.getLinks().size() == 1) {
+                result.addResult(currentResult);
+                TableInfo tableInfo = currentResult.getLinks().get(0);
+                findAncestorViews(tableInfo.getTenantId(), tableInfo.getSchemaName(),
+                    tableInfo.getTableName(), result, isNamespaceMapped);
             }
+            // else this is an index on a regular table and so we don't need to combine columns
         }
     }
     
-    private TableViewFinder findChildViews_4_11(Region region, byte[] tenantId, byte[] schemaName, byte[] tableName, boolean stopAfterFirst) throws IOException {
-        Scan scan = new Scan();
-        byte[] startRow = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
-        byte[] stopRow = ByteUtil.nextKey(startRow);
-        scan.setStartRow(startRow);
-        scan.setStopRow(stopRow);
-        SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, CHILD_TABLE_BYTES);
-        Filter filter = linkFilter;
-        linkFilter.setFilterIfMissing(true);
-        if (stopAfterFirst) {
-            filter = new FilterList(linkFilter, new PageFilter(1));
-        }
-        scan.setFilter(filter);
-        scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
-        scan.addColumn(TABLE_FAMILY_BYTES, PARENT_TENANT_ID_BYTES);
-        
-        // Original region-only scanner modified due to PHOENIX-1208
-        // RegionScanner scanner = region.getScanner(scan);
-        // The following *should* work, but doesn't due to HBASE-11837
-        // TableName systemCatalogTableName = region.getTableDesc().getTableName();
-        // HTableInterface hTable = env.getTable(systemCatalogTableName);
-        // These deprecated calls work around the issue
-        try (HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env,
-            region.getTableDesc().getTableName().getName())) {
-            boolean allViewsInCurrentRegion = true;
-            int numOfChildViews = 0;
-            List<ViewInfo> viewInfoList = Lists.newArrayList();
-            try (ResultScanner scanner = hTable.getScanner(scan)) {
-                for (Result result = scanner.next(); (result != null); result = scanner.next()) {
-                    numOfChildViews++;
-                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                    ResultTuple resultTuple = new ResultTuple(result);
-                    resultTuple.getKey(ptr);
-                    byte[] key = ptr.copyBytes();
-                    if (checkTableKeyInRegion(key, region) != null) {
-                        allViewsInCurrentRegion = false;
-                    }
-                    byte[][] rowViewKeyMetaData = new byte[5][];
-                    getVarChars(result.getRow(), 5, rowViewKeyMetaData);
-                    byte[] viewTenantId = rowViewKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX];
-                    byte[] viewSchemaName = SchemaUtil.getSchemaNameFromFullName(rowViewKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]).getBytes();
-                    byte[] viewName = SchemaUtil.getTableNameFromFullName(rowViewKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]).getBytes();
-                    viewInfoList.add(new ViewInfo(viewTenantId, viewSchemaName, viewName));
-                }
-                TableViewFinder tableViewFinderResult = new TableViewFinder(viewInfoList);
-                if (numOfChildViews > 0 && !allViewsInCurrentRegion) {
-                    tableViewFinderResult.setAllViewsNotInSingleRegion();
-                }
-                return tableViewFinderResult;
+    private void findAncestorViews(byte[] tenantId, byte[] schemaName, byte[] tableName,
+            TableViewFinderResult result, boolean isNamespaceMapped) throws IOException {
+        try (Table hTable =
+                env.getTable(SchemaUtil.getPhysicalTableName(
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG

<TRUNCATED>