You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/06/18 23:21:51 UTC

[2/2] phoenix git commit: PHOENIX-1504 Support adding column to a table that has views (Samarth Jain/Dave Hacker)

PHOENIX-1504 Support adding column to a table that has views (Samarth Jain/Dave Hacker)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e001c63f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e001c63f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e001c63f

Branch: refs/heads/4.x-HBase-0.98
Commit: e001c63f87552ec871c3ba8f4484b2011a28bf15
Parents: 7fe2317
Author: Samarth <sa...@salesforce.com>
Authored: Thu Jun 18 14:21:38 2015 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Thu Jun 18 14:21:38 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/AlterTableIT.java    | 356 +++++++++++++++++
 .../end2end/TenantSpecificTablesDDLIT.java      |  20 +-
 .../org/apache/phoenix/end2end/UpgradeIT.java   | 332 ++++++++++++++++
 .../coprocessor/MetaDataEndpointImpl.java       | 183 +++++++--
 .../phoenix/coprocessor/MetaDataProtocol.java   |   4 +-
 .../coprocessor/generated/PTableProtos.java     | 103 ++++-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   3 +-
 .../query/ConnectionQueryServicesImpl.java      |  41 +-
 .../apache/phoenix/query/QueryConstants.java    |  30 +-
 .../apache/phoenix/schema/DelegateTable.java    |   5 +
 .../apache/phoenix/schema/MetaDataClient.java   |  37 +-
 .../java/org/apache/phoenix/schema/PTable.java  |   1 +
 .../org/apache/phoenix/schema/PTableImpl.java   |  40 +-
 .../java/org/apache/phoenix/util/ByteUtil.java  |  10 +-
 .../org/apache/phoenix/util/UpgradeUtil.java    | 395 ++++++++++++++++++-
 15 files changed, 1451 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 59698d6..61dd6a9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.end2end;
 
 import static org.apache.hadoop.hbase.HColumnDescriptor.DEFAULT_REPLICATION_SCOPE;
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_MUTATE_TABLE;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.apache.phoenix.util.TestUtil.closeConnection;
 import static org.apache.phoenix.util.TestUtil.closeStatement;
@@ -32,9 +33,11 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -48,8 +51,10 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -59,6 +64,8 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.base.Objects;
+
 /**
  *
  * A lot of tests in this class test HBase level properties. As a result,
@@ -1988,4 +1995,353 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT {
             conn.close();
         }
     }
+    
+    @Test
+    public void testAddColumnToTableWithViews() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {       
+            conn.createStatement().execute("CREATE TABLE IF NOT EXISTS TABLEWITHVIEW ("
+                    + " ID char(1) NOT NULL,"
+                    + " COL1 integer NOT NULL,"
+                    + " COL2 bigint NOT NULL,"
+                    + " CONSTRAINT NAME_PK PRIMARY KEY (ID, COL1, COL2)"
+                    + " )");
+            assertTableDefinition(conn, "TABLEWITHVIEW", PTableType.TABLE, null, 0, 3, -1, "ID", "COL1", "COL2");
+            
+            conn.createStatement().execute("CREATE VIEW VIEWOFTABLE ( VIEW_COL1 SMALLINT ) AS SELECT * FROM TABLEWITHVIEW");
+            assertTableDefinition(conn, "VIEWOFTABLE", PTableType.VIEW, "TABLEWITHVIEW", 0, 4, 3, "ID", "COL1", "COL2", "VIEW_COL1");
+            
+            conn.createStatement().execute("ALTER TABLE TABLEWITHVIEW ADD COL3 char(10)");
+            assertTableDefinition(conn, "VIEWOFTABLE", PTableType.VIEW, "TABLEWITHVIEW", 1, 5, 4, "ID", "COL1", "COL2", "COL3", "VIEW_COL1");
+            
+        } finally {
+            conn.close();
+        }
+    }
+
+    private void assertTableDefinition(Connection conn, String tableName, PTableType tableType, String parentTableName, int sequenceNumber, int columnCount, int baseColumnCount, String... columnName) throws Exception {
+        PreparedStatement p = conn.prepareStatement("SELECT * FROM SYSTEM.CATALOG WHERE TABLE_NAME=? AND TABLE_TYPE=?");
+        p.setString(1, tableName);
+        p.setString(2, tableType.getSerializedValue());
+        ResultSet rs = p.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(getSystemCatalogEntriesForTable(conn, tableName, "Mismatch in BaseColumnCount"), baseColumnCount, rs.getInt("BASE_COLUMN_COUNT"));
+        assertEquals(getSystemCatalogEntriesForTable(conn, tableName, "Mismatch in columnCount"), columnCount, rs.getInt("COLUMN_COUNT"));
+        assertEquals(getSystemCatalogEntriesForTable(conn, tableName, "Mismatch in sequenceNumber"), sequenceNumber, rs.getInt("TABLE_SEQ_NUM"));
+        rs.close();
+
+        ResultSet parentTableColumnsRs = null; 
+        if (parentTableName != null) {
+            parentTableColumnsRs = conn.getMetaData().getColumns(null, null, parentTableName, null);
+        }
+        
+        rs = conn.getMetaData().getColumns(null, null, tableName, null);
+        for (int i = 0; i < columnName.length; i++) {
+            if (columnName[i] != null) {
+                assertTrue(rs.next());
+                assertEquals(getSystemCatalogEntriesForTable(conn, tableName, "Mismatch in columnName: i=" + i), columnName[i], rs.getString("COLUMN_NAME"));
+                assertEquals(getSystemCatalogEntriesForTable(conn, tableName, "Mismatch in ordinalPosition: i=" + i), i+1, rs.getInt("ORDINAL_POSITION"));
+                if (i < baseColumnCount && parentTableColumnsRs != null) {
+                    assertTrue(parentTableColumnsRs.next());
+                    ResultSetMetaData md = parentTableColumnsRs.getMetaData();
+                    assertEquals(md.getColumnCount(), rs.getMetaData().getColumnCount());
+                    for (int columnIndex = 1; columnIndex < md.getColumnCount(); columnIndex++) {
+                        String viewColumnValue = rs.getString(columnIndex);
+                        String parentTableColumnValue = parentTableColumnsRs.getString(columnIndex);
+                        if (!Objects.equal(viewColumnValue, parentTableColumnValue)) {
+                            if (md.getColumnName(columnIndex).equals("TABLE_NAME")) {
+                                assertEquals(parentTableName, parentTableColumnValue);
+                                assertEquals(tableName, viewColumnValue);
+                            } else {
+                                fail(md.getColumnName(columnIndex) + "=" + parentTableColumnValue);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        assertFalse(getSystemCatalogEntriesForTable(conn, tableName, ""), rs.next());
+    }
+    
+    private String getSystemCatalogEntriesForTable(Connection conn, String tableName, String message) throws Exception {
+        StringBuilder sb = new StringBuilder(message);
+        sb.append("\n\n\n");
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM SYSTEM.CATALOG WHERE TABLE_NAME='"+ tableName +"'");
+        ResultSetMetaData metaData = rs.getMetaData();
+        int rowNum = 0;
+        while (rs.next()) {
+            sb.append(rowNum++).append("------\n");
+            for (int i = 1; i <= metaData.getColumnCount(); i++) {
+                sb.append("\t").append(metaData.getColumnLabel(i)).append("=").append(rs.getString(i)).append("\n");
+            }
+            sb.append("\n");
+        }
+        rs.close();
+        return sb.toString();
+    }
+    
+    @Test
+    public void testCacheInvalidatedAfterAddingColumnToBaseTableWithViews() throws Exception {
+        String baseTable = "testCacheInvalidatedAfterAddingColumnToBaseTableWithViews";
+        String viewName = baseTable + "_view";
+        String tenantId = "tenantId";
+        try (Connection globalConn = DriverManager.getConnection(getUrl())) {
+            String tableDDL = "CREATE TABLE " + baseTable + " (TENANT_ID VARCHAR NOT NULL, PK1 VARCHAR NOT NULL, V1 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK1)) MULTI_TENANT = true " ;
+            globalConn.createStatement().execute(tableDDL);
+            Properties tenantProps = new Properties();
+            tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+            // create a tenant specific view
+            try (Connection tenantConn =  DriverManager.getConnection(getUrl(), tenantProps)) {
+                String viewDDL = "CREATE VIEW " + viewName + " AS SELECT * FROM " + baseTable;
+                tenantConn.createStatement().execute(viewDDL);
+                
+                // Add a column to the base table using global connection
+                globalConn.createStatement().execute("ALTER TABLE " + baseTable + " ADD NEW_COL VARCHAR");
+
+                // Check now whether the tenant connection can see the column that was added
+                tenantConn.createStatement().execute("SELECT NEW_COL FROM " + viewName);
+                tenantConn.createStatement().execute("SELECT NEW_COL FROM " + baseTable);
+            }
+        }
+    }
+    
+    @Test
+    public void testDropColumnOnTableWithViewsNotAllowed() throws Exception {
+        String baseTable = "testDropColumnOnTableWithViewsNotAllowed";
+        String viewName = baseTable + "_view";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableDDL = "CREATE TABLE " + baseTable + " (PK1 VARCHAR NOT NULL PRIMARY KEY, V1 VARCHAR, V2 VARCHAR)";
+            conn.createStatement().execute(tableDDL);
+            
+            String viewDDL = "CREATE VIEW " + viewName + " AS SELECT * FROM " + baseTable;
+            conn.createStatement().execute(viewDDL);
+            
+            String dropColumn = "ALTER TABLE " + baseTable + " DROP COLUMN V2";
+            try {
+                conn.createStatement().execute(dropColumn);
+                fail("Dropping column on a base table that has views is not allowed");
+            } catch (SQLException e) {  
+                assertEquals(CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
+            }
+        }
+    }
+    
+    @Test
+    public void testAlteringViewThatHasChildViewsNotAllowed() throws Exception {
+        String baseTable = "testAlteringViewThatHasChildViewsNotAllowed";
+        String childView = "childView";
+        String grandChildView = "grandChildView";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String baseTableDDL =
+                    "CREATE TABLE " +  baseTable + " (TENANT_ID VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK2))";
+            conn.createStatement().execute(baseTableDDL);
+
+            String childViewDDL = "CREATE VIEW " + childView + " AS SELECT * FROM " + baseTable;
+            conn.createStatement().execute(childViewDDL);
+
+            String addColumnToChildViewDDL =
+                    "ALTER VIEW " + childView + " ADD CHILD_VIEW_COL VARCHAR";
+            conn.createStatement().execute(addColumnToChildViewDDL);
+
+            String grandChildViewDDL =
+                    "CREATE VIEW " + grandChildView + " AS SELECT * FROM " + childView;
+            conn.createStatement().execute(grandChildViewDDL);
+
+            // dropping base table column from child view should fail
+            String dropColumnFromChildView = "ALTER VIEW " + childView + " DROP COLUMN V2";
+            try {
+                conn.createStatement().execute(dropColumnFromChildView);
+                fail("Dropping columns from a view that has child views on it is not allowed");
+            } catch (SQLException e) {
+                assertEquals(CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
+            }
+
+            // dropping view specific column from child view should fail
+            dropColumnFromChildView = "ALTER VIEW " + childView + " DROP COLUMN CHILD_VIEW_COL";
+            try {
+                conn.createStatement().execute(dropColumnFromChildView);
+                fail("Dropping columns from a view that has child views on it is not allowed");
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
+            }
+            
+            // Adding column to view that has child views should fail
+            String addColumnToChildView = "ALTER VIEW " + childView + " ADD V5 VARCHAR";
+            try {
+                conn.createStatement().execute(addColumnToChildView);
+                fail("Adding columns to a view that has child views on it is not allowed");
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
+            }
+
+            // dropping column from the grand child view, however, should work.
+            String dropColumnFromGrandChildView =
+                    "ALTER VIEW " + grandChildView + " DROP COLUMN CHILD_VIEW_COL";
+            conn.createStatement().execute(dropColumnFromGrandChildView);
+
+            // similarly, dropping column inherited from the base table should work.
+            dropColumnFromGrandChildView = "ALTER VIEW " + grandChildView + " DROP COLUMN V2";
+            conn.createStatement().execute(dropColumnFromGrandChildView);
+        }
+    }
+    
+    @Test
+    public void testDivorcedViewsStayDivorced() throws Exception {
+        String baseTable = "testDivorcedViewsStayDivorced";
+        String viewName = baseTable + "_view";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableDDL = "CREATE TABLE " + baseTable + " (PK1 VARCHAR NOT NULL PRIMARY KEY, V1 VARCHAR, V2 VARCHAR)";
+            conn.createStatement().execute(tableDDL);
+            
+            String viewDDL = "CREATE VIEW " + viewName + " AS SELECT * FROM " + baseTable;
+            conn.createStatement().execute(viewDDL);
+            
+            // Drop the column inherited from base table to divorce the view
+            String dropColumn = "ALTER VIEW " + viewName + " DROP COLUMN V2";
+            conn.createStatement().execute(dropColumn);
+            
+            String alterBaseTable = "ALTER TABLE " + baseTable + " ADD V3 VARCHAR";
+            conn.createStatement().execute(alterBaseTable);
+            
+            // Column V3 shouldn't have propagated to the divorced view.
+            String sql = "SELECT V3 FROM " + viewName;
+            try {
+                conn.createStatement().execute(sql);
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.COLUMN_NOT_FOUND.getErrorCode(), e.getErrorCode());
+            }
+        } 
+    }
+    
+    @Test
+    public void testAddingColumnToBaseTablePropagatesToEntireViewHierarchy() throws Exception {
+        String baseTable = "testViewHierarchy";
+        String view1 = "view1";
+        String view2 = "view2";
+        String view3 = "view3";
+        String view4 = "view4";
+        /*                                     baseTable
+                                 /                  |               \ 
+                         view1(tenant1)    view3(tenant2)          view4(global)
+                          /
+                        view2(tenant1)  
+        */
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String baseTableDDL = "CREATE TABLE " + baseTable + " (TENANT_ID VARCHAR NOT NULL, PK1 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK1)) MULTI_TENANT = true ";
+            conn.createStatement().execute(baseTableDDL);
+            
+            try (Connection tenant1Conn = getTenantConnection("tenant1")) {
+                String view1DDL = "CREATE VIEW " + view1 + " AS SELECT * FROM " + baseTable;
+                tenant1Conn.createStatement().execute(view1DDL);
+                
+                String view2DDL = "CREATE VIEW " + view2 + " AS SELECT * FROM " + view1;
+                tenant1Conn.createStatement().execute(view2DDL);
+            }
+            
+            try (Connection tenant2Conn = getTenantConnection("tenant2")) {
+                String view3DDL = "CREATE VIEW " + view3 + " AS SELECT * FROM " + baseTable;
+                tenant2Conn.createStatement().execute(view3DDL);
+            }
+            
+            String view4DDL = "CREATE VIEW " + view4 + " AS SELECT * FROM " + baseTable;
+            conn.createStatement().execute(view4DDL);
+            
+            String alterBaseTable = "ALTER TABLE " + baseTable + " ADD V3 VARCHAR";
+            conn.createStatement().execute(alterBaseTable);
+            
+            // verify that the column is visible to view4
+            conn.createStatement().execute("SELECT V3 FROM " + view4);
+            
+            // verify that the column is visible to view1 and view2
+            try (Connection tenant1Conn = getTenantConnection("tenant1")) {
+                tenant1Conn.createStatement().execute("SELECT V3 from " + view1);
+                tenant1Conn.createStatement().execute("SELECT V3 from " + view2);
+            }
+            
+            // verify that the column is visible to view3
+            try (Connection tenant2Conn = getTenantConnection("tenant2")) {
+                tenant2Conn.createStatement().execute("SELECT V3 from " + view3);
+            }
+            
+        }
+           
+    }
+    
+    @Test
+    public void testChangingPKOfBaseTableChangesPKForAllViews() throws Exception {
+        String baseTable = "testChangePKOfBaseTable";
+        String view1 = "view1";
+        String view2 = "view2";
+        String view3 = "view3";
+        String view4 = "view4";
+        /*                                     baseTable
+                                 /                  |               \ 
+                         view1(tenant1)    view3(tenant2)          view4(global)
+                          /
+                        view2(tenant1)  
+         */
+        Connection tenant1Conn = null, tenant2Conn = null;
+        try (Connection globalConn = DriverManager.getConnection(getUrl())) {
+            String baseTableDDL = "CREATE TABLE "
+                    + baseTable
+                    + " (TENANT_ID VARCHAR NOT NULL, PK1 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK1)) MULTI_TENANT = true ";
+            globalConn.createStatement().execute(baseTableDDL);
+
+            tenant1Conn = getTenantConnection("tenant1");
+            String view1DDL = "CREATE VIEW " + view1 + " AS SELECT * FROM " + baseTable;
+            tenant1Conn.createStatement().execute(view1DDL);
+
+            String view2DDL = "CREATE VIEW " + view2 + " AS SELECT * FROM " + view1;
+            tenant1Conn.createStatement().execute(view2DDL);
+
+            tenant2Conn = getTenantConnection("tenant2");
+            String view3DDL = "CREATE VIEW " + view3 + " AS SELECT * FROM " + baseTable;
+            tenant2Conn.createStatement().execute(view3DDL);
+
+            String view4DDL = "CREATE VIEW " + view4 + " AS SELECT * FROM " + baseTable;
+            globalConn.createStatement().execute(view4DDL);
+
+            String alterBaseTable = "ALTER TABLE " + baseTable + " ADD NEW_PK varchar primary key ";
+            globalConn.createStatement().execute(alterBaseTable);
+
+            // verify that the new column new_pk is now part of the primary key for the entire hierarchy
+            assertTrue(checkColumnPartOfPk(globalConn.unwrap(PhoenixConnection.class), "PK1", baseTable));
+            assertTrue(checkColumnPartOfPk(tenant1Conn.unwrap(PhoenixConnection.class), "PK1", view1));
+            assertTrue(checkColumnPartOfPk(tenant1Conn.unwrap(PhoenixConnection.class), "PK1", view2));
+            assertTrue(checkColumnPartOfPk(tenant2Conn.unwrap(PhoenixConnection.class), "PK1", view3));
+            assertTrue(checkColumnPartOfPk(globalConn.unwrap(PhoenixConnection.class), "PK1", view4));
+
+        } finally {
+            if (tenant1Conn != null) {
+                try {
+                    tenant1Conn.close();
+                } catch (Throwable ignore) {}
+            }
+            if (tenant2Conn != null) {
+                try {
+                    tenant2Conn.close();
+                } catch (Throwable ignore) {}
+            }
+        }
+
+    }
+    
+    private boolean checkColumnPartOfPk(PhoenixConnection conn, String columnName, String tableName) throws SQLException {
+        String normalizedTableName = SchemaUtil.normalizeIdentifier(tableName);
+        PTable table = conn.getMetaDataCache().getTable(new PTableKey(conn.getTenantId(), normalizedTableName));
+        List<PColumn> pkCols = table.getPKColumns();
+        String normalizedColumnName = SchemaUtil.normalizeIdentifier(columnName);
+        for (PColumn pkCol : pkCols) {
+            if (pkCol.getName().getString().equals(normalizedColumnName)) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
+    private Connection getTenantConnection(String tenantId) throws Exception {
+        Properties tenantProps = new Properties();
+        tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        return DriverManager.getConnection(getUrl(), tenantProps);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index a7c7291..e1a1970 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -272,6 +272,7 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
                 assertEquals(CANNOT_MODIFY_VIEW_PK.getErrorCode(), expected.getErrorCode());
             }
             
+            // try removing a non-PK col
             try {
                 conn.createStatement().execute("alter table " + TENANT_TABLE_NAME + " drop column id");
                 fail();
@@ -291,25 +292,6 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(nextTimestamp()));
         Connection conn = DriverManager.getConnection(getUrl(), props);
         try {
-            // try adding a PK col
-            try {
-                conn.createStatement().execute("alter table " + PARENT_TABLE_NAME + " add new_pk varchar primary key");
-                fail();
-            }
-            catch (SQLException expected) {
-                assertEquals(CANNOT_MUTATE_TABLE.getErrorCode(), expected.getErrorCode());
-            }
-            
-            // try adding a non-PK col
-            try {
-                conn.createStatement().execute("alter table " + PARENT_TABLE_NAME + " add new_col char(1)");
-                fail();
-            }
-            catch (SQLException expected) {
-                assertEquals(CANNOT_MUTATE_TABLE.getErrorCode(), expected.getErrorCode());
-            }
-            
-            // try removing a PK col
             try {
                 conn.createStatement().execute("alter table " + PARENT_TABLE_NAME + " drop column id");
                 fail();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
new file mode 100644
index 0000000..886e567
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
+import static org.apache.phoenix.query.QueryConstants.DIVORCED_VIEW_BASE_COLUMN_COUNT;
+import static org.apache.phoenix.util.UpgradeUtil.SELECT_BASE_COLUMN_COUNT_FROM_HEADER_ROW;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.UpgradeUtil;
+import org.junit.Test;
+
+public class UpgradeIT extends BaseHBaseManagedTimeIT {
+
+    private static String TENANT_ID = "tenantId";
+
+    @Test
+    public void testUpgradeForTenantViewWithSameColumnsAsBaseTable() throws Exception {
+        testViewUpgrade(true, TENANT_ID, null, "TABLEWITHVIEW1", null, "VIEW1", ColumnDiff.EQUAL);
+        testViewUpgrade(true, TENANT_ID, "TABLESCHEMA", "TABLEWITHVIEW", null, "VIEW2",
+            ColumnDiff.EQUAL);
+        testViewUpgrade(true, TENANT_ID, null, "TABLEWITHVIEW3", "VIEWSCHEMA", "VIEW3",
+            ColumnDiff.EQUAL);
+        testViewUpgrade(true, TENANT_ID, "TABLESCHEMA", "TABLEWITHVIEW4", "VIEWSCHEMA", "VIEW4",
+            ColumnDiff.EQUAL);
+        testViewUpgrade(true, TENANT_ID, "SAMESCHEMA", "TABLEWITHVIEW5", "SAMESCHEMA", "VIEW5",
+            ColumnDiff.EQUAL);
+    }
+
+    @Test
+    public void testUpgradeForTenantViewWithMoreColumnsThanBaseTable() throws Exception {
+        testViewUpgrade(true, TENANT_ID, null, "TABLEWITHVIEW1", null, "VIEW1", ColumnDiff.MORE);
+        testViewUpgrade(true, TENANT_ID, "TABLESCHEMA", "TABLEWITHVIEW", null, "VIEW2",
+            ColumnDiff.MORE);
+        testViewUpgrade(true, TENANT_ID, null, "TABLEWITHVIEW3", "VIEWSCHEMA", "VIEW3",
+            ColumnDiff.MORE);
+        testViewUpgrade(true, TENANT_ID, "TABLESCHEMA", "TABLEWITHVIEW4", "VIEWSCHEMA", "VIEW4",
+            ColumnDiff.MORE);
+        testViewUpgrade(true, TENANT_ID, "SAMESCHEMA", "TABLEWITHVIEW5", "SAMESCHEMA", "VIEW5",
+            ColumnDiff.MORE);
+    }
+
+    @Test
+    public void testUpgradeForViewWithSameColumnsAsBaseTable() throws Exception {
+        testViewUpgrade(false, null, null, "TABLEWITHVIEW1", null, "VIEW1", ColumnDiff.EQUAL);
+        testViewUpgrade(false, null, "TABLESCHEMA", "TABLEWITHVIEW", null, "VIEW2",
+            ColumnDiff.EQUAL);
+        testViewUpgrade(false, null, null, "TABLEWITHVIEW3", "VIEWSCHEMA", "VIEW3",
+            ColumnDiff.EQUAL);
+        testViewUpgrade(false, null, "TABLESCHEMA", "TABLEWITHVIEW4", "VIEWSCHEMA", "VIEW4",
+            ColumnDiff.EQUAL);
+        testViewUpgrade(false, null, "SAMESCHEMA", "TABLEWITHVIEW5", "SAMESCHEMA", "VIEW5",
+            ColumnDiff.EQUAL);
+    }
+
+    @Test
+    public void testUpgradeForViewWithMoreColumnsThanBaseTable() throws Exception {
+        testViewUpgrade(false, null, null, "TABLEWITHVIEW1", null, "VIEW1", ColumnDiff.MORE);
+        testViewUpgrade(false, null, "TABLESCHEMA", "TABLEWITHVIEW", null, "VIEW2", ColumnDiff.MORE);
+        testViewUpgrade(false, null, null, "TABLEWITHVIEW3", "VIEWSCHEMA", "VIEW3", ColumnDiff.MORE);
+        testViewUpgrade(false, null, "TABLESCHEMA", "TABLEWITHVIEW4", "VIEWSCHEMA", "VIEW4",
+            ColumnDiff.MORE);
+        testViewUpgrade(false, null, "SAMESCHEMA", "TABLEWITHVIEW5", "SAMESCHEMA", "VIEW5",
+            ColumnDiff.MORE);
+    }
+
+    @Test
+    public void testSettingBaseColumnCountWhenBaseTableColumnDropped() throws Exception {
+        testViewUpgrade(true, TENANT_ID, null, "TABLEWITHVIEW1", null, "VIEW1", ColumnDiff.MORE);
+        testViewUpgrade(true, TENANT_ID, "TABLESCHEMA", "TABLEWITHVIEW", null, "VIEW2",
+            ColumnDiff.LESS);
+        testViewUpgrade(true, TENANT_ID, null, "TABLEWITHVIEW3", "VIEWSCHEMA", "VIEW3",
+            ColumnDiff.LESS);
+        testViewUpgrade(true, TENANT_ID, "TABLESCHEMA", "TABLEWITHVIEW4", "VIEWSCHEMA", "VIEW4",
+            ColumnDiff.LESS);
+        testViewUpgrade(true, TENANT_ID, "SAMESCHEMA", "TABLEWITHVIEW5", "SAMESCHEMA", "VIEW5",
+            ColumnDiff.LESS);
+    }
+    
+    @Test
+    public void testSettingBaseColumnCountForMultipleViewsOnTable() throws Exception {
+        String baseSchema = "XYZ";
+        String baseTable = "BASE_TABLE";
+        String fullBaseTableName = SchemaUtil.getTableName(baseSchema, baseTable);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String baseTableDDL = "CREATE TABLE " + fullBaseTableName + " (TENANT_ID VARCHAR NOT NULL, PK1 VARCHAR NOT NULL, V1 INTEGER, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK1)) MULTI_TENANT = true";
+            conn.createStatement().execute(baseTableDDL);
+            
+            for (int i = 1; i <=2; i++) {
+                // Create views for tenants;
+                String tenant = "tenant" + i;
+                try (Connection tenantConn = createTenantConnection(tenant)) {
+                    String view = "TENANT_VIEW1";
+                    
+                    // view with its own column
+                    String viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
+                    tenantConn.createStatement().execute(viewDDL);
+                    String addCols = "ALTER VIEW " + view + " ADD COL1 VARCHAR ";
+                    tenantConn.createStatement().execute(addCols);
+                    removeBaseColumnCountKV(tenant, null, view);
+
+                    // view that has the last base table column removed
+                    view = "TENANT_VIEW2";
+                    viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
+                    tenantConn.createStatement().execute(viewDDL);
+                    String droplastBaseCol = "ALTER VIEW " + view + " DROP COLUMN V2";
+                    tenantConn.createStatement().execute(droplastBaseCol);
+                    removeBaseColumnCountKV(tenant, null, view);
+
+                    // view that has the middle base table column removed
+                    view = "TENANT_VIEW3";
+                    viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
+                    tenantConn.createStatement().execute(viewDDL);
+                    String dropMiddileBaseCol = "ALTER VIEW " + view + " DROP COLUMN V1";
+                    tenantConn.createStatement().execute(dropMiddileBaseCol);
+                    removeBaseColumnCountKV(tenant, null, view);
+                }
+            }
+            
+            // create global views
+            try (Connection globalConn = DriverManager.getConnection(getUrl())) {
+                String view = "GLOBAL_VIEW1";
+                
+                // view with its own column
+                String viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
+                globalConn.createStatement().execute(viewDDL);
+                String addCols = "ALTER VIEW " + view + " ADD COL1 VARCHAR ";
+                globalConn.createStatement().execute(addCols);
+                removeBaseColumnCountKV(null, null, view);
+
+                // view that has the last base table column removed
+                view = "GLOBAL_VIEW2";
+                viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
+                globalConn.createStatement().execute(viewDDL);
+                String droplastBaseCol = "ALTER VIEW " + view + " DROP COLUMN V2";
+                globalConn.createStatement().execute(droplastBaseCol);
+                removeBaseColumnCountKV(null, null, view);
+
+                // view that has the middle base table column removed
+                view = "GLOBAL_VIEW3";
+                viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
+                globalConn.createStatement().execute(viewDDL);
+                String dropMiddileBaseCol = "ALTER VIEW " + view + " DROP COLUMN V1";
+                globalConn.createStatement().execute(dropMiddileBaseCol);
+                removeBaseColumnCountKV(null, null, view);
+            }
+            
+            // run upgrade
+            UpgradeUtil.upgradeTo4_5_0(conn.unwrap(PhoenixConnection.class));
+            
+            // Verify base column counts for tenant specific views
+            for (int i = 1; i <=2 ; i++) {
+                String tenantId = "tenant" + i;
+                checkBaseColumnCount(tenantId, null, "TENANT_VIEW1", 4);
+                checkBaseColumnCount(tenantId, null, "TENANT_VIEW2", DIVORCED_VIEW_BASE_COLUMN_COUNT);
+                checkBaseColumnCount(tenantId, null, "TENANT_VIEW3", DIVORCED_VIEW_BASE_COLUMN_COUNT);
+            }
+            
+            // Verify base column count for global views
+            checkBaseColumnCount(null, null, "GLOBAL_VIEW1", 4);
+            checkBaseColumnCount(null, null, "GLOBAL_VIEW2", DIVORCED_VIEW_BASE_COLUMN_COUNT);
+            checkBaseColumnCount(null, null, "GLOBAL_VIEW3", DIVORCED_VIEW_BASE_COLUMN_COUNT);
+        }
+        
+        
+    }
+    
+    private enum ColumnDiff {
+        MORE, EQUAL, LESS
+    };
+
+    private void testViewUpgrade(boolean tenantView, String tenantId, String baseTableSchema,
+            String baseTableName, String viewSchema, String viewName, ColumnDiff diff)
+            throws Exception {
+        if (tenantView) {
+            checkNotNull(tenantId);
+        } else {
+            checkArgument(tenantId == null);
+        }
+        Connection conn = DriverManager.getConnection(getUrl());
+        String fullViewName = SchemaUtil.getTableName(viewSchema, viewName);
+        String fullBaseTableName = SchemaUtil.getTableName(baseTableSchema, baseTableName);
+        try {
+            int expectedBaseColumnCount;
+            conn.createStatement().execute(
+                "CREATE TABLE IF NOT EXISTS " + fullBaseTableName + " ("
+                        + " TENANT_ID CHAR(15) NOT NULL, " + " PK1 integer NOT NULL, "
+                        + "PK2 bigint NOT NULL, " + "CF1.V1 VARCHAR, " + "CF2.V2 VARCHAR, "
+                        + "V3 CHAR(100) ARRAY[4] "
+                        + " CONSTRAINT NAME_PK PRIMARY KEY (TENANT_ID, PK1, PK2)"
+                        + " ) MULTI_TENANT= true");
+            
+            // create a view with same columns as base table.
+            try (Connection conn2 = getConnection(tenantView, tenantId)) {
+                conn2.createStatement().execute(
+                    "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullBaseTableName);
+            }
+
+            if (diff == ColumnDiff.MORE) {
+                    // add a column to the view
+                    try (Connection conn3 = getConnection(tenantView, tenantId)) {
+                        conn3.createStatement().execute(
+                            "ALTER VIEW " + fullViewName + " ADD VIEW_COL1 VARCHAR");
+                    }
+            }
+            if (diff == ColumnDiff.LESS) {
+                try (Connection conn3 = getConnection(tenantView, tenantId)) {
+                    conn3.createStatement().execute(
+                        "ALTER VIEW " + fullViewName + " DROP COLUMN CF2.V2");
+                }
+                expectedBaseColumnCount = DIVORCED_VIEW_BASE_COLUMN_COUNT;
+            } else {
+                expectedBaseColumnCount = 6;
+            }
+
+            checkBaseColumnCount(tenantId, viewSchema, viewName, expectedBaseColumnCount);
+            checkBaseColumnCount(null, baseTableSchema, baseTableName, BASE_TABLE_BASE_COLUMN_COUNT);
+            
+            // remove base column count kv so we can check whether the upgrade code is setting the 
+            // base column count correctly.
+            removeBaseColumnCountKV(tenantId, viewSchema, viewName);
+            removeBaseColumnCountKV(null, baseTableSchema, baseTableName);
+
+            // assert that the removing base column count key value worked correctly.
+            checkBaseColumnCount(tenantId, viewSchema, viewName, 0);
+            checkBaseColumnCount(null, baseTableSchema, baseTableName, 0);
+            
+            // run upgrade
+            UpgradeUtil.upgradeTo4_5_0(conn.unwrap(PhoenixConnection.class));
+
+            checkBaseColumnCount(tenantId, viewSchema, viewName, expectedBaseColumnCount);
+            checkBaseColumnCount(null, baseTableSchema, baseTableName, BASE_TABLE_BASE_COLUMN_COUNT);
+        } finally {
+            conn.close();
+        }
+    }
+
+    private static void checkBaseColumnCount(String tenantId, String schemaName, String tableName,
+            int expectedBaseColumnCount) throws Exception {
+        checkNotNull(tableName);
+        Connection conn = DriverManager.getConnection(getUrl());
+        String sql = SELECT_BASE_COLUMN_COUNT_FROM_HEADER_ROW;
+        sql =
+                String.format(sql, tenantId == null ? " IS NULL " : " = ? ",
+                    schemaName == null ? "IS NULL" : " = ? ");
+        int paramIndex = 1;
+        PreparedStatement stmt = conn.prepareStatement(sql);
+        if (tenantId != null) {
+            stmt.setString(paramIndex++, tenantId);
+        }
+        if (schemaName != null) {
+            stmt.setString(paramIndex++, schemaName);
+        }
+        stmt.setString(paramIndex, tableName);
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(expectedBaseColumnCount, rs.getInt(1));
+        assertFalse(rs.next());
+    }
+
+    private static void
+            removeBaseColumnCountKV(String tenantId, String schemaName, String tableName)
+                    throws Exception {
+        byte[] rowKey =
+                SchemaUtil.getTableKey(tenantId == null ? new byte[0] : Bytes.toBytes(tenantId),
+                    schemaName == null ? new byte[0] : Bytes.toBytes(schemaName),
+                    Bytes.toBytes(tableName));
+        Put viewColumnDefinitionPut = new Put(rowKey, HConstants.LATEST_TIMESTAMP);
+        viewColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+            PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, HConstants.LATEST_TIMESTAMP, null);
+
+        try (PhoenixConnection conn =
+                (DriverManager.getConnection(getUrl())).unwrap(PhoenixConnection.class)) {
+            try (HTableInterface htable =
+                    conn.getQueryServices().getTable(
+                        Bytes.toBytes(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME))) {
+                RowMutations mutations = new RowMutations(rowKey);
+                mutations.add(viewColumnDefinitionPut);
+                htable.mutateRow(mutations);
+            }
+        }
+    }
+
+    private Connection createTenantConnection(String tenantId) throws SQLException {
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        return DriverManager.getConnection(getUrl(), props);
+    }
+
+    private Connection getConnection(boolean tenantSpecific, String tenantId) throws SQLException {
+        if (tenantSpecific) {
+            checkNotNull(tenantId);
+            return createTenantConnection(tenantId);
+        }
+        return DriverManager.getConnection(getUrl());
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 9686de8..f1b5373 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -61,6 +61,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT_BYTE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
+import static org.apache.phoenix.query.QueryConstants.DIVORCED_VIEW_BASE_COLUMN_COUNT;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
@@ -77,6 +78,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
@@ -223,6 +225,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
     private static final KeyValue STORE_NULLS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORE_NULLS_BYTES);
     private static final KeyValue EMPTY_KEYVALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
+    private static final KeyValue BASE_COLUMN_COUNT_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES);
+    
     private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
             EMPTY_KEYVALUE_KV,
             TABLE_TYPE_KV,
@@ -241,7 +245,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             VIEW_INDEX_ID_KV,
             INDEX_TYPE_KV,
             INDEX_DISABLE_TIMESTAMP_KV,
-            STORE_NULLS_KV
+            STORE_NULLS_KV,
+            BASE_COLUMN_COUNT_KV
             );
     static {
         Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -263,6 +268,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int VIEW_INDEX_ID_INDEX = TABLE_KV_COLUMNS.indexOf(VIEW_INDEX_ID_KV);
     private static final int INDEX_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(INDEX_TYPE_KV);
     private static final int STORE_NULLS_INDEX = TABLE_KV_COLUMNS.indexOf(STORE_NULLS_KV);
+    private static final int BASE_COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(BASE_COLUMN_COUNT_KV);
 
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
@@ -401,7 +407,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         byte[] tableName = request.getTableName().toByteArray();
         byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
         long tableTimeStamp = request.getTableTimestamp();
-
         try {
             // TODO: check that key is within region.getStartKey() and region.getEndKey()
             // and return special code to force client to lookup region from meta.
@@ -766,7 +771,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         Short viewIndexId = viewIndexIdKv == null ? null : (Short)MetaDataUtil.getViewIndexIdDataType().getCodec().decodeShort(viewIndexIdKv.getValueArray(), viewIndexIdKv.getValueOffset(), SortOrder.getDefault());
         Cell indexTypeKv = tableKeyValues[INDEX_TYPE_INDEX];
         IndexType indexType = indexTypeKv == null ? null : IndexType.fromSerializedValue(indexTypeKv.getValueArray()[indexTypeKv.getValueOffset()]);
-
+        Cell baseColumnCountKv = tableKeyValues[BASE_COLUMN_COUNT_INDEX];
+        int baseColumnCount = baseColumnCountKv == null ? 0 : PInteger.INSTANCE.getCodec().decodeInt(baseColumnCountKv.getValueArray(),
+            baseColumnCountKv.getValueOffset(), SortOrder.getDefault());
 
         List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
         List<PTable> indexes = new ArrayList<PTable>();
@@ -811,7 +818,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp,
             tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null,
             tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement,
-            disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats);
+            disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount);
     }
 
     private PFunction getFunction(RegionScanner scanner)
@@ -1160,13 +1167,15 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         locks.add(rowLock);
     }
 
-    protected static final byte[] PHYSICAL_TABLE_BYTES = new byte[] {PTable.LinkType.PHYSICAL_TABLE.getSerializedValue()};
+    private static final byte[] PHYSICAL_TABLE_BYTES = new byte[] {PTable.LinkType.PHYSICAL_TABLE.getSerializedValue()};
+    private static final byte[] PARENT_TABLE_BYTES = new byte[] {PTable.LinkType.PARENT_TABLE.getSerializedValue()};
+
     /**
      * @param tableName parent table's name
      * Looks for whether child views exist for the table specified by table.
      * TODO: should we pass a timestamp here?
      */
-    private TableViewFinderResult findChildViews(HRegion region, byte[] tenantId, PTable table) throws IOException {
+    private TableViewFinderResult findChildViews(HRegion region, byte[] tenantId, PTable table, byte[] linkTypeBytes) throws IOException {
         byte[] schemaName = table.getSchemaName().getBytes();
         byte[] tableName = table.getTableName().getBytes();
         boolean isMultiTenant = table.isMultiTenant();
@@ -1180,13 +1189,15 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             scan.setStartRow(startRow);
             scan.setStopRow(stopRow);
         }
-        SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, PHYSICAL_TABLE_BYTES);
+        SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, linkTypeBytes);
         linkFilter.setFilterIfMissing(true);
         byte[] suffix = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, SchemaUtil.getTableNameAsBytes(schemaName, tableName));
         SuffixFilter rowFilter = new SuffixFilter(suffix);
         Filter filter = new FilterList(linkFilter, rowFilter);
         scan.setFilter(filter);
         scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
+        scan.addColumn(TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
+        
         // Original region-only scanner modified due to PHOENIX-1208
         // RegionScanner scanner = region.getScanner(scan);
         // The following *should* work, but doesn't due to HBASE-11837
@@ -1351,7 +1362,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
           }
 
           // Handle any child views that exist
-          TableViewFinderResult tableViewFinderResult = findChildViews(region, tenantId, table);
+          TableViewFinderResult tableViewFinderResult = findChildViews(region, tenantId, table, PHYSICAL_TABLE_BYTES);
           if (tableViewFinderResult.hasViews()) {
             if (isCascade) {
               if (tableViewFinderResult.allViewsInMultipleRegions()) {
@@ -1435,10 +1446,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static interface ColumnMutator {
       MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
           List<Mutation> tableMetadata, HRegion region,
-          List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) throws IOException,
+          List<ImmutableBytesPtr> invalidateList, List<RowLock> locks, long clientTimeStamp) throws IOException,
           SQLException;
     }
 
+    @SuppressWarnings("deprecation")
     private MetaDataMutationResult
     mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator) throws IOException {
         byte[][] rowKeyMetaData = new byte[5][];
@@ -1518,22 +1530,23 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                             EnvironmentEdgeManager.currentTimeMillis(), null);
                 } else {
                     // server-side, except for indexing, we always expect the keyvalues to be standard KeyValues
-                    PTableType expectedType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesPtr());
+                    PTableType expectedType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE,
+                            new ImmutableBytesPtr());
                     // We said to drop a table, but found a view or visa versa
-                    if (type != expectedType) {
-                        return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
-                    }
-                    if (findChildViews(region, tenantId, table).hasViews()) {
-                        // Disallow any column mutations for parents of tenant tables
-                        return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
+                    if (type != expectedType) { return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                            EnvironmentEdgeManager.currentTimeMillis(), null); }
+                    if (table.getBaseColumnCount() == 0) {
+                        // If the base column count hasn't been set, then it means that the upgrade
+                        // to 4.5.0 is in progress. Have the client retry the mutation operation.
+                        return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION,
+                                EnvironmentEdgeManager.currentTimeMillis(), table);
                     }
                 }
                 result = mutator.updateMutation(table, rowKeyMetaData, tableMetadata, region,
-                            invalidateList, locks);
+                            invalidateList, locks, clientTimeStamp);
                 if (result != null) {
                     return result;
                 }
-
                 region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
                 // Invalidate from cache
                 for (ImmutableBytesPtr invalidateKey : invalidateList) {
@@ -1553,6 +1566,85 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         }
     }
 
+    private void addRowsToChildViews(List<Mutation> tableMetadata, List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName,
+            List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, TableViewFinderResult childViewsResult,
+            HRegion region, List<RowLock> locks) throws IOException, SQLException {
+        for (Result viewResult : childViewsResult.getResults()) {
+            byte[][] rowViewKeyMetaData = new byte[3][];
+            getVarChars(viewResult.getRow(), 3, rowViewKeyMetaData);
+            byte[] viewTenantId = rowViewKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+            byte[] viewSchemaName = rowViewKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+            byte[] viewName = rowViewKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+            byte[] viewKey = SchemaUtil.getTableKey(viewTenantId, viewSchemaName, viewName);
+            PTable view = doGetTable(viewKey, clientTimeStamp);
+
+            if (view.getBaseColumnCount() == QueryConstants.DIVORCED_VIEW_BASE_COLUMN_COUNT) {
+                // if a view has divorced itself from the base table, we don't allow schema changes
+                // to be propagated to it.
+                return;
+            }
+            // lock the rows corresponding to views so that no other thread can modify the view meta-data
+            acquireLock(region, viewKey, locks);
+
+            int deltaNumberOfColumns = 0;
+            
+            for (Mutation m : tableMetadata) {
+                byte[][] rkmd = new byte[5][];
+                int pkCount = getVarChars(m.getRow(), rkmd);
+                if (m instanceof Put && pkCount > COLUMN_NAME_INDEX
+                        && Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0
+                        && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) {
+                    Put p = (Put)m;
+
+                    byte[] k = ByteUtil.concat(viewKey, QueryConstants.SEPARATOR_BYTE_ARRAY, rkmd[COLUMN_NAME_INDEX],
+                            QueryConstants.SEPARATOR_BYTE_ARRAY, rkmd[FAMILY_NAME_INDEX]);
+                    Put viewColumnDefinitionPut = new Put(k, clientTimeStamp);
+                    for (Cell cell : p.getFamilyCellMap().values().iterator().next()) {
+                        viewColumnDefinitionPut.add(CellUtil.createCell(k, CellUtil.cloneFamily(cell),
+                                CellUtil.cloneQualifier(cell), cell.getTimestamp(), cell.getTypeByte(),
+                                CellUtil.cloneValue(cell)));
+                    }
+                    deltaNumberOfColumns++;
+                    mutationsForAddingColumnsToViews.add(viewColumnDefinitionPut);
+                }
+            }
+
+            int oldBaseColumnCount = view.getBaseColumnCount();
+
+            Put viewHeaderRowPut = new Put(viewKey, clientTimeStamp);
+            byte[] baseColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
+            PInteger.INSTANCE.getCodec().encodeInt(oldBaseColumnCount + deltaNumberOfColumns, baseColumnCountPtr, 0);
+            byte[] columnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
+            PInteger.INSTANCE.getCodec().encodeInt(view.getColumns().size() + deltaNumberOfColumns, columnCountPtr, 0);
+            byte[] viewSequencePtr = new byte[PLong.INSTANCE.getByteSize()];
+            PLong.INSTANCE.getCodec().encodeLong(view.getSequenceNumber() + 1, viewSequencePtr, 0);
+            viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES, clientTimeStamp, columnCountPtr);
+            viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, clientTimeStamp, baseColumnCountPtr);
+            viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, clientTimeStamp, viewSequencePtr);
+            mutationsForAddingColumnsToViews.add(viewHeaderRowPut);
+
+            // Update positions of view columns
+            for (PColumn column : view.getColumns()) {
+                if (column.getPosition() >= oldBaseColumnCount) {
+                    int newPosition = column.getPosition() + deltaNumberOfColumns + 1;
+
+                    byte[] k = ByteUtil.concat(viewKey, QueryConstants.SEPARATOR_BYTE_ARRAY, column.getName()
+                            .getBytes(), QueryConstants.SEPARATOR_BYTE_ARRAY, column.getFamilyName() != null ? column.getFamilyName().getBytes() : null);
+
+                    Put positionUpdatePut = new Put(k, clientTimeStamp);
+                    byte[] ptr = new byte[PInteger.INSTANCE.getByteSize()];
+                    PInteger.INSTANCE.getCodec().encodeInt(newPosition, ptr, 0);
+                    positionUpdatePut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                            PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, clientTimeStamp, ptr);
+                    mutationsForAddingColumnsToViews.add(positionUpdatePut);
+                }
+            }
+            invalidateList.add(new ImmutableBytesPtr(viewKey));
+        }
+    }
 
     @Override
     public void addColumn(RpcController controller, AddColumnRequest request,
@@ -1562,11 +1654,29 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() {
                 @Override
                 public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
-                        List<Mutation> tableMetaData, HRegion region,
-                        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) {
+                        List<Mutation> tableMetaData, HRegion region, List<ImmutableBytesPtr> invalidateList,
+                        List<RowLock> locks, long clientTimeStamp) throws IOException, SQLException {
                     byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
                     byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
                     byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
+                    PTableType type = table.getType();
+                    TableViewFinderResult childViewsResult = findChildViews(region, tenantId, table,
+                            (type == PTableType.VIEW ? PARENT_TABLE_BYTES : PHYSICAL_TABLE_BYTES));
+                    List<Mutation> mutationsForAddingColumnsToViews = Collections.emptyList();
+                    if (childViewsResult.hasViews()) {
+                        /*
+                         * Adding a column is not allowed if: 1) Meta-data for child view/s spans over more than one
+                         * region. 2) Adding column to a views that has child view/s.
+                         */
+                        if (!childViewsResult.allViewsInSingleRegion() || type == PTableType.VIEW) {
+                            return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
+                                    EnvironmentEdgeManager.currentTimeMillis(), null);
+                        } else {
+                            mutationsForAddingColumnsToViews = new ArrayList<>(childViewsResult.getResults().size() * tableMetaData.size());
+                            addRowsToChildViews(tableMetaData, mutationsForAddingColumnsToViews, schemaName, tableName, invalidateList, clientTimeStamp,
+                                    childViewsResult, region, locks);
+                        }
+                    }
                     for (Mutation m : tableMetaData) {
                         byte[] key = m.getRow();
                         boolean addingPKColumn = false;
@@ -1609,6 +1719,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                             }
                         }
                     }
+                    tableMetaData.addAll(mutationsForAddingColumnsToViews);
                     return null;
                 }
             });
@@ -1750,19 +1861,27 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
 
         try {
             tableMetaData = ProtobufUtil.getMutations(request);
-            final long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
             final List<byte[]> tableNamesToDelete = Lists.newArrayList();
             MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() {
                 @Override
                 public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
                         List<Mutation> tableMetaData, HRegion region,
-                        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks)
+                        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks, long clientTimeStamp)
                         throws IOException, SQLException {
                     byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
                     byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
                     byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
                     boolean deletePKColumn = false;
                     List<Mutation> additionalTableMetaData = Lists.newArrayList();
+                    
+                    PTableType type = table.getType();
+                    TableViewFinderResult childViewsResult = findChildViews(region, tenantId, table,
+                            (type == PTableType.VIEW ? PARENT_TABLE_BYTES : PHYSICAL_TABLE_BYTES));
+                    if (childViewsResult.hasViews()) {
+                        return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager
+                                .currentTimeMillis(), null);
+                    }
+                    
                     for (Mutation m : tableMetaData) {
                         if (m instanceof Delete) {
                             byte[] key = m.getRow();
@@ -1787,6 +1906,26 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                     } else {
                                         continue;
                                     }
+                                    if (table.getType() == PTableType.VIEW) {
+                                        if (table.getBaseColumnCount() != DIVORCED_VIEW_BASE_COLUMN_COUNT
+                                                && columnToDelete.getPosition() < table.getBaseColumnCount()) {
+                                            /*
+                                             * If the column being dropped is inherited from the base table, then the
+                                             * view is about to divorce itself from the base table. Divorce here means
+                                             * that any further meta-data changes made to the base table will not be
+                                             * propagated to the hierarchy of views on the base table.
+                                             */
+                                            byte[] viewKey = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
+                                            Put updateBaseColumnCountPut = new Put(viewKey);
+                                            byte[] baseColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
+                                            PInteger.INSTANCE.getCodec().encodeInt(DIVORCED_VIEW_BASE_COLUMN_COUNT,
+                                                    baseColumnCountPtr, 0);
+                                            updateBaseColumnCountPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                                                    PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, clientTimeStamp,
+                                                    baseColumnCountPtr);
+                                            additionalTableMetaData.add(updateBaseColumnCountPut);
+                                        }
+                                    }
                                     if (columnToDelete.isViewReferenced()) { // Disallow deletion of column referenced in WHERE clause of view
                                         return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), table, columnToDelete);
                                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index acbd9f8..2989bf6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -62,8 +62,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
 
     public static final long MIN_TABLE_TIMESTAMP = 0;
 
-    // Incremented from 5 to 7 with the addition of the STORE_NULLS table option in 4.3
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 7;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 8;
     public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000;
     public static final int DEFAULT_MAX_STAT_DATA_VERSIONS = 3;
     public static final boolean DEFAULT_META_DATA_KEEP_DELETED_CELLS = true;
@@ -73,6 +72,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_2_0 = MIN_TABLE_TIMESTAMP + 4;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1 = MIN_TABLE_TIMESTAMP + 5;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0 = MIN_TABLE_TIMESTAMP + 7;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0 = MIN_TABLE_TIMESTAMP + 8;
     
     // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need
     // a different code for every type of error.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index 7d389ac..dd6e303 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -3108,6 +3108,16 @@ public final class PTableProtos {
      * <code>optional bool storeNulls = 24;</code>
      */
     boolean getStoreNulls();
+
+    // optional int32 baseColumnCount = 25;
+    /**
+     * <code>optional int32 baseColumnCount = 25;</code>
+     */
+    boolean hasBaseColumnCount();
+    /**
+     * <code>optional int32 baseColumnCount = 25;</code>
+     */
+    int getBaseColumnCount();
   }
   /**
    * Protobuf type {@code PTable}
@@ -3298,6 +3308,11 @@ public final class PTableProtos {
               storeNulls_ = input.readBool();
               break;
             }
+            case 200: {
+              bitField0_ |= 0x00100000;
+              baseColumnCount_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3828,6 +3843,22 @@ public final class PTableProtos {
       return storeNulls_;
     }
 
+    // optional int32 baseColumnCount = 25;
+    public static final int BASECOLUMNCOUNT_FIELD_NUMBER = 25;
+    private int baseColumnCount_;
+    /**
+     * <code>optional int32 baseColumnCount = 25;</code>
+     */
+    public boolean hasBaseColumnCount() {
+      return ((bitField0_ & 0x00100000) == 0x00100000);
+    }
+    /**
+     * <code>optional int32 baseColumnCount = 25;</code>
+     */
+    public int getBaseColumnCount() {
+      return baseColumnCount_;
+    }
+
     private void initFields() {
       schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       tableNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -3853,6 +3884,7 @@ public final class PTableProtos {
       indexType_ = com.google.protobuf.ByteString.EMPTY;
       statsTimeStamp_ = 0L;
       storeNulls_ = false;
+      baseColumnCount_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3992,6 +4024,9 @@ public final class PTableProtos {
       if (((bitField0_ & 0x00080000) == 0x00080000)) {
         output.writeBool(24, storeNulls_);
       }
+      if (((bitField0_ & 0x00100000) == 0x00100000)) {
+        output.writeInt32(25, baseColumnCount_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -4102,6 +4137,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(24, storeNulls_);
       }
+      if (((bitField0_ & 0x00100000) == 0x00100000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(25, baseColumnCount_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -4233,6 +4272,11 @@ public final class PTableProtos {
         result = result && (getStoreNulls()
             == other.getStoreNulls());
       }
+      result = result && (hasBaseColumnCount() == other.hasBaseColumnCount());
+      if (hasBaseColumnCount()) {
+        result = result && (getBaseColumnCount()
+            == other.getBaseColumnCount());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -4342,6 +4386,10 @@ public final class PTableProtos {
         hash = (37 * hash) + STORENULLS_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getStoreNulls());
       }
+      if (hasBaseColumnCount()) {
+        hash = (37 * hash) + BASECOLUMNCOUNT_FIELD_NUMBER;
+        hash = (53 * hash) + getBaseColumnCount();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -4514,6 +4562,8 @@ public final class PTableProtos {
         bitField0_ = (bitField0_ & ~0x00400000);
         storeNulls_ = false;
         bitField0_ = (bitField0_ & ~0x00800000);
+        baseColumnCount_ = 0;
+        bitField0_ = (bitField0_ & ~0x01000000);
         return this;
       }
 
@@ -4654,6 +4704,10 @@ public final class PTableProtos {
           to_bitField0_ |= 0x00080000;
         }
         result.storeNulls_ = storeNulls_;
+        if (((from_bitField0_ & 0x01000000) == 0x01000000)) {
+          to_bitField0_ |= 0x00100000;
+        }
+        result.baseColumnCount_ = baseColumnCount_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -4820,6 +4874,9 @@ public final class PTableProtos {
         if (other.hasStoreNulls()) {
           setStoreNulls(other.getStoreNulls());
         }
+        if (other.hasBaseColumnCount()) {
+          setBaseColumnCount(other.getBaseColumnCount());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -6424,6 +6481,39 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional int32 baseColumnCount = 25;
+      private int baseColumnCount_ ;
+      /**
+       * <code>optional int32 baseColumnCount = 25;</code>
+       */
+      public boolean hasBaseColumnCount() {
+        return ((bitField0_ & 0x01000000) == 0x01000000);
+      }
+      /**
+       * <code>optional int32 baseColumnCount = 25;</code>
+       */
+      public int getBaseColumnCount() {
+        return baseColumnCount_;
+      }
+      /**
+       * <code>optional int32 baseColumnCount = 25;</code>
+       */
+      public Builder setBaseColumnCount(int value) {
+        bitField0_ |= 0x01000000;
+        baseColumnCount_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 baseColumnCount = 25;</code>
+       */
+      public Builder clearBaseColumnCount() {
+        bitField0_ = (bitField0_ & ~0x01000000);
+        baseColumnCount_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PTable)
     }
 
@@ -6470,7 +6560,7 @@ public final class PTableProtos {
       "values\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003 \001(" +
       "\003\022\025\n\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePostsCo",
       "unt\030\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGuideP" +
-      "osts\"\266\004\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014" +
+      "osts\"\317\004\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014" +
       "\022\026\n\016tableNameBytes\030\002 \002(\014\022\036\n\ttableType\030\003 " +
       "\002(\0162\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022\026\n\016" +
       "sequenceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022" +
@@ -6484,10 +6574,11 @@ public final class PTableProtos {
       "nt\030\022 \001(\014\022\025\n\rphysicalNames\030\023 \003(\014\022\020\n\010tenan" +
       "tId\030\024 \001(\014\022\023\n\013viewIndexId\030\025 \001(\005\022\021\n\tindexT" +
       "ype\030\026 \001(\014\022\026\n\016statsTimeStamp\030\027 \001(\003\022\022\n\nsto" +
-      "reNulls\030\030 \001(\010*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022" +
-      "\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004" +
-      "B@\n(org.apache.phoenix.coprocessor.gener" +
-      "atedB\014PTableProtosH\001\210\001\001\240\001\001"
+      "reNulls\030\030 \001(\010\022\027\n\017baseColumnCount\030\031 \001(\005*A" +
+      "\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VI" +
+      "EW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache." +
+      "phoenix.coprocessor.generatedB\014PTablePro" +
+      "tosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6511,7 +6602,7 @@ public final class PTableProtos {
           internal_static_PTable_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PTable_descriptor,
-              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", });
+              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 3a0b03b..d1b3b27 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -67,7 +67,6 @@ import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 
@@ -209,6 +208,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
     public static final byte[] IS_VIEW_REFERENCED_BYTES = Bytes.toBytes(IS_VIEW_REFERENCED);
     public static final String VIEW_INDEX_ID = "VIEW_INDEX_ID";
     public static final byte[] VIEW_INDEX_ID_BYTES = Bytes.toBytes(VIEW_INDEX_ID);
+    public static final String BASE_COLUMN_COUNT = "BASE_COLUMN_COUNT";
+    public static final byte[] BASE_COLUMN_COUNT_BYTES = Bytes.toBytes(BASE_COLUMN_COUNT);
 
     public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY;
     public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e001c63f/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 83f7157..f3be8f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
 import static org.apache.hadoop.hbase.HColumnDescriptor.TTL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
+import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -116,6 +117,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.schema.ColumnAlreadyExistsException;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.EmptySequenceCacheException;
 import org.apache.phoenix.schema.FunctionNotFoundException;
@@ -141,6 +143,7 @@ import org.apache.phoenix.schema.stats.PTableStats;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PUnsignedTinyint;
 import org.apache.phoenix.util.ByteUtil;
@@ -1823,21 +1826,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     }
 
-    /** 
-     * Keeping this to use for further upgrades. This method closes the oldMetaConnection.
-     */
-    private PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection,
-        String tableName, long timestamp, String columns) throws SQLException {
-
+    private PhoenixConnection addColumn(PhoenixConnection oldMetaConnection, String tableName, long timestamp, String columns, boolean addIfNotExists) throws SQLException {
         Properties props = new Properties(oldMetaConnection.getClientInfo());
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp));
         // Cannot go through DriverManager or you end up in an infinite loop because it'll call init again
         PhoenixConnection metaConnection = new PhoenixConnection(this, oldMetaConnection.getURL(), props, oldMetaConnection.getMetaDataCache());
         SQLException sqlE = null;
         try {
-            metaConnection.createStatement().executeUpdate("ALTER TABLE " + tableName + " ADD IF NOT EXISTS " + columns );
+            metaConnection.createStatement().executeUpdate("ALTER TABLE " + tableName + " ADD " + (addIfNotExists ? " IF NOT EXISTS " : "") + columns );
         } catch (SQLException e) {
-            logger.warn("addColumnsIfNotExists failed due to:" + e);
+            logger.warn("Add column failed due to:" + e);
             sqlE = e;
         } finally {
             try {
@@ -1855,6 +1853,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
         return metaConnection;
     }
+    
+    /** 
+     * Keeping this to use for further upgrades. This method closes the oldMetaConnection.
+     */
+    private PhoenixConnection addColumnsIfNotExists(PhoenixConnection oldMetaConnection,
+            String tableName, long timestamp, String columns) throws SQLException {
+        return addColumn(oldMetaConnection, tableName, timestamp, columns, true);
+    }
 
     @Override
     public void init(final String url, final Properties props) throws SQLException {
@@ -1935,12 +1941,27 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                     columnsToAdd += ", " + PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName()
                                             + ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName();
                                 }
-                                
                                 // Ugh..need to assign to another local variable to keep eclipse happy.
                                 PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection,
                                         PhoenixDatabaseMetaData.SYSTEM_CATALOG,
                                         MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
                                 metaConnection = newMetaConnection;
+                                
+                                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) {
+                                    columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " "
+                                            + PInteger.INSTANCE.getSqlTypeName();
+                                    try {
+                                        addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                                                MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd, false);
+                                        upgradeTo4_5_0(metaConnection);
+                                    } catch (ColumnAlreadyExistsException ignored) {
+                                        /* 
+                                         * Upgrade to 4.5 is a slightly special case. We use the fact that the column
+                                         * BASE_COLUMN_COUNT is already part of the meta-data schema as the signal that
+                                         * the server side upgrade has finished or is in progress.
+                                         */
+                                    }
+                                }
                             }
                             int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
                                     QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
@@ -1983,6 +2004,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                 } else { 
                                     nSequenceSaltBuckets = getSaltBuckets(e);
                                 }
+                                
                             }
                             try {
                                 metaConnection.createStatement().executeUpdate(
@@ -2002,6 +2024,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                             } catch (NewerTableAlreadyExistsException e) {
                             } catch (TableAlreadyExistsException e) {
                             }
+                            
 
                         } catch (Exception e) {
                             if (e instanceof SQLException) {