You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/06/18 23:21:50 UTC

[1/2] phoenix git commit: PHOENIX-1504 Support adding column to a table that has views (Samarth Jain/Dave Hacker)

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 7fe231732 -> e001c63f8


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 73d1123..f82c594 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -18,10 +18,13 @@
 package org.apache.phoenix.query;
 
 
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BUFFER_LENGTH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHAR_OCTET_LENGTH;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
@@ -33,7 +36,9 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT;
@@ -43,9 +48,12 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
@@ -56,6 +64,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_KEY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_PREC_RADIX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
@@ -63,7 +72,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REF_GENERATION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REGION_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REMARKS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_CATALOG;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_SCHEMA;
@@ -76,27 +85,19 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SOURCE_DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATETIME_SUB;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
@@ -184,6 +185,8 @@ public interface QueryConstants {
     public static final BigDecimal BD_MILLIS_IN_DAY = BigDecimal.valueOf(QueryConstants.MILLIS_IN_DAY);
     public static final int MAX_ALLOWED_NANOS = 999999999;
     public static final int NANOS_IN_SECOND = BigDecimal.valueOf(Math.pow(10, 9)).intValue();
+    public static final int DIVORCED_VIEW_BASE_COLUMN_COUNT = -100;
+    public static final int BASE_TABLE_BASE_COLUMN_COUNT = -1;
     public static final String CREATE_TABLE_METADATA =
             // Do not use IF NOT EXISTS as we sometimes catch the TableAlreadyExists
             // exception and add columns to the SYSTEM.TABLE dynamically.
@@ -241,7 +244,8 @@ public interface QueryConstants {
             IS_AUTOINCREMENT + " VARCHAR," +
             INDEX_TYPE + " UNSIGNED_TINYINT," +
             INDEX_DISABLE_TIMESTAMP + " BIGINT," +
-                    STORE_NULLS + " BOOLEAN," +
+            STORE_NULLS + " BOOLEAN," +
+            BASE_COLUMN_COUNT + " INTEGER," +
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ","
             + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" +
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index b719aae..2a43679 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -236,4 +236,9 @@ public class DelegateTable implements PTable {
     public PName getParentSchemaName() {
         return delegate.getParentSchemaName();
     }
+
+    @Override
+    public int getBaseColumnCount() {
+        return delegate.getBaseColumnCount();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 75678fd..e7c3cd5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -22,7 +22,10 @@ import static com.google.common.collect.Sets.newLinkedHashSet;
 import static com.google.common.collect.Sets.newLinkedHashSetWithExpectedSize;
 import static org.apache.hadoop.hbase.HColumnDescriptor.TTL;
 import static org.apache.phoenix.exception.SQLExceptionCode.INSUFFICIENT_MULTI_TENANT_COLUMNS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
@@ -34,6 +37,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
@@ -41,6 +45,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
@@ -48,39 +53,34 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARENT_TENANT_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REGION_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
+import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
 import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.sql.Array;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ParameterMetaData;
@@ -115,7 +115,6 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.DynamicClassLoader;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.ExplainPlan;
@@ -137,8 +136,6 @@ import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
-import org.apache.phoenix.expression.function.FunctionExpression;
-import org.apache.phoenix.expression.function.ScalarFunction;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -158,18 +155,16 @@ import org.apache.phoenix.parse.DropFunctionStatement;
 import org.apache.phoenix.parse.DropIndexStatement;
 import org.apache.phoenix.parse.DropSequenceStatement;
 import org.apache.phoenix.parse.DropTableStatement;
-import org.apache.phoenix.parse.FunctionParseNode;
 import org.apache.phoenix.parse.IndexKeyConstraint;
 import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.PFunction;
+import org.apache.phoenix.parse.PFunction.FunctionArgument;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.PrimaryKeyConstraint;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.UpdateStatisticsStatement;
-import org.apache.phoenix.parse.PFunction.FunctionArgument;
 import org.apache.phoenix.query.ConnectionQueryServices.Feature;
-import org.apache.phoenix.query.HBaseFactoryProvider;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -225,8 +220,9 @@ public class MetaDataClient {
             VIEW_TYPE + "," +
             VIEW_INDEX_ID + "," +
             INDEX_TYPE + "," +
-            STORE_NULLS +
-            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+            STORE_NULLS + "," +
+            BASE_COLUMN_COUNT +
+            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
     private static final String CREATE_LINK =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
             TENANT_ID + "," +
@@ -1878,6 +1874,11 @@ public class MetaDataClient {
                 tableUpsert.setByte(18, indexType.getSerializedValue());
             }
             tableUpsert.setBoolean(19, storeNulls);
+            if (parent != null && tableType == PTableType.VIEW) {
+                tableUpsert.setInt(20, parent.getColumns().size());
+            } else {
+                tableUpsert.setInt(20, BASE_TABLE_BASE_COLUMN_COUNT);
+            }
             tableUpsert.execute();
 
             tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index e46dcb7..b983074 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -322,4 +322,5 @@ public interface PTable extends PMetaDataEntity {
 
     IndexType getIndexType();
     PTableStats getTableStats();
+    int getBaseColumnCount();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 5dc59e0..4650739 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -125,6 +125,7 @@ public class PTableImpl implements PTable {
     private int estimatedSize;
     private IndexType indexType;
     private PTableStats tableStats = PTableStats.EMPTY_STATS;
+    private int baseColumnCount;
 
     public PTableImpl() {
         this.indexes = Collections.emptyList();
@@ -193,7 +194,7 @@ public class PTableImpl implements PTable {
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(),
                 indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement,
-                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount());
     }
 
     public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
@@ -201,7 +202,7 @@ public class PTableImpl implements PTable {
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(),
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
-                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
@@ -209,7 +210,7 @@ public class PTableImpl implements PTable {
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
-                table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+                table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
@@ -217,7 +218,7 @@ public class PTableImpl implements PTable {
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
-                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount());
     }
     
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException {
@@ -225,7 +226,7 @@ public class PTableImpl implements PTable {
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
-                isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+                isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount());
     }
     
     public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
@@ -234,7 +235,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table),
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
-                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount());
     }
 
     public static PTableImpl makePTable(PTable table, PTableStats stats) throws SQLException {
@@ -243,7 +244,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table),
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
-                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), stats);
+                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), stats, table.getBaseColumnCount());
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
@@ -253,18 +254,18 @@ public class PTableImpl implements PTable {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
                 dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
-                indexType, PTableStats.EMPTY_STATS);
+                indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT);
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
             PIndexState state, long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum,
             List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
-            boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats)
+            boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats, int baseColumnCount)
             throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
                 bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
-                defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats);
+                defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount);
     }
 
     private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state,
@@ -272,10 +273,10 @@ public class PTableImpl implements PTable {
             PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows,
             List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
             boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType,
-            PTableStats stats) throws SQLException {
+            PTableStats stats, int baseColumnCount) throws SQLException {
         init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
                 stats, schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
-                viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType);
+                viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount);
     }
 
     @Override
@@ -303,7 +304,7 @@ public class PTableImpl implements PTable {
             PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentSchemaName, PName parentTableName,
             List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
             boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
-            IndexType indexType ) throws SQLException {
+            IndexType indexType , int baseColumnCount) throws SQLException {
         Preconditions.checkNotNull(schemaName);
         Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
         int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE +
@@ -437,6 +438,7 @@ public class PTableImpl implements PTable {
         }
 
         this.estimatedSize = estimatedSize;
+        this.baseColumnCount = baseColumnCount;
     }
 
     @Override
@@ -968,13 +970,18 @@ public class PTableImpl implements PTable {
           physicalNames.add(PNameFactory.newName(table.getPhysicalNames(i).toByteArray()));
         }
       }
+      
+      int baseColumnCount = -1;
+      if (table.hasBaseColumnCount()) {
+          baseColumnCount = table.getBaseColumnCount();
+      }
 
       try {
         PTableImpl result = new PTableImpl();
         result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
           (bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, schemaName,dataTableName, indexes,
               isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL,
-                multiTenant, storeNulls, viewType, viewIndexId, indexType);
+                multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount);
         return result;
       } catch (SQLException e) {
         throw new RuntimeException(e); // Impossible
@@ -1063,6 +1070,7 @@ public class PTableImpl implements PTable {
           builder.addPhysicalNames(ByteStringer.wrap(table.getPhysicalNames().get(i).getBytes()));
         }
       }
+      builder.setBaseColumnCount(table.getBaseColumnCount());
 
       return builder.build();
     }
@@ -1082,4 +1090,8 @@ public class PTableImpl implements PTable {
         return parentSchemaName;
     }
 
+    @Override
+    public int getBaseColumnCount() {
+        return baseColumnCount;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
index 1e3516d..1f4a285 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
@@ -253,13 +253,17 @@ public class ByteUtil {
     public static byte[] concat(byte[] first, byte[]... rest) {
         int totalLength = first.length;
         for (byte[] array : rest) {
-            totalLength += array.length;
+            if (array != null) {
+                totalLength += array.length;
+            }
         }
         byte[] result = Arrays.copyOf(first, totalLength);
         int offset = first.length;
         for (byte[] array : rest) {
-            System.arraycopy(array, 0, result, offset, array.length);
-            offset += array.length;
+            if (array != null) {
+                System.arraycopy(array, 0, result, offset, array.length);
+                offset += array.length;
+            }
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 86da5cc..dff6598 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -17,9 +17,35 @@
  */
 package org.apache.phoenix.util;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
+import static org.apache.phoenix.query.QueryConstants.DIVORCED_VIEW_BASE_COLUMN_COUNT;
+
 import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -38,21 +64,43 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.LinkType;
 import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 
 public class UpgradeUtil {
     private static final Logger logger = LoggerFactory.getLogger(UpgradeUtil.class);
     private static final byte[] SEQ_PREFIX_BYTES = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes("_SEQ_"));
+    
+    public static String UPSERT_BASE_COLUMN_COUNT_IN_HEADER_ROW = "UPSERT "
+            + "INTO SYSTEM.CATALOG "
+            + "(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, BASE_COLUMN_COUNT) "
+            + "VALUES (?, ?, ?, ?, ?, ?) ";
 
+    public static String SELECT_BASE_COLUMN_COUNT_FROM_HEADER_ROW = "SELECT "
+            + "BASE_COLUMN_COUNT "
+            + "FROM SYSTEM.CATALOG "
+            + "WHERE "
+            + "COLUMN_NAME IS NULL "
+            + "AND "
+            + "COLUMN_FAMILY IS NULL "
+            + "AND "
+            + "TENANT_ID %s "
+            + "AND "
+            + "TABLE_SCHEM %s "
+            + "AND "
+            + "TABLE_NAME = ? "
+            ;
+    
     private UpgradeUtil() {
     }
 
@@ -394,5 +442,348 @@ public class UpgradeUtil {
                 keyValue.getTimestamp(), KeyValue.Type.codeToType(keyValue.getType()),
                 buf, keyValue.getValueOffset(), keyValue.getValueLength());
     }
+    
+    public static void upgradeTo4_5_0(PhoenixConnection metaConnection) throws SQLException {
+        String getBaseTableAndViews = "SELECT "
+                + COLUMN_FAMILY + " AS BASE_PHYSICAL_TABLE, "
+                + TENANT_ID + ", "
+                + TABLE_SCHEM + " AS VIEW_SCHEMA, "
+                + TABLE_NAME + " AS VIEW_NAME "
+                + "FROM " + SYSTEM_CATALOG_NAME 
+                + " WHERE " + COLUMN_FAMILY + " IS NOT NULL " // column_family column points to the physical table name.
+                + " AND " + COLUMN_NAME + " IS NULL "
+                + " AND " + LINK_TYPE + " = ? ";
+        // Build a map of base table name -> list of views on the table. 
+        Map<String, List<ViewKey>> parentTableViewsMap = new HashMap<>();
+        try (PreparedStatement stmt = metaConnection.prepareStatement(getBaseTableAndViews)) {
+            // Get back view rows that have links back to the base physical table. This takes care
+            // of cases when we have a hierarchy of views too.
+            stmt.setByte(1, LinkType.PHYSICAL_TABLE.getSerializedValue());
+            try (ResultSet rs = stmt.executeQuery()) {
+                while (rs.next()) {
+                    // this is actually SCHEMANAME.TABLENAME
+                    String parentTable = rs.getString("BASE_PHYSICAL_TABLE");
+                    String tenantId = rs.getString(TENANT_ID);
+                    String viewSchema = rs.getString("VIEW_SCHEMA");
+                    String viewName = rs.getString("VIEW_NAME");
+                    List<ViewKey> viewKeysList = parentTableViewsMap.get(parentTable);
+                    if (viewKeysList == null) {
+                        viewKeysList = new ArrayList<>();
+                        parentTableViewsMap.put(parentTable, viewKeysList);
+                    }
+                    viewKeysList.add(new ViewKey(tenantId, viewSchema, viewName));
+                }
+            }
+        }
+        
+        for (Entry<String, List<ViewKey>> entry : parentTableViewsMap.entrySet()) {
+            // Fetch column information for the base physical table
+            String physicalTable = entry.getKey();
+            String baseTableSchemaName = SchemaUtil.getSchemaNameFromFullName(physicalTable).equals(StringUtil.EMPTY_STRING) ? null : SchemaUtil.getSchemaNameFromFullName(physicalTable);
+            String baseTableName = SchemaUtil.getTableNameFromFullName(physicalTable);
+            List<ColumnDetails> basePhysicalTableColumns = new ArrayList<>(); 
+            
+            // Columns fetched in order of ordinal position
+            String fetchColumnInfoForBasePhysicalTable = "SELECT " +
+                    COLUMN_NAME + "," +
+                    COLUMN_FAMILY + "," +
+                    DATA_TYPE + "," +
+                    COLUMN_SIZE + "," +
+                    DECIMAL_DIGITS + "," +
+                    ORDINAL_POSITION + "," +
+                    SORT_ORDER + "," +
+                    ARRAY_SIZE + " " +
+                    "FROM SYSTEM.CATALOG " +
+                    "WHERE " +
+                    "TABLE_SCHEM %s " +
+                    "AND TABLE_NAME = ? " +
+                    "AND COLUMN_NAME IS NOT NULL " +
+                    "ORDER BY " + 
+                    ORDINAL_POSITION;
+           
+            PreparedStatement stmt = null;
+            if (baseTableSchemaName == null) {
+                fetchColumnInfoForBasePhysicalTable =
+                        String.format(fetchColumnInfoForBasePhysicalTable, "IS NULL ");
+                stmt = metaConnection.prepareStatement(fetchColumnInfoForBasePhysicalTable);
+                stmt.setString(1, baseTableName);
+            } else {
+                fetchColumnInfoForBasePhysicalTable =
+                        String.format(fetchColumnInfoForBasePhysicalTable, " = ? ");
+                stmt = metaConnection.prepareStatement(fetchColumnInfoForBasePhysicalTable);
+                stmt.setString(1, baseTableSchemaName);
+                stmt.setString(2, baseTableName);
+            }
+
+            try (ResultSet rs = stmt.executeQuery()) {
+                while (rs.next()) {
+                    basePhysicalTableColumns.add(new ColumnDetails(rs.getString(COLUMN_FAMILY), rs
+                            .getString(COLUMN_NAME), rs.getInt(ORDINAL_POSITION), rs
+                            .getInt(DATA_TYPE), rs.getInt(COLUMN_SIZE), rs.getInt(DECIMAL_DIGITS),
+                            rs.getInt(SORT_ORDER), rs.getInt(ARRAY_SIZE)));
+                }
+            }
+            
+            // Fetch column information for all the views on the base physical table ordered by ordinal position.
+            List<ViewKey> viewKeys = entry.getValue();
+            StringBuilder sb = new StringBuilder();
+            sb.append("SELECT " + 
+                    TENANT_ID + "," +
+                    TABLE_SCHEM + "," +
+                    TABLE_NAME + "," +
+                    COLUMN_NAME + "," +
+                    COLUMN_FAMILY + "," +
+                    DATA_TYPE + "," +
+                    COLUMN_SIZE + "," +
+                    DECIMAL_DIGITS + "," +
+                    ORDINAL_POSITION + "," +
+                    SORT_ORDER + "," +
+                    ARRAY_SIZE + " " + 
+                    "FROM SYSTEM.CATALOG " +
+                    "WHERE " +
+                    COLUMN_NAME + " IS NOT NULL " +
+                    "AND " +
+                    ORDINAL_POSITION + " <= ? " + // fetch only those columns that would impact setting of base column count
+                    "AND " +
+                    "(" + TENANT_ID+ ", " + TABLE_SCHEM + ", " + TABLE_NAME + ") IN (");
+            
+            int numViews = viewKeys.size();
+            for (int i = 0; i < numViews; i++) {
+                sb.append(" (?, ?, ?) ");
+                if (i < numViews - 1) {
+                    sb.append(", ");
+                }
+            }
+            sb.append(" ) ");
+            sb.append(" GROUP BY " +
+                    TENANT_ID + "," +
+                    TABLE_SCHEM + "," +
+                    TABLE_NAME + "," +
+                    COLUMN_NAME + "," +
+                    COLUMN_FAMILY + "," +
+                    DATA_TYPE + "," +
+                    COLUMN_SIZE + "," +
+                    DECIMAL_DIGITS + "," +
+                    ORDINAL_POSITION + "," +
+                    SORT_ORDER + "," +
+                    ARRAY_SIZE + " " + 
+                    "ORDER BY " + 
+                    TENANT_ID + "," + TABLE_SCHEM + ", " + TABLE_NAME + ", " + ORDINAL_POSITION);
+            String fetchViewColumnsSql = sb.toString();
+            stmt = metaConnection.prepareStatement(fetchViewColumnsSql);
+            int numColsInBaseTable = basePhysicalTableColumns.size();
+            stmt.setInt(1, numColsInBaseTable);
+            int paramIndex = 1;
+            stmt.setInt(paramIndex++, numColsInBaseTable);
+            for (ViewKey view : viewKeys) {
+                stmt.setString(paramIndex++, view.tenantId);
+                stmt.setString(paramIndex++, view.schema);
+                stmt.setString(paramIndex++, view.name);
+            }
+            String currentTenantId = null;
+            String currentViewSchema = null;
+            String currentViewName = null;
+            try (ResultSet rs = stmt.executeQuery()) {
+                int numBaseTableColsMatched = 0;
+                boolean ignore = false;
+                boolean baseColumnCountUpserted = false;
+                while (rs.next()) {
+                    String viewTenantId = rs.getString(TENANT_ID);
+                    String viewSchema = rs.getString(TABLE_SCHEM);
+                    String viewName = rs.getString(TABLE_NAME);
+                    if (!(Objects.equal(viewTenantId, currentTenantId) && Objects.equal(viewSchema, currentViewSchema) && Objects.equal(viewName, currentViewName))) {
+                        // We are about to iterate through columns of a different view. Check whether base column count was upserted.
+                        // If it wasn't then it is likely the case that a column inherited from the base table was dropped from view.
+                        if (currentViewName != null && !baseColumnCountUpserted && numBaseTableColsMatched < numColsInBaseTable) {
+                            upsertBaseColumnCountInHeaderRow(metaConnection, currentTenantId, currentViewSchema, currentViewName, DIVORCED_VIEW_BASE_COLUMN_COUNT);
+                        }
+                        // reset the values as we are now going to iterate over columns of a new view.
+                        numBaseTableColsMatched = 0;
+                        currentTenantId = viewTenantId;
+                        currentViewSchema = viewSchema;
+                        currentViewName = viewName;
+                        ignore = false;
+                        baseColumnCountUpserted = false;
+                    }
+                    if (!ignore) {
+                        /*
+                         * Iterate over all the columns of the base physical table and the columns of the view. Compare the
+                         * two till one of the following happens: 
+                         * 
+                         * 1) We run into a view column which is different from column in the base physical table.
+                         * This means that the view has divorced itself from the base physical table. In such a case
+                         * we will set a special value for the base column count. That special value will also be used
+                         * on the server side to filter out the divorced view so that meta-data changes on the base 
+                         * physical table are not propagated to it.
+                         * 
+                         * 2) Every physical table column is present in the view. In that case we set the base column count
+                         * as the number of columns in the base physical table. At that point we ignore rest of the columns
+                         * of the view.
+                         * 
+                         */
+                        ColumnDetails baseTableColumn = basePhysicalTableColumns.get(numBaseTableColsMatched);
+                        String columName = rs.getString(COLUMN_NAME);
+                        String columnFamily = rs.getString(COLUMN_FAMILY);
+                        int ordinalPos = rs.getInt(ORDINAL_POSITION);
+                        int dataType = rs.getInt(DATA_TYPE);
+                        int columnSize = rs.getInt(COLUMN_SIZE);
+                        int decimalDigits = rs.getInt(DECIMAL_DIGITS);
+                        int sortOrder = rs.getInt(SORT_ORDER);
+                        int arraySize = rs.getInt(ARRAY_SIZE);
+                        ColumnDetails viewColumn = new ColumnDetails(columnFamily, columName, ordinalPos, dataType, columnSize, decimalDigits, sortOrder, arraySize);
+                        if (baseTableColumn.equals(viewColumn)) {
+                            numBaseTableColsMatched++;
+                            if (numBaseTableColsMatched == numColsInBaseTable) {
+                                upsertBaseColumnCountInHeaderRow(metaConnection, viewTenantId, viewSchema, viewName, numColsInBaseTable);
+                                // No need to ignore the rest of the columns of the view here since the
+                                // query retrieved only those columns that had ordinal position <= numColsInBaseTable
+                                baseColumnCountUpserted = true;
+                            }
+                        } else {
+                            // special value to denote that the view has divorced itself from the base physical table.
+                            upsertBaseColumnCountInHeaderRow(metaConnection, viewTenantId, viewSchema, viewName, DIVORCED_VIEW_BASE_COLUMN_COUNT);
+                            baseColumnCountUpserted = true;
+                            // ignore rest of the rows for the view.
+                            ignore = true;
+                        }
+                    }
+                }
+            }
+            // set base column count for the header row of the base table too. We use this information
+            // to figure out whether the upgrade is in progress or hasn't started.
+            upsertBaseColumnCountInHeaderRow(metaConnection, null, baseTableSchemaName, baseTableName, BASE_TABLE_BASE_COLUMN_COUNT);
+            metaConnection.commit();
+        }
+    }
+    
+    private static void upsertBaseColumnCountInHeaderRow(PhoenixConnection metaConnection,
+            String tenantId, String schemaName, String viewOrTableName, int baseColumnCount)
+            throws SQLException {
+        try (PreparedStatement stmt =
+                metaConnection.prepareStatement(UPSERT_BASE_COLUMN_COUNT_IN_HEADER_ROW)) {
+            stmt.setString(1, tenantId);
+            stmt.setString(2, schemaName);
+            stmt.setString(3, viewOrTableName);
+            stmt.setString(4, null);
+            stmt.setString(5, null);
+            stmt.setInt(6, baseColumnCount);
+            stmt.executeUpdate();
+        }
+    }
+    
+    private static class ColumnDetails {
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + columnName.hashCode();
+            result = prime * result + ((columnFamily == null) ? 0 : columnFamily.hashCode());
+            result = prime * result + arraySize;
+            result = prime * result + dataType;
+            result = prime * result + maxLength;
+            result = prime * result + ordinalValue;
+            result = prime * result + scale;
+            result = prime * result + sortOrder;
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) return true;
+            if (obj == null) return false;
+            if (getClass() != obj.getClass()) return false;
+            ColumnDetails other = (ColumnDetails) obj;
+            if (!columnName.equals(other.columnName)) return false;
+            if (columnFamily == null) {
+                if (other.columnFamily != null) return false;
+            } else if (!columnFamily.equals(other.columnFamily)) return false;
+            if (arraySize != other.arraySize) return false;
+            if (dataType != other.dataType) return false;
+            if (maxLength != other.maxLength) return false;
+            if (ordinalValue != other.ordinalValue) return false;
+            if (scale != other.scale) return false;
+            if (sortOrder != other.sortOrder) return false;
+            return true;
+        }
+
+        @Nullable
+        private final String columnFamily;
+
+        @Nonnull
+        private final String columnName;
+
+        private final int ordinalValue;
+
+        private final int dataType;
+
+        private final int maxLength;
+
+        private final int scale;
+
+        private final int sortOrder;
+
+        private final int arraySize;
+
+        ColumnDetails(String columnFamily, String columnName, int ordinalValue, int dataType,
+                int maxLength, int scale, int sortOrder, int arraySize) {
+            checkNotNull(columnName);
+            checkNotNull(ordinalValue);
+            checkNotNull(dataType);
+            this.columnFamily = columnFamily;
+            this.columnName = columnName;
+            this.ordinalValue = ordinalValue;
+            this.dataType = dataType;
+            this.maxLength = maxLength;
+            this.scale = scale;
+            this.sortOrder = sortOrder;
+            this.arraySize = arraySize;
+        }
+
+    }
+    
+    private static class ViewKey {
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((tenantId == null) ? 0 : tenantId.hashCode());
+            result = prime * result + name.hashCode();
+            result = prime * result + ((schema == null) ? 0 : schema.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) return true;
+            if (obj == null) return false;
+            if (getClass() != obj.getClass()) return false;
+            ViewKey other = (ViewKey) obj;
+            if (tenantId == null) {
+                if (other.tenantId != null) return false;
+            } else if (!tenantId.equals(other.tenantId)) return false;
+            if (!name.equals(other.name)) return false;
+            if (schema == null) {
+                if (other.schema != null) return false;
+            } else if (!schema.equals(other.schema)) return false;
+            return true;
+        }
+
+        @Nullable
+        private final String tenantId;
+
+        @Nullable
+        private final String schema;
+
+        @Nonnull
+        private final String name;
+
+        private ViewKey(String tenantId, String schema, String viewName) {
+            this.tenantId = tenantId;
+            this.schema = schema;
+            this.name = viewName;
+        }
+    }
 
 }


[2/2] phoenix git commit: PHOENIX-1504 Support adding column to a table that has views (Samarth Jain/Dave Hacker)

Posted by sa...@apache.org.
PHOENIX-1504 Support adding column to a table that has views (Samarth Jain/Dave Hacker)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e001c63f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e001c63f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e001c63f

Branch: refs/heads/4.x-HBase-0.98
Commit: e001c63f87552ec871c3ba8f4484b2011a28bf15
Parents: 7fe2317
Author: Samarth <sa...@salesforce.com>
Authored: Thu Jun 18 14:21:38 2015 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Thu Jun 18 14:21:38 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/AlterTableIT.java    | 356 +++++++++++++++++
 .../end2end/TenantSpecificTablesDDLIT.java      |  20 +-
 .../org/apache/phoenix/end2end/UpgradeIT.java   | 332 ++++++++++++++++
 .../coprocessor/MetaDataEndpointImpl.java       | 183 +++++++--
 .../phoenix/coprocessor/MetaDataProtocol.java   |   4 +-
 .../coprocessor/generated/PTableProtos.java     | 103 ++++-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   3 +-
 .../query/ConnectionQueryServicesImpl.java      |  41 +-
 .../apache/phoenix/query/QueryConstants.java    |  30 +-
 .../apache/phoenix/schema/DelegateTable.java    |   5 +
 .../apache/phoenix/schema/MetaDataClient.java   |  37 +-
 .../java/org/apache/phoenix/schema/PTable.java  |   1 +
 .../org/apache/phoenix/schema/PTableImpl.java   |  40 +-
 .../java/org/apache/phoenix/util/ByteUtil.java  |  10 +-
 .../org/apache/phoenix/util/UpgradeUtil.java    | 395 ++++++++++++++++++-
 15 files changed, 1451 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 59698d6..61dd6a9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.end2end;
 
 import static org.apache.hadoop.hbase.HColumnDescriptor.DEFAULT_REPLICATION_SCOPE;
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_MUTATE_TABLE;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.apache.phoenix.util.TestUtil.closeConnection;
 import static org.apache.phoenix.util.TestUtil.closeStatement;
@@ -32,9 +33,11 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -48,8 +51,10 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -59,6 +64,8 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.base.Objects;
+
 /**
  *
  * A lot of tests in this class test HBase level properties. As a result,
@@ -1988,4 +1995,353 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT {
             conn.close();
         }
     }
+    
+    @Test
+    public void testAddColumnToTableWithViews() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {       
+            conn.createStatement().execute("CREATE TABLE IF NOT EXISTS TABLEWITHVIEW ("
+                    + " ID char(1) NOT NULL,"
+                    + " COL1 integer NOT NULL,"
+                    + " COL2 bigint NOT NULL,"
+                    + " CONSTRAINT NAME_PK PRIMARY KEY (ID, COL1, COL2)"
+                    + " )");
+            assertTableDefinition(conn, "TABLEWITHVIEW", PTableType.TABLE, null, 0, 3, -1, "ID", "COL1", "COL2");
+            
+            conn.createStatement().execute("CREATE VIEW VIEWOFTABLE ( VIEW_COL1 SMALLINT ) AS SELECT * FROM TABLEWITHVIEW");
+            assertTableDefinition(conn, "VIEWOFTABLE", PTableType.VIEW, "TABLEWITHVIEW", 0, 4, 3, "ID", "COL1", "COL2", "VIEW_COL1");
+            
+            conn.createStatement().execute("ALTER TABLE TABLEWITHVIEW ADD COL3 char(10)");
+            assertTableDefinition(conn, "VIEWOFTABLE", PTableType.VIEW, "TABLEWITHVIEW", 1, 5, 4, "ID", "COL1", "COL2", "COL3", "VIEW_COL1");
+            
+        } finally {
+            conn.close();
+        }
+    }
+
+    private void assertTableDefinition(Connection conn, String tableName, PTableType tableType, String parentTableName, int sequenceNumber, int columnCount, int baseColumnCount, String... columnName) throws Exception {
+        PreparedStatement p = conn.prepareStatement("SELECT * FROM SYSTEM.CATALOG WHERE TABLE_NAME=? AND TABLE_TYPE=?");
+        p.setString(1, tableName);
+        p.setString(2, tableType.getSerializedValue());
+        ResultSet rs = p.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(getSystemCatalogEntriesForTable(conn, tableName, "Mismatch in BaseColumnCount"), baseColumnCount, rs.getInt("BASE_COLUMN_COUNT"));
+        assertEquals(getSystemCatalogEntriesForTable(conn, tableName, "Mismatch in columnCount"), columnCount, rs.getInt("COLUMN_COUNT"));
+        assertEquals(getSystemCatalogEntriesForTable(conn, tableName, "Mismatch in sequenceNumber"), sequenceNumber, rs.getInt("TABLE_SEQ_NUM"));
+        rs.close();
+
+        ResultSet parentTableColumnsRs = null; 
+        if (parentTableName != null) {
+            parentTableColumnsRs = conn.getMetaData().getColumns(null, null, parentTableName, null);
+        }
+        
+        rs = conn.getMetaData().getColumns(null, null, tableName, null);
+        for (int i = 0; i < columnName.length; i++) {
+            if (columnName[i] != null) {
+                assertTrue(rs.next());
+                assertEquals(getSystemCatalogEntriesForTable(conn, tableName, "Mismatch in columnName: i=" + i), columnName[i], rs.getString("COLUMN_NAME"));
+                assertEquals(getSystemCatalogEntriesForTable(conn, tableName, "Mismatch in ordinalPosition: i=" + i), i+1, rs.getInt("ORDINAL_POSITION"));
+                if (i < baseColumnCount && parentTableColumnsRs != null) {
+                    assertTrue(parentTableColumnsRs.next());
+                    ResultSetMetaData md = parentTableColumnsRs.getMetaData();
+                    assertEquals(md.getColumnCount(), rs.getMetaData().getColumnCount());
+                    for (int columnIndex = 1; columnIndex < md.getColumnCount(); columnIndex++) {
+                        String viewColumnValue = rs.getString(columnIndex);
+                        String parentTableColumnValue = parentTableColumnsRs.getString(columnIndex);
+                        if (!Objects.equal(viewColumnValue, parentTableColumnValue)) {
+                            if (md.getColumnName(columnIndex).equals("TABLE_NAME")) {
+                                assertEquals(parentTableName, parentTableColumnValue);
+                                assertEquals(tableName, viewColumnValue);
+                            } else {
+                                fail(md.getColumnName(columnIndex) + "=" + parentTableColumnValue);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        assertFalse(getSystemCatalogEntriesForTable(conn, tableName, ""), rs.next());
+    }
+    
+    private String getSystemCatalogEntriesForTable(Connection conn, String tableName, String message) throws Exception {
+        StringBuilder sb = new StringBuilder(message);
+        sb.append("\n\n\n");
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM SYSTEM.CATALOG WHERE TABLE_NAME='"+ tableName +"'");
+        ResultSetMetaData metaData = rs.getMetaData();
+        int rowNum = 0;
+        while (rs.next()) {
+            sb.append(rowNum++).append("------\n");
+            for (int i = 1; i <= metaData.getColumnCount(); i++) {
+                sb.append("\t").append(metaData.getColumnLabel(i)).append("=").append(rs.getString(i)).append("\n");
+            }
+            sb.append("\n");
+        }
+        rs.close();
+        return sb.toString();
+    }
+    
+    @Test
+    public void testCacheInvalidatedAfterAddingColumnToBaseTableWithViews() throws Exception {
+        String baseTable = "testCacheInvalidatedAfterAddingColumnToBaseTableWithViews";
+        String viewName = baseTable + "_view";
+        String tenantId = "tenantId";
+        try (Connection globalConn = DriverManager.getConnection(getUrl())) {
+            String tableDDL = "CREATE TABLE " + baseTable + " (TENANT_ID VARCHAR NOT NULL, PK1 VARCHAR NOT NULL, V1 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK1)) MULTI_TENANT = true " ;
+            globalConn.createStatement().execute(tableDDL);
+            Properties tenantProps = new Properties();
+            tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+            // create a tenant specific view
+            try (Connection tenantConn =  DriverManager.getConnection(getUrl(), tenantProps)) {
+                String viewDDL = "CREATE VIEW " + viewName + " AS SELECT * FROM " + baseTable;
+                tenantConn.createStatement().execute(viewDDL);
+                
+                // Add a column to the base table using global connection
+                globalConn.createStatement().execute("ALTER TABLE " + baseTable + " ADD NEW_COL VARCHAR");
+
+                // Check now whether the tenant connection can see the column that was added
+                tenantConn.createStatement().execute("SELECT NEW_COL FROM " + viewName);
+                tenantConn.createStatement().execute("SELECT NEW_COL FROM " + baseTable);
+            }
+        }
+    }
+    
+    @Test
+    public void testDropColumnOnTableWithViewsNotAllowed() throws Exception {
+        String baseTable = "testDropColumnOnTableWithViewsNotAllowed";
+        String viewName = baseTable + "_view";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableDDL = "CREATE TABLE " + baseTable + " (PK1 VARCHAR NOT NULL PRIMARY KEY, V1 VARCHAR, V2 VARCHAR)";
+            conn.createStatement().execute(tableDDL);
+            
+            String viewDDL = "CREATE VIEW " + viewName + " AS SELECT * FROM " + baseTable;
+            conn.createStatement().execute(viewDDL);
+            
+            String dropColumn = "ALTER TABLE " + baseTable + " DROP COLUMN V2";
+            try {
+                conn.createStatement().execute(dropColumn);
+                fail("Dropping column on a base table that has views is not allowed");
+            } catch (SQLException e) {  
+                assertEquals(CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
+            }
+        }
+    }
+    
+    @Test
+    public void testAlteringViewThatHasChildViewsNotAllowed() throws Exception {
+        String baseTable = "testAlteringViewThatHasChildViewsNotAllowed";
+        String childView = "childView";
+        String grandChildView = "grandChildView";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String baseTableDDL =
+                    "CREATE TABLE " +  baseTable + " (TENANT_ID VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK2))";
+            conn.createStatement().execute(baseTableDDL);
+
+            String childViewDDL = "CREATE VIEW " + childView + " AS SELECT * FROM " + baseTable;
+            conn.createStatement().execute(childViewDDL);
+
+            String addColumnToChildViewDDL =
+                    "ALTER VIEW " + childView + " ADD CHILD_VIEW_COL VARCHAR";
+            conn.createStatement().execute(addColumnToChildViewDDL);
+
+            String grandChildViewDDL =
+                    "CREATE VIEW " + grandChildView + " AS SELECT * FROM " + childView;
+            conn.createStatement().execute(grandChildViewDDL);
+
+            // dropping base table column from child view should fail
+            String dropColumnFromChildView = "ALTER VIEW " + childView + " DROP COLUMN V2";
+            try {
+                conn.createStatement().execute(dropColumnFromChildView);
+                fail("Dropping columns from a view that has child views on it is not allowed");
+            } catch (SQLException e) {
+                assertEquals(CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
+            }
+
+            // dropping view specific column from child view should fail
+            dropColumnFromChildView = "ALTER VIEW " + childView + " DROP COLUMN CHILD_VIEW_COL";
+            try {
+                conn.createStatement().execute(dropColumnFromChildView);
+                fail("Dropping columns from a view that has child views on it is not allowed");
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
+            }
+            
+            // Adding column to view that has child views should fail
+            String addColumnToChildView = "ALTER VIEW " + childView + " ADD V5 VARCHAR";
+            try {
+                conn.createStatement().execute(addColumnToChildView);
+                fail("Adding columns to a view that has child views on it is not allowed");
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
+            }
+
+            // dropping column from the grand child view, however, should work.
+            String dropColumnFromGrandChildView =
+                    "ALTER VIEW " + grandChildView + " DROP COLUMN CHILD_VIEW_COL";
+            conn.createStatement().execute(dropColumnFromGrandChildView);
+
+            // similarly, dropping column inherited from the base table should work.
+            dropColumnFromGrandChildView = "ALTER VIEW " + grandChildView + " DROP COLUMN V2";
+            conn.createStatement().execute(dropColumnFromGrandChildView);
+        }
+    }
+    
+    @Test
+    public void testDivorcedViewsStayDivorced() throws Exception {
+        String baseTable = "testDivorcedViewsStayDivorced";
+        String viewName = baseTable + "_view";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableDDL = "CREATE TABLE " + baseTable + " (PK1 VARCHAR NOT NULL PRIMARY KEY, V1 VARCHAR, V2 VARCHAR)";
+            conn.createStatement().execute(tableDDL);
+            
+            String viewDDL = "CREATE VIEW " + viewName + " AS SELECT * FROM " + baseTable;
+            conn.createStatement().execute(viewDDL);
+            
+            // Drop the column inherited from base table to divorce the view
+            String dropColumn = "ALTER VIEW " + viewName + " DROP COLUMN V2";
+            conn.createStatement().execute(dropColumn);
+            
+            String alterBaseTable = "ALTER TABLE " + baseTable + " ADD V3 VARCHAR";
+            conn.createStatement().execute(alterBaseTable);
+            
+            // Column V3 shouldn't have propagated to the divorced view.
+            String sql = "SELECT V3 FROM " + viewName;
+            try {
+                conn.createStatement().execute(sql);
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.COLUMN_NOT_FOUND.getErrorCode(), e.getErrorCode());
+            }
+        } 
+    }
+    
+    @Test
+    public void testAddingColumnToBaseTablePropagatesToEntireViewHierarchy() throws Exception {
+        String baseTable = "testViewHierarchy";
+        String view1 = "view1";
+        String view2 = "view2";
+        String view3 = "view3";
+        String view4 = "view4";
+        /*                                     baseTable
+                                 /                  |               \ 
+                         view1(tenant1)    view3(tenant2)          view4(global)
+                          /
+                        view2(tenant1)  
+        */
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String baseTableDDL = "CREATE TABLE " + baseTable + " (TENANT_ID VARCHAR NOT NULL, PK1 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK1)) MULTI_TENANT = true ";
+            conn.createStatement().execute(baseTableDDL);
+            
+            try (Connection tenant1Conn = getTenantConnection("tenant1")) {
+                String view1DDL = "CREATE VIEW " + view1 + " AS SELECT * FROM " + baseTable;
+                tenant1Conn.createStatement().execute(view1DDL);
+                
+                String view2DDL = "CREATE VIEW " + view2 + " AS SELECT * FROM " + view1;
+                tenant1Conn.createStatement().execute(view2DDL);
+            }
+            
+            try (Connection tenant2Conn = getTenantConnection("tenant2")) {
+                String view3DDL = "CREATE VIEW " + view3 + " AS SELECT * FROM " + baseTable;
+                tenant2Conn.createStatement().execute(view3DDL);
+            }
+            
+            String view4DDL = "CREATE VIEW " + view4 + " AS SELECT * FROM " + baseTable;
+            conn.createStatement().execute(view4DDL);
+            
+            String alterBaseTable = "ALTER TABLE " + baseTable + " ADD V3 VARCHAR";
+            conn.createStatement().execute(alterBaseTable);
+            
+            // verify that the column is visible to view4
+            conn.createStatement().execute("SELECT V3 FROM " + view4);
+            
+            // verify that the column is visible to view1 and view2
+            try (Connection tenant1Conn = getTenantConnection("tenant1")) {
+                tenant1Conn.createStatement().execute("SELECT V3 from " + view1);
+                tenant1Conn.createStatement().execute("SELECT V3 from " + view2);
+            }
+            
+            // verify that the column is visible to view3
+            try (Connection tenant2Conn = getTenantConnection("tenant2")) {
+                tenant2Conn.createStatement().execute("SELECT V3 from " + view3);
+            }
+            
+        }
+           
+    }
+    
+    @Test
+    public void testChangingPKOfBaseTableChangesPKForAllViews() throws Exception {
+        String baseTable = "testChangePKOfBaseTable";
+        String view1 = "view1";
+        String view2 = "view2";
+        String view3 = "view3";
+        String view4 = "view4";
+        /*                                     baseTable
+                                 /                  |               \ 
+                         view1(tenant1)    view3(tenant2)          view4(global)
+                          /
+                        view2(tenant1)  
+         */
+        Connection tenant1Conn = null, tenant2Conn = null;
+        try (Connection globalConn = DriverManager.getConnection(getUrl())) {
+            String baseTableDDL = "CREATE TABLE "
+                    + baseTable
+                    + " (TENANT_ID VARCHAR NOT NULL, PK1 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK1)) MULTI_TENANT = true ";
+            globalConn.createStatement().execute(baseTableDDL);
+
+            tenant1Conn = getTenantConnection("tenant1");
+            String view1DDL = "CREATE VIEW " + view1 + " AS SELECT * FROM " + baseTable;
+            tenant1Conn.createStatement().execute(view1DDL);
+
+            String view2DDL = "CREATE VIEW " + view2 + " AS SELECT * FROM " + view1;
+            tenant1Conn.createStatement().execute(view2DDL);
+
+            tenant2Conn = getTenantConnection("tenant2");
+            String view3DDL = "CREATE VIEW " + view3 + " AS SELECT * FROM " + baseTable;
+            tenant2Conn.createStatement().execute(view3DDL);
+
+            String view4DDL = "CREATE VIEW " + view4 + " AS SELECT * FROM " + baseTable;
+            globalConn.createStatement().execute(view4DDL);
+
+            String alterBaseTable = "ALTER TABLE " + baseTable + " ADD NEW_PK varchar primary key ";
+            globalConn.createStatement().execute(alterBaseTable);
+
+            // verify that the new column new_pk is now part of the primary key for the entire hierarchy
+            assertTrue(checkColumnPartOfPk(globalConn.unwrap(PhoenixConnection.class), "PK1", baseTable));
+            assertTrue(checkColumnPartOfPk(tenant1Conn.unwrap(PhoenixConnection.class), "PK1", view1));
+            assertTrue(checkColumnPartOfPk(tenant1Conn.unwrap(PhoenixConnection.class), "PK1", view2));
+            assertTrue(checkColumnPartOfPk(tenant2Conn.unwrap(PhoenixConnection.class), "PK1", view3));
+            assertTrue(checkColumnPartOfPk(globalConn.unwrap(PhoenixConnection.class), "PK1", view4));
+
+        } finally {
+            if (tenant1Conn != null) {
+                try {
+                    tenant1Conn.close();
+                } catch (Throwable ignore) {}
+            }
+            if (tenant2Conn != null) {
+                try {
+                    tenant2Conn.close();
+                } catch (Throwable ignore) {}
+            }
+        }
+
+    }
+    
+    private boolean checkColumnPartOfPk(PhoenixConnection conn, String columnName, String tableName) throws SQLException {
+        String normalizedTableName = SchemaUtil.normalizeIdentifier(tableName);
+        PTable table = conn.getMetaDataCache().getTable(new PTableKey(conn.getTenantId(), normalizedTableName));
+        List<PColumn> pkCols = table.getPKColumns();
+        String normalizedColumnName = SchemaUtil.normalizeIdentifier(columnName);
+        for (PColumn pkCol : pkCols) {
+            if (pkCol.getName().getString().equals(normalizedColumnName)) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
+    private Connection getTenantConnection(String tenantId) throws Exception {
+        Properties tenantProps = new Properties();
+        tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        return DriverManager.getConnection(getUrl(), tenantProps);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index a7c7291..e1a1970 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -272,6 +272,7 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
                 assertEquals(CANNOT_MODIFY_VIEW_PK.getErrorCode(), expected.getErrorCode());
             }
             
+            // try removing a non-PK col
             try {
                 conn.createStatement().execute("alter table " + TENANT_TABLE_NAME + " drop column id");
                 fail();
@@ -291,25 +292,6 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(nextTimestamp()));
         Connection conn = DriverManager.getConnection(getUrl(), props);
         try {
-            // try adding a PK col
-            try {
-                conn.createStatement().execute("alter table " + PARENT_TABLE_NAME + " add new_pk varchar primary key");
-                fail();
-            }
-            catch (SQLException expected) {
-                assertEquals(CANNOT_MUTATE_TABLE.getErrorCode(), expected.getErrorCode());
-            }
-            
-            // try adding a non-PK col
-            try {
-                conn.createStatement().execute("alter table " + PARENT_TABLE_NAME + " add new_col char(1)");
-                fail();
-            }
-            catch (SQLException expected) {
-                assertEquals(CANNOT_MUTATE_TABLE.getErrorCode(), expected.getErrorCode());
-            }
-            
-            // try removing a PK col
             try {
                 conn.createStatement().execute("alter table " + PARENT_TABLE_NAME + " drop column id");
                 fail();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
new file mode 100644
index 0000000..886e567
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
+import static org.apache.phoenix.query.QueryConstants.DIVORCED_VIEW_BASE_COLUMN_COUNT;
+import static org.apache.phoenix.util.UpgradeUtil.SELECT_BASE_COLUMN_COUNT_FROM_HEADER_ROW;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.UpgradeUtil;
+import org.junit.Test;
+
+public class UpgradeIT extends BaseHBaseManagedTimeIT {
+
+    private static String TENANT_ID = "tenantId";
+
+    @Test
+    public void testUpgradeForTenantViewWithSameColumnsAsBaseTable() throws Exception {
+        testViewUpgrade(true, TENANT_ID, null, "TABLEWITHVIEW1", null, "VIEW1", ColumnDiff.EQUAL);
+        testViewUpgrade(true, TENANT_ID, "TABLESCHEMA", "TABLEWITHVIEW", null, "VIEW2",
+            ColumnDiff.EQUAL);
+        testViewUpgrade(true, TENANT_ID, null, "TABLEWITHVIEW3", "VIEWSCHEMA", "VIEW3",
+            ColumnDiff.EQUAL);
+        testViewUpgrade(true, TENANT_ID, "TABLESCHEMA", "TABLEWITHVIEW4", "VIEWSCHEMA", "VIEW4",
+            ColumnDiff.EQUAL);
+        testViewUpgrade(true, TENANT_ID, "SAMESCHEMA", "TABLEWITHVIEW5", "SAMESCHEMA", "VIEW5",
+            ColumnDiff.EQUAL);
+    }
+
+    @Test
+    public void testUpgradeForTenantViewWithMoreColumnsThanBaseTable() throws Exception {
+        testViewUpgrade(true, TENANT_ID, null, "TABLEWITHVIEW1", null, "VIEW1", ColumnDiff.MORE);
+        testViewUpgrade(true, TENANT_ID, "TABLESCHEMA", "TABLEWITHVIEW", null, "VIEW2",
+            ColumnDiff.MORE);
+        testViewUpgrade(true, TENANT_ID, null, "TABLEWITHVIEW3", "VIEWSCHEMA", "VIEW3",
+            ColumnDiff.MORE);
+        testViewUpgrade(true, TENANT_ID, "TABLESCHEMA", "TABLEWITHVIEW4", "VIEWSCHEMA", "VIEW4",
+            ColumnDiff.MORE);
+        testViewUpgrade(true, TENANT_ID, "SAMESCHEMA", "TABLEWITHVIEW5", "SAMESCHEMA", "VIEW5",
+            ColumnDiff.MORE);
+    }
+
+    @Test
+    public void testUpgradeForViewWithSameColumnsAsBaseTable() throws Exception {
+        testViewUpgrade(false, null, null, "TABLEWITHVIEW1", null, "VIEW1", ColumnDiff.EQUAL);
+        testViewUpgrade(false, null, "TABLESCHEMA", "TABLEWITHVIEW", null, "VIEW2",
+            ColumnDiff.EQUAL);
+        testViewUpgrade(false, null, null, "TABLEWITHVIEW3", "VIEWSCHEMA", "VIEW3",
+            ColumnDiff.EQUAL);
+        testViewUpgrade(false, null, "TABLESCHEMA", "TABLEWITHVIEW4", "VIEWSCHEMA", "VIEW4",
+            ColumnDiff.EQUAL);
+        testViewUpgrade(false, null, "SAMESCHEMA", "TABLEWITHVIEW5", "SAMESCHEMA", "VIEW5",
+            ColumnDiff.EQUAL);
+    }
+
+    @Test
+    public void testUpgradeForViewWithMoreColumnsThanBaseTable() throws Exception {
+        testViewUpgrade(false, null, null, "TABLEWITHVIEW1", null, "VIEW1", ColumnDiff.MORE);
+        testViewUpgrade(false, null, "TABLESCHEMA", "TABLEWITHVIEW", null, "VIEW2", ColumnDiff.MORE);
+        testViewUpgrade(false, null, null, "TABLEWITHVIEW3", "VIEWSCHEMA", "VIEW3", ColumnDiff.MORE);
+        testViewUpgrade(false, null, "TABLESCHEMA", "TABLEWITHVIEW4", "VIEWSCHEMA", "VIEW4",
+            ColumnDiff.MORE);
+        testViewUpgrade(false, null, "SAMESCHEMA", "TABLEWITHVIEW5", "SAMESCHEMA", "VIEW5",
+            ColumnDiff.MORE);
+    }
+
+    @Test
+    public void testSettingBaseColumnCountWhenBaseTableColumnDropped() throws Exception {
+        testViewUpgrade(true, TENANT_ID, null, "TABLEWITHVIEW1", null, "VIEW1", ColumnDiff.MORE);
+        testViewUpgrade(true, TENANT_ID, "TABLESCHEMA", "TABLEWITHVIEW", null, "VIEW2",
+            ColumnDiff.LESS);
+        testViewUpgrade(true, TENANT_ID, null, "TABLEWITHVIEW3", "VIEWSCHEMA", "VIEW3",
+            ColumnDiff.LESS);
+        testViewUpgrade(true, TENANT_ID, "TABLESCHEMA", "TABLEWITHVIEW4", "VIEWSCHEMA", "VIEW4",
+            ColumnDiff.LESS);
+        testViewUpgrade(true, TENANT_ID, "SAMESCHEMA", "TABLEWITHVIEW5", "SAMESCHEMA", "VIEW5",
+            ColumnDiff.LESS);
+    }
+    
+    @Test
+    public void testSettingBaseColumnCountForMultipleViewsOnTable() throws Exception {
+        String baseSchema = "XYZ";
+        String baseTable = "BASE_TABLE";
+        String fullBaseTableName = SchemaUtil.getTableName(baseSchema, baseTable);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String baseTableDDL = "CREATE TABLE " + fullBaseTableName + " (TENANT_ID VARCHAR NOT NULL, PK1 VARCHAR NOT NULL, V1 INTEGER, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK1)) MULTI_TENANT = true";
+            conn.createStatement().execute(baseTableDDL);
+            
+            for (int i = 1; i <=2; i++) {
+                // Create views for tenants;
+                String tenant = "tenant" + i;
+                try (Connection tenantConn = createTenantConnection(tenant)) {
+                    String view = "TENANT_VIEW1";
+                    
+                    // view with its own column
+                    String viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
+                    tenantConn.createStatement().execute(viewDDL);
+                    String addCols = "ALTER VIEW " + view + " ADD COL1 VARCHAR ";
+                    tenantConn.createStatement().execute(addCols);
+                    removeBaseColumnCountKV(tenant, null, view);
+
+                    // view that has the last base table column removed
+                    view = "TENANT_VIEW2";
+                    viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
+                    tenantConn.createStatement().execute(viewDDL);
+                    String droplastBaseCol = "ALTER VIEW " + view + " DROP COLUMN V2";
+                    tenantConn.createStatement().execute(droplastBaseCol);
+                    removeBaseColumnCountKV(tenant, null, view);
+
+                    // view that has the middle base table column removed
+                    view = "TENANT_VIEW3";
+                    viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
+                    tenantConn.createStatement().execute(viewDDL);
+                    String dropMiddileBaseCol = "ALTER VIEW " + view + " DROP COLUMN V1";
+                    tenantConn.createStatement().execute(dropMiddileBaseCol);
+                    removeBaseColumnCountKV(tenant, null, view);
+                }
+            }
+            
+            // create global views
+            try (Connection globalConn = DriverManager.getConnection(getUrl())) {
+                String view = "GLOBAL_VIEW1";
+                
+                // view with its own column
+                String viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
+                globalConn.createStatement().execute(viewDDL);
+                String addCols = "ALTER VIEW " + view + " ADD COL1 VARCHAR ";
+                globalConn.createStatement().execute(addCols);
+                removeBaseColumnCountKV(null, null, view);
+
+                // view that has the last base table column removed
+                view = "GLOBAL_VIEW2";
+                viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
+                globalConn.createStatement().execute(viewDDL);
+                String droplastBaseCol = "ALTER VIEW " + view + " DROP COLUMN V2";
+                globalConn.createStatement().execute(droplastBaseCol);
+                removeBaseColumnCountKV(null, null, view);
+
+                // view that has the middle base table column removed
+                view = "GLOBAL_VIEW3";
+                viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
+                globalConn.createStatement().execute(viewDDL);
+                String dropMiddileBaseCol = "ALTER VIEW " + view + " DROP COLUMN V1";
+                globalConn.createStatement().execute(dropMiddileBaseCol);
+                removeBaseColumnCountKV(null, null, view);
+            }
+            
+            // run upgrade
+            UpgradeUtil.upgradeTo4_5_0(conn.unwrap(PhoenixConnection.class));
+            
+            // Verify base column counts for tenant specific views
+            for (int i = 1; i <=2 ; i++) {
+                String tenantId = "tenant" + i;
+                checkBaseColumnCount(tenantId, null, "TENANT_VIEW1", 4);
+                checkBaseColumnCount(tenantId, null, "TENANT_VIEW2", DIVORCED_VIEW_BASE_COLUMN_COUNT);
+                checkBaseColumnCount(tenantId, null, "TENANT_VIEW3", DIVORCED_VIEW_BASE_COLUMN_COUNT);
+            }
+            
+            // Verify base column count for global views
+            checkBaseColumnCount(null, null, "GLOBAL_VIEW1", 4);
+            checkBaseColumnCount(null, null, "GLOBAL_VIEW2", DIVORCED_VIEW_BASE_COLUMN_COUNT);
+            checkBaseColumnCount(null, null, "GLOBAL_VIEW3", DIVORCED_VIEW_BASE_COLUMN_COUNT);
+        }
+        
+        
+    }
+    
+    private enum ColumnDiff {
+        MORE, EQUAL, LESS
+    };
+
+    private void testViewUpgrade(boolean tenantView, String tenantId, String baseTableSchema,
+            String baseTableName, String viewSchema, String viewName, ColumnDiff diff)
+            throws Exception {
+        if (tenantView) {
+            checkNotNull(tenantId);
+        } else {
+            checkArgument(tenantId == null);
+        }
+        Connection conn = DriverManager.getConnection(getUrl());
+        String fullViewName = SchemaUtil.getTableName(viewSchema, viewName);
+        String fullBaseTableName = SchemaUtil.getTableName(baseTableSchema, baseTableName);
+        try {
+            int expectedBaseColumnCount;
+            conn.createStatement().execute(
+                "CREATE TABLE IF NOT EXISTS " + fullBaseTableName + " ("
+                        + " TENANT_ID CHAR(15) NOT NULL, " + " PK1 integer NOT NULL, "
+                        + "PK2 bigint NOT NULL, " + "CF1.V1 VARCHAR, " + "CF2.V2 VARCHAR, "
+                        + "V3 CHAR(100) ARRAY[4] "
+                        + " CONSTRAINT NAME_PK PRIMARY KEY (TENANT_ID, PK1, PK2)"
+                        + " ) MULTI_TENANT= true");
+            
+            // create a view with same columns as base table.
+            try (Connection conn2 = getConnection(tenantView, tenantId)) {
+                conn2.createStatement().execute(
+                    "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullBaseTableName);
+            }
+
+            if (diff == ColumnDiff.MORE) {
+                    // add a column to the view
+                    try (Connection conn3 = getConnection(tenantView, tenantId)) {
+                        conn3.createStatement().execute(
+                            "ALTER VIEW " + fullViewName + " ADD VIEW_COL1 VARCHAR");
+                    }
+            }
+            if (diff == ColumnDiff.LESS) {
+                try (Connection conn3 = getConnection(tenantView, tenantId)) {
+                    conn3.createStatement().execute(
+                        "ALTER VIEW " + fullViewName + " DROP COLUMN CF2.V2");
+                }
+                expectedBaseColumnCount = DIVORCED_VIEW_BASE_COLUMN_COUNT;
+            } else {
+                expectedBaseColumnCount = 6;
+            }
+
+            checkBaseColumnCount(tenantId, viewSchema, viewName, expectedBaseColumnCount);
+            checkBaseColumnCount(null, baseTableSchema, baseTableName, BASE_TABLE_BASE_COLUMN_COUNT);
+            
+            // remove base column count kv so we can check whether the upgrade code is setting the 
+            // base column count correctly.
+            removeBaseColumnCountKV(tenantId, viewSchema, viewName);
+            removeBaseColumnCountKV(null, baseTableSchema, baseTableName);
+
+            // assert that the removing base column count key value worked correctly.
+            checkBaseColumnCount(tenantId, viewSchema, viewName, 0);
+            checkBaseColumnCount(null, baseTableSchema, baseTableName, 0);
+            
+            // run upgrade
+            UpgradeUtil.upgradeTo4_5_0(conn.unwrap(PhoenixConnection.class));
+
+            checkBaseColumnCount(tenantId, viewSchema, viewName, expectedBaseColumnCount);
+            checkBaseColumnCount(null, baseTableSchema, baseTableName, BASE_TABLE_BASE_COLUMN_COUNT);
+        } finally {
+            conn.close();
+        }
+    }
+
+    private static void checkBaseColumnCount(String tenantId, String schemaName, String tableName,
+            int expectedBaseColumnCount) throws Exception {
+        checkNotNull(tableName);
+        Connection conn = DriverManager.getConnection(getUrl());
+        String sql = SELECT_BASE_COLUMN_COUNT_FROM_HEADER_ROW;
+        sql =
+                String.format(sql, tenantId == null ? " IS NULL " : " = ? ",
+                    schemaName == null ? "IS NULL" : " = ? ");
+        int paramIndex = 1;
+        PreparedStatement stmt = conn.prepareStatement(sql);
+        if (tenantId != null) {
+            stmt.setString(paramIndex++, tenantId);
+        }
+        if (schemaName != null) {
+            stmt.setString(paramIndex++, schemaName);
+        }
+        stmt.setString(paramIndex, tableName);
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(expectedBaseColumnCount, rs.getInt(1));
+        assertFalse(rs.next());
+    }
+
+    private static void
+            removeBaseColumnCountKV(String tenantId, String schemaName, String tableName)
+                    throws Exception {
+        byte[] rowKey =
+                SchemaUtil.getTableKey(tenantId == null ? new byte[0] : Bytes.toBytes(tenantId),
+                    schemaName == null ? new byte[0] : Bytes.toBytes(schemaName),
+                    Bytes.toBytes(tableName));
+        Put viewColumnDefinitionPut = new Put(rowKey, HConstants.LATEST_TIMESTAMP);
+        viewColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+            PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, HConstants.LATEST_TIMESTAMP, null);
+
+        try (PhoenixConnection conn =
+                (DriverManager.getConnection(getUrl())).unwrap(PhoenixConnection.class)) {
+            try (HTableInterface htable =
+                    conn.getQueryServices().getTable(
+                        Bytes.toBytes(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME))) {
+                RowMutations mutations = new RowMutations(rowKey);
+                mutations.add(viewColumnDefinitionPut);
+                htable.mutateRow(mutations);
+            }
+        }
+    }
+
+    private Connection createTenantConnection(String tenantId) throws SQLException {
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        return DriverManager.getConnection(getUrl(), props);
+    }
+
+    private Connection getConnection(boolean tenantSpecific, String tenantId) throws SQLException {
+        if (tenantSpecific) {
+            checkNotNull(tenantId);
+            return createTenantConnection(tenantId);
+        }
+        return DriverManager.getConnection(getUrl());
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 9686de8..f1b5373 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -61,6 +61,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT_BYTE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
+import static org.apache.phoenix.query.QueryConstants.DIVORCED_VIEW_BASE_COLUMN_COUNT;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
@@ -77,6 +78,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
@@ -223,6 +225,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
     private static final KeyValue STORE_NULLS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORE_NULLS_BYTES);
     private static final KeyValue EMPTY_KEYVALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
+    private static final KeyValue BASE_COLUMN_COUNT_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES);
+    
     private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
             EMPTY_KEYVALUE_KV,
             TABLE_TYPE_KV,
@@ -241,7 +245,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             VIEW_INDEX_ID_KV,
             INDEX_TYPE_KV,
             INDEX_DISABLE_TIMESTAMP_KV,
-            STORE_NULLS_KV
+            STORE_NULLS_KV,
+            BASE_COLUMN_COUNT_KV
             );
     static {
         Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -263,6 +268,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int VIEW_INDEX_ID_INDEX = TABLE_KV_COLUMNS.indexOf(VIEW_INDEX_ID_KV);
     private static final int INDEX_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(INDEX_TYPE_KV);
     private static final int STORE_NULLS_INDEX = TABLE_KV_COLUMNS.indexOf(STORE_NULLS_KV);
+    private static final int BASE_COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(BASE_COLUMN_COUNT_KV);
 
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
@@ -401,7 +407,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         byte[] tableName = request.getTableName().toByteArray();
         byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
         long tableTimeStamp = request.getTableTimestamp();
-
         try {
             // TODO: check that key is within region.getStartKey() and region.getEndKey()
             // and return special code to force client to lookup region from meta.
@@ -766,7 +771,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         Short viewIndexId = viewIndexIdKv == null ? null : (Short)MetaDataUtil.getViewIndexIdDataType().getCodec().decodeShort(viewIndexIdKv.getValueArray(), viewIndexIdKv.getValueOffset(), SortOrder.getDefault());
         Cell indexTypeKv = tableKeyValues[INDEX_TYPE_INDEX];
         IndexType indexType = indexTypeKv == null ? null : IndexType.fromSerializedValue(indexTypeKv.getValueArray()[indexTypeKv.getValueOffset()]);
-
+        Cell baseColumnCountKv = tableKeyValues[BASE_COLUMN_COUNT_INDEX];
+        int baseColumnCount = baseColumnCountKv == null ? 0 : PInteger.INSTANCE.getCodec().decodeInt(baseColumnCountKv.getValueArray(),
+            baseColumnCountKv.getValueOffset(), SortOrder.getDefault());
 
         List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
         List<PTable> indexes = new ArrayList<PTable>();
@@ -811,7 +818,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp,
             tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null,
             tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement,
-            disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats);
+            disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount);
     }
 
     private PFunction getFunction(RegionScanner scanner)
@@ -1160,13 +1167,15 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         locks.add(rowLock);
     }
 
-    protected static final byte[] PHYSICAL_TABLE_BYTES = new byte[] {PTable.LinkType.PHYSICAL_TABLE.getSerializedValue()};
+    private static final byte[] PHYSICAL_TABLE_BYTES = new byte[] {PTable.LinkType.PHYSICAL_TABLE.getSerializedValue()};
+    private static final byte[] PARENT_TABLE_BYTES = new byte[] {PTable.LinkType.PARENT_TABLE.getSerializedValue()};
+
     /**
      * @param tableName parent table's name
      * Looks for whether child views exist for the table specified by table.
      * TODO: should we pass a timestamp here?
      */
-    private TableViewFinderResult findChildViews(HRegion region, byte[] tenantId, PTable table) throws IOException {
+    private TableViewFinderResult findChildViews(HRegion region, byte[] tenantId, PTable table, byte[] linkTypeBytes) throws IOException {
         byte[] schemaName = table.getSchemaName().getBytes();
         byte[] tableName = table.getTableName().getBytes();
         boolean isMultiTenant = table.isMultiTenant();
@@ -1180,13 +1189,15 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             scan.setStartRow(startRow);
             scan.setStopRow(stopRow);
         }
-        SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, PHYSICAL_TABLE_BYTES);
+        SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, linkTypeBytes);
         linkFilter.setFilterIfMissing(true);
         byte[] suffix = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, SchemaUtil.getTableNameAsBytes(schemaName, tableName));
         SuffixFilter rowFilter = new SuffixFilter(suffix);
         Filter filter = new FilterList(linkFilter, rowFilter);
         scan.setFilter(filter);
         scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
+        scan.addColumn(TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
+        
         // Original region-only scanner modified due to PHOENIX-1208
         // RegionScanner scanner = region.getScanner(scan);
         // The following *should* work, but doesn't due to HBASE-11837
@@ -1351,7 +1362,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
           }
 
           // Handle any child views that exist
-          TableViewFinderResult tableViewFinderResult = findChildViews(region, tenantId, table);
+          TableViewFinderResult tableViewFinderResult = findChildViews(region, tenantId, table, PHYSICAL_TABLE_BYTES);
           if (tableViewFinderResult.hasViews()) {
             if (isCascade) {
               if (tableViewFinderResult.allViewsInMultipleRegions()) {
@@ -1435,10 +1446,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static interface ColumnMutator {
       MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
           List<Mutation> tableMetadata, HRegion region,
-          List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) throws IOException,
+          List<ImmutableBytesPtr> invalidateList, List<RowLock> locks, long clientTimeStamp) throws IOException,
           SQLException;
     }
 
+    @SuppressWarnings("deprecation")
     private MetaDataMutationResult
     mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator) throws IOException {
         byte[][] rowKeyMetaData = new byte[5][];
@@ -1518,22 +1530,23 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                             EnvironmentEdgeManager.currentTimeMillis(), null);
                 } else {
                     // server-side, except for indexing, we always expect the keyvalues to be standard KeyValues
-                    PTableType expectedType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesPtr());
+                    PTableType expectedType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE,
+                            new ImmutableBytesPtr());
                     // We said to drop a table, but found a view or visa versa
-                    if (type != expectedType) {
-                        return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
-                    }
-                    if (findChildViews(region, tenantId, table).hasViews()) {
-                        // Disallow any column mutations for parents of tenant tables
-                        return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
+                    if (type != expectedType) { return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                            EnvironmentEdgeManager.currentTimeMillis(), null); }
+                    if (table.getBaseColumnCount() == 0) {
+                        // If the base column count hasn't been set, then it means that the upgrade
+                        // to 4.5.0 is in progress. Have the client retry the mutation operation.
+                        return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION,
+                                EnvironmentEdgeManager.currentTimeMillis(), table);
                     }
                 }
                 result = mutator.updateMutation(table, rowKeyMetaData, tableMetadata, region,
-                            invalidateList, locks);
+                            invalidateList, locks, clientTimeStamp);
                 if (result != null) {
                     return result;
                 }
-
                 region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
                 // Invalidate from cache
                 for (ImmutableBytesPtr invalidateKey : invalidateList) {
@@ -1553,6 +1566,85 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         }
     }
 
+    private void addRowsToChildViews(List<Mutation> tableMetadata, List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName,
+            List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, TableViewFinderResult childViewsResult,
+            HRegion region, List<RowLock> locks) throws IOException, SQLException {
+        for (Result viewResult : childViewsResult.getResults()) {
+            byte[][] rowViewKeyMetaData = new byte[3][];
+            getVarChars(viewResult.getRow(), 3, rowViewKeyMetaData);
+            byte[] viewTenantId = rowViewKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+            byte[] viewSchemaName = rowViewKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+            byte[] viewName = rowViewKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+            byte[] viewKey = SchemaUtil.getTableKey(viewTenantId, viewSchemaName, viewName);
+            PTable view = doGetTable(viewKey, clientTimeStamp);
+
+            if (view.getBaseColumnCount() == QueryConstants.DIVORCED_VIEW_BASE_COLUMN_COUNT) {
+                // if a view has divorced itself from the base table, we don't allow schema changes
+                // to be propagated to it.
+                return;
+            }
+            // lock the rows corresponding to views so that no other thread can modify the view meta-data
+            acquireLock(region, viewKey, locks);
+
+            int deltaNumberOfColumns = 0;
+            
+            for (Mutation m : tableMetadata) {
+                byte[][] rkmd = new byte[5][];
+                int pkCount = getVarChars(m.getRow(), rkmd);
+                if (m instanceof Put && pkCount > COLUMN_NAME_INDEX
+                        && Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0
+                        && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) {
+                    Put p = (Put)m;
+
+                    byte[] k = ByteUtil.concat(viewKey, QueryConstants.SEPARATOR_BYTE_ARRAY, rkmd[COLUMN_NAME_INDEX],
+                            QueryConstants.SEPARATOR_BYTE_ARRAY, rkmd[FAMILY_NAME_INDEX]);
+                    Put viewColumnDefinitionPut = new Put(k, clientTimeStamp);
+                    for (Cell cell : p.getFamilyCellMap().values().iterator().next()) {
+                        viewColumnDefinitionPut.add(CellUtil.createCell(k, CellUtil.cloneFamily(cell),
+                                CellUtil.cloneQualifier(cell), cell.getTimestamp(), cell.getTypeByte(),
+                                CellUtil.cloneValue(cell)));
+                    }
+                    deltaNumberOfColumns++;
+                    mutationsForAddingColumnsToViews.add(viewColumnDefinitionPut);
+                }
+            }
+
+            int oldBaseColumnCount = view.getBaseColumnCount();
+
+            Put viewHeaderRowPut = new Put(viewKey, clientTimeStamp);
+            byte[] baseColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
+            PInteger.INSTANCE.getCodec().encodeInt(oldBaseColumnCount + deltaNumberOfColumns, baseColumnCountPtr, 0);
+            byte[] columnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
+            PInteger.INSTANCE.getCodec().encodeInt(view.getColumns().size() + deltaNumberOfColumns, columnCountPtr, 0);
+            byte[] viewSequencePtr = new byte[PLong.INSTANCE.getByteSize()];
+            PLong.INSTANCE.getCodec().encodeLong(view.getSequenceNumber() + 1, viewSequencePtr, 0);
+            viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES, clientTimeStamp, columnCountPtr);
+            viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, clientTimeStamp, baseColumnCountPtr);
+            viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, clientTimeStamp, viewSequencePtr);
+            mutationsForAddingColumnsToViews.add(viewHeaderRowPut);
+
+            // Update positions of view columns
+            for (PColumn column : view.getColumns()) {
+                if (column.getPosition() >= oldBaseColumnCount) {
+                    int newPosition = column.getPosition() + deltaNumberOfColumns + 1;
+
+                    byte[] k = ByteUtil.concat(viewKey, QueryConstants.SEPARATOR_BYTE_ARRAY, column.getName()
+                            .getBytes(), QueryConstants.SEPARATOR_BYTE_ARRAY, column.getFamilyName() != null ? column.getFamilyName().getBytes() : null);
+
+                    Put positionUpdatePut = new Put(k, clientTimeStamp);
+                    byte[] ptr = new byte[PInteger.INSTANCE.getByteSize()];
+                    PInteger.INSTANCE.getCodec().encodeInt(newPosition, ptr, 0);
+                    positionUpdatePut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                            PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, clientTimeStamp, ptr);
+                    mutationsForAddingColumnsToViews.add(positionUpdatePut);
+                }
+            }
+            invalidateList.add(new ImmutableBytesPtr(viewKey));
+        }
+    }
 
     @Override
     public void addColumn(RpcController controller, AddColumnRequest request,
@@ -1562,11 +1654,29 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() {
                 @Override
                 public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
-                        List<Mutation> tableMetaData, HRegion region,
-                        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) {
+                        List<Mutation> tableMetaData, HRegion region, List<ImmutableBytesPtr> invalidateList,
+                        List<RowLock> locks, long clientTimeStamp) throws IOException, SQLException {
                     byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
                     byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
                     byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
+                    PTableType type = table.getType();
+                    TableViewFinderResult childViewsResult = findChildViews(region, tenantId, table,
+                            (type == PTableType.VIEW ? PARENT_TABLE_BYTES : PHYSICAL_TABLE_BYTES));
+                    List<Mutation> mutationsForAddingColumnsToViews = Collections.emptyList();
+                    if (childViewsResult.hasViews()) {
+                        /*
+                         * Adding a column is not allowed if: 1) Meta-data for child view/s spans over more than one
+                         * region. 2) Adding column to a views that has child view/s.
+                         */
+                        if (!childViewsResult.allViewsInSingleRegion() || type == PTableType.VIEW) {
+                            return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
+                                    EnvironmentEdgeManager.currentTimeMillis(), null);
+                        } else {
+                            mutationsForAddingColumnsToViews = new ArrayList<>(childViewsResult.getResults().size() * tableMetaData.size());
+                            addRowsToChildViews(tableMetaData, mutationsForAddingColumnsToViews, schemaName, tableName, invalidateList, clientTimeStamp,
+                                    childViewsResult, region, locks);
+                        }
+                    }
                     for (Mutation m : tableMetaData) {
                         byte[] key = m.getRow();
                         boolean addingPKColumn = false;
@@ -1609,6 +1719,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                             }
                         }
                     }
+                    tableMetaData.addAll(mutationsForAddingColumnsToViews);
                     return null;
                 }
             });
@@ -1750,19 +1861,27 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
 
         try {
             tableMetaData = ProtobufUtil.getMutations(request);
-            final long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
             final List<byte[]> tableNamesToDelete = Lists.newArrayList();
             MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() {
                 @Override
                 public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
                         List<Mutation> tableMetaData, HRegion region,
-                        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks)
+                        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks, long clientTimeStamp)
                         throws IOException, SQLException {
                     byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
                     byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
                     byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
                     boolean deletePKColumn = false;
                     List<Mutation> additionalTableMetaData = Lists.newArrayList();
+                    
+                    PTableType type = table.getType();
+                    TableViewFinderResult childViewsResult = findChildViews(region, tenantId, table,
+                            (type == PTableType.VIEW ? PARENT_TABLE_BYTES : PHYSICAL_TABLE_BYTES));
+                    if (childViewsResult.hasViews()) {
+                        return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager
+                                .currentTimeMillis(), null);
+                    }
+                    
                     for (Mutation m : tableMetaData) {
                         if (m instanceof Delete) {
                             byte[] key = m.getRow();
@@ -1787,6 +1906,26 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                     } else {
                                         continue;
                                     }
+                                    if (table.getType() == PTableType.VIEW) {
+                                        if (table.getBaseColumnCount() != DIVORCED_VIEW_BASE_COLUMN_COUNT
+                                                && columnToDelete.getPosition() < table.getBaseColumnCount()) {
+                                            /*
+                                             * If the column being dropped is inherited from the base table, then the
+                                             * view is about to divorce itself from the base table. Divorce here means
+                                             * that any further meta-data changes made to the base table will not be
+                                             * propagated to the hierarchy of views on the base table.
+                                             */
+                                            byte[] viewKey = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
+                                            Put updateBaseColumnCountPut = new Put(viewKey);
+                                            byte[] baseColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
+                                            PInteger.INSTANCE.getCodec().encodeInt(DIVORCED_VIEW_BASE_COLUMN_COUNT,
+                                                    baseColumnCountPtr, 0);
+                                            updateBaseColumnCountPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                                                    PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, clientTimeStamp,
+                                                    baseColumnCountPtr);
+                                            additionalTableMetaData.add(updateBaseColumnCountPut);
+                                        }
+                                    }
                                     if (columnToDelete.isViewReferenced()) { // Disallow deletion of column referenced in WHERE clause of view
                                         return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), table, columnToDelete);
                                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index acbd9f8..2989bf6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -62,8 +62,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
 
     public static final long MIN_TABLE_TIMESTAMP = 0;
 
-    // Incremented from 5 to 7 with the addition of the STORE_NULLS table option in 4.3
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 7;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 8;
     public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000;
     public static final int DEFAULT_MAX_STAT_DATA_VERSIONS = 3;
     public static final boolean DEFAULT_META_DATA_KEEP_DELETED_CELLS = true;
@@ -73,6 +72,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_2_0 = MIN_TABLE_TIMESTAMP + 4;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1 = MIN_TABLE_TIMESTAMP + 5;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0 = MIN_TABLE_TIMESTAMP + 7;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0 = MIN_TABLE_TIMESTAMP + 8;
     
     // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need
     // a different code for every type of error.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index 7d389ac..dd6e303 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -3108,6 +3108,16 @@ public final class PTableProtos {
      * <code>optional bool storeNulls = 24;</code>
      */
     boolean getStoreNulls();
+
+    // optional int32 baseColumnCount = 25;
+    /**
+     * <code>optional int32 baseColumnCount = 25;</code>
+     */
+    boolean hasBaseColumnCount();
+    /**
+     * <code>optional int32 baseColumnCount = 25;</code>
+     */
+    int getBaseColumnCount();
   }
   /**
    * Protobuf type {@code PTable}
@@ -3298,6 +3308,11 @@ public final class PTableProtos {
               storeNulls_ = input.readBool();
               break;
             }
+            case 200: {
+              bitField0_ |= 0x00100000;
+              baseColumnCount_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3828,6 +3843,22 @@ public final class PTableProtos {
       return storeNulls_;
     }
 
+    // optional int32 baseColumnCount = 25;
+    public static final int BASECOLUMNCOUNT_FIELD_NUMBER = 25;
+    private int baseColumnCount_;
+    /**
+     * <code>optional int32 baseColumnCount = 25;</code>
+     */
+    public boolean hasBaseColumnCount() {
+      return ((bitField0_ & 0x00100000) == 0x00100000);
+    }
+    /**
+     * <code>optional int32 baseColumnCount = 25;</code>
+     */
+    public int getBaseColumnCount() {
+      return baseColumnCount_;
+    }
+
     private void initFields() {
       schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       tableNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -3853,6 +3884,7 @@ public final class PTableProtos {
       indexType_ = com.google.protobuf.ByteString.EMPTY;
       statsTimeStamp_ = 0L;
       storeNulls_ = false;
+      baseColumnCount_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3992,6 +4024,9 @@ public final class PTableProtos {
       if (((bitField0_ & 0x00080000) == 0x00080000)) {
         output.writeBool(24, storeNulls_);
       }
+      if (((bitField0_ & 0x00100000) == 0x00100000)) {
+        output.writeInt32(25, baseColumnCount_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -4102,6 +4137,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(24, storeNulls_);
       }
+      if (((bitField0_ & 0x00100000) == 0x00100000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(25, baseColumnCount_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -4233,6 +4272,11 @@ public final class PTableProtos {
         result = result && (getStoreNulls()
             == other.getStoreNulls());
       }
+      result = result && (hasBaseColumnCount() == other.hasBaseColumnCount());
+      if (hasBaseColumnCount()) {
+        result = result && (getBaseColumnCount()
+            == other.getBaseColumnCount());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -4342,6 +4386,10 @@ public final class PTableProtos {
         hash = (37 * hash) + STORENULLS_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getStoreNulls());
       }
+      if (hasBaseColumnCount()) {
+        hash = (37 * hash) + BASECOLUMNCOUNT_FIELD_NUMBER;
+        hash = (53 * hash) + getBaseColumnCount();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -4514,6 +4562,8 @@ public final class PTableProtos {
         bitField0_ = (bitField0_ & ~0x00400000);
         storeNulls_ = false;
         bitField0_ = (bitField0_ & ~0x00800000);
+        baseColumnCount_ = 0;
+        bitField0_ = (bitField0_ & ~0x01000000);
         return this;
       }
 
@@ -4654,6 +4704,10 @@ public final class PTableProtos {
           to_bitField0_ |= 0x00080000;
         }
         result.storeNulls_ = storeNulls_;
+        if (((from_bitField0_ & 0x01000000) == 0x01000000)) {
+          to_bitField0_ |= 0x00100000;
+        }
+        result.baseColumnCount_ = baseColumnCount_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -4820,6 +4874,9 @@ public final class PTableProtos {
         if (other.hasStoreNulls()) {
           setStoreNulls(other.getStoreNulls());
         }
+        if (other.hasBaseColumnCount()) {
+          setBaseColumnCount(other.getBaseColumnCount());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -6424,6 +6481,39 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional int32 baseColumnCount = 25;
+      private int baseColumnCount_ ;
+      /**
+       * <code>optional int32 baseColumnCount = 25;</code>
+       */
+      public boolean hasBaseColumnCount() {
+        return ((bitField0_ & 0x01000000) == 0x01000000);
+      }
+      /**
+       * <code>optional int32 baseColumnCount = 25;</code>
+       */
+      public int getBaseColumnCount() {
+        return baseColumnCount_;
+      }
+      /**
+       * <code>optional int32 baseColumnCount = 25;</code>
+       */
+      public Builder setBaseColumnCount(int value) {
+        bitField0_ |= 0x01000000;
+        baseColumnCount_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 baseColumnCount = 25;</code>
+       */
+      public Builder clearBaseColumnCount() {
+        bitField0_ = (bitField0_ & ~0x01000000);
+        baseColumnCount_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PTable)
     }
 
@@ -6470,7 +6560,7 @@ public final class PTableProtos {
       "values\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003 \001(" +
       "\003\022\025\n\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePostsCo",
       "unt\030\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGuideP" +
-      "osts\"\266\004\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014" +
+      "osts\"\317\004\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014" +
       "\022\026\n\016tableNameBytes\030\002 \002(\014\022\036\n\ttableType\030\003 " +
       "\002(\0162\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022\026\n\016" +
       "sequenceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022" +
@@ -6484,10 +6574,11 @@ public final class PTableProtos {
       "nt\030\022 \001(\014\022\025\n\rphysicalNames\030\023 \003(\014\022\020\n\010tenan" +
       "tId\030\024 \001(\014\022\023\n\013viewIndexId\030\025 \001(\005\022\021\n\tindexT" +
       "ype\030\026 \001(\014\022\026\n\016statsTimeStamp\030\027 \001(\003\022\022\n\nsto" +
-      "reNulls\030\030 \001(\010*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022" +
-      "\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004" +
-      "B@\n(org.apache.phoenix.coprocessor.gener" +
-      "atedB\014PTableProtosH\001\210\001\001\240\001\001"
+      "reNulls\030\030 \001(\010\022\027\n\017baseColumnCount\030\031 \001(\005*A" +
+      "\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VI" +
+      "EW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache." +
+      "phoenix.coprocessor.generatedB\014PTablePro" +
+      "tosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6511,7 +6602,7 @@ public final class PTableProtos {
           internal_static_PTable_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PTable_descriptor,
-              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", });
+              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 3a0b03b..d1b3b27 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -67,7 +67,6 @@ import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 
@@ -209,6 +208,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
     public static final byte[] IS_VIEW_REFERENCED_BYTES = Bytes.toBytes(IS_VIEW_REFERENCED);
     public static final String VIEW_INDEX_ID = "VIEW_INDEX_ID";
     public static final byte[] VIEW_INDEX_ID_BYTES = Bytes.toBytes(VIEW_INDEX_ID);
+    public static final String BASE_COLUMN_COUNT = "BASE_COLUMN_COUNT";
+    public static final byte[] BASE_COLUMN_COUNT_BYTES = Bytes.toBytes(BASE_COLUMN_COUNT);
 
     public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY;
     public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 83f7157..f3be8f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
 import static org.apache.hadoop.hbase.HColumnDescriptor.TTL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
+import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -116,6 +117,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.schema.ColumnAlreadyExistsException;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.EmptySequenceCacheException;
 import org.apache.phoenix.schema.FunctionNotFoundException;
@@ -141,6 +143,7 @@ import org.apache.phoenix.schema.stats.PTableStats;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PUnsignedTinyint;
 import org.apache.phoenix.util.ByteUtil;
@@ -1823,21 +1826,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     }
 
-    /** 
-     * Keeping this to use for further upgrades. This method closes the oldMetaConnection.
-     */
-    private PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection,
-        String tableName, long timestamp, String columns) throws SQLException {
-
+    private PhoenixConnection addColumn(PhoenixConnection oldMetaConnection, String tableName, long timestamp, String columns, boolean addIfNotExists) throws SQLException {
         Properties props = new Properties(oldMetaConnection.getClientInfo());
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp));
         // Cannot go through DriverManager or you end up in an infinite loop because it'll call init again
         PhoenixConnection metaConnection = new PhoenixConnection(this, oldMetaConnection.getURL(), props, oldMetaConnection.getMetaDataCache());
         SQLException sqlE = null;
         try {
-            metaConnection.createStatement().executeUpdate("ALTER TABLE " + tableName + " ADD IF NOT EXISTS " + columns );
+            metaConnection.createStatement().executeUpdate("ALTER TABLE " + tableName + " ADD " + (addIfNotExists ? " IF NOT EXISTS " : "") + columns );
         } catch (SQLException e) {
-            logger.warn("addColumnsIfNotExists failed due to:" + e);
+            logger.warn("Add column failed due to:" + e);
             sqlE = e;
         } finally {
             try {
@@ -1855,6 +1853,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
         return metaConnection;
     }
+    
+    /** 
+     * Keeping this to use for further upgrades. This method closes the oldMetaConnection.
+     */
+    private PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection,
+            String tableName, long timestamp, String columns) throws SQLException {
+        return addColumn(oldMetaConnection, tableName, timestamp, columns, true);
+    }
 
     @Override
     public void init(final String url, final Properties props) throws SQLException {
@@ -1935,12 +1941,27 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                     columnsToAdd += ", " + PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName()
                                             + ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName();
                                 }
-                                
                                 // Ugh..need to assign to another local variable to keep eclipse happy.
                                 PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection,
                                         PhoenixDatabaseMetaData.SYSTEM_CATALOG,
                                         MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
                                 metaConnection = newMetaConnection;
+                                
+                                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) {
+                                    columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " "
+                                            + PInteger.INSTANCE.getSqlTypeName();
+                                    try {
+                                        addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd, false);
+                                        upgradeTo4_5_0(metaConnection);
+                                    } catch (ColumnAlreadyExistsException ignored) {
+                                        /* 
+                                         * Upgrade to 4.5 is a slightly special case. We use the fact that the column
+                                         * BASE_COLUMN_COUNT is already part of the meta-data schema as the signal that
+                                         * the server side upgrade has finished or is in progress.
+                                         */
+                                    }
+                                }
                             }
                             int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
                                     QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
@@ -1983,6 +2004,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                 } else { 
                                     nSequenceSaltBuckets = getSaltBuckets(e);
                                 }
+                                
                             }
                             try {
                                 metaConnection.createStatement().executeUpdate(
@@ -2002,6 +2024,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                             } catch (NewerTableAlreadyExistsException e) {
                             } catch (TableAlreadyExistsException e) {
                             }
+                            
 
                         } catch (Exception e) {
                             if (e instanceof SQLException) {