You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/09/24 15:28:09 UTC

[04/50] [abbrv] phoenix git commit: PHOENIX-3534 Support multi region SYSTEM.CATALOG table (Thomas D'Silva and Rahul Gidwani)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 8dd4a88..dab1048 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -29,9 +29,10 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.ExpressionProjector;
@@ -40,7 +41,12 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.expression.LikeExpression;
+import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.expression.StringBasedLikeExpression;
 import org.apache.phoenix.expression.function.ExternalSqlTypeIdFunction;
 import org.apache.phoenix.expression.function.IndexStateNameFunction;
 import org.apache.phoenix.expression.function.SQLIndexTypeFunction;
@@ -48,25 +54,33 @@ import org.apache.phoenix.expression.function.SQLTableTypeFunction;
 import org.apache.phoenix.expression.function.SQLViewTypeFunction;
 import org.apache.phoenix.expression.function.SqlTypeNameFunction;
 import org.apache.phoenix.expression.function.TransactionProviderNameFunction;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.parse.LikeParseNode.LikeType;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.LinkType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PSmallint;
+import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
 
@@ -336,6 +350,11 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER_COUNTER);
     public static final String USE_STATS_FOR_PARALLELIZATION = "USE_STATS_FOR_PARALLELIZATION";
     public static final byte[] USE_STATS_FOR_PARALLELIZATION_BYTES = Bytes.toBytes(USE_STATS_FOR_PARALLELIZATION);
+    
+    public static final String SYSTEM_CHILD_LINK_TABLE = "CHILD_LINK";
+    public static final String SYSTEM_CHILD_LINK_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CHILD_LINK_TABLE);
+    public static final byte[] SYSTEM_CHILD_LINK_NAME_BYTES = Bytes.toBytes(SYSTEM_CHILD_LINK_NAME);
+    public static final TableName SYSTEM_LINK_HBASE_TABLE_NAME = TableName.valueOf(SYSTEM_CHILD_LINK_NAME);
 
     
     //SYSTEM:LOG
@@ -467,179 +486,352 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     private static void appendConjunction(StringBuilder buf) {
         buf.append(buf.length() == 0 ? "" : " and ");
     }
-
+    
+    private static final PColumnImpl TENANT_ID_COLUMN = new PColumnImpl(PNameFactory.newName(TENANT_ID),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, DATA_TYPE_BYTES, HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl TABLE_SCHEM_COLUMN = new PColumnImpl(PNameFactory.newName(TABLE_SCHEM),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, DATA_TYPE_BYTES, HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl TABLE_NAME_COLUMN = new PColumnImpl(PNameFactory.newName(TABLE_NAME),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, DATA_TYPE_BYTES, HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl COLUMN_NAME_COLUMN = new PColumnImpl(PNameFactory.newName(COLUMN_NAME),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, DATA_TYPE_BYTES, HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl DATA_TYPE_COLUMN = new PColumnImpl(PNameFactory.newName(DATA_TYPE),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PInteger.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, DATA_TYPE_BYTES, HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl TYPE_NAME_COLUMN = new PColumnImpl(PNameFactory.newName(TYPE_NAME),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(TYPE_NAME), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl COLUMN_SIZE_COLUMN = new PColumnImpl(PNameFactory.newName(COLUMN_SIZE),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PInteger.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, COLUMN_SIZE_BYTES, HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl BUFFER_LENGTH_COLUMN = new PColumnImpl(PNameFactory.newName(BUFFER_LENGTH),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PInteger.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(BUFFER_LENGTH), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl DECIMAL_DIGITS_COLUMN = new PColumnImpl(PNameFactory.newName(DECIMAL_DIGITS),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PInteger.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, DECIMAL_DIGITS_BYTES, HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl NUM_PREC_RADIX_COLUMN = new PColumnImpl(PNameFactory.newName(NUM_PREC_RADIX),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PInteger.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(NUM_PREC_RADIX), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl NULLABLE_COLUMN = new PColumnImpl(PNameFactory.newName(NULLABLE),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PInteger.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, NULLABLE_BYTES, HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl REMARKS_COLUMN = new PColumnImpl(PNameFactory.newName(REMARKS),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(REMARKS), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl COLUMN_DEF_COLUMN = new PColumnImpl(PNameFactory.newName(COLUMN_DEF),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(COLUMN_DEF), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl SQL_DATA_TYPE_COLUMN = new PColumnImpl(PNameFactory.newName(SQL_DATA_TYPE),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PInteger.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(SQL_DATA_TYPE), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl SQL_DATETIME_SUB_COLUMN = new PColumnImpl(PNameFactory.newName(SQL_DATETIME_SUB),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PInteger.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(SQL_DATETIME_SUB), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl CHAR_OCTET_LENGTH_COLUMN = new PColumnImpl(PNameFactory.newName(COLUMN_SIZE),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PInteger.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(CHAR_OCTET_LENGTH), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl ORDINAL_POSITION_COLUMN = new PColumnImpl(PNameFactory.newName(ORDINAL_POSITION),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PInteger.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, ORDINAL_POSITION_BYTES, HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl IS_NULLABLE_COLUMN = new PColumnImpl(PNameFactory.newName(IS_NULLABLE),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(IS_NULLABLE), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl SCOPE_CATALOG_COLUMN = new PColumnImpl(PNameFactory.newName(SCOPE_CATALOG),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(SCOPE_CATALOG), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl SCOPE_SCHEMA_COLUMN = new PColumnImpl(PNameFactory.newName(SCOPE_SCHEMA),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(SCOPE_SCHEMA), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl SCOPE_TABLE_COLUMN = new PColumnImpl(PNameFactory.newName(SCOPE_TABLE),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(SCOPE_TABLE), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl SOURCE_DATA_TYPE_COLUMN = new PColumnImpl(PNameFactory.newName(SOURCE_DATA_TYPE),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(SOURCE_DATA_TYPE), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl IS_AUTOINCREMENT_COLUMN = new PColumnImpl(PNameFactory.newName(COLUMN_SIZE),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PSmallint.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(SCOPE_CATALOG), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl ARRAY_SIZE_COLUMN = new PColumnImpl(PNameFactory.newName(ARRAY_SIZE),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PInteger.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, ARRAY_SIZE_BYTES, HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl COLUMN_FAMILY_COLUMN = new PColumnImpl(PNameFactory.newName(COLUMN_FAMILY),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarchar.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, COLUMN_FAMILY_BYTES, HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl TYPE_ID_COLUMN = new PColumnImpl(PNameFactory.newName(COLUMN_SIZE),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PInteger.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, Bytes.toBytes(TYPE_ID), HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl VIEW_CONSTANT_COLUMN = new PColumnImpl(PNameFactory.newName(VIEW_CONSTANT),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PVarbinary.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, VIEW_CONSTANT_BYTES, HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl MULTI_TENANT_COLUMN = new PColumnImpl(PNameFactory.newName(MULTI_TENANT),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PBoolean.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, MULTI_TENANT_BYTES, HConstants.LATEST_TIMESTAMP);
+    private static final PColumnImpl KEY_SEQ_COLUMN = new PColumnImpl(PNameFactory.newName(KEY_SEQ),
+            PNameFactory.newName(TABLE_FAMILY_BYTES), PSmallint.INSTANCE, null, null, false, 1, SortOrder.getDefault(),
+            0, null, false, null, false, false, KEY_SEQ_BYTES, HConstants.LATEST_TIMESTAMP);
+    
+    private static final List<PColumnImpl> PK_DATUM_LIST = Lists.newArrayList(TENANT_ID_COLUMN, TABLE_SCHEM_COLUMN, TABLE_NAME_COLUMN, COLUMN_NAME_COLUMN);
+    
+    private static final RowProjector GET_COLUMNS_ROW_PROJECTOR = new RowProjector(
+            Arrays.<ColumnProjector> asList(
+                    new ExpressionProjector(TABLE_CAT, SYSTEM_CATALOG,
+                            new RowKeyColumnExpression(TENANT_ID_COLUMN,
+                                    new RowKeyValueAccessor(PK_DATUM_LIST, 0)), false),
+                    new ExpressionProjector(TABLE_SCHEM, SYSTEM_CATALOG,
+                            new RowKeyColumnExpression(TABLE_SCHEM_COLUMN,
+                                    new RowKeyValueAccessor(PK_DATUM_LIST, 1)), false),
+                    new ExpressionProjector(TABLE_NAME, SYSTEM_CATALOG,
+                            new RowKeyColumnExpression(TABLE_NAME_COLUMN,
+                                    new RowKeyValueAccessor(PK_DATUM_LIST, 2)), false),
+                    new ExpressionProjector(COLUMN_NAME, SYSTEM_CATALOG,
+                            new RowKeyColumnExpression(COLUMN_NAME_COLUMN,
+                                    new RowKeyValueAccessor(PK_DATUM_LIST, 3)), false),
+                    new ExpressionProjector(DATA_TYPE, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(DATA_TYPE_COLUMN), false),
+                    new ExpressionProjector(TYPE_NAME, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(TYPE_NAME_COLUMN), false),
+                    new ExpressionProjector(COLUMN_SIZE, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(COLUMN_SIZE_COLUMN), false),
+                    new ExpressionProjector(BUFFER_LENGTH, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(BUFFER_LENGTH_COLUMN), false),
+                    new ExpressionProjector(DECIMAL_DIGITS, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(DECIMAL_DIGITS_COLUMN), false),
+                    new ExpressionProjector(NUM_PREC_RADIX, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(NUM_PREC_RADIX_COLUMN), false),
+                    new ExpressionProjector(NULLABLE, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(NULLABLE_COLUMN), false),
+                    new ExpressionProjector(REMARKS, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(REMARKS_COLUMN), false),
+                    new ExpressionProjector(COLUMN_DEF, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(COLUMN_DEF_COLUMN), false),
+                    new ExpressionProjector(SQL_DATA_TYPE, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(SQL_DATA_TYPE_COLUMN), false),
+                    new ExpressionProjector(SQL_DATETIME_SUB, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(SQL_DATETIME_SUB_COLUMN), false),
+                    new ExpressionProjector(CHAR_OCTET_LENGTH, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(CHAR_OCTET_LENGTH_COLUMN), false),
+                    new ExpressionProjector(ORDINAL_POSITION, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(ORDINAL_POSITION_COLUMN), false),
+                    new ExpressionProjector(IS_NULLABLE, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(IS_NULLABLE_COLUMN), false),
+                    new ExpressionProjector(SCOPE_CATALOG, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(SCOPE_CATALOG_COLUMN), false),
+                    new ExpressionProjector(SCOPE_SCHEMA, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(SCOPE_SCHEMA_COLUMN), false),
+                    new ExpressionProjector(SCOPE_TABLE, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(SCOPE_TABLE_COLUMN), false),
+                    new ExpressionProjector(SOURCE_DATA_TYPE, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(SOURCE_DATA_TYPE_COLUMN), false),
+                    new ExpressionProjector(IS_AUTOINCREMENT, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(IS_AUTOINCREMENT_COLUMN), false),
+                    new ExpressionProjector(ARRAY_SIZE, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(ARRAY_SIZE_COLUMN), false),
+                    new ExpressionProjector(COLUMN_FAMILY, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(COLUMN_FAMILY_COLUMN), false),
+                    new ExpressionProjector(TYPE_ID, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(TYPE_ID_COLUMN), false),
+                    new ExpressionProjector(VIEW_CONSTANT, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(VIEW_CONSTANT_COLUMN), false),
+                    new ExpressionProjector(MULTI_TENANT, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(MULTI_TENANT_COLUMN), false),
+                    new ExpressionProjector(KEY_SEQ, SYSTEM_CATALOG,
+                            new KeyValueColumnExpression(KEY_SEQ_COLUMN), false)
+                    ), 0, true);
+    
+    private boolean match(String str, String pattern) throws SQLException {
+        LiteralExpression strExpr = LiteralExpression.newConstant(str, PVarchar.INSTANCE, SortOrder.ASC);
+        LiteralExpression patternExpr = LiteralExpression.newConstant(pattern, PVarchar.INSTANCE, SortOrder.ASC);
+        List<Expression> children = Arrays.<Expression>asList(strExpr, patternExpr);
+        LikeExpression likeExpr = StringBasedLikeExpression.create(children, LikeType.CASE_SENSITIVE);
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        boolean evaluated = likeExpr.evaluate(null, ptr);
+        Boolean result = (Boolean)likeExpr.getDataType().toObject(ptr);
+        if (evaluated) {
+            return result;
+        }
+        return false;
+    }
+    
     @Override
     public ResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern)
             throws SQLException {
-        StringBuilder buf = new StringBuilder("select \n " +
-                TENANT_ID + " " + TABLE_CAT + "," + // use this for tenant id
-                TABLE_SCHEM + "," +
-                TABLE_NAME + " ," +
-                COLUMN_NAME + "," +
-                ExternalSqlTypeIdFunction.NAME + "(" + DATA_TYPE + ") AS " + DATA_TYPE + "," +
-                SqlTypeNameFunction.NAME + "(" + DATA_TYPE + ") AS " + TYPE_NAME + "," +
-                COLUMN_SIZE + "," +
-                BUFFER_LENGTH + "," +
-                DECIMAL_DIGITS + "," +
-                NUM_PREC_RADIX + "," +
-                NULLABLE + "," +
-                REMARKS + "," +
-                COLUMN_DEF + "," +
-                SQL_DATA_TYPE + "," +
-                SQL_DATETIME_SUB + "," +
-                CHAR_OCTET_LENGTH + "," +
-                "CASE WHEN " + TENANT_POS_SHIFT + " THEN " + ORDINAL_POSITION + "-1 ELSE " + ORDINAL_POSITION + " END AS " + ORDINAL_POSITION + "," +
-                "CASE " + NULLABLE + " WHEN " + DatabaseMetaData.attributeNoNulls +  " THEN '" + Boolean.FALSE.toString() + "' WHEN " + DatabaseMetaData.attributeNullable + " THEN '" + Boolean.TRUE.toString() + "' END AS " + IS_NULLABLE + "," +
-                SCOPE_CATALOG + "," +
-                SCOPE_SCHEMA + "," +
-                SCOPE_TABLE + "," +
-                SOURCE_DATA_TYPE + "," +
-                IS_AUTOINCREMENT + "," +
-                ARRAY_SIZE + "," +
-                COLUMN_FAMILY + "," +
-                DATA_TYPE + " " + TYPE_ID + "," +// raw type id for potential internal consumption
-                VIEW_CONSTANT + "," +
-                MULTI_TENANT + "," +
-                "CASE WHEN " + TENANT_POS_SHIFT + " THEN " + KEY_SEQ + "-1 ELSE " + KEY_SEQ + " END AS " + KEY_SEQ +
-                " from " + SYSTEM_CATALOG + " " + SYSTEM_CATALOG_ALIAS + "(" + TENANT_POS_SHIFT + " BOOLEAN)");
-        StringBuilder where = new StringBuilder();
-        addTenantIdFilter(where, catalog);
-        if (schemaPattern != null) {
-            appendConjunction(where);
-            where.append(TABLE_SCHEM + (schemaPattern.length() == 0 ? " is null" : " like '" + StringUtil.escapeStringConstant(schemaPattern) + "'" ));
-        }
-        if (tableNamePattern != null && tableNamePattern.length() > 0) {
-            appendConjunction(where);
-            where.append(TABLE_NAME + " like '" + StringUtil.escapeStringConstant(tableNamePattern) + "'" );
-        }
-        // Allow a "." in columnNamePattern for column family match
-        String colPattern = null;
-        if (columnNamePattern != null && columnNamePattern.length() > 0) {
+        boolean isTenantSpecificConnection = connection.getTenantId() != null;
+        List<Tuple> tuples = Lists.newArrayListWithExpectedSize(10);
+        ResultSet rs = getTables(catalog, schemaPattern, tableNamePattern, null);
+        while (rs.next()) {
+            String schemaName = rs.getString(TABLE_SCHEM);
+            // Allow a "." in columnNamePattern for column family match
+            String colPattern = null;
             String cfPattern = null;
-            int index = columnNamePattern.indexOf('.');
-            if (index <= 0) {
-                colPattern = columnNamePattern;
-            } else {
-                cfPattern = columnNamePattern.substring(0, index);
-                if (columnNamePattern.length() > index+1) {
-                    colPattern = columnNamePattern.substring(index+1);
+            if (columnNamePattern != null && columnNamePattern.length() > 0) {
+                int index = columnNamePattern.indexOf('.');
+                if (index <= 0) {
+                    colPattern = columnNamePattern;
+                } else {
+                    cfPattern = columnNamePattern.substring(0, index);
+                    if (columnNamePattern.length() > index+1) {
+                        colPattern = columnNamePattern.substring(index+1);
+                    }
                 }
             }
-            if (cfPattern != null && cfPattern.length() > 0) { // if null or empty, will pick up all columns
-                // Will pick up only KV columns
-                appendConjunction(where);
-                where.append(COLUMN_FAMILY + " like '" + StringUtil.escapeStringConstant(cfPattern) + "'" );
-            }
-            if (colPattern != null && colPattern.length() > 0) {
-                appendConjunction(where);
-                where.append(COLUMN_NAME + " like '" + StringUtil.escapeStringConstant(colPattern) + "'" );
-            }
-        }
-        if (colPattern == null || colPattern.length() == 0) {
-            appendConjunction(where);
-            where.append(COLUMN_NAME + " is not null" );
-            appendConjunction(where);
-            where.append(LINK_TYPE + " is null" );
-        }
-        boolean isTenantSpecificConnection = connection.getTenantId() != null;
-        if (isTenantSpecificConnection) {
-            buf.append(" where (" + where + ") OR ("
-                    + COLUMN_FAMILY + " is null AND " +  COLUMN_NAME + " is null)");
-        } else {
-            buf.append(" where " + where);
-        }
-        buf.append(" order by " + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + "," + SYSTEM_CATALOG_ALIAS + "." + ORDINAL_POSITION);
-
-        Statement stmt;
-        if (isTenantSpecificConnection) {
-            stmt = connection.createStatement(new PhoenixStatementFactory() {
-                @Override
-                public PhoenixStatement newStatement(PhoenixConnection connection) {
-                    return new PhoenixStatement(connection) {
-                        @Override
-                        public PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector,
-                                StatementContext context) throws SQLException {
-                            return new PhoenixResultSet(new TenantColumnFilteringIterator(iterator, projector),
-                                    projector, context);
-                        }
-                    };
+            String tableName = rs.getString(TABLE_NAME);
+            String tenantId = rs.getString(TABLE_CAT);
+            String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+            PTable table = PhoenixRuntime.getTable(connection, fullTableName);
+            boolean isSalted = table.getBucketNum()!=null;
+            boolean tenantColSkipped = false;
+            for (PColumn column : table.getColumns()) {
+                if (isTenantSpecificConnection && column.equals(table.getPKColumns().get(0))) {
+                    // skip the tenant column
+                    tenantColSkipped = true;
+                    continue;
                 }
-            });
-        } else {
-            stmt = connection.createStatement();
-        }
-        return stmt.executeQuery(buf.toString());
-    }
-    
-//    private ColumnResolver getColumnResolverForCatalogTable() throws SQLException {
-//        TableRef tableRef = new TableRef(getTable(connection, SYSTEM_CATALOG_NAME));
-//        return FromCompiler.getResolver(tableRef);
-//    }
-    
-    /**
-     * Filters the tenant id column out of a column metadata result set (thus, where each row is a column definition).
-     * The tenant id is by definition the first column of the primary key, but the primary key does not necessarily
-     * start at the first column. Assumes columns are sorted on ordinal position.
-     */
-    private static class TenantColumnFilteringIterator extends DelegateResultIterator {
-        private final RowProjector rowProjector;
-        private final int columnFamilyIndex;
-        private final int columnNameIndex;
-        private final int multiTenantIndex;
-        private final int keySeqIndex;
-        private boolean inMultiTenantTable;
-        private boolean tenantColumnSkipped;
-
-        private TenantColumnFilteringIterator(ResultIterator delegate, RowProjector rowProjector) throws SQLException {
-            super(delegate);
-            this.rowProjector = rowProjector;
-            this.columnFamilyIndex = rowProjector.getColumnIndex(COLUMN_FAMILY);
-            this.columnNameIndex = rowProjector.getColumnIndex(COLUMN_NAME);
-            this.multiTenantIndex = rowProjector.getColumnIndex(MULTI_TENANT);
-            this.keySeqIndex = rowProjector.getColumnIndex(KEY_SEQ);
-        }
-
-        @Override
-        public Tuple next() throws SQLException {
-            Tuple tuple = super.next();
-
-            while (tuple != null
-                    && getColumn(tuple, columnFamilyIndex) == null && getColumn(tuple, columnNameIndex) == null) {
-                // new table, check if it is multitenant
-                inMultiTenantTable = getColumn(tuple, multiTenantIndex) == Boolean.TRUE;
-                tenantColumnSkipped = false;
-                // skip row representing table
-                tuple = super.next();
-            }
-
-            if (tuple != null && inMultiTenantTable && !tenantColumnSkipped) {
-                Object value = getColumn(tuple, keySeqIndex);
-                if (value != null && ((Number)value).longValue() == 1L) {
-                    tenantColumnSkipped = true;
-                    // skip tenant id primary key column
-                    return next();
+                String columnFamily = column.getFamilyName()!=null ? column.getFamilyName().getString() : null;
+                String columnName = column.getName().getString();
+                if (cfPattern != null && cfPattern.length() > 0) { // if null or empty, will pick up all columns
+                    if (columnFamily==null || !match(columnFamily, cfPattern)) {
+                        continue;
+                    }
                 }
+                if (colPattern != null && colPattern.length() > 0) {
+                    if (!match(columnName, colPattern)) {
+                        continue;
+                    }
+                }
+                // generate row key
+                // TENANT_ID, TABLE_SCHEM, TABLE_NAME , COLUMN_NAME are row key columns
+                byte[] rowKey =
+                        SchemaUtil.getColumnKey(tenantId, schemaName, tableName, columnName, null);
+
+                // add one cell for each column info
+                List<Cell> cells = Lists.newArrayListWithCapacity(25);
+                // DATA_TYPE
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES,
+                    MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    PInteger.INSTANCE.toBytes(column.getDataType().getResultSetSqlType())));
+                // TYPE_NAME
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(TYPE_NAME), MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    column.getDataType().getSqlTypeNameBytes()));
+                // COLUMN_SIZE
+                cells.add(
+                    KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES,
+                        MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                        column.getMaxLength() != null
+                                ? PInteger.INSTANCE.toBytes(column.getMaxLength())
+                                : ByteUtil.EMPTY_BYTE_ARRAY));
+                // BUFFER_LENGTH
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(BUFFER_LENGTH), MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    ByteUtil.EMPTY_BYTE_ARRAY));
+                // DECIMAL_DIGITS
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES,
+                    MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    column.getScale() != null ? PInteger.INSTANCE.toBytes(column.getScale())
+                            : ByteUtil.EMPTY_BYTE_ARRAY));
+                // NUM_PREC_RADIX
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(NUM_PREC_RADIX), MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    ByteUtil.EMPTY_BYTE_ARRAY));
+                // NULLABLE
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, NULLABLE_BYTES,
+                    MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    PInteger.INSTANCE.toBytes(SchemaUtil.getIsNullableInt(column.isNullable()))));
+                // REMARKS
+                cells.add(
+                    KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, Bytes.toBytes(REMARKS),
+                        MetaDataProtocol.MIN_TABLE_TIMESTAMP, ByteUtil.EMPTY_BYTE_ARRAY));
+                // COLUMN_DEF
+                cells.add(
+                    KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, Bytes.toBytes(COLUMN_DEF),
+                        MetaDataProtocol.MIN_TABLE_TIMESTAMP, ByteUtil.EMPTY_BYTE_ARRAY));
+                // SQL_DATA_TYPE
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(SQL_DATA_TYPE), MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    ByteUtil.EMPTY_BYTE_ARRAY));
+                // SQL_DATETIME_SUB
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(SQL_DATETIME_SUB), MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    ByteUtil.EMPTY_BYTE_ARRAY));
+                // CHAR_OCTET_LENGTH
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(CHAR_OCTET_LENGTH), MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    ByteUtil.EMPTY_BYTE_ARRAY));
+                // ORDINAL_POSITION
+                int ordinal =
+                        column.getPosition() + (isSalted ? 0 : 1) - (tenantColSkipped ? 1 : 0);
+                cells.add(
+                    KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES,
+                        MetaDataProtocol.MIN_TABLE_TIMESTAMP, PInteger.INSTANCE.toBytes(ordinal)));
+                String isNullable =
+                        column.isNullable() ? Boolean.TRUE.toString() : Boolean.FALSE.toString();
+                // IS_NULLABLE
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(IS_NULLABLE), MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    PVarchar.INSTANCE.toBytes(isNullable)));
+                // SCOPE_CATALOG
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(SCOPE_CATALOG), MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    ByteUtil.EMPTY_BYTE_ARRAY));
+                // SCOPE_SCHEMA
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(SCOPE_SCHEMA), MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    ByteUtil.EMPTY_BYTE_ARRAY));
+                // SCOPE_TABLE
+                cells.add(
+                    KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, Bytes.toBytes(SCOPE_TABLE),
+                        MetaDataProtocol.MIN_TABLE_TIMESTAMP, ByteUtil.EMPTY_BYTE_ARRAY));
+                // SOURCE_DATA_TYPE
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(SOURCE_DATA_TYPE), MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    ByteUtil.EMPTY_BYTE_ARRAY));
+                // IS_AUTOINCREMENT
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(IS_AUTOINCREMENT), MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    ByteUtil.EMPTY_BYTE_ARRAY));
+                // ARRAY_SIZE
+                cells.add(
+                    KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, ARRAY_SIZE_BYTES,
+                        MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                        column.getArraySize() != null
+                                ? PInteger.INSTANCE.toBytes(column.getArraySize())
+                                : ByteUtil.EMPTY_BYTE_ARRAY));
+                // COLUMN_FAMILY
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, COLUMN_FAMILY_BYTES,
+                    MetaDataProtocol.MIN_TABLE_TIMESTAMP, column.getFamilyName() != null
+                            ? column.getFamilyName().getBytes() : ByteUtil.EMPTY_BYTE_ARRAY));
+                // TYPE_ID
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES,
+                    Bytes.toBytes(TYPE_ID), MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    PInteger.INSTANCE.toBytes(column.getDataType().getSqlType())));
+                // VIEW_CONSTANT
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, VIEW_CONSTANT_BYTES,
+                    MetaDataProtocol.MIN_TABLE_TIMESTAMP, column.getViewConstant() != null
+                            ? column.getViewConstant() : ByteUtil.EMPTY_BYTE_ARRAY));
+                // MULTI_TENANT
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, MULTI_TENANT_BYTES,
+                    MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                    PBoolean.INSTANCE.toBytes(table.isMultiTenant())));
+                // KEY_SEQ_COLUMN
+                byte[] keySeqBytes = ByteUtil.EMPTY_BYTE_ARRAY;
+                int pkPos = table.getPKColumns().indexOf(column);
+                if (pkPos!=-1) {
+                    short keySeq = (short) (pkPos + 1 - (isSalted ? 1 : 0) - (tenantColSkipped ? 1 : 0));
+                    keySeqBytes = PSmallint.INSTANCE.toBytes(keySeq);
+                }
+                cells.add(KeyValueUtil.newKeyValue(rowKey, TABLE_FAMILY_BYTES, KEY_SEQ_BYTES,
+                        MetaDataProtocol.MIN_TABLE_TIMESTAMP, keySeqBytes));
+                Collections.sort(cells, new CellComparator());
+                Tuple tuple = new MultiKeyValueTuple(cells);
+                tuples.add(tuple);
             }
-
-            if (tuple != null && tenantColumnSkipped) {
-                ResultTuple resultTuple = (ResultTuple)tuple;
-                List<Cell> cells = resultTuple.getResult().listCells();
-                KeyValue kv = new KeyValue(resultTuple.getResult().getRow(), TABLE_FAMILY_BYTES,
-                        TENANT_POS_SHIFT_BYTES, PDataType.TRUE_BYTES);
-                List<Cell> newCells = Lists.newArrayListWithCapacity(cells.size() + 1);
-                newCells.addAll(cells);
-                newCells.add(kv);
-                Collections.sort(newCells, KeyValue.COMPARATOR);
-                tuple = new ResultTuple(Result.create(newCells));
-            }
-            return tuple;
-        }
-
-        private Object getColumn(Tuple tuple, int index) throws SQLException {
-            ColumnProjector projector = this.rowProjector.getColumnProjector(index);
-            PDataType type = projector.getExpression().getDataType();
-            return projector.getValue(tuple, type, new ImmutableBytesPtr());
         }
+        return new PhoenixResultSet(new MaterializedResultIterator(tuples), GET_COLUMNS_ROW_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false));
     }
 
     @Override
@@ -992,6 +1184,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     }
 
     @Override
+    // TODO does this need to change to use the PARENT_TABLE link
     public ResultSet getSuperTables(String catalog, String schemaPattern, String tableNamePattern) throws SQLException {
         StringBuilder buf = new StringBuilder("select \n" +
                 TENANT_ID + " " + TABLE_CAT + "," + // Use tenantId for catalog
@@ -1028,11 +1221,11 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
             throws SQLException {
         return emptyResultSet;
     }
-
+    
     private static final PDatum TABLE_TYPE_DATUM = new PDatum() {
         @Override
         public boolean isNullable() {
-            return false;
+            return true;
         }
         @Override
         public PDataType getDataType() {
@@ -1051,6 +1244,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
             return SortOrder.getDefault();
         }
     };
+
     private static final RowProjector TABLE_TYPE_ROW_PROJECTOR = new RowProjector(Arrays.<ColumnProjector>asList(
             new ExpressionProjector(TABLE_TYPE, SYSTEM_CATALOG,
                     new RowKeyColumnExpression(TABLE_TYPE_DATUM,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 015f04c..048ff81 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -528,7 +528,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
             new PColumnImpl(PNameFactory.newName(EXPLAIN_PLAN_BYTES_ESTIMATE),
                     PNameFactory.newName(EXPLAIN_PLAN_FAMILY), PLong.INSTANCE, null, null, true, 1,
                     SortOrder.getDefault(), 0, null, false, null, false, false,
-                    EXPLAIN_PLAN_BYTES_ESTIMATE);
+                    EXPLAIN_PLAN_BYTES_ESTIMATE, 0, false);
 
     private static final String EXPLAIN_PLAN_ROWS_ESTIMATE_COLUMN_NAME = "RowsEstimate";
     private static final byte[] EXPLAIN_PLAN_ROWS_ESTIMATE =
@@ -538,7 +538,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
             new PColumnImpl(PNameFactory.newName(EXPLAIN_PLAN_ROWS_ESTIMATE),
                     PNameFactory.newName(EXPLAIN_PLAN_FAMILY), PLong.INSTANCE, null, null, true, 2,
                     SortOrder.getDefault(), 0, null, false, null, false, false,
-                    EXPLAIN_PLAN_ROWS_ESTIMATE);
+                    EXPLAIN_PLAN_ROWS_ESTIMATE, 0, false);
 
     private static final String EXPLAIN_PLAN_ESTIMATE_INFO_TS_COLUMN_NAME = "EstimateInfoTS";
     private static final byte[] EXPLAIN_PLAN_ESTIMATE_INFO_TS =
@@ -548,7 +548,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
             new PColumnImpl(PNameFactory.newName(EXPLAIN_PLAN_ESTIMATE_INFO_TS),
                 PNameFactory.newName(EXPLAIN_PLAN_FAMILY), PLong.INSTANCE, null, null, true, 3,
                 SortOrder.getDefault(), 0, null, false, null, false, false,
-                EXPLAIN_PLAN_ESTIMATE_INFO_TS);
+                EXPLAIN_PLAN_ESTIMATE_INFO_TS, 0, false);
 
     private static final RowProjector EXPLAIN_PLAN_ROW_PROJECTOR_WITH_BYTE_ROW_ESTIMATES =
             new RowProjector(Arrays
@@ -1109,7 +1109,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
     private static class ExecutableDropTableStatement extends DropTableStatement implements CompilableStatement {
 
         ExecutableDropTableStatement(TableName tableName, PTableType tableType, boolean ifExists, boolean cascade) {
-            super(tableName, tableType, ifExists, cascade);
+            super(tableName, tableType, ifExists, cascade, false);
         }
 
         @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java
index 997b695..c334a81 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java
@@ -25,13 +25,15 @@ public class DropTableStatement extends MutableStatement {
     private final boolean ifExists;
     private final PTableType tableType;
     private final boolean cascade;
+    private final boolean skipAddingParentColumns;
     
 
-    protected DropTableStatement(TableName tableName, PTableType tableType, boolean ifExists, boolean cascade) {
+    public DropTableStatement(TableName tableName, PTableType tableType, boolean ifExists, boolean cascade, boolean skipAddingParentColumns) {
         this.tableName = tableName;
         this.tableType = tableType;
         this.ifExists = ifExists;
         this.cascade = cascade;
+        this.skipAddingParentColumns = skipAddingParentColumns;
     }
     
     @Override
@@ -59,4 +61,8 @@ public class DropTableStatement extends MutableStatement {
     public Operation getOperation() {
         return Operation.DELETE;
     }
+
+	public boolean getSkipAddingParentColumns() {
+		return skipAddingParentColumns;
+	}
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 9be59f3..aef2a84 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -366,7 +366,7 @@ public class ParseNodeFactory {
     }
 
     public DropTableStatement dropTable(TableName tableName, PTableType tableType, boolean ifExists, boolean cascade) {
-        return new DropTableStatement(tableName, tableType, ifExists, cascade);
+        return new DropTableStatement(tableName, tableType, ifExists, cascade, false);
     }
 
     public DropIndexStatement dropIndex(NamedNode indexName, TableName tableName, boolean ifExists) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 092bfe9..6f8cbc0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -79,13 +79,26 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
 
     public PhoenixConnection connect(String url, Properties info) throws SQLException;
 
-    public MetaDataMutationResult getTable(PName tenantId, byte[] schemaName, byte[] tableName, long tableTimestamp, long clientTimetamp) throws SQLException;
+    /**
+     * @param tableTimestamp timestamp of table if its present in the client side cache
+     * @param clientTimetamp if the client connection has an scn, or of the table is transactional
+     *            the txn write pointer
+     * @param skipAddingIndexes if true will the returned PTable will not include any indexes
+     * @param skipAddingParentColumns if true will the returned PTable will not include any columns
+     *            derived from ancestors
+     * @param lockedAncestorTable ancestor table table that is being mutated (as we won't be able to
+     *            resolve this table as its locked)
+     * @return PTable for the given tenant id, schema and table name
+     */
+    public MetaDataMutationResult getTable(PName tenantId, byte[] schemaName, byte[] tableName,
+            long tableTimestamp, long clientTimetamp, boolean skipAddingIndexes,
+            boolean skipAddingParentColumns, PTable lockedAncestorTable) throws SQLException;
     public MetaDataMutationResult getFunctions(PName tenantId, List<Pair<byte[], Long>> functionNameAndTimeStampPairs, long clientTimestamp) throws SQLException;
 
     public MetaDataMutationResult createTable(List<Mutation> tableMetaData, byte[] tableName, PTableType tableType,
             Map<String, Object> tableProps, List<Pair<byte[], Map<String, Object>>> families, byte[][] splits,
             boolean isNamespaceMapped, boolean allocateIndexId, boolean isDoNotUpgradePropSet) throws SQLException;
-    public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, PTableType tableType, boolean cascade) throws SQLException;
+    public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, PTableType tableType, boolean cascade, boolean skipAddingParentColumns) throws SQLException;
     public MetaDataMutationResult dropFunction(List<Mutation> tableMetadata, boolean ifExists) throws SQLException;
     public MetaDataMutationResult addColumn(List<Mutation> tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded, List<PColumn> columns) throws SQLException;
     public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, PTableType tableType) throws SQLException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index d3cad64..4c7630d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -58,7 +58,9 @@ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
 import static org.apache.phoenix.util.UpgradeUtil.addParentToChildLinks;
+import static org.apache.phoenix.util.UpgradeUtil.addViewIndexToParentLinks;
 import static org.apache.phoenix.util.UpgradeUtil.getSysCatalogSnapshotName;
+import static org.apache.phoenix.util.UpgradeUtil.moveChildLinks;
 import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
 
 import java.io.IOException;
@@ -206,6 +208,7 @@ import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PSynchronizedMetaData;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.ReadOnlyTableException;
@@ -1553,8 +1556,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     }
 
     @Override
-    public MetaDataMutationResult getTable(final PName tenantId, final byte[] schemaBytes, final byte[] tableBytes,
-            final long tableTimestamp, final long clientTimestamp) throws SQLException {
+    public MetaDataMutationResult getTable(final PName tenantId, final byte[] schemaBytes,
+            final byte[] tableBytes, final long tableTimestamp, final long clientTimestamp,
+            final boolean skipAddingIndexes, final boolean skipAddingParentColumns,
+            final PTable lockedAncestorTable) throws SQLException {
         final byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes();
         byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
         return metaDataCoprocessorExec(tableKey,
@@ -1571,6 +1576,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setTableTimestamp(tableTimestamp);
                 builder.setClientTimestamp(clientTimestamp);
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
+                builder.setSkipAddingParentColumns(skipAddingParentColumns);
+                builder.setSkipAddingIndexes(skipAddingIndexes);
+                if (lockedAncestorTable!=null)
+                    builder.setLockedAncestorTable(PTableImpl.toProto(lockedAncestorTable));
                 instance.getTable(controller, builder.build(), rpcCallback);
                 if(controller.getFailedOn() != null) {
                     throw controller.getFailedOn();
@@ -1582,7 +1591,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     @Override
     public MetaDataMutationResult dropTable(final List<Mutation> tableMetaData, final PTableType tableType,
-            final boolean cascade) throws SQLException {
+            final boolean cascade, final boolean skipAddingParentColumns) throws SQLException {
         byte[][] rowKeyMetadata = new byte[3][];
         SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata);
         byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
@@ -1604,6 +1613,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setTableType(tableType.getSerializedValue());
                 builder.setCascade(cascade);
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
+                builder.setSkipAddingParentColumns(skipAddingParentColumns);
                 instance.dropTable(controller, builder.build(), rpcCallback);
                 if(controller.getFailedOn() != null) {
                     throw controller.getFailedOn();
@@ -1761,7 +1771,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             byte[] schemaName = Bytes.toBytes(SchemaUtil.getSchemaNameFromFullName(fullTableName));
             byte[] tableName = Bytes.toBytes(SchemaUtil.getTableNameFromFullName(fullTableName));
             MetaDataMutationResult result = this.getTable(tenantId, schemaName, tableName, HConstants.LATEST_TIMESTAMP,
-                    timestamp);
+                    timestamp, false, false, null);
             table = result.getTable();
             if (table == null) { throw e; }
         }
@@ -2447,6 +2457,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         return MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
     }
     
+    
     // Available for testing
     protected void setUpgradeRequired() {
         this.upgradeRequired.set(true);
@@ -2486,6 +2497,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         return String.format(ddl, props.getInt(LOG_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_LOG_SALT_BUCKETS));
 
     }
+    
+    // Available for testing
+    protected String getChildLinkDDL() {
+        return setSystemDDLProperties(QueryConstants.CREATE_CHILD_LINK_METADATA);
+    }
 
     private String setSystemDDLProperties(String ddl) {
         return String.format(ddl,
@@ -2704,6 +2720,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         try {
             metaConnection.createStatement().execute(getLogTableDDL());
         } catch (TableAlreadyExistsException ignore) {}
+        try {
+            metaConnection.createStatement().executeUpdate(getChildLinkDDL());
+        } catch (TableAlreadyExistsException e) {}
         // Catch the IOException to log the error message and then bubble it up for the client to retry.
         try {
             createSysMutexTableIfNotExists(hbaseAdmin);
@@ -2984,6 +3003,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     HTableDescriptor.SPLIT_POLICY + "='" + SystemStatsSplitPolicy.class.getName() +"'"
                     );
         }
+        if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0) {
+            addViewIndexToParentLinks(metaConnection);
+            moveChildLinks(metaConnection);
+        }
         return metaConnection;
     }
 
@@ -3147,6 +3170,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             try {
                 metaConnection.createStatement().executeUpdate(getLogTableDDL());
             } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {}
+            try {
+                metaConnection.createStatement().executeUpdate(getChildLinkDDL());
+            } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {}
 
             // In case namespace mapping is enabled and system table to system namespace mapping is also enabled,
             // create an entry for the SYSTEM namespace in the SYSCAT table, so that GRANT/REVOKE commands can work
@@ -3223,8 +3249,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         metaConnection.rollback();
         PColumn column = new PColumnImpl(PNameFactory.newName("COLUMN_QUALIFIER"),
                 PNameFactory.newName(DEFAULT_COLUMN_FAMILY_NAME), PVarbinary.INSTANCE, null, null, true, numColumns,
-                SortOrder.ASC, null, null, false, null, false, false,
-                Bytes.toBytes("COLUMN_QUALIFIER"));
+                SortOrder.ASC, null, null, false, null, false, false, 
+                Bytes.toBytes("COLUMN_QUALIFIER"), timestamp);
         String upsertColumnMetadata = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
                 TENANT_ID + "," +
                 TABLE_SCHEM + "," +
@@ -3765,7 +3791,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     public MetaDataMutationResult updateIndexState(final List<Mutation> tableMetaData, String parentTableName) throws SQLException {
         byte[][] rowKeyMetadata = new byte[3][];
         SchemaUtil.getVarChars(tableMetaData.get(0).getRow(), rowKeyMetadata);
-        byte[] tableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX], rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]);
+        byte[] tableKey =
+                SchemaUtil.getTableKey(rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX],
+                    rowKeyMetadata[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX],
+                    rowKeyMetadata[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]);
         return metaDataCoprocessorExec(tableKey,
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 94a5257..5a46214 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -176,6 +176,10 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
         return String.format(ddl, props.getInt(LOG_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_LOG_SALT_BUCKETS));
 
     }
+    
+    protected String getChildLinkDDL() {
+        return setSystemDDLProperties(QueryConstants.CREATE_CHILD_LINK_METADATA);
+    }
 
     private String setSystemDDLProperties(String ddl) {
         return String.format(ddl,
@@ -233,7 +237,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     }
 
     @Override
-    public MetaDataMutationResult getTable(PName tenantId, byte[] schemaBytes, byte[] tableBytes, long tableTimestamp, long clientTimestamp) throws SQLException {
+    public MetaDataMutationResult getTable(PName tenantId, byte[] schemaBytes, byte[] tableBytes, long tableTimestamp, long clientTimestamp, boolean skipAddingIndexes, boolean skipCombiningColumns, PTable ancestorTable) throws SQLException {
         // Return result that will cause client to use it's own metadata instead of needing
         // to get anything from the server (since we don't have a connection)
         try {
@@ -294,7 +298,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     }
 
     @Override
-    public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, PTableType tableType, boolean cascade) throws SQLException {
+    public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, PTableType tableType, boolean cascade, boolean skipAddingParentColumns) throws SQLException {
         byte[] tableName = getTableName(tableMetadata, null);
         tableSplits.remove(Bytes.toString(tableName));
         return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null);
@@ -370,6 +374,11 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
                 try {
                     metaConnection.createStatement().executeUpdate(getLogTableDDL());
                 } catch (NewerTableAlreadyExistsException ignore) {}
+                try {
+                    metaConnection.createStatement()
+                            .executeUpdate(getChildLinkDDL());
+                } catch (NewerTableAlreadyExistsException ignore) {
+                }
             } catch (SQLException e) {
                 sqlE = e;
             } finally {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index cb7ce58..b3e2cb2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -107,8 +107,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     }
 
     @Override
-    public MetaDataMutationResult getTable(PName tenantId, byte[] schemaBytes, byte[] tableBytes, long tableTimestamp, long clientTimestamp) throws SQLException {
-        return getDelegate().getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, clientTimestamp);
+    public MetaDataMutationResult getTable(PName tenantId, byte[] schemaBytes, byte[] tableBytes, long tableTimestamp, long clientTimestamp, boolean skipAddingIndexes, boolean skipAddingParentColumns, PTable ancestorTable) throws SQLException {
+        return getDelegate().getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, clientTimestamp, skipAddingIndexes, skipAddingParentColumns, ancestorTable);
     }
 
     @Override
@@ -120,8 +120,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     }
 
     @Override
-    public MetaDataMutationResult dropTable(List<Mutation> tabeMetaData, PTableType tableType, boolean cascade) throws SQLException {
-        return getDelegate().dropTable(tabeMetaData, tableType, cascade);
+    public MetaDataMutationResult dropTable(List<Mutation> tabeMetaData, PTableType tableType, boolean cascade, boolean skipAddingParentColumns) throws SQLException {
+        return getDelegate().dropTable(tabeMetaData, tableType, cascade, skipAddingParentColumns);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index b31175a..8d8d47f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -344,4 +344,16 @@ public interface QueryConstants {
     public static final String HASH_JOIN_CACHE_RETRIES = "hashjoin.client.retries.number";
     public static final int DEFAULT_HASH_JOIN_CACHE_RETRIES = 5;
     
-}
+	// Links from parent to child views are stored in a separate table for
+	// scalability
+	public static final String CREATE_CHILD_LINK_METADATA = "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\""
+			+ SYSTEM_CHILD_LINK_TABLE + "\"(\n" +
+			// PK columns
+			TENANT_ID + " VARCHAR NULL," + TABLE_SCHEM + " VARCHAR NULL," + TABLE_NAME + " VARCHAR NOT NULL,"
+			+ COLUMN_NAME + " VARCHAR NULL," + COLUMN_FAMILY + " VARCHAR NULL," + LINK_TYPE + " UNSIGNED_TINYINT,\n"
+			+ "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME
+			+ "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" + HConstants.VERSIONS + "=%s,\n"
+			+ HColumnDescriptor.KEEP_DELETED_CELLS + "=%s,\n" + PhoenixDatabaseMetaData.TRANSACTIONAL + "="
+			+ Boolean.FALSE;
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 559d165..2bb9350 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -313,6 +313,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String LOG_BUFFER_WAIT_STRATEGY = "phoenix.log.wait.strategy";
     public static final String LOG_SAMPLE_RATE = "phoenix.log.sample.rate";
 
+	public static final String SYSTEM_CATALOG_SPLITTABLE = "phoenix.system.catalog.splittable";
+
     /**
      * Get executor service used for parallel scans
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 307c5dd..4be8f81 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -359,6 +359,8 @@ public class QueryServicesOptions {
     public static final String DEFAULT_LOG_SAMPLE_RATE = "1.0";
     public static final int DEFAULT_LOG_SALT_BUCKETS = 32;
 
+	public static final boolean DEFAULT_SYSTEM_CATALOG_SPLITTABLE = true;
+
     private final Configuration config;
 
     private QueryServicesOptions(Configuration config) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
index d1b71ef..5452298 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
@@ -22,9 +22,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.SchemaUtil;
 
 import java.util.List;
@@ -34,21 +32,17 @@ import java.util.List;
  * may change between the source and target clusters at different times, in particular
  * during cluster upgrades. However, tenant-owned data such as tenant-owned views need to
  * be copied. This WALEntryFilter will only allow tenant-owned rows in SYSTEM.CATALOG to
- * be replicated. Data from all other tables is automatically passed. It will also copy
- * child links in SYSTEM.CATALOG that are globally-owned but point to tenant-owned views.
- *
+ * be replicated. Data from all other tables is automatically passed.
  */
 public class SystemCatalogWALEntryFilter implements WALEntryFilter {
 
-  private static byte[] CHILD_TABLE_BYTES =
-      new byte[]{PTable.LinkType.CHILD_TABLE.getSerializedValue()};
-
   @Override
   public WAL.Entry filter(WAL.Entry entry) {
 
-    //if the WAL.Entry's table isn't System.Catalog, it auto-passes this filter
+    //if the WAL.Entry's table isn't System.Catalog or System.Child_Link, it auto-passes this filter
     //TODO: when Phoenix drops support for pre-1.3 versions of HBase, redo as a WALCellFilter
-    if (!SchemaUtil.isMetaTable(entry.getKey().getTablename().getName())){
+    byte[] tableName = entry.getKey().getTablename().getName();
+	if (!SchemaUtil.isMetaTable(tableName)){
       return entry;
     }
 
@@ -71,35 +65,6 @@ public class SystemCatalogWALEntryFilter implements WALEntryFilter {
     ImmutableBytesWritable key =
         new ImmutableBytesWritable(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
     //rows in system.catalog that aren't tenant-owned will have a leading separator byte
-    boolean isTenantRowCell = key.get()[key.getOffset()] != QueryConstants.SEPARATOR_BYTE;
-
-    /* In addition to the tenant view rows, there are parent-child links (see PHOENIX-2051) that
-     * provide an efficient way for a parent table or view to look up its children.
-     * These rows override SYSTEM_CATALOG.COLUMN_NAME with the child tenant_id,
-     * if any, and contain only a single Cell, LINK_TYPE, which is of PTable.LinkType.Child
-     */
-    boolean isChildLinkToTenantView = false;
-    if (!isTenantRowCell) {
-      ImmutableBytesWritable columnQualifier = new ImmutableBytesWritable(cell.getQualifierArray(),
-          cell.getQualifierOffset(), cell.getQualifierLength());
-      boolean isChildLink = columnQualifier.compareTo(PhoenixDatabaseMetaData.LINK_TYPE_BYTES) == 0;
-      if (isChildLink) {
-        ImmutableBytesWritable columnValue = new ImmutableBytesWritable(cell.getValueArray(),
-            cell.getValueOffset(), cell.getValueLength());
-        if (columnValue.compareTo(CHILD_TABLE_BYTES) == 0) {
-          byte[][] rowViewKeyMetadata = new byte[5][];
-          SchemaUtil.getVarChars(key.get(), key.getOffset(),
-              key.getLength(), 0, rowViewKeyMetadata);
-          //if the child link is to a tenant-owned view,
-          // the COLUMN_NAME field will be the byte[] of the tenant
-          //otherwise, it will be an empty byte array
-          // (NOT QueryConstants.SEPARATOR_BYTE, but a byte[0])
-          isChildLinkToTenantView =
-              rowViewKeyMetadata[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length != 0;
-        }
-      }
-
-    }
-    return isTenantRowCell || isChildLinkToTenantView;
+    return key.get()[key.getOffset()] != QueryConstants.SEPARATOR_BYTE;
   }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index 5c9cc2e..4d9abaf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -76,6 +76,21 @@ public class DelegateColumn extends DelegateDatum implements PColumn {
     }
 
     @Override
+    public long getTimestamp() {
+        return getDelegate().getTimestamp();
+    }
+
+    @Override
+    public boolean isDerived() {
+        return getDelegate().isDerived();
+    }
+
+    @Override
+    public boolean isExcluded() {
+        return getDelegate().isExcluded();
+    }
+
+    @Override
     public boolean isRowTimestamp() {
         return getDelegate().isRowTimestamp();
     }    

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 21391f3..625d03f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -111,7 +111,6 @@ import java.io.IOException;
 import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Types;
@@ -319,7 +318,8 @@ public class MetaDataClient {
                     TABLE_TYPE +
                     ") VALUES (?, ?, ?, ?, ?, ?, ?)";
     
-    private static final String CREATE_VIEW_LINK =
+    
+    public static final String CREATE_VIEW_LINK =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
                     TENANT_ID + "," +
                     TABLE_SCHEM + "," +
@@ -347,6 +347,16 @@ public class MetaDataClient {
                     COLUMN_FAMILY + "," +
                     LINK_TYPE + 
                     ") VALUES (?, ?, ?, ?, ?, ?)";
+    
+    private static final String CREATE_VIEW_INDEX_PARENT_LINK =
+    		"UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
+                    TENANT_ID + "," +
+                    TABLE_SCHEM + "," +
+                    TABLE_NAME + "," +
+                    COLUMN_FAMILY + "," +
+                    LINK_TYPE + 
+                    ") VALUES (?, ?, ?, ?, ?)";
+    
     private static final String INCREMENT_SEQ_NUM =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
                     TENANT_ID + "," +
@@ -642,7 +652,7 @@ public class MetaDataClient {
                 ConnectionQueryServices queryServices = connection.getQueryServices();
                 result =
                         queryServices.getTable(tenantId, schemaBytes, tableBytes, tableTimestamp,
-                            resolvedTimestamp);
+                            resolvedTimestamp, false, false, null);
                 // if the table was assumed to be non transactional, but is actually transactional
                 // then re-resolve as of the right timestamp
                 if (result.getTable() != null
@@ -655,7 +665,7 @@ public class MetaDataClient {
                     if (result.getTable().getTimeStamp() >= resolveTimestamp) {
                         result =
                                 queryServices.getTable(tenantId, schemaBytes, tableBytes,
-                                    tableTimestamp, resolveTimestamp);
+                                    tableTimestamp, resolveTimestamp, false, false, null);
                     }
                 }
 
@@ -934,7 +944,7 @@ public class MetaDataClient {
         colUpsert.setString(4, column.getName().getString());
         colUpsert.setString(5, column.getFamilyName() == null ? null : column.getFamilyName().getString());
         colUpsert.setInt(6, column.getDataType().getSqlType());
-        colUpsert.setInt(7, column.isNullable() ? ResultSetMetaData.columnNullable : ResultSetMetaData.columnNoNulls);
+        colUpsert.setInt(7, SchemaUtil.getIsNullableInt(column.isNullable()));
         if (column.getMaxLength() == null) {
             colUpsert.setNull(8, Types.INTEGER);
         } else {
@@ -979,7 +989,7 @@ public class MetaDataClient {
         colUpsert.execute();
     }
 
-    private void addFunctionArgMutation(String functionName, FunctionArgument arg, PreparedStatement argUpsert, int position) throws SQLException {
+	private void addFunctionArgMutation(String functionName, FunctionArgument arg, PreparedStatement argUpsert, int position) throws SQLException {
         argUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
         argUpsert.setString(2, functionName);
         argUpsert.setString(3, arg.getArgumentType());
@@ -1043,7 +1053,8 @@ public class MetaDataClient {
                 isNull = false;
             }
             PColumn column = new PColumnImpl(PNameFactory.newName(columnName), familyName, def.getDataType(),
-                    def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp, false, columnQualifierBytes);
+                    def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp, false, columnQualifierBytes,
+                HConstants.LATEST_TIMESTAMP);
             return column;
         } catch (IllegalArgumentException e) { // Based on precondition check in constructor
             throw new SQLException(e);
@@ -1957,6 +1968,17 @@ public class MetaDataClient {
                 linkStatement.setLong(6, parent.getSequenceNumber());
                 linkStatement.setString(7, PTableType.INDEX.getSerializedValue());
                 linkStatement.execute();
+                
+                // Add row linking index table to parent table for indexes on views
+                if (parent.getType() == PTableType.VIEW) {
+	                linkStatement = connection.prepareStatement(CREATE_VIEW_INDEX_PARENT_LINK);
+	                linkStatement.setString(1, tenantIdStr);
+	                linkStatement.setString(2, schemaName);
+	                linkStatement.setString(3, tableName);
+	                linkStatement.setString(4, parent.getName().getString());
+	                linkStatement.setByte(5, LinkType.VIEW_INDEX_PARENT_TABLE.getSerializedValue());
+	                linkStatement.execute();
+                }
             }
 
             PrimaryKeyConstraint pkConstraint = statement.getPrimaryKeyConstraint();
@@ -2195,6 +2217,7 @@ public class MetaDataClient {
                     }
                     disableWAL = (disableWALProp == null ? parent.isWALDisabled() : disableWALProp);
                     defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
+                    // TODO PHOENIX-4766 Add an options to stop sending parent metadata when creating views
                     List<PColumn> allColumns = parent.getColumns();
                     if (saltBucketNum != null) { // Don't include salt column in columns, as it should not have it when created
                         allColumns = allColumns.subList(1, allColumns.size());
@@ -2215,6 +2238,7 @@ public class MetaDataClient {
                     linkStatement.setString(6, parent.getTenantId() == null ? null : parent.getTenantId().getString());
                     linkStatement.execute();
                     // Add row linking parent to view
+                    // TODO From 4.16 write the child links to SYSTEM.CHILD_LINK directly 
                     linkStatement = connection.prepareStatement(CREATE_CHILD_LINK);
                     linkStatement.setString(1, parent.getTenantId() == null ? null : parent.getTenantId().getString());
                     linkStatement.setString(2, parent.getSchemaName() == null ? null : parent.getSchemaName().getString());
@@ -2859,7 +2883,8 @@ public class MetaDataClient {
         String schemaName = connection.getSchema() != null && statement.getTableName().getSchemaName() == null
                 ? connection.getSchema() : statement.getTableName().getSchemaName();
         String tableName = statement.getTableName().getTableName();
-        return dropTable(schemaName, tableName, null, statement.getTableType(), statement.ifExists(), statement.cascade());
+		return dropTable(schemaName, tableName, null, statement.getTableType(), statement.ifExists(),
+				statement.cascade(), statement.getSkipAddingParentColumns());
     }
 
     public MutationState dropFunction(DropFunctionStatement statement) throws SQLException {
@@ -2870,7 +2895,7 @@ public class MetaDataClient {
         String schemaName = statement.getTableName().getSchemaName();
         String tableName = statement.getIndexName().getName();
         String parentTableName = statement.getTableName().getTableName();
-        return dropTable(schemaName, tableName, parentTableName, PTableType.INDEX, statement.ifExists(), false);
+		return dropTable(schemaName, tableName, parentTableName, PTableType.INDEX, statement.ifExists(), false, false);
     }
 
     private MutationState dropFunction(String functionName,
@@ -2913,8 +2938,8 @@ public class MetaDataClient {
             connection.setAutoCommit(wasAutoCommit);
         }
     }
-    private MutationState dropTable(String schemaName, String tableName, String parentTableName, PTableType tableType,
-            boolean ifExists, boolean cascade) throws SQLException {
+    MutationState dropTable(String schemaName, String tableName, String parentTableName, PTableType tableType,
+            boolean ifExists, boolean cascade, boolean skipAddingParentColumns) throws SQLException {
         connection.rollback();
         boolean wasAutoCommit = connection.getAutoCommit();
         try {
@@ -2932,7 +2957,7 @@ public class MetaDataClient {
                 Delete linkDelete = new Delete(linkKey, clientTimeStamp);
                 tableMetaData.add(linkDelete);
             }
-            MetaDataMutationResult result = connection.getQueryServices().dropTable(tableMetaData, tableType, cascade);
+            MetaDataMutationResult result = connection.getQueryServices().dropTable(tableMetaData, tableType, cascade, skipAddingParentColumns);
             MutationCode code = result.getMutationCode();
             PTable table = result.getTable();
             switch (code) {
@@ -3315,7 +3340,7 @@ public class MetaDataClient {
                 List<PColumn> columns = Lists.newArrayListWithExpectedSize(numCols);
                 Set<String> colFamiliesForPColumnsToBeAdded = new LinkedHashSet<>();
                 Set<String> families = new LinkedHashSet<>();
-                PTable tableForCQCounters = tableType == PTableType.VIEW ? PhoenixRuntime.getTable(connection, table.getPhysicalName().getString()) : table;;
+                PTable tableForCQCounters = tableType == PTableType.VIEW ? PhoenixRuntime.getTable(connection, table.getPhysicalName().getString()) : table;
                 EncodedCQCounter cqCounterToUse = tableForCQCounters.getEncodedCQCounter();
                 Map<String, Integer> changedCqCounters = new HashMap<>(numCols);
                 if (numCols > 0 ) {
@@ -3527,9 +3552,9 @@ public class MetaDataClient {
                     if (table.getIndexes().isEmpty() || (numPkColumnsAdded==0 && ! metaProperties.getNonTxToTx())) {
                         connection.addTable(result.getTable(), resolvedTimeStamp);
                         table = result.getTable();
-                    } else if (metaPropertiesEvaluated.getUpdateCacheFrequency() != null) {
-                        // Force removal from cache as the update cache frequency has changed
-                        // Note that clients outside this JVM won't be affected.
+                    } else  {
+                        // remove the table from the cache, it will be fetched from the server the
+                        // next time it is resolved
                         connection.removeTable(tenantId, fullTableName, null, resolvedTimeStamp);
                     }
                     // Delete rows in view index if we haven't dropped it already

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataSplitPolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataSplitPolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataSplitPolicy.java
index 154a9c2..a11c31a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataSplitPolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataSplitPolicy.java
@@ -17,16 +17,26 @@
  */
 package org.apache.phoenix.schema;
 
-import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 
-import org.apache.phoenix.util.SchemaUtil;
+public class MetaDataSplitPolicy extends SplitOnLeadingVarCharColumnsPolicy {
 
+	@Override
+	protected boolean shouldSplit() {
+		Configuration conf = getConf();
+		return super.shouldSplit() && conf.getBoolean(QueryServices.SYSTEM_CATALOG_SPLITTABLE,
+				QueryServicesOptions.DEFAULT_SYSTEM_CATALOG_SPLITTABLE);
+	}
 
-public class MetaDataSplitPolicy extends ConstantSizeRegionSplitPolicy {
+	@Override
+	protected int getColumnToSplitAt() {
+		// SYSTEM.CATALOG rowkey is (tenant id, schema name, table name, column name,
+		// column family) ensure all meta data rows for a given schema are in the same
+		// region (indexes and tables are in the same schema as we lock the parent table
+		// when modifying an index)
+		return 2;
+	}
 
-    @Override
-    protected boolean shouldSplit() {
-        // never split SYSTEM.CATALOG
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/93fdd5ba/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
index 9e26227..2e518c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
@@ -53,6 +53,18 @@ public interface PColumn extends PDatum {
     int getEstimatedSize();
     
     String getExpressionStr();
+
+    /**
+     * @return the cell timestamp associated with this PColumn
+     */
+    long getTimestamp();
+
+    /**
+     * @return is the column derived from some other table / view or not
+     */
+    boolean isDerived();
+
+    boolean isExcluded();
     
     /**
      * @return whether this column represents/stores the hbase cell timestamp.