You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2020/11/20 01:42:04 UTC

[phoenix] branch 4.x updated: PHOENIX-6186 - Store last DDL timestamp in System.Catalog (#935)

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 55c41f7  PHOENIX-6186 - Store last DDL timestamp in System.Catalog (#935)
55c41f7 is described below

commit 55c41f7bb34d363a30d3704644aacd2580c0a926
Author: Geoffrey Jacoby <gj...@salesforce.com>
AuthorDate: Thu Nov 19 19:41:53 2020 -0600

    PHOENIX-6186 - Store last DDL timestamp in System.Catalog (#935)
    
    * PHOENIX-6186 - Store last DDL timestamp in System.Catalog
---
 .../org/apache/phoenix/end2end/AlterTableIT.java   |  65 +++-
 .../phoenix/end2end/AlterTableWithViewsIT.java     | 256 +++++++++++++++-
 .../org/apache/phoenix/end2end/CreateTableIT.java  |  37 ++-
 .../java/org/apache/phoenix/end2end/UpgradeIT.java | 332 ++++++---------------
 .../apache/phoenix/end2end/UpgradeNamespaceIT.java | 300 +++++++++++++++++++
 .../it/java/org/apache/phoenix/end2end/ViewIT.java |  41 +++
 .../phoenix/coprocessor/AddColumnMutator.java      |  17 +-
 .../apache/phoenix/coprocessor/ColumnMutator.java  |   3 +-
 .../phoenix/coprocessor/DropColumnMutator.java     |  18 +-
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  54 +++-
 .../phoenix/coprocessor/MetaDataProtocol.java      |  14 +-
 .../coprocessor/generated/MetaDataProtos.java      | 214 +++++++++----
 .../coprocessor/generated/PTableProtos.java        | 107 ++++++-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |  19 +-
 .../phoenix/query/ConnectionQueryServicesImpl.java |  78 ++---
 .../org/apache/phoenix/query/QueryConstants.java   |   2 +
 .../org/apache/phoenix/schema/DelegateTable.java   |   5 +
 .../org/apache/phoenix/schema/MetaDataClient.java  | 181 +++++------
 .../java/org/apache/phoenix/schema/PTable.java     |   9 +-
 .../java/org/apache/phoenix/schema/PTableImpl.java |  24 +-
 .../java/org/apache/phoenix/util/MetaDataUtil.java |  23 ++
 .../java/org/apache/phoenix/util/SchemaUtil.java   |   6 +
 .../java/org/apache/phoenix/util/UpgradeUtil.java  |  21 ++
 .../java/org/apache/phoenix/util/ViewUtil.java     |  10 +-
 .../org/apache/phoenix/util/MetaDataUtilTest.java  |  27 +-
 phoenix-protocol/src/main/MetaDataService.proto    |   1 +
 phoenix-protocol/src/main/PTable.proto             |   1 +
 27 files changed, 1384 insertions(+), 481 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 3dafc59..e7d12b4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -65,7 +65,9 @@ import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -1329,7 +1331,68 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
             assertSequenceNumber(schemaName, viewName, PTable.INITIAL_SEQ_NUM + 1);
         }
     }
-	
+
+    @Test
+    public void testAddThenDropColumnTableDDLTimestamp() throws Exception {
+        String tableDDL = "CREATE TABLE IF NOT EXISTS " + dataTableFullName + " ("
+            + " ENTITY_ID integer NOT NULL,"
+            + " COL1 integer NOT NULL,"
+            + " COL2 bigint NOT NULL,"
+            + " CONSTRAINT NAME_PK PRIMARY KEY (ENTITY_ID, COL1, COL2)"
+            + " ) " + generateDDLOptions("");
+
+        String columnAddDDL = "ALTER TABLE " + dataTableFullName + " ADD COL3 varchar(50) NULL ";
+        String columnDropDDL = "ALTER TABLE " + dataTableFullName + " DROP COLUMN COL3 ";
+        long startTS = EnvironmentEdgeManager.currentTimeMillis();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(tableDDL);
+            //first get the original DDL timestamp when we created the table
+            long tableDDLTimestamp = CreateTableIT.verifyLastDDLTimestamp(
+                dataTableFullName, startTS,
+                conn);
+            Thread.sleep(1);
+            //now add a column and make sure the timestamp updates
+            conn.createStatement().execute(columnAddDDL);
+            tableDDLTimestamp = CreateTableIT.verifyLastDDLTimestamp(
+                dataTableFullName,
+                tableDDLTimestamp + 1, conn);
+            Thread.sleep(1);
+            conn.createStatement().execute(columnDropDDL);
+            CreateTableIT.verifyLastDDLTimestamp(
+                dataTableFullName,
+                tableDDLTimestamp + 1 , conn);
+        }
+    }
+
+    @Test
+    public void testSetPropertyDoesntUpdateDDLTimestamp() throws Exception {
+        Properties props = new Properties();
+        String tableDDL = "CREATE TABLE IF NOT EXISTS " + dataTableFullName + " ("
+            + " ENTITY_ID integer NOT NULL,"
+            + " COL1 integer NOT NULL,"
+            + " COL2 bigint NOT NULL,"
+            + " CONSTRAINT NAME_PK PRIMARY KEY (ENTITY_ID, COL1, COL2)"
+            + " ) " + generateDDLOptions("");
+
+        String setPropertyDDL = "ALTER TABLE " + dataTableFullName +
+            " SET UPDATE_CACHE_FREQUENCY=300000 ";
+        long startTS = EnvironmentEdgeManager.currentTimeMillis();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute(tableDDL);
+            //first get the original DDL timestamp when we created the table
+            long tableDDLTimestamp = CreateTableIT.verifyLastDDLTimestamp(
+                dataTableFullName, startTS,
+                conn);
+            Thread.sleep(1);
+            //now change a property and make sure the timestamp DOES NOT update
+            conn.createStatement().execute(setPropertyDDL);
+            PTable table = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
+            assertNotNull(table);
+            assertNotNull(table.getLastDDLTimestamp());
+            assertEquals(tableDDLTimestamp, table.getLastDDLTimestamp().longValue());
+        }
+    }
+
 	private void assertEncodedCQValue(String columnFamily, String columnName, String schemaName, String tableName, int expectedValue) throws Exception {
         String query = "SELECT " + COLUMN_QUALIFIER + " FROM \"SYSTEM\".CATALOG WHERE " + TABLE_SCHEM + " = ? AND " + TABLE_NAME
                 + " = ? " + " AND " + COLUMN_FAMILY + " = ?" + " AND " + COLUMN_NAME  + " = ?" + " AND " + COLUMN_QUALIFIER  + " IS NOT NULL";
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
index d634b44..909bcbd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
@@ -60,11 +60,13 @@ import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -73,8 +75,6 @@ import org.junit.runners.Parameterized.Parameters;
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 
-import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_USE_STATS_FOR_PARALLELIZATION;
-
 @RunWith(Parameterized.class)
 public class AlterTableWithViewsIT extends SplitSystemCatalogIT {
 
@@ -1216,5 +1216,255 @@ public class AlterTableWithViewsIT extends SplitSystemCatalogIT {
             assertNull(results.next());
         }
     }
-    
+
+    @Test
+    public void testAddThenDropColumnTableDDLTimestamp() throws Exception {
+        Properties props = new Properties();
+        String schemaName = SCHEMA1;
+        String dataTableName = "T_" + generateUniqueName();
+        String viewName = "V_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
+
+        String tableDDL = generateDDL("CREATE TABLE IF NOT EXISTS " + dataTableFullName + " ("
+            + " %s ID char(1) NOT NULL,"
+            + " COL1 integer NOT NULL,"
+            + " COL2 bigint NOT NULL,"
+            + " CONSTRAINT NAME_PK PRIMARY KEY (%s ID, COL1, COL2)"
+            + " ) %s");
+
+        String viewDDL = "CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName;
+
+        String columnAddDDL = "ALTER VIEW " + viewFullName + " ADD COL3 varchar(50) NULL ";
+        String columnDropDDL = "ALTER VIEW " + viewFullName + " DROP COLUMN COL3 ";
+        long startTS = EnvironmentEdgeManager.currentTimeMillis();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute(tableDDL);
+            //first get the original DDL timestamp when we created the table
+            long tableDDLTimestamp = CreateTableIT.verifyLastDDLTimestamp(
+                dataTableFullName, startTS,
+                conn);
+            Thread.sleep(1);
+            conn.createStatement().execute(viewDDL);
+            tableDDLTimestamp = CreateTableIT.verifyLastDDLTimestamp(
+                viewFullName, tableDDLTimestamp + 1, conn);
+            Thread.sleep(1);
+            //now add a column and make sure the timestamp updates
+            conn.createStatement().execute(columnAddDDL);
+            tableDDLTimestamp = CreateTableIT.verifyLastDDLTimestamp(
+                viewFullName,
+                tableDDLTimestamp + 1, conn);
+            Thread.sleep(1);
+            conn.createStatement().execute(columnDropDDL);
+            CreateTableIT.verifyLastDDLTimestamp(
+                viewFullName,
+                tableDDLTimestamp + 1 , conn);
+        }
+    }
+
+    @Test
+    public void testLastDDLTimestampForDivergedViews() throws Exception {
+        //Phoenix allows users to "drop" columns from views that are inherited from their ancestor
+        // views or tables. These columns are then excluded from the view schema, and the view is
+        // considered "diverged" from its parents, and so no longer inherit any additional schema
+        // changes that are applied to their ancestors. This test make sure that this behavior
+        // extends to DDL timestamp
+        String schemaName = SCHEMA1;
+        String dataTableName = "T_" + generateUniqueName();
+        String viewName = "V_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
+
+        String tableDDL = generateDDL("CREATE TABLE IF NOT EXISTS " + dataTableFullName + " ("
+            + " %s ID char(1) NOT NULL,"
+            + " COL1 integer NOT NULL,"
+            + " COL2 bigint,"
+            + " COL3 bigint,"
+            + " CONSTRAINT NAME_PK PRIMARY KEY (%s ID, COL1)"
+            + " ) %s");
+
+        String viewDDL = "CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName;
+
+        String divergeDDL = "ALTER VIEW " + viewFullName + " DROP COLUMN COL2";
+        String viewColumnAddDDL = "ALTER VIEW " + viewFullName + " ADD COL4 varchar(50) NULL ";
+        String viewColumnDropDDL = "ALTER VIEW " + viewFullName + " DROP COLUMN COL4 ";
+        String tableColumnAddDDL = "ALTER TABLE " + dataTableFullName + " ADD COL5 varchar" +
+            "(50) NULL";
+        String tableColumnDropDDL = "ALTER TABLE " + dataTableFullName + " DROP COLUMN COL3 ";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(tableDDL);
+            long tableDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn, dataTableFullName);
+            conn.createStatement().execute(viewDDL);
+            long viewDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn, viewFullName);
+            Thread.sleep(1);
+            conn.createStatement().execute(divergeDDL);
+            //verify table DDL timestamp DID NOT change
+            assertEquals(tableDDLTimestamp, CreateTableIT.getLastDDLTimestamp(conn, dataTableFullName));
+            //verify view DDL timestamp changed
+            viewDDLTimestamp = CreateTableIT.verifyLastDDLTimestamp(
+                viewFullName, viewDDLTimestamp + 1, conn);
+            Thread.sleep(1);
+            conn.createStatement().execute(viewColumnAddDDL);
+            //verify DDL timestamp changed because we added a column to the view
+            viewDDLTimestamp = CreateTableIT.verifyLastDDLTimestamp(
+                viewFullName, viewDDLTimestamp + 1, conn);
+            Thread.sleep(1);
+            conn.createStatement().execute(viewColumnDropDDL);
+            //verify DDL timestamp changed because we dropped a column from the view
+            viewDDLTimestamp = CreateTableIT.verifyLastDDLTimestamp(
+                viewFullName, viewDDLTimestamp + 1, conn);
+            Thread.sleep(1);
+            conn.createStatement().execute(tableColumnAddDDL);
+            //verify DDL timestamp DID change because we added a column from the base table
+            viewDDLTimestamp = CreateTableIT.verifyLastDDLTimestamp(
+                viewFullName, viewDDLTimestamp + 1, conn);
+            //and that it did change because we dropped a column from the base table
+            conn.createStatement().execute(tableColumnDropDDL);
+            viewDDLTimestamp = CreateTableIT.verifyLastDDLTimestamp(
+                viewFullName, viewDDLTimestamp + 1, conn);
+        }
+    }
+
+    @Test
+    public void testLastDDLTimestampWithChildViews() throws Exception {
+        Assume.assumeTrue(isMultiTenant);
+        Properties props = new Properties();
+        String schemaName = SCHEMA1;
+        String dataTableName = "T_" + generateUniqueName();
+        String globalViewName = "V_" + generateUniqueName();
+        String tenantViewName = "V_" + generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String globalViewFullName = SchemaUtil.getTableName(schemaName, globalViewName);
+        String tenantViewFullName = SchemaUtil.getTableName(schemaName, tenantViewName);
+
+        String tableDDL = generateDDL("CREATE TABLE IF NOT EXISTS " + dataTableFullName + " ("
+            + " %s ID char(1) NOT NULL,"
+            + " COL1 integer NOT NULL,"
+            + " COL2 bigint NOT NULL,"
+            + " CONSTRAINT NAME_PK PRIMARY KEY (%s ID, COL1, COL2)"
+            + " ) %s");
+
+        //create a table with a child global view, who then has a child tenant view
+        String globalViewDDL =
+            "CREATE VIEW " + globalViewFullName + " AS SELECT * FROM " + dataTableFullName;
+
+        String tenantViewDDL =
+            "CREATE VIEW " + tenantViewFullName + " AS SELECT * FROM " + globalViewFullName;
+
+        long startTS = EnvironmentEdgeManager.currentTimeMillis();
+        long tableDDLTimestamp, globalViewDDLTimestamp;
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(tableDDL);
+            conn.createStatement().execute(globalViewDDL);
+            tableDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn, dataTableFullName);
+            globalViewDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn, globalViewFullName);
+        }
+        props.setProperty(TENANT_ID_ATTRIB, TENANT1);
+        try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)) {
+            tenantConn.createStatement().execute(tenantViewDDL);
+        }
+        // First, check that adding a child view to the base table didn't change the base table's
+        // timestamp, and that adding a child tenant view to the global view didn't change the
+        // global view's timestamp
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            long newTableDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn, dataTableFullName);
+            assertEquals(tableDDLTimestamp, newTableDDLTimestamp);
+
+            long newGlobalViewDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn, globalViewFullName);
+            assertEquals(globalViewDDLTimestamp, newGlobalViewDDLTimestamp);
+        }
+        Thread.sleep(1);
+        //now add / drop a column from the tenant view and make sure it doesn't change its
+        // ancestors' timestamps
+        String tenantViewColumnAddDDL = "ALTER VIEW " + tenantViewFullName + " ADD COL3 varchar" +
+            "(50) " + "NULL ";
+        String tenantViewColumnDropDDL = "ALTER VIEW " + tenantViewFullName + " DROP COLUMN COL3 ";
+
+        try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)) {
+            tenantConn.createStatement().execute(tenantViewColumnAddDDL);
+            long newTableDDLTimestamp = CreateTableIT.getLastDDLTimestamp(tenantConn, dataTableFullName);
+            assertEquals(tableDDLTimestamp, newTableDDLTimestamp);
+
+            long afterTenantColumnAddViewDDLTimestamp = CreateTableIT.getLastDDLTimestamp(tenantConn,
+                globalViewFullName);
+            assertEquals(globalViewDDLTimestamp, afterTenantColumnAddViewDDLTimestamp);
+
+            tenantConn.createStatement().execute(tenantViewColumnDropDDL);
+            //update the tenant view timestamp (we'll need it later)
+            long afterTenantColumnDropTableDDLTimestamp = CreateTableIT.getLastDDLTimestamp(tenantConn,
+                dataTableFullName);
+            assertEquals(tableDDLTimestamp, afterTenantColumnDropTableDDLTimestamp);
+
+            long afterTenantColumnDropViewDDLTimestamp = CreateTableIT.getLastDDLTimestamp(tenantConn,
+                globalViewFullName);
+            assertEquals(globalViewDDLTimestamp, afterTenantColumnDropViewDDLTimestamp);
+        }
+        Thread.sleep(1);
+        //now add / drop a column from the base table and make sure it changes the timestamps for
+        // both the global view (its child) and the tenant view (its grandchild)
+        String tableColumnAddDDL = "ALTER TABLE " + dataTableFullName + " ADD COL4 varchar" +
+            "(50) " + "NULL ";
+        String tableColumnDropDDL = "ALTER TABLE " + dataTableFullName + " DROP COLUMN COL4 ";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(tableColumnAddDDL);
+            tableDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn, dataTableFullName);
+            try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)) {
+                long tenantViewDDLTimestamp = CreateTableIT.getLastDDLTimestamp(tenantConn,
+                    tenantViewFullName);
+                assertEquals(tableDDLTimestamp, tenantViewDDLTimestamp);
+            }
+            globalViewDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn,
+                globalViewFullName);
+            assertEquals(tableDDLTimestamp, globalViewDDLTimestamp);
+
+            conn.createStatement().execute(tableColumnDropDDL);
+            tableDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn, dataTableFullName);
+            try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)) {
+                long tenantViewDDLTimestamp = CreateTableIT.getLastDDLTimestamp(tenantConn,
+                    tenantViewFullName);
+                assertEquals(tableDDLTimestamp, tenantViewDDLTimestamp);
+            }
+            globalViewDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn,
+                globalViewFullName);
+            assertEquals(tableDDLTimestamp, globalViewDDLTimestamp);
+        }
+
+        //now add / drop a column from the global view and make sure it doesn't change its
+        // parent (the base table) but does change the timestamp for its child (the tenant view)
+        String globalViewColumnAddDDL = "ALTER VIEW " + globalViewFullName + " ADD COL5 varchar" +
+            "(50) " + "NULL ";
+        String globalViewColumnDropDDL = "ALTER VIEW " + globalViewFullName + " DROP COLUMN COL5 ";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(globalViewColumnAddDDL);
+            globalViewDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn, globalViewFullName);
+            long newTableDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn,
+                dataTableFullName);
+            //table DDL timestamp shouldn't have changed
+            assertEquals(tableDDLTimestamp, newTableDDLTimestamp);
+            try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)) {
+                long tenantViewDDLTimestamp = CreateTableIT.getLastDDLTimestamp(tenantConn,
+                    tenantViewFullName);
+                //but tenant timestamp should have changed
+                assertEquals(globalViewDDLTimestamp, tenantViewDDLTimestamp);
+            }
+
+            conn.createStatement().execute(globalViewColumnDropDDL);
+            globalViewDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn, globalViewFullName);
+            newTableDDLTimestamp = CreateTableIT.getLastDDLTimestamp(conn,
+                dataTableFullName);
+            //table DDL timestamp shouldn't have changed
+            assertEquals(tableDDLTimestamp, newTableDDLTimestamp);
+            try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)) {
+                long tenantViewDDLTimestamp = CreateTableIT.getLastDDLTimestamp(tenantConn,
+                    tenantViewFullName);
+                //but tenant timestamp should have changed
+                assertEquals(globalViewDDLTimestamp, tenantViewDDLTimestamp);
+            }
+        }
+
+    }
+
+
+
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index 4f6ccab..51d1c31 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -59,12 +59,13 @@ import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SchemaNotFoundException;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.junit.Assert;
 import org.junit.Test;
 
 
@@ -910,6 +911,40 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
         }
     }
 
+    @Test
+    public void testCreateTableDDLTimestamp() throws Exception {
+        Properties props = new Properties();
+        final String schemaName = generateUniqueName();
+        final String tableName = generateUniqueName();
+        final String dataTableFullName = SchemaUtil.getTableName(schemaName, tableName);
+        String ddl =
+            "CREATE TABLE " + dataTableFullName + " (\n" + "ID1 VARCHAR(15) NOT NULL,\n"
+                + "ID2 VARCHAR(15) NOT NULL,\n" + "CREATED_DATE DATE,\n"
+                + "CREATION_TIME BIGINT,\n" + "LAST_USED DATE,\n"
+                + "CONSTRAINT PK PRIMARY KEY (ID1, ID2)) ";
+        long startTS = EnvironmentEdgeManager.currentTimeMillis();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute(ddl);
+            verifyLastDDLTimestamp(dataTableFullName, startTS, conn);
+        }
+    }
+
+    public static long verifyLastDDLTimestamp(String dataTableFullName, long startTS, Connection conn) throws SQLException {
+        long endTS = EnvironmentEdgeManager.currentTimeMillis();
+        //Now try the PTable API
+        long ddlTimestamp = getLastDDLTimestamp(conn, dataTableFullName);
+        assertTrue("PTable DDL Timestamp not in the right range!",
+            ddlTimestamp >= startTS && ddlTimestamp <= endTS);
+        return ddlTimestamp;
+    }
+
+    public static long getLastDDLTimestamp(Connection conn, String dataTableFullName) throws SQLException {
+        PTable table = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
+        assertNotNull("PTable is null!", table);
+        assertNotNull("DDL timestamp is null!", table.getLastDDLTimestamp());
+        return table.getLastDDLTimestamp();
+    }
+
     private int checkGuidePostWidth(String tableName) throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String query =
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
index 0f22da6..8a6fac0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
@@ -18,13 +18,22 @@
 package org.apache.phoenix.end2end;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_DDL_TIMESTAMP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -90,251 +99,6 @@ import org.junit.experimental.categories.Category;
 
 @Category(NeedsOwnMiniClusterTest.class)
 public class UpgradeIT extends ParallelStatsDisabledIT {
-
-    @Test
-    public void testMapTableToNamespaceDuringUpgrade()
-            throws SQLException, IOException, IllegalArgumentException, InterruptedException {
-        String[] strings = new String[] { "a", "b", "c", "d" };
-
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            String schemaName = "TEST";
-            String phoenixFullTableName = schemaName + "." + generateUniqueName();
-            String indexName = "IDX_" + generateUniqueName();
-            String localIndexName = "LIDX_" + generateUniqueName();
-
-            String viewName = "VIEW_" + generateUniqueName();
-            String viewIndexName = "VIDX_" + generateUniqueName();
-
-            String[] tableNames = new String[] { phoenixFullTableName, schemaName + "." + indexName,
-                    schemaName + "." + localIndexName, "diff." + viewName, "test." + viewName, viewName};
-            String[] viewIndexes = new String[] { "diff." + viewIndexName, "test." + viewIndexName };
-            conn.createStatement().execute("CREATE TABLE " + phoenixFullTableName
-                    + "(k VARCHAR PRIMARY KEY, v INTEGER, f INTEGER, g INTEGER NULL, h INTEGER NULL)");
-            PreparedStatement upsertStmt = conn
-                    .prepareStatement("UPSERT INTO " + phoenixFullTableName + " VALUES(?, ?, 0, 0, 0)");
-            int i = 1;
-            for (String str : strings) {
-                upsertStmt.setString(1, str);
-                upsertStmt.setInt(2, i++);
-                upsertStmt.execute();
-            }
-            conn.commit();
-            // creating local index
-            conn.createStatement()
-                    .execute("create local index " + localIndexName + " on " + phoenixFullTableName + "(K)");
-            // creating global index
-            conn.createStatement().execute("create index " + indexName + " on " + phoenixFullTableName + "(k)");
-            // creating view in schema 'diff'
-            conn.createStatement().execute("CREATE VIEW diff." + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
-            // creating view in schema 'test'
-            conn.createStatement().execute("CREATE VIEW test." + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
-            conn.createStatement().execute("CREATE VIEW " + viewName + "(col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
-            // Creating index on views
-            conn.createStatement().execute("create index " + viewIndexName + "  on diff." + viewName + "(col)");
-            conn.createStatement().execute("create index " + viewIndexName + " on test." + viewName + "(col)");
-
-            // validate data
-            for (String tableName : tableNames) {
-                ResultSet rs = conn.createStatement().executeQuery("select * from " + tableName);
-                for (String str : strings) {
-                    assertTrue(rs.next());
-                    assertEquals(str, rs.getString(1));
-                }
-            }
-
-            // validate view Index data
-            for (String viewIndex : viewIndexes) {
-                ResultSet rs = conn.createStatement().executeQuery("select * from " + viewIndex);
-                for (String str : strings) {
-                    assertTrue(rs.next());
-                    assertEquals(str, rs.getString(2));
-                }
-            }
-
-            HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
-            assertTrue(admin.tableExists(phoenixFullTableName));
-            assertTrue(admin.tableExists(schemaName + QueryConstants.NAME_SEPARATOR + indexName));
-            assertTrue(admin.tableExists(MetaDataUtil.getViewIndexPhysicalName(Bytes.toBytes(phoenixFullTableName))));
-            Properties props = new Properties();
-            props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
-            props.setProperty(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, Boolean.toString(false));
-            admin.close();
-            PhoenixConnection phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
-            UpgradeUtil.upgradeTable(phxConn, phoenixFullTableName);
-            phxConn.close();
-            props = new Properties();
-            phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
-            // purge MetaDataCache except for system tables
-            phxConn.getMetaDataCache().pruneTables(new PMetaData.Pruner() {
-                @Override public boolean prune(PTable table) {
-                    return table.getType() != PTableType.SYSTEM;
-                }
-
-                @Override public boolean prune(PFunction function) {
-                    return false;
-                }
-            });
-            admin = phxConn.getQueryServices().getAdmin();
-            String hbaseTableName = SchemaUtil.getPhysicalTableName(Bytes.toBytes(phoenixFullTableName), true)
-                    .getNameAsString();
-            assertTrue(admin.tableExists(hbaseTableName));
-            assertTrue(admin.tableExists(Bytes.toBytes(hbaseTableName)));
-            assertTrue(admin.tableExists(schemaName + QueryConstants.NAMESPACE_SEPARATOR + indexName));
-            assertTrue(admin.tableExists(MetaDataUtil.getViewIndexPhysicalName(Bytes.toBytes(hbaseTableName))));
-            i = 0;
-            // validate data
-            for (String tableName : tableNames) {
-                ResultSet rs = phxConn.createStatement().executeQuery("select * from " + tableName);
-                for (String str : strings) {
-                    assertTrue(rs.next());
-                    assertEquals(str, rs.getString(1));
-                }
-            }
-            // validate view Index data
-            for (String viewIndex : viewIndexes) {
-                ResultSet rs = conn.createStatement().executeQuery("select * from " + viewIndex);
-                for (String str : strings) {
-                    assertTrue(rs.next());
-                    assertEquals(str, rs.getString(2));
-                }
-            }
-            PName tenantId = phxConn.getTenantId();
-            PName physicalName = PNameFactory.newName(hbaseTableName);
-            String newSchemaName = MetaDataUtil.getViewIndexSequenceSchemaName(physicalName, true);
-            String newSequenceName = MetaDataUtil.getViewIndexSequenceName(physicalName, tenantId, true);
-            verifySequenceValue(null, newSequenceName, newSchemaName, Short.MIN_VALUE + 3);
-            admin.close();
-        }
-    }
-
-    @Test
-    public void testMapMultiTenantTableToNamespaceDuringUpgrade() throws SQLException, SnapshotCreationException,
-            IllegalArgumentException, IOException, InterruptedException {
-        String[] strings = new String[] { "a", "b", "c", "d" };
-        String schemaName1 = "S_" +generateUniqueName(); // TEST
-        String schemaName2 = "S_" +generateUniqueName(); // DIFF
-        String phoenixFullTableName = schemaName1 + "." + generateUniqueName();
-        String hbaseTableName = SchemaUtil.getPhysicalTableName(Bytes.toBytes(phoenixFullTableName), true)
-                .getNameAsString();
-        String indexName = "IDX_" + generateUniqueName();
-        String viewName = "V_" + generateUniqueName();
-        String viewName1 = "V1_" + generateUniqueName();
-        String viewIndexName = "V_IDX_" + generateUniqueName();
-        String tenantViewIndexName = "V1_IDX_" + generateUniqueName();
-
-        String[] tableNames = new String[] { phoenixFullTableName, schemaName2 + "." + viewName1, schemaName1 + "." + viewName1, viewName1 };
-        String[] viewIndexes = new String[] { schemaName1 + "." + viewIndexName, schemaName2 + "." + viewIndexName };
-        String[] tenantViewIndexes = new String[] { schemaName1 + "." + tenantViewIndexName, schemaName2 + "." + tenantViewIndexName };
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + phoenixFullTableName
-                    + "(k VARCHAR not null, v INTEGER not null, f INTEGER, g INTEGER NULL, h INTEGER NULL CONSTRAINT pk PRIMARY KEY(k,v)) MULTI_TENANT=true");
-            PreparedStatement upsertStmt = conn
-                    .prepareStatement("UPSERT INTO " + phoenixFullTableName + " VALUES(?, ?, 0, 0, 0)");
-            int i = 1;
-            for (String str : strings) {
-                upsertStmt.setString(1, str);
-                upsertStmt.setInt(2, i++);
-                upsertStmt.execute();
-            }
-            conn.commit();
-
-            // creating global index
-            conn.createStatement().execute("create index " + indexName + " on " + phoenixFullTableName + "(f)");
-            // creating view in schema 'diff'
-            conn.createStatement().execute("CREATE VIEW " + schemaName2 + "." + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
-            // creating view in schema 'test'
-            conn.createStatement().execute("CREATE VIEW " + schemaName1 + "." + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
-            conn.createStatement().execute("CREATE VIEW " + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
-            // Creating index on views
-            conn.createStatement().execute("create local index " + viewIndexName + " on " + schemaName2 + "." + viewName + "(col)");
-            conn.createStatement().execute("create local index " + viewIndexName + " on " + schemaName1 + "." + viewName + "(col)");
-        }
-        Properties props = new Properties();
-        String tenantId = strings[0];
-        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            PreparedStatement upsertStmt = conn
-                    .prepareStatement("UPSERT INTO " + phoenixFullTableName + "(k,v,f,g,h)  VALUES(?, ?, 0, 0, 0)");
-            int i = 1;
-            for (String str : strings) {
-                upsertStmt.setString(1, str);
-                upsertStmt.setInt(2, i++);
-                upsertStmt.execute();
-            }
-            conn.commit();
-            // creating view in schema 'diff'
-            conn.createStatement()
-                    .execute("CREATE VIEW " + schemaName2 + "." + viewName1 + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
-            // creating view in schema 'test'
-            conn.createStatement()
-                    .execute("CREATE VIEW " + schemaName1 + "." + viewName1 + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
-            conn.createStatement().execute("CREATE VIEW " + viewName1 + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
-            // Creating index on views
-            conn.createStatement().execute("create index " + tenantViewIndexName + " on " + schemaName2 + "." + viewName1 + "(col)");
-            conn.createStatement().execute("create index " + tenantViewIndexName + " on " + schemaName1 + "." + viewName1 + "(col)");
-        }
-
-        props = new Properties();
-        props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
-        props.setProperty(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, Boolean.toString(false));
-        PhoenixConnection phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
-        UpgradeUtil.upgradeTable(phxConn, phoenixFullTableName);
-        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
-        phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
-        // purge MetaDataCache except for system tables
-        phxConn.getMetaDataCache().pruneTables(new PMetaData.Pruner() {
-            @Override public boolean prune(PTable table) {
-                return table.getType() != PTableType.SYSTEM;
-            }
-
-            @Override public boolean prune(PFunction function) {
-                return false;
-            }
-        });
-        int i = 1;
-        String indexPhysicalTableName = Bytes
-                .toString(MetaDataUtil.getViewIndexPhysicalName(Bytes.toBytes(hbaseTableName)));
-        // validate data with tenant
-        for (String tableName : tableNames) {
-            assertTableUsed(phxConn, tableName, hbaseTableName);
-            ResultSet rs = phxConn.createStatement().executeQuery("select * from " + tableName);
-            assertTrue(rs.next());
-            do {
-                assertEquals(i++, rs.getInt(1));
-            } while (rs.next());
-            i = 1;
-        }
-        // validate view Index data
-        for (String viewIndex : tenantViewIndexes) {
-            assertTableUsed(phxConn, viewIndex, indexPhysicalTableName);
-            ResultSet rs = phxConn.createStatement().executeQuery("select * from " + viewIndex);
-            assertTrue(rs.next());
-            do {
-                assertEquals(i++, rs.getInt(2));
-            } while (rs.next());
-            i = 1;
-        }
-        phxConn.close();
-        props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
-        phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
-
-        // validate view Index data
-        for (String viewIndex : viewIndexes) {
-            assertTableUsed(phxConn, viewIndex, hbaseTableName);
-            ResultSet rs = phxConn.createStatement().executeQuery("select * from " + viewIndex);
-            for (String str : strings) {
-                assertTrue(rs.next());
-                assertEquals(str, rs.getString(1));
-            }
-        }
-        phxConn.close();
-    }
-
-    public void assertTableUsed(Connection conn, String phoenixTableName, String hbaseTableName) throws SQLException {
-        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + phoenixTableName);
-        assertTrue(rs.next());
-        assertTrue(rs.getString(1).contains(hbaseTableName));
-    }
         
     @Test
     public void testUpgradeRequiredPreventsSQL() throws SQLException {
@@ -747,4 +511,82 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
             assertArrayEquals(expectedDateTypeBytes, CellUtil.cloneValue(cell));
         }
     }
+
+    @Test
+    public void testLastDDLTimestampBootstrap() throws Exception {
+        //Create a table, view, and index
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "T_" + generateUniqueName();
+        String viewName = "V_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+        try (Connection conn = getConnection(false, null)) {
+            conn.createStatement().execute(
+                "CREATE TABLE " + fullTableName
+                    + " (PK1 VARCHAR NOT NULL, PK2 VARCHAR, KV1 VARCHAR, KV2 VARCHAR CONSTRAINT " +
+                    "PK PRIMARY KEY(PK1, PK2)) ");
+            conn.createStatement().execute(
+                "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullTableName);
+
+            //Now we null out any existing last ddl timestamps
+            nullDDLTimestamps(conn);
+
+            //now get the row timestamps for each header row
+            long tableTS = getRowTimestampForMetadata(conn, schemaName, tableName,
+                PTableType.TABLE);
+            long viewTS = getRowTimestampForMetadata(conn, schemaName, viewName, PTableType.VIEW);
+
+            UpgradeUtil.bootstrapLastDDLTimestamp(conn.unwrap(PhoenixConnection.class));
+            long actualTableTS = getLastTimestampForMetadata(conn, schemaName, tableName,
+                PTableType.TABLE);
+            long actualViewTS = getLastTimestampForMetadata(conn, schemaName, viewName,
+                PTableType.VIEW);
+            assertEquals(tableTS, actualTableTS);
+            assertEquals(viewTS, actualViewTS);
+
+        }
+    }
+
+    private void nullDDLTimestamps(Connection conn) throws SQLException {
+        String pkCols = TENANT_ID + ", " + TABLE_SCHEM +
+            ", " + TABLE_NAME + ", " + COLUMN_NAME + ", " + COLUMN_FAMILY;
+        String upsertSql =
+            "UPSERT INTO " + SYSTEM_CATALOG_NAME + " (" + pkCols + ", " +
+                LAST_DDL_TIMESTAMP + ")" + " " +
+                "SELECT " + pkCols + ", NULL FROM " + SYSTEM_CATALOG_NAME + " " +
+                "WHERE " + TABLE_TYPE + " IS NOT NULL";
+        conn.createStatement().execute(upsertSql);
+        conn.commit();
+    }
+
+    private long getRowTimestampForMetadata(Connection conn, String schemaName, String objectName,
+                                            PTableType type) throws SQLException {
+        String sql = "SELECT PHOENIX_ROW_TIMESTAMP() FROM " + SYSTEM_CATALOG_NAME + " WHERE " +
+            " TENANT_ID IS NULL AND TABLE_SCHEM = ? AND TABLE_NAME = ? and TABLE_TYPE = ?";
+        PreparedStatement stmt = conn.prepareStatement(sql);
+        stmt.setString(1, schemaName);
+        stmt.setString(2, objectName);
+        stmt.setString(3, type.getSerializedValue());
+
+        ResultSet rs = stmt.executeQuery();
+        assertNotNull(rs);
+        assertTrue("Result set was empty!", rs.next());
+        return rs.getLong(1);
+    }
+
+    private long getLastTimestampForMetadata(Connection conn, String schemaName, String objectName,
+                                            PTableType type) throws SQLException {
+        String sql = "SELECT LAST_DDL_TIMESTAMP FROM " + SYSTEM_CATALOG_NAME + " WHERE " +
+            " TENANT_ID IS NULL AND TABLE_SCHEM = ? AND TABLE_NAME = ? and TABLE_TYPE = ?";
+        PreparedStatement stmt = conn.prepareStatement(sql);
+        stmt.setString(1, schemaName);
+        stmt.setString(2, objectName);
+        stmt.setString(3, type.getSerializedValue());
+
+        ResultSet rs = stmt.executeQuery();
+        assertNotNull(rs);
+        assertTrue("Result set was empty!", rs.next());
+        return rs.getLong(1);
+    }
+
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeNamespaceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeNamespaceIT.java
new file mode 100644
index 0000000..dca9ca4
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeNamespaceIT.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.PFunction;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PMetaData;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.UpgradeUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class UpgradeNamespaceIT extends ParallelStatsDisabledIT {
+
+    @Test
+    public void testMapMultiTenantTableToNamespaceDuringUpgrade() throws SQLException, SnapshotCreationException,
+        IllegalArgumentException, IOException, InterruptedException {
+        String[] strings = new String[] { "a", "b", "c", "d" };
+        String schemaName1 = "S_" +generateUniqueName(); // TEST
+        String schemaName2 = "S_" +generateUniqueName(); // DIFF
+        String phoenixFullTableName = schemaName1 + "." + generateUniqueName();
+        String hbaseTableName = SchemaUtil.getPhysicalTableName(Bytes.toBytes(phoenixFullTableName), true)
+            .getNameAsString();
+        String indexName = "IDX_" + generateUniqueName();
+        String viewName = "V_" + generateUniqueName();
+        String viewName1 = "V1_" + generateUniqueName();
+        String viewIndexName = "V_IDX_" + generateUniqueName();
+        String tenantViewIndexName = "V1_IDX_" + generateUniqueName();
+
+        String[] tableNames = new String[] { phoenixFullTableName, schemaName2 + "." + viewName1, schemaName1 + "." + viewName1, viewName1 };
+        String[] viewIndexes = new String[] { schemaName1 + "." + viewIndexName, schemaName2 + "." + viewIndexName };
+        String[] tenantViewIndexes = new String[] { schemaName1 + "." + tenantViewIndexName, schemaName2 + "." + tenantViewIndexName };
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + phoenixFullTableName
+                + "(k VARCHAR not null, v INTEGER not null, f INTEGER, g INTEGER NULL, h INTEGER NULL CONSTRAINT pk PRIMARY KEY(k,v)) MULTI_TENANT=true");
+            PreparedStatement upsertStmt = conn
+                .prepareStatement("UPSERT INTO " + phoenixFullTableName + " VALUES(?, ?, 0, 0, 0)");
+            int i = 1;
+            for (String str : strings) {
+                upsertStmt.setString(1, str);
+                upsertStmt.setInt(2, i++);
+                upsertStmt.execute();
+            }
+            conn.commit();
+
+            // creating global index
+            conn.createStatement().execute("create index " + indexName + " on " + phoenixFullTableName + "(f)");
+            // creating view in schema 'diff'
+            conn.createStatement().execute("CREATE VIEW " + schemaName2 + "." + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
+            // creating view in schema 'test'
+            conn.createStatement().execute("CREATE VIEW " + schemaName1 + "." + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
+            conn.createStatement().execute("CREATE VIEW " + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
+            // Creating index on views
+            conn.createStatement().execute("create local index " + viewIndexName + " on " + schemaName2 + "." + viewName + "(col)");
+            conn.createStatement().execute("create local index " + viewIndexName + " on " + schemaName1 + "." + viewName + "(col)");
+        }
+        Properties props = new Properties();
+        String tenantId = strings[0];
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            PreparedStatement upsertStmt = conn
+                .prepareStatement("UPSERT INTO " + phoenixFullTableName + "(k,v,f,g,h)  VALUES(?, ?, 0, 0, 0)");
+            int i = 1;
+            for (String str : strings) {
+                upsertStmt.setString(1, str);
+                upsertStmt.setInt(2, i++);
+                upsertStmt.execute();
+            }
+            conn.commit();
+            // creating view in schema 'diff'
+            conn.createStatement()
+                .execute("CREATE VIEW " + schemaName2 + "." + viewName1 + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
+            // creating view in schema 'test'
+            conn.createStatement()
+                .execute("CREATE VIEW " + schemaName1 + "." + viewName1 + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
+            conn.createStatement().execute("CREATE VIEW " + viewName1 + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
+            // Creating index on views
+            conn.createStatement().execute("create index " + tenantViewIndexName + " on " + schemaName2 + "." + viewName1 + "(col)");
+            conn.createStatement().execute("create index " + tenantViewIndexName + " on " + schemaName1 + "." + viewName1 + "(col)");
+        }
+
+        props = new Properties();
+        props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
+        props.setProperty(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, Boolean.toString(false));
+        PhoenixConnection phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
+        UpgradeUtil.upgradeTable(phxConn, phoenixFullTableName);
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
+        // purge MetaDataCache except for system tables
+        phxConn.getMetaDataCache().pruneTables(new PMetaData.Pruner() {
+            @Override public boolean prune(PTable table) {
+                return table.getType() != PTableType.SYSTEM;
+            }
+
+            @Override public boolean prune(PFunction function) {
+                return false;
+            }
+        });
+        int i = 1;
+        String indexPhysicalTableName = Bytes
+            .toString(MetaDataUtil.getViewIndexPhysicalName(Bytes.toBytes(hbaseTableName)));
+        // validate data with tenant
+        for (String tableName : tableNames) {
+            assertTableUsed(phxConn, tableName, hbaseTableName);
+            ResultSet rs = phxConn.createStatement().executeQuery("select * from " + tableName);
+            assertTrue(rs.next());
+            do {
+                assertEquals(i++, rs.getInt(1));
+            } while (rs.next());
+            i = 1;
+        }
+        // validate view Index data
+        for (String viewIndex : tenantViewIndexes) {
+            assertTableUsed(phxConn, viewIndex, indexPhysicalTableName);
+            ResultSet rs = phxConn.createStatement().executeQuery("select * from " + viewIndex);
+            assertTrue(rs.next());
+            do {
+                assertEquals(i++, rs.getInt(2));
+            } while (rs.next());
+            i = 1;
+        }
+        phxConn.close();
+        props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+        phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
+
+        // validate view Index data
+        for (String viewIndex : viewIndexes) {
+            assertTableUsed(phxConn, viewIndex, hbaseTableName);
+            ResultSet rs = phxConn.createStatement().executeQuery("select * from " + viewIndex);
+            for (String str : strings) {
+                assertTrue(rs.next());
+                assertEquals(str, rs.getString(1));
+            }
+        }
+        phxConn.close();
+    }
+
+    public void assertTableUsed(Connection conn, String phoenixTableName, String hbaseTableName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + phoenixTableName);
+        assertTrue(rs.next());
+        assertTrue(rs.getString(1).contains(hbaseTableName));
+    }
+
+    @Test
+    public void testMapTableToNamespaceDuringUpgrade()
+        throws SQLException, IOException, IllegalArgumentException, InterruptedException {
+        //This test needs to run in its own test class because other tests creating views or view
+        // indexes can affect the value of the sequence it checks.
+        String[] strings = new String[] { "a", "b", "c", "d" };
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String schemaName = "TEST";
+            String phoenixFullTableName = schemaName + "." + generateUniqueName();
+            String indexName = "IDX_" + generateUniqueName();
+            String localIndexName = "LIDX_" + generateUniqueName();
+
+            String viewName = "VIEW_" + generateUniqueName();
+            String viewIndexName = "VIDX_" + generateUniqueName();
+
+            String[] tableNames = new String[] { phoenixFullTableName, schemaName + "." + indexName,
+                schemaName + "." + localIndexName, "diff." + viewName, "test." + viewName, viewName};
+            String[] viewIndexes = new String[] { "diff." + viewIndexName, "test." + viewIndexName };
+            conn.createStatement().execute("CREATE TABLE " + phoenixFullTableName
+                + "(k VARCHAR PRIMARY KEY, v INTEGER, f INTEGER, g INTEGER NULL, h INTEGER NULL)");
+            PreparedStatement upsertStmt = conn
+                .prepareStatement("UPSERT INTO " + phoenixFullTableName + " VALUES(?, ?, 0, 0, 0)");
+            int i = 1;
+            for (String str : strings) {
+                upsertStmt.setString(1, str);
+                upsertStmt.setInt(2, i++);
+                upsertStmt.execute();
+            }
+            conn.commit();
+            // creating local index
+            conn.createStatement()
+                .execute("create local index " + localIndexName + " on " + phoenixFullTableName + "(K)");
+            // creating global index
+            conn.createStatement().execute("create index " + indexName + " on " + phoenixFullTableName + "(k)");
+            // creating view in schema 'diff'
+            conn.createStatement().execute("CREATE VIEW diff." + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
+            // creating view in schema 'test'
+            conn.createStatement().execute("CREATE VIEW test." + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
+            conn.createStatement().execute("CREATE VIEW " + viewName + "(col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
+            // Creating index on views
+            conn.createStatement().execute("create index " + viewIndexName + "  on diff." + viewName + "(col)");
+            conn.createStatement().execute("create index " + viewIndexName + " on test." + viewName + "(col)");
+
+            // validate data
+            for (String tableName : tableNames) {
+                ResultSet rs = conn.createStatement().executeQuery("select * from " + tableName);
+                for (String str : strings) {
+                    assertTrue(rs.next());
+                    assertEquals(str, rs.getString(1));
+                }
+            }
+
+            // validate view Index data
+            for (String viewIndex : viewIndexes) {
+                ResultSet rs = conn.createStatement().executeQuery("select * from " + viewIndex);
+                for (String str : strings) {
+                    assertTrue(rs.next());
+                    assertEquals(str, rs.getString(2));
+                }
+            }
+
+            HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+            assertTrue(admin.tableExists(phoenixFullTableName));
+            assertTrue(admin.tableExists(schemaName + QueryConstants.NAME_SEPARATOR + indexName));
+            assertTrue(admin.tableExists(MetaDataUtil.getViewIndexPhysicalName(Bytes.toBytes(phoenixFullTableName))));
+            Properties props = new Properties();
+            props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
+            props.setProperty(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, Boolean.toString(false));
+            admin.close();
+            PhoenixConnection phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
+            // long oldSeqValue =
+            UpgradeUtil.upgradeTable(phxConn, phoenixFullTableName);
+            phxConn.close();
+            props = new Properties();
+            phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
+            // purge MetaDataCache except for system tables
+            phxConn.getMetaDataCache().pruneTables(new PMetaData.Pruner() {
+                @Override public boolean prune(PTable table) {
+                    return table.getType() != PTableType.SYSTEM;
+                }
+
+                @Override public boolean prune(PFunction function) {
+                    return false;
+                }
+            });
+            admin = phxConn.getQueryServices().getAdmin();
+            String hbaseTableName = SchemaUtil.getPhysicalTableName(Bytes.toBytes(phoenixFullTableName), true)
+                .getNameAsString();
+            assertTrue(admin.tableExists(hbaseTableName));
+            assertTrue(admin.tableExists(Bytes.toBytes(hbaseTableName)));
+            assertTrue(admin.tableExists(schemaName + QueryConstants.NAMESPACE_SEPARATOR + indexName));
+            assertTrue(admin.tableExists(MetaDataUtil.getViewIndexPhysicalName(Bytes.toBytes(hbaseTableName))));
+            i = 0;
+            // validate data
+            for (String tableName : tableNames) {
+                ResultSet rs = phxConn.createStatement().executeQuery("select * from " + tableName);
+                for (String str : strings) {
+                    assertTrue(rs.next());
+                    assertEquals(str, rs.getString(1));
+                }
+            }
+            // validate view Index data
+            for (String viewIndex : viewIndexes) {
+                ResultSet rs = conn.createStatement().executeQuery("select * from " + viewIndex);
+                for (String str : strings) {
+                    assertTrue(rs.next());
+                    assertEquals(str, rs.getString(2));
+                }
+            }
+            PName tenantId = phxConn.getTenantId();
+            PName physicalName = PNameFactory.newName(hbaseTableName);
+            String newSchemaName = MetaDataUtil.getViewIndexSequenceSchemaName(physicalName, true);
+            String newSequenceName = MetaDataUtil.getViewIndexSequenceName(physicalName, tenantId, true);
+            verifySequenceValue(null, newSequenceName, newSchemaName,Short.MIN_VALUE + 3);
+            admin.close();
+        }
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
index 95e78df..49ba84b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
@@ -54,6 +54,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
@@ -319,6 +320,46 @@ public class ViewIT extends SplitSystemCatalogIT {
         }
     }
 
+    @Test
+    public void testCreateViewTimestamp() throws Exception {
+        String tenantId = null;
+        createViewTimestampHelper(tenantId);
+    }
+
+    @Test
+    public void testCreateTenantViewTimestamp() throws Exception {
+        createViewTimestampHelper(TENANT1);
+    }
+
+    private void createViewTimestampHelper(String tenantId) throws SQLException {
+        Properties props = new Properties();
+        if (tenantId != null) {
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        final String schemaName = "S_" + generateUniqueName();
+        final String tableName = "T_" + generateUniqueName();
+        final String viewName = "V_" + generateUniqueName();
+        final String dataTableFullName = SchemaUtil.getTableName(schemaName, tableName);
+        final String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
+        String tableDDL =
+            "CREATE TABLE " + dataTableFullName + " (\n" + "ID1 VARCHAR(15) NOT NULL,\n"
+                + "ID2 VARCHAR(15) NOT NULL,\n" + "CREATED_DATE DATE,\n"
+                + "CREATION_TIME BIGINT,\n" + "LAST_USED DATE,\n"
+                + "CONSTRAINT PK PRIMARY KEY (ID1, ID2)) ";
+        String viewDDL = "CREATE VIEW " + viewFullName  + " AS SELECT * " +
+            "FROM " + dataTableFullName;
+        long startTS = EnvironmentEdgeManager.currentTimeMillis();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(tableDDL);
+        }
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute(viewDDL);
+            CreateTableIT.verifyLastDDLTimestamp(viewFullName,
+                startTS,
+                conn);
+        }
+    }
+
     private void testViewUsesTableIndex(boolean localIndex) throws Exception {
         ResultSet rs;
         // Use unique name for table with local index as otherwise we run
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/AddColumnMutator.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/AddColumnMutator.java
index 4504666..7e74c3d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/AddColumnMutator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/AddColumnMutator.java
@@ -293,7 +293,8 @@ public class AddColumnMutator implements ColumnMutator {
                                                          List<ImmutableBytesPtr> invalidateList,
                                                          List<Region.RowLock> locks,
                                                          long clientTimeStamp,
-                                                         long clientVersion) {
+                                                         final long clientVersion,
+                                                         final boolean isAddingColumns) {
         byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
         byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
         byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
@@ -337,7 +338,7 @@ public class AddColumnMutator implements ColumnMutator {
                         family.getPColumnForColumnNameBytes(colName);
                     } else if (colName!=null && colName.length > 0) {
                         addingPKColumn = true;
-                        table.getPKColumn(new String(colName));
+                        table.getPKColumn(Bytes.toString(colName));
                     } else {
                         continue;
                     }
@@ -399,6 +400,18 @@ public class AddColumnMutator implements ColumnMutator {
                                 rowKeyMetaData[TABLE_NAME_INDEX])));
             }
         }
+        if (isAddingColumns) {
+            //We're changing the application-facing schema by adding a column, so update the DDL
+            // timestamp
+            long serverTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+            if (MetaDataUtil.isTableTypeDirectlyQueried(table.getType())) {
+                additionalTableMetadataMutations.add(MetaDataUtil.getLastDDLTimestampUpdate(tableHeaderRowKey,
+                    clientTimeStamp, serverTimestamp));
+            }
+            //we don't need to update the DDL timestamp for child views, because when we look up
+            // a PTable, we'll take the max timestamp of a view and all its ancestors. This is true
+            // whether the view is diverged or not.
+        }
         tableMetaData.addAll(additionalTableMetadataMutations);
         if (type == PTableType.VIEW) {
             if ( EncodedColumnsUtil.usesEncodedColumnNames(table) && addingCol &&
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ColumnMutator.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ColumnMutator.java
index 9ef0515..d1413d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ColumnMutator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ColumnMutator.java
@@ -52,7 +52,8 @@ public interface ColumnMutator {
                                                                    List<ImmutableBytesPtr> invalidateList,
                                                                    List<Region.RowLock> locks,
                                                                    long clientTimeStamp,
-                                                                   long clientVersion)
+                                                                   long clientVersion,
+                                                                   boolean isAddingOrDroppingColumns)
             throws IOException, SQLException;
 
     /**
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DropColumnMutator.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DropColumnMutator.java
index 735a40b..1ca631a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DropColumnMutator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DropColumnMutator.java
@@ -172,13 +172,17 @@ public class DropColumnMutator implements ColumnMutator {
                                                          List<ImmutableBytesPtr> invalidateList,
                                                          List<Region.RowLock> locks,
                                                          long clientTimeStamp,
-                                                         long clientVersion) throws SQLException {
+                                                         long clientVersion,
+                                                         final boolean isDroppingColumns)
+        throws SQLException {
         byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
         byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
         byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
         boolean isView = table.getType() == PTableType.VIEW;
         boolean deletePKColumn = false;
 
+        byte[] tableHeaderRowKey = SchemaUtil.getTableKey(tenantId,
+            schemaName, tableName);
         List<Mutation> additionalTableMetaData = Lists.newArrayList();
         ListIterator<Mutation> iterator = tableMetaData.listIterator();
         while (iterator.hasNext()) {
@@ -192,7 +196,7 @@ public class DropColumnMutator implements ColumnMutator {
                         && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
                     column = MetaDataUtil.getColumn(pkCount, rowKeyMetaData, table);
                 } else {
-                    for(int i = 0; i < table.getIndexes().size(); i++) {
+                    for (int i = 0; i < table.getIndexes().size(); i++) {
                         PTableImpl indexTable = (PTableImpl) table.getIndexes().get(i);
                         byte[] indexTableName = indexTable.getTableName().getBytes();
                         byte[] indexSchema = indexTable.getSchemaName().getBytes();
@@ -268,6 +272,16 @@ public class DropColumnMutator implements ColumnMutator {
             }
 
         }
+        //We're changing the application-facing schema by dropping a column, so update the DDL
+        // timestamp to current _server_ timestamp
+        if (MetaDataUtil.isTableTypeDirectlyQueried(table.getType())) {
+            long serverTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+            additionalTableMetaData.add(MetaDataUtil.getLastDDLTimestampUpdate(tableHeaderRowKey,
+                clientTimeStamp, serverTimestamp));
+        }
+        //we don't need to update the DDL timestamp for any child views we may have, because
+        // when we look up a PTable for any of those child views, we'll take the max timestamp
+        // of the view and all its ancestors. This is true whether the view is diverged or not.
         tableMetaData.addAll(additionalTableMetaData);
         if (deletePKColumn) {
             if (table.getPKColumns().size() == 1) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index fc3ddb2..0cf62d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -45,6 +45,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_DDL_TIMESTAMP_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE_BYTES;
@@ -327,6 +328,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue USE_STATS_FOR_PARALLELIZATION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, USE_STATS_FOR_PARALLELIZATION_BYTES);
     private static final KeyValue PHOENIX_TTL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PHOENIX_TTL_BYTES);
     private static final KeyValue PHOENIX_TTL_HWM_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PHOENIX_TTL_HWM_BYTES);
+    private static final KeyValue LAST_DDL_TIMESTAMP_KV =
+        createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, LAST_DDL_TIMESTAMP_BYTES);
 
     private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
             EMPTY_KEYVALUE_KV,
@@ -360,7 +363,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             ENCODING_SCHEME_KV,
             USE_STATS_FOR_PARALLELIZATION_KV,
             PHOENIX_TTL_KV,
-            PHOENIX_TTL_HWM_KV
+            PHOENIX_TTL_HWM_KV,
+            LAST_DDL_TIMESTAMP_KV
     );
 
     static {
@@ -398,7 +402,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int USE_STATS_FOR_PARALLELIZATION_INDEX = TABLE_KV_COLUMNS.indexOf(USE_STATS_FOR_PARALLELIZATION_KV);
     private static final int PHOENIX_TTL_INDEX = TABLE_KV_COLUMNS.indexOf(PHOENIX_TTL_KV);
     private static final int PHOENIX_TTL_HWM_INDEX = TABLE_KV_COLUMNS.indexOf(PHOENIX_TTL_HWM_KV);
-
+    private static final int LAST_DDL_TIMESTAMP_INDEX =
+        TABLE_KV_COLUMNS.indexOf(LAST_DDL_TIMESTAMP_KV);
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
     private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
@@ -1150,6 +1155,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         boolean viewModifiedPhoenixTTL = (PTableType.VIEW.equals(tableType)) &&
                 Bytes.contains(tagPhoenixTTL, VIEW_MODIFIED_PROPERTY_BYTES);
 
+        Cell lastDDLTimestampKv = tableKeyValues[LAST_DDL_TIMESTAMP_INDEX];
+        Long lastDDLTimestamp = lastDDLTimestampKv == null ?
+           null : PLong.INSTANCE.getCodec().decodeLong(lastDDLTimestampKv.getValueArray(),
+                lastDDLTimestampKv.getValueOffset(), SortOrder.getDefault());
+
         // Check the cell tag to see whether the view has modified this property
         final byte[] tagUseStatsForParallelization = (useStatsForParallelizationKv == null) ?
                 HConstants.EMPTY_BYTE_ARRAY : CellUtil.getTagArray(useStatsForParallelizationKv);
@@ -1243,6 +1253,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 .setViewModifiedUpdateCacheFrequency(viewModifiedUpdateCacheFrequency)
                 .setViewModifiedUseStatsForParallelization(viewModifiedUseStatsForParallelization)
                 .setViewModifiedPhoenixTTL(viewModifiedPhoenixTTL)
+                .setLastDDLTimestamp(lastDDLTimestamp)
                 .setColumns(columns)
                 .build();
     }
@@ -2038,7 +2049,14 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     // view's property in case they are different from the parent
                     ViewUtil.addTagsToPutsForViewAlteredProperties(tableMetadata, parentTable);
                 }
-
+                //set the last DDL timestamp to the current server time since we're creating the
+                // table. We only need to do this for tables and views because indexes and system
+                // tables aren't relevant to external systems that may be tracking our schema
+                // changes.
+                if (MetaDataUtil.isTableTypeDirectlyQueried(tableType)) {
+                    tableMetadata.add(MetaDataUtil.getLastDDLTimestampUpdate(tableKey,
+                        clientTimeStamp, EnvironmentEdgeManager.currentTimeMillis()));
+                }
                 // When we drop a view we first drop the view metadata and then drop the parent->child linking row
                 List<Mutation> localMutations =
                         Lists.newArrayListWithExpectedSize(tableMetadata.size());
@@ -2100,6 +2118,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     builder.setViewIndexIdType(PLong.INSTANCE.getSqlType());
                 }
                 builder.setMutationTime(currentTimeStamp);
+                //send the newly built table back because we generated the DDL timestamp server
+                // side and the client doesn't have it.
+                PTable newTable = buildTable(tableKey, cacheKey, region,
+                    clientTimeStamp, clientVersion);
+                if (newTable != null) {
+                    builder.setTable(PTableImpl.toProto(newTable));
+                }
                 done.run(builder.build());
             } finally {
                 releaseRowLocks(region, locks);
@@ -2673,7 +2698,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private MetaDataMutationResult mutateColumn(
             final List<Mutation> tableMetadata,
             final ColumnMutator mutator, final int clientVersion,
-            final PTable parentTable) throws IOException {
+            final PTable parentTable, boolean isAddingOrDroppingColumns) throws IOException {
         byte[][] rowKeyMetaData = new byte[5][];
         MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
         byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
@@ -2810,7 +2835,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                         getParentPhysicalTableName(table), table.getType());
 
                 result = mutator.validateAndAddMetadata(table, rowKeyMetaData, tableMetadata,
-                        region, invalidateList, locks, clientTimeStamp, clientVersion);
+                        region, invalidateList, locks, clientTimeStamp, clientVersion,
+                        isAddingOrDroppingColumns);
                 // if the update mutation caused tables to be deleted, the mutation code returned
                 // will be MutationCode.TABLE_ALREADY_EXISTS
                 if (result != null
@@ -2857,18 +2883,21 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     // there should only be remote mutations if we are adding a column to a view
                     // that uses encoded column qualifiers (the remote mutations are to update the
                     // encoded column qualifier counter on the parent table)
-                    if (mutator.getMutateColumnType() == ColumnMutator.MutateColumnType.ADD_COLUMN
+                    if (( mutator.getMutateColumnType() == ColumnMutator.MutateColumnType.ADD_COLUMN
                             && type == PTableType.VIEW
                             && table.getEncodingScheme() !=
-                            QualifierEncodingScheme.NON_ENCODED_QUALIFIERS) {
+                            QualifierEncodingScheme.NON_ENCODED_QUALIFIERS)) {
                         processRemoteRegionMutations(
                                 PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, remoteMutations,
                                 MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE);
-                        clearRemoteTableFromCache(clientTimeStamp,
+                        //if we're a view or index, clear the cache for our parent
+                        if ((type == PTableType.VIEW || type == INDEX) && table.getParentTableName() != null) {
+                            clearRemoteTableFromCache(clientTimeStamp,
                                 table.getParentSchemaName() != null
-                                        ? table.getParentSchemaName().getBytes()
-                                        : ByteUtil.EMPTY_BYTE_ARRAY,
+                                    ? table.getParentSchemaName().getBytes()
+                                    : ByteUtil.EMPTY_BYTE_ARRAY,
                                 table.getParentTableName().getBytes());
+                        }
                     } else {
                         String msg = "Found unexpected mutations while adding or dropping column "
                                 + "to " + fullTableName;
@@ -3028,8 +3057,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         try {
             List<Mutation> tableMetaData = ProtobufUtil.getMutations(request);
             PTable parentTable = request.hasParentTable() ? PTableImpl.createFromProto(request.getParentTable()) : null;
+            boolean addingColumns = request.getAddingColumns();
             MetaDataMutationResult result = mutateColumn(tableMetaData, new AddColumnMutator(),
-                    request.getClientVersion(), parentTable);
+                    request.getClientVersion(), parentTable, addingColumns);
             if (result != null) {
                 done.run(MetaDataMutationResult.toProto(result));
             }
@@ -3167,7 +3197,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             tableMetaData = ProtobufUtil.getMutations(request);
             PTable parentTable = request.hasParentTable() ? PTableImpl.createFromProto(request.getParentTable()) : null;
             MetaDataMutationResult result = mutateColumn(tableMetaData, new DropColumnMutator(env.getConfiguration()),
-                    request.getClientVersion(), parentTable);
+                    request.getClientVersion(), parentTable, true);
             if (result != null) {
                 done.run(MetaDataMutationResult.toProto(result));
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index e6f0bb5..7a0c5fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -94,7 +94,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = MIN_TABLE_TIMESTAMP + 28;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0 = MIN_TABLE_TIMESTAMP + 29;
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = MIN_TABLE_TIMESTAMP + 31;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = MIN_TABLE_TIMESTAMP + 33;
     // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
     // Version below which we should disallow usage of mutable secondary indexing.
@@ -388,7 +388,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
           }
           if (proto.getFunctionCount() > 0) {
               result.wasUpdated = true;
-              for(PFunctionProtos.PFunction function: proto.getFunctionList())
+              for (PFunctionProtos.PFunction function: proto.getFunctionList())
               result.functions.add(PFunction.createFromProto(function));
           }
           if (proto.getTablesToDeleteCount() > 0) {
@@ -399,13 +399,13 @@ public abstract class MetaDataProtocol extends MetaDataService {
             }
           }
           result.columnName = ByteUtil.EMPTY_BYTE_ARRAY;
-          if(proto.hasColumnName()){
+          if (proto.hasColumnName()){
             result.columnName = proto.getColumnName().toByteArray();
           }
-          if(proto.hasFamilyName()){
+          if (proto.hasFamilyName()){
             result.familyName = proto.getFamilyName().toByteArray();
           }
-          if(proto.getSharedTablesToDeleteCount() > 0) {
+          if (proto.getSharedTablesToDeleteCount() > 0) {
               result.sharedTablesToDelete = 
                  Lists.newArrayListWithExpectedSize(proto.getSharedTablesToDeleteCount());
               for (org.apache.phoenix.coprocessor.generated.MetaDataProtos.SharedTableState sharedTable : 
@@ -444,10 +444,10 @@ public abstract class MetaDataProtocol extends MetaDataService {
                 builder.addTablesToDelete(ByteStringer.wrap(tableName));
               }
             }
-            if(result.getColumnName() != null){
+            if (result.getColumnName() != null){
               builder.setColumnName(ByteStringer.wrap(result.getColumnName()));
             }
-            if(result.getFamilyName() != null){
+            if (result.getFamilyName() != null){
               builder.setFamilyName(ByteStringer.wrap(result.getFamilyName()));
             }
             if (result.getSharedTablesToDelete() !=null){
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
index 38283a5..619a92e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
@@ -10822,6 +10822,16 @@ public final class MetaDataProtos {
      * <code>optional .PTable parentTable = 3;</code>
      */
     org.apache.phoenix.coprocessor.generated.PTableProtos.PTableOrBuilder getParentTableOrBuilder();
+
+    // optional bool addingColumns = 4;
+    /**
+     * <code>optional bool addingColumns = 4;</code>
+     */
+    boolean hasAddingColumns();
+    /**
+     * <code>optional bool addingColumns = 4;</code>
+     */
+    boolean getAddingColumns();
   }
   /**
    * Protobuf type {@code AddColumnRequest}
@@ -10900,6 +10910,11 @@ public final class MetaDataProtos {
               bitField0_ |= 0x00000002;
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000004;
+              addingColumns_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -11004,10 +11019,27 @@ public final class MetaDataProtos {
       return parentTable_;
     }
 
+    // optional bool addingColumns = 4;
+    public static final int ADDINGCOLUMNS_FIELD_NUMBER = 4;
+    private boolean addingColumns_;
+    /**
+     * <code>optional bool addingColumns = 4;</code>
+     */
+    public boolean hasAddingColumns() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional bool addingColumns = 4;</code>
+     */
+    public boolean getAddingColumns() {
+      return addingColumns_;
+    }
+
     private void initFields() {
       tableMetadataMutations_ = java.util.Collections.emptyList();
       clientVersion_ = 0;
       parentTable_ = org.apache.phoenix.coprocessor.generated.PTableProtos.PTable.getDefaultInstance();
+      addingColumns_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -11036,6 +11068,9 @@ public final class MetaDataProtos {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeMessage(3, parentTable_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBool(4, addingColumns_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -11062,6 +11097,10 @@ public final class MetaDataProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(3, parentTable_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(4, addingColumns_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -11097,6 +11136,11 @@ public final class MetaDataProtos {
         result = result && getParentTable()
             .equals(other.getParentTable());
       }
+      result = result && (hasAddingColumns() == other.hasAddingColumns());
+      if (hasAddingColumns()) {
+        result = result && (getAddingColumns()
+            == other.getAddingColumns());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -11122,6 +11166,10 @@ public final class MetaDataProtos {
         hash = (37 * hash) + PARENTTABLE_FIELD_NUMBER;
         hash = (53 * hash) + getParentTable().hashCode();
       }
+      if (hasAddingColumns()) {
+        hash = (37 * hash) + ADDINGCOLUMNS_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getAddingColumns());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -11242,6 +11290,8 @@ public final class MetaDataProtos {
           parentTableBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000004);
+        addingColumns_ = false;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -11287,6 +11337,10 @@ public final class MetaDataProtos {
         } else {
           result.parentTable_ = parentTableBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.addingColumns_ = addingColumns_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -11319,6 +11373,9 @@ public final class MetaDataProtos {
         if (other.hasParentTable()) {
           mergeParentTable(other.getParentTable());
         }
+        if (other.hasAddingColumns()) {
+          setAddingColumns(other.getAddingColumns());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -11574,6 +11631,39 @@ public final class MetaDataProtos {
         return parentTableBuilder_;
       }
 
+      // optional bool addingColumns = 4;
+      private boolean addingColumns_ ;
+      /**
+       * <code>optional bool addingColumns = 4;</code>
+       */
+      public boolean hasAddingColumns() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional bool addingColumns = 4;</code>
+       */
+      public boolean getAddingColumns() {
+        return addingColumns_;
+      }
+      /**
+       * <code>optional bool addingColumns = 4;</code>
+       */
+      public Builder setAddingColumns(boolean value) {
+        bitField0_ |= 0x00000008;
+        addingColumns_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool addingColumns = 4;</code>
+       */
+      public Builder clearAddingColumns() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        addingColumns_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:AddColumnRequest)
     }
 
@@ -18038,69 +18128,69 @@ public final class MetaDataProtos {
       "Type\030\002 \002(\t\022\017\n\007cascade\030\003 \001(\010\022\025\n\rclientVer" +
       "sion\030\004 \001(\005\"_\n\021DropSchemaRequest\022\037\n\027schem" +
       "aMetadataMutations\030\001 \003(\014\022\022\n\nschemaName\030\002" +
-      " \002(\t\022\025\n\rclientVersion\030\003 \002(\005\"g\n\020AddColumn" +
+      " \002(\t\022\025\n\rclientVersion\030\003 \002(\005\"~\n\020AddColumn" +
       "Request\022\036\n\026tableMetadataMutations\030\001 \003(\014\022",
       "\025\n\rclientVersion\030\002 \001(\005\022\034\n\013parentTable\030\003 " +
-      "\001(\0132\007.PTable\"h\n\021DropColumnRequest\022\036\n\026tab" +
-      "leMetadataMutations\030\001 \003(\014\022\025\n\rclientVersi" +
-      "on\030\002 \001(\005\022\034\n\013parentTable\030\003 \001(\0132\007.PTable\"^" +
-      "\n\023DropFunctionRequest\022\036\n\026tableMetadataMu" +
-      "tations\030\001 \003(\014\022\020\n\010ifExists\030\002 \001(\010\022\025\n\rclien" +
-      "tVersion\030\003 \001(\005\"P\n\027UpdateIndexStateReques" +
-      "t\022\036\n\026tableMetadataMutations\030\001 \003(\014\022\025\n\rcli" +
-      "entVersion\030\002 \001(\005\"*\n\021ClearCacheRequest\022\025\n" +
-      "\rclientVersion\030\001 \001(\005\"*\n\022ClearCacheRespon",
-      "se\022\024\n\014unfreedBytes\030\001 \001(\003\"*\n\021GetVersionRe" +
-      "quest\022\025\n\rclientVersion\030\001 \001(\005\"E\n\022GetVersi" +
-      "onResponse\022\017\n\007version\030\001 \002(\003\022\036\n\026systemCat" +
-      "alogTimestamp\030\002 \001(\003\"\205\001\n\032ClearTableFromCa" +
-      "cheRequest\022\020\n\010tenantId\030\001 \002(\014\022\022\n\nschemaNa" +
-      "me\030\002 \002(\014\022\021\n\ttableName\030\003 \002(\014\022\027\n\017clientTim" +
-      "estamp\030\004 \002(\003\022\025\n\rclientVersion\030\005 \001(\005\"\035\n\033C" +
-      "learTableFromCacheResponse*\365\005\n\014MutationC" +
-      "ode\022\030\n\024TABLE_ALREADY_EXISTS\020\000\022\023\n\017TABLE_N" +
-      "OT_FOUND\020\001\022\024\n\020COLUMN_NOT_FOUND\020\002\022\031\n\025COLU",
-      "MN_ALREADY_EXISTS\020\003\022\035\n\031CONCURRENT_TABLE_" +
-      "MUTATION\020\004\022\027\n\023TABLE_NOT_IN_REGION\020\005\022\025\n\021N" +
-      "EWER_TABLE_FOUND\020\006\022\034\n\030UNALLOWED_TABLE_MU" +
-      "TATION\020\007\022\021\n\rNO_PK_COLUMNS\020\010\022\032\n\026PARENT_TA" +
-      "BLE_NOT_FOUND\020\t\022\033\n\027FUNCTION_ALREADY_EXIS" +
-      "TS\020\n\022\026\n\022FUNCTION_NOT_FOUND\020\013\022\030\n\024NEWER_FU" +
-      "NCTION_FOUND\020\014\022\032\n\026FUNCTION_NOT_IN_REGION" +
-      "\020\r\022\031\n\025SCHEMA_ALREADY_EXISTS\020\016\022\026\n\022NEWER_S" +
-      "CHEMA_FOUND\020\017\022\024\n\020SCHEMA_NOT_FOUND\020\020\022\030\n\024S" +
-      "CHEMA_NOT_IN_REGION\020\021\022\032\n\026TABLES_EXIST_ON",
-      "_SCHEMA\020\022\022\035\n\031UNALLOWED_SCHEMA_MUTATION\020\023" +
-      "\022%\n!AUTO_PARTITION_SEQUENCE_NOT_FOUND\020\024\022" +
-      "#\n\037CANNOT_COERCE_AUTO_PARTITION_ID\020\025\022\024\n\020" +
-      "TOO_MANY_INDEXES\020\026\022\037\n\033UNABLE_TO_CREATE_C" +
-      "HILD_LINK\020\027\022!\n\035UNABLE_TO_UPDATE_PARENT_T" +
-      "ABLE\020\030\022\037\n\033UNABLE_TO_DELETE_CHILD_LINK\020\031\022" +
-      "\031\n\025UNABLE_TO_UPSERT_TASK\020\0322\345\006\n\017MetaDataS" +
-      "ervice\022/\n\010getTable\022\020.GetTableRequest\032\021.M" +
-      "etaDataResponse\0227\n\014getFunctions\022\024.GetFun" +
-      "ctionsRequest\032\021.MetaDataResponse\0221\n\tgetS",
-      "chema\022\021.GetSchemaRequest\032\021.MetaDataRespo" +
-      "nse\0225\n\013createTable\022\023.CreateTableRequest\032" +
-      "\021.MetaDataResponse\022;\n\016createFunction\022\026.C" +
-      "reateFunctionRequest\032\021.MetaDataResponse\022" +
-      "7\n\014createSchema\022\024.CreateSchemaRequest\032\021." +
-      "MetaDataResponse\0221\n\tdropTable\022\021.DropTabl" +
-      "eRequest\032\021.MetaDataResponse\0223\n\ndropSchem" +
-      "a\022\022.DropSchemaRequest\032\021.MetaDataResponse" +
-      "\0227\n\014dropFunction\022\024.DropFunctionRequest\032\021" +
-      ".MetaDataResponse\0221\n\taddColumn\022\021.AddColu",
-      "mnRequest\032\021.MetaDataResponse\0223\n\ndropColu" +
-      "mn\022\022.DropColumnRequest\032\021.MetaDataRespons" +
-      "e\022?\n\020updateIndexState\022\030.UpdateIndexState" +
-      "Request\032\021.MetaDataResponse\0225\n\nclearCache" +
-      "\022\022.ClearCacheRequest\032\023.ClearCacheRespons" +
-      "e\0225\n\ngetVersion\022\022.GetVersionRequest\032\023.Ge" +
-      "tVersionResponse\022P\n\023clearTableFromCache\022" +
-      "\033.ClearTableFromCacheRequest\032\034.ClearTabl" +
-      "eFromCacheResponseBB\n(org.apache.phoenix" +
-      ".coprocessor.generatedB\016MetaDataProtosH\001",
-      "\210\001\001\240\001\001"
+      "\001(\0132\007.PTable\022\025\n\raddingColumns\030\004 \001(\010\"h\n\021D" +
+      "ropColumnRequest\022\036\n\026tableMetadataMutatio" +
+      "ns\030\001 \003(\014\022\025\n\rclientVersion\030\002 \001(\005\022\034\n\013paren" +
+      "tTable\030\003 \001(\0132\007.PTable\"^\n\023DropFunctionReq" +
+      "uest\022\036\n\026tableMetadataMutations\030\001 \003(\014\022\020\n\010" +
+      "ifExists\030\002 \001(\010\022\025\n\rclientVersion\030\003 \001(\005\"P\n" +
+      "\027UpdateIndexStateRequest\022\036\n\026tableMetadat" +
+      "aMutations\030\001 \003(\014\022\025\n\rclientVersion\030\002 \001(\005\"" +
+      "*\n\021ClearCacheRequest\022\025\n\rclientVersion\030\001 ",
+      "\001(\005\"*\n\022ClearCacheResponse\022\024\n\014unfreedByte" +
+      "s\030\001 \001(\003\"*\n\021GetVersionRequest\022\025\n\rclientVe" +
+      "rsion\030\001 \001(\005\"E\n\022GetVersionResponse\022\017\n\007ver" +
+      "sion\030\001 \002(\003\022\036\n\026systemCatalogTimestamp\030\002 \001" +
+      "(\003\"\205\001\n\032ClearTableFromCacheRequest\022\020\n\010ten" +
+      "antId\030\001 \002(\014\022\022\n\nschemaName\030\002 \002(\014\022\021\n\ttable" +
+      "Name\030\003 \002(\014\022\027\n\017clientTimestamp\030\004 \002(\003\022\025\n\rc" +
+      "lientVersion\030\005 \001(\005\"\035\n\033ClearTableFromCach" +
+      "eResponse*\365\005\n\014MutationCode\022\030\n\024TABLE_ALRE" +
+      "ADY_EXISTS\020\000\022\023\n\017TABLE_NOT_FOUND\020\001\022\024\n\020COL",
+      "UMN_NOT_FOUND\020\002\022\031\n\025COLUMN_ALREADY_EXISTS" +
+      "\020\003\022\035\n\031CONCURRENT_TABLE_MUTATION\020\004\022\027\n\023TAB" +
+      "LE_NOT_IN_REGION\020\005\022\025\n\021NEWER_TABLE_FOUND\020" +
+      "\006\022\034\n\030UNALLOWED_TABLE_MUTATION\020\007\022\021\n\rNO_PK" +
+      "_COLUMNS\020\010\022\032\n\026PARENT_TABLE_NOT_FOUND\020\t\022\033" +
+      "\n\027FUNCTION_ALREADY_EXISTS\020\n\022\026\n\022FUNCTION_" +
+      "NOT_FOUND\020\013\022\030\n\024NEWER_FUNCTION_FOUND\020\014\022\032\n" +
+      "\026FUNCTION_NOT_IN_REGION\020\r\022\031\n\025SCHEMA_ALRE" +
+      "ADY_EXISTS\020\016\022\026\n\022NEWER_SCHEMA_FOUND\020\017\022\024\n\020" +
+      "SCHEMA_NOT_FOUND\020\020\022\030\n\024SCHEMA_NOT_IN_REGI",
+      "ON\020\021\022\032\n\026TABLES_EXIST_ON_SCHEMA\020\022\022\035\n\031UNAL" +
+      "LOWED_SCHEMA_MUTATION\020\023\022%\n!AUTO_PARTITIO" +
+      "N_SEQUENCE_NOT_FOUND\020\024\022#\n\037CANNOT_COERCE_" +
+      "AUTO_PARTITION_ID\020\025\022\024\n\020TOO_MANY_INDEXES\020" +
+      "\026\022\037\n\033UNABLE_TO_CREATE_CHILD_LINK\020\027\022!\n\035UN" +
+      "ABLE_TO_UPDATE_PARENT_TABLE\020\030\022\037\n\033UNABLE_" +
+      "TO_DELETE_CHILD_LINK\020\031\022\031\n\025UNABLE_TO_UPSE" +
+      "RT_TASK\020\0322\345\006\n\017MetaDataService\022/\n\010getTabl" +
+      "e\022\020.GetTableRequest\032\021.MetaDataResponse\0227" +
+      "\n\014getFunctions\022\024.GetFunctionsRequest\032\021.M",
+      "etaDataResponse\0221\n\tgetSchema\022\021.GetSchema" +
+      "Request\032\021.MetaDataResponse\0225\n\013createTabl" +
+      "e\022\023.CreateTableRequest\032\021.MetaDataRespons" +
+      "e\022;\n\016createFunction\022\026.CreateFunctionRequ" +
+      "est\032\021.MetaDataResponse\0227\n\014createSchema\022\024" +
+      ".CreateSchemaRequest\032\021.MetaDataResponse\022" +
+      "1\n\tdropTable\022\021.DropTableRequest\032\021.MetaDa" +
+      "taResponse\0223\n\ndropSchema\022\022.DropSchemaReq" +
+      "uest\032\021.MetaDataResponse\0227\n\014dropFunction\022" +
+      "\024.DropFunctionRequest\032\021.MetaDataResponse",
+      "\0221\n\taddColumn\022\021.AddColumnRequest\032\021.MetaD" +
+      "ataResponse\0223\n\ndropColumn\022\022.DropColumnRe" +
+      "quest\032\021.MetaDataResponse\022?\n\020updateIndexS" +
+      "tate\022\030.UpdateIndexStateRequest\032\021.MetaDat" +
+      "aResponse\0225\n\nclearCache\022\022.ClearCacheRequ" +
+      "est\032\023.ClearCacheResponse\0225\n\ngetVersion\022\022" +
+      ".GetVersionRequest\032\023.GetVersionResponse\022" +
+      "P\n\023clearTableFromCache\022\033.ClearTableFromC" +
+      "acheRequest\032\034.ClearTableFromCacheRespons" +
+      "eBB\n(org.apache.phoenix.coprocessor.gene",
+      "ratedB\016MetaDataProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -18172,7 +18262,7 @@ public final class MetaDataProtos {
           internal_static_AddColumnRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_AddColumnRequest_descriptor,
-              new java.lang.String[] { "TableMetadataMutations", "ClientVersion", "ParentTable", });
+              new java.lang.String[] { "TableMetadataMutations", "ClientVersion", "ParentTable", "AddingColumns", });
           internal_static_DropColumnRequest_descriptor =
             getDescriptor().getMessageTypes().get(11);
           internal_static_DropColumnRequest_fieldAccessorTable = new
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index 9a935e9..122d893 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -3754,6 +3754,16 @@ public final class PTableProtos {
      * <code>optional bool viewModifiedPhoenixTTL = 44;</code>
      */
     boolean getViewModifiedPhoenixTTL();
+
+    // optional int64 lastDDLTimestamp = 45;
+    /**
+     * <code>optional int64 lastDDLTimestamp = 45;</code>
+     */
+    boolean hasLastDDLTimestamp();
+    /**
+     * <code>optional int64 lastDDLTimestamp = 45;</code>
+     */
+    long getLastDDLTimestamp();
   }
   /**
    * Protobuf type {@code PTable}
@@ -4040,6 +4050,11 @@ public final class PTableProtos {
               viewModifiedPhoenixTTL_ = input.readBool();
               break;
             }
+            case 360: {
+              bitField1_ |= 0x00000080;
+              lastDDLTimestamp_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4916,6 +4931,22 @@ public final class PTableProtos {
       return viewModifiedPhoenixTTL_;
     }
 
+    // optional int64 lastDDLTimestamp = 45;
+    public static final int LASTDDLTIMESTAMP_FIELD_NUMBER = 45;
+    private long lastDDLTimestamp_;
+    /**
+     * <code>optional int64 lastDDLTimestamp = 45;</code>
+     */
+    public boolean hasLastDDLTimestamp() {
+      return ((bitField1_ & 0x00000080) == 0x00000080);
+    }
+    /**
+     * <code>optional int64 lastDDLTimestamp = 45;</code>
+     */
+    public long getLastDDLTimestamp() {
+      return lastDDLTimestamp_;
+    }
+
     private void initFields() {
       schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       tableNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -4960,6 +4991,7 @@ public final class PTableProtos {
       phoenixTTL_ = 0L;
       phoenixTTLHighWaterMark_ = 0L;
       viewModifiedPhoenixTTL_ = false;
+      lastDDLTimestamp_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5156,6 +5188,9 @@ public final class PTableProtos {
       if (((bitField1_ & 0x00000040) == 0x00000040)) {
         output.writeBool(44, viewModifiedPhoenixTTL_);
       }
+      if (((bitField1_ & 0x00000080) == 0x00000080)) {
+        output.writeInt64(45, lastDDLTimestamp_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -5342,6 +5377,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(44, viewModifiedPhoenixTTL_);
       }
+      if (((bitField1_ & 0x00000080) == 0x00000080)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(45, lastDDLTimestamp_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -5568,6 +5607,11 @@ public final class PTableProtos {
         result = result && (getViewModifiedPhoenixTTL()
             == other.getViewModifiedPhoenixTTL());
       }
+      result = result && (hasLastDDLTimestamp() == other.hasLastDDLTimestamp());
+      if (hasLastDDLTimestamp()) {
+        result = result && (getLastDDLTimestamp()
+            == other.getLastDDLTimestamp());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -5753,6 +5797,10 @@ public final class PTableProtos {
         hash = (37 * hash) + VIEWMODIFIEDPHOENIXTTL_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getViewModifiedPhoenixTTL());
       }
+      if (hasLastDDLTimestamp()) {
+        hash = (37 * hash) + LASTDDLTIMESTAMP_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getLastDDLTimestamp());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -5963,6 +6011,8 @@ public final class PTableProtos {
         bitField1_ = (bitField1_ & ~0x00000200);
         viewModifiedPhoenixTTL_ = false;
         bitField1_ = (bitField1_ & ~0x00000400);
+        lastDDLTimestamp_ = 0L;
+        bitField1_ = (bitField1_ & ~0x00000800);
         return this;
       }
 
@@ -6181,6 +6231,10 @@ public final class PTableProtos {
           to_bitField1_ |= 0x00000040;
         }
         result.viewModifiedPhoenixTTL_ = viewModifiedPhoenixTTL_;
+        if (((from_bitField1_ & 0x00000800) == 0x00000800)) {
+          to_bitField1_ |= 0x00000080;
+        }
+        result.lastDDLTimestamp_ = lastDDLTimestamp_;
         result.bitField0_ = to_bitField0_;
         result.bitField1_ = to_bitField1_;
         onBuilt();
@@ -6407,6 +6461,9 @@ public final class PTableProtos {
         if (other.hasViewModifiedPhoenixTTL()) {
           setViewModifiedPhoenixTTL(other.getViewModifiedPhoenixTTL());
         }
+        if (other.hasLastDDLTimestamp()) {
+          setLastDDLTimestamp(other.getLastDDLTimestamp());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -8717,6 +8774,39 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional int64 lastDDLTimestamp = 45;
+      private long lastDDLTimestamp_ ;
+      /**
+       * <code>optional int64 lastDDLTimestamp = 45;</code>
+       */
+      public boolean hasLastDDLTimestamp() {
+        return ((bitField1_ & 0x00000800) == 0x00000800);
+      }
+      /**
+       * <code>optional int64 lastDDLTimestamp = 45;</code>
+       */
+      public long getLastDDLTimestamp() {
+        return lastDDLTimestamp_;
+      }
+      /**
+       * <code>optional int64 lastDDLTimestamp = 45;</code>
+       */
+      public Builder setLastDDLTimestamp(long value) {
+        bitField1_ |= 0x00000800;
+        lastDDLTimestamp_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 lastDDLTimestamp = 45;</code>
+       */
+      public Builder clearLastDDLTimestamp() {
+        bitField1_ = (bitField1_ & ~0x00000800);
+        lastDDLTimestamp_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PTable)
     }
 
@@ -9386,7 +9476,7 @@ public final class PTableProtos {
       "es\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003 \001(\003\022\025\n" +
       "\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePostsCount\030" +
       "\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGuidePosts" +
-      "\"\370\010\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016" +
+      "\"\222\t\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016" +
       "tableNameBytes\030\002 \002(\014\022\036\n\ttableType\030\003 \002(\0162" +
       "\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022\026\n\016sequ" +
       "enceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022\023\n\013p" +
@@ -9414,12 +9504,13 @@ public final class PTableProtos {
       "requency\030( \001(\010\022.\n&viewModifiedUseStatsFo",
       "rParallelization\030) \001(\010\022\022\n\nphoenixTTL\030* \001" +
       "(\003\022\037\n\027phoenixTTLHighWaterMark\030+ \001(\003\022\036\n\026v" +
-      "iewModifiedPhoenixTTL\030, \001(\010\"6\n\020EncodedCQ" +
-      "Counter\022\021\n\tcolFamily\030\001 \002(\t\022\017\n\007counter\030\002 " +
-      "\002(\005*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022" +
-      "\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.ap" +
-      "ache.phoenix.coprocessor.generatedB\014PTab" +
-      "leProtosH\001\210\001\001\240\001\001"
+      "iewModifiedPhoenixTTL\030, \001(\010\022\030\n\020lastDDLTi" +
+      "mestamp\030- \001(\003\"6\n\020EncodedCQCounter\022\021\n\tcol" +
+      "Family\030\001 \002(\t\022\017\n\007counter\030\002 \002(\005*A\n\nPTableT" +
+      "ype\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005I" +
+      "NDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.phoenix.c" +
+      "oprocessor.generatedB\014PTableProtosH\001\210\001\001\240" +
+      "\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9443,7 +9534,7 @@ public final class PTableProtos {
           internal_static_PTable_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PTable_descriptor,
-              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisable [...]
+              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisable [...]
           internal_static_EncodedCQCounter_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_EncodedCQCounter_fieldAccessorTable = new
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index cac4798..48a367c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -378,6 +378,9 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final String PHOENIX_TTL_HWM = "PHOENIX_TTL_HWM";
     public static final byte[] PHOENIX_TTL_HWM_BYTES = Bytes.toBytes(PHOENIX_TTL_HWM);
 
+    public static final String LAST_DDL_TIMESTAMP = "LAST_DDL_TIMESTAMP";
+    public static final byte[] LAST_DDL_TIMESTAMP_BYTES = Bytes.toBytes(LAST_DDL_TIMESTAMP);
+
     public static final String SYSTEM_CHILD_LINK_TABLE = "CHILD_LINK";
     public static final String SYSTEM_CHILD_LINK_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CHILD_LINK_TABLE);
     public static final byte[] SYSTEM_CHILD_LINK_NAME_BYTES = Bytes.toBytes(SYSTEM_CHILD_LINK_NAME);
@@ -1043,7 +1046,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
                 "\nfrom " + SYSTEM_CATALOG +
                 "\nwhere ");
         buf.append(TABLE_SCHEM + (schema == null || schema.length() == 0 ? " is null" : " = ?" ));
-        if(schema != null && schema.length() > 0) {
+        if (schema != null && schema.length() > 0) {
             parameterValues.add(schema);
         }
         buf.append("\nand " + DATA_TABLE_NAME + " = ?" );
@@ -1360,7 +1363,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
         addTenantIdFilter(buf, catalog, parameterValues);
         if (schemaPattern != null) {
             buf.append(" and " + TABLE_SCHEM + (schemaPattern.length() == 0 ? " is null" : " like ?" ));
-            if(schemaPattern.length() > 0) {
+            if (schemaPattern.length() > 0) {
                 parameterValues.add(schemaPattern);
             }
         }
@@ -1493,7 +1496,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
                     TRANSACTION_PROVIDER + " IS NOT NULL AS " + TRANSACTIONAL + "," +
                     IS_NAMESPACE_MAPPED + "," +
                     GUIDE_POSTS_WIDTH + "," +
-                    TransactionProviderNameFunction.NAME + "(" + TRANSACTION_PROVIDER + ") AS TRANSACTION_PROVIDER" +
+                    TransactionProviderNameFunction.NAME + "(" + TRANSACTION_PROVIDER + ") AS " +
+                        "TRANSACTION_PROVIDER" +
                     " from " + SYSTEM_CATALOG + " " + SYSTEM_CATALOG_ALIAS +
                     " where " + COLUMN_NAME + " is null" +
                     " and " + COLUMN_FAMILY + " is null" +
@@ -1501,7 +1505,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
             addTenantIdFilter(buf, catalog, parameterValues);
             if (schemaPattern != null) {
                 buf.append(" and " + TABLE_SCHEM + (schemaPattern.length() == 0 ? " is null" : " like ?" ));
-                if(schemaPattern.length() > 0) {
+                if (schemaPattern.length() > 0) {
                     parameterValues.add(schemaPattern);
                 }
             }
@@ -1538,7 +1542,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
                     "CAST(null AS BOOLEAN) " + TRANSACTIONAL + "," +
                     "CAST(null AS BOOLEAN) " + IS_NAMESPACE_MAPPED + "," +
                     "CAST(null AS BIGINT) " + GUIDE_POSTS_WIDTH + "," +
-                    "CAST(null AS VARCHAR) " + TRANSACTION_PROVIDER + "\n");
+                    "CAST(null AS VARCHAR) " + TRANSACTION_PROVIDER + "\n"
+            );
             buf.append(
                     " from " + SYSTEM_SEQUENCE + "\n");
             StringBuilder whereClause = new StringBuilder();
@@ -1546,7 +1551,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
             if (schemaPattern != null) {
                 appendConjunction(whereClause);
                 whereClause.append(SEQUENCE_SCHEMA + (schemaPattern.length() == 0 ? " is null" : " like ?\n" ));
-                if(schemaPattern.length() > 0) {
+                if (schemaPattern.length() > 0) {
                     parameterValues.add(schemaPattern);
                 }
             }
@@ -2086,7 +2091,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
 
     private void setParameters(PreparedStatement stmt, List<String> parameterValues)
             throws SQLException {
-        for(int i = 0; i < parameterValues.size(); i++) {
+        for (int i = 0; i < parameterValues.size(); i++) {
             stmt.setString(i+1, parameterValues.get(i));
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 7d93abd..3a6268e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -515,7 +515,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         try {
             return htable.getTableDescriptor();
         } catch (IOException e) {
-            if(e instanceof org.apache.hadoop.hbase.TableNotFoundException
+            if (e instanceof org.apache.hadoop.hbase.TableNotFoundException
                     || e.getCause() instanceof org.apache.hadoop.hbase.TableNotFoundException) {
                 byte[][] schemaAndTableName = new byte[2][];
                 SchemaUtil.getVarChars(tableName, schemaAndTableName);
@@ -1066,8 +1066,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             }
 
             Set<byte[]> familiesKeys = descriptor.getFamiliesKeys();
-            for(byte[] family: familiesKeys) {
-                if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+            for (byte[] family: familiesKeys) {
+                if (Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
                     if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) {
                         descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(),
                                 null, priority, null);
@@ -1082,7 +1082,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 if (!descriptor.hasCoprocessor(MetaDataEndpointImpl.class.getName())) {
                     descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, priority, null);
                 }
-                if(SchemaUtil.isMetaTable(tableName) ) {
+                if (SchemaUtil.isMetaTable(tableName) ) {
                     if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) {
                         descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, priority + 1, null);
                     }
@@ -1092,7 +1092,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null);
                 }
             } else if (SchemaUtil.isTaskTable(tableName)) {
-                if(!descriptor.hasCoprocessor(TaskRegionObserver.class.getName())) {
+                if (!descriptor.hasCoprocessor(TaskRegionObserver.class.getName())) {
                     descriptor.addCoprocessor(TaskRegionObserver.class.getName(), null, priority, null);
                 }
                 if (!descriptor.hasCoprocessor(
@@ -1298,7 +1298,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             LOGGER.debug("Found quorum: " + quorum + ":" + znode);
 
             if (isMetaTable) {
-                if(SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, this.getProps())) {
+                if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, this.getProps())) {
                     try {
                         // SYSTEM namespace needs to be created via HBase APIs because "CREATE SCHEMA" statement tries to write
                         // its metadata in SYSTEM:CATALOG table. Without SYSTEM namespace, SYSTEM:CATALOG table cannot be created
@@ -1419,7 +1419,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 if (isMetaTable && !isUpgradeRequired()) {
                     checkClientServerCompatibility(SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName());
                 } else {
-                    for(Pair<byte[],Map<String,Object>> family: families) {
+                    for (Pair<byte[],Map<String,Object>> family: families) {
                         if ((newDesc.getValue(HTableDescriptor.SPLIT_POLICY)==null || !newDesc.getValue(HTableDescriptor.SPLIT_POLICY).equals(
                                 IndexRegionSplitPolicy.class.getName()))
                                 && Bytes.toString(family.getFirst()).startsWith(
@@ -1833,12 +1833,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
                 if (dropMetadata) {
                     List<String> columnFamiles = new ArrayList<String>();
-                    for(HColumnDescriptor cf : desc.getColumnFamilies()) {
-                        if(cf.getNameAsString().startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+                    for (HColumnDescriptor cf : desc.getColumnFamilies()) {
+                        if (cf.getNameAsString().startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
                             columnFamiles.add(cf.getNameAsString());
                         }
                     }
-                    for(String cf: columnFamiles) {
+                    for (String cf: columnFamiles) {
                         admin.deleteColumn(physicalTableName, cf);
                     }
                     clearTableRegionCache(physicalTableName);
@@ -1871,8 +1871,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         byte[] physicalTableNameBytes = physicalTableName != null ? physicalTableName :
             SchemaUtil.getPhysicalHBaseTableName(schemaBytes, tableBytes, isNamespaceMapped).getBytes();
         boolean localIndexTable = false;
-        for(Pair<byte[], Map<String, Object>> family: families) {
-            if(Bytes.toString(family.getFirst()).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+        for (Pair<byte[], Map<String, Object>> family: families) {
+            if (Bytes.toString(family.getFirst()).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
                 localIndexTable = true;
                 break;
             }
@@ -1962,7 +1962,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 }
                 CreateTableRequest build = builder.build();
                 instance.createTable(controller, build, rpcCallback);
-                if(controller.getFailedOn() != null) {
+                if (controller.getFailedOn() != null) {
                     throw controller.getFailedOn();
                 }
                 return rpcCallback.get();
@@ -1990,7 +1990,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setClientTimestamp(clientTimestamp);
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                 instance.getTable(controller, builder.build(), rpcCallback);
-                if(controller.getFailedOn() != null) {
+                if (controller.getFailedOn() != null) {
                     throw controller.getFailedOn();
                 }
                 return rpcCallback.get();
@@ -2023,7 +2023,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setCascade(cascade);
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                 instance.dropTable(controller, builder.build(), rpcCallback);
-                if(controller.getFailedOn() != null) {
+                if (controller.getFailedOn() != null) {
                     throw controller.getFailedOn();
                 }
                 return rpcCallback.get();
@@ -2109,7 +2109,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setIfExists(ifExists);
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                 instance.dropFunction(controller, builder.build(), rpcCallback);
-                if(controller.getFailedOn() != null) {
+                if (controller.getFailedOn() != null) {
                     throw controller.getFailedOn();
                 }
                 return rpcCallback.get();
@@ -2300,6 +2300,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
 
             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            final boolean addingColumns = columns != null && columns.size() > 0;
             result =  metaDataCoprocessorExec(tableKey,
                     new Batch.Call<MetaDataService, MetaDataResponse>() {
                 @Override
@@ -2315,8 +2316,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                     if (parentTable!=null)
                         builder.setParentTable(PTableImpl.toProto(parentTable));
+                    builder.setAddingColumns(addingColumns);
                     instance.addColumn(controller, builder.build(), rpcCallback);
-                    if(controller.getFailedOn() != null) {
+                    if (controller.getFailedOn() != null) {
                         throw controller.getFailedOn();
                     }
                     return rpcCallback.get();
@@ -3011,7 +3013,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 if (parentTable!=null)
                     builder.setParentTable(PTableImpl.toProto(parentTable));
                 instance.dropColumn(controller, builder.build(), rpcCallback);
-                if(controller.getFailedOn() != null) {
+                if (controller.getFailedOn() != null) {
                     throw controller.getFailedOn();
                 }
                 return rpcCallback.get();
@@ -3737,15 +3739,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             metaConnection = addColumnsIfNotExists(
                 metaConnection,
                 PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 - 1,
+                MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 - 2,
                 PhoenixDatabaseMetaData.PHOENIX_TTL + " "
                         + PInteger.INSTANCE.getSqlTypeName());
             metaConnection = addColumnsIfNotExists(
                 metaConnection,
                 PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0,
+                MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 - 1,
                 PhoenixDatabaseMetaData.PHOENIX_TTL_HWM + " "
                         + PInteger.INSTANCE.getSqlTypeName());
+            metaConnection = addColumnsIfNotExists(metaConnection,
+                PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0,
+                PhoenixDatabaseMetaData.LAST_DDL_TIMESTAMP + " "
+                    + PLong.INSTANCE.getSqlTypeName());
+            UpgradeUtil.bootstrapLastDDLTimestamp(metaConnection);
 
             boolean isNamespaceMapping =
                     SchemaUtil.isNamespaceMappingEnabled(null,  getConfiguration());
@@ -4568,7 +4575,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     /**
      * Set IMMUTABLE_ROWS to true for all index tables over immutable tables.
-     * @param metaConnection connection over which to run the upgrade
+     * @param oldMetaConnection connection over which to run the upgrade
+     * @param timestamp SCN at which to run the update
      * @throws SQLException
      */
     private PhoenixConnection setImmutableTableIndexesImmutable(PhoenixConnection oldMetaConnection, long timestamp) throws SQLException {
@@ -4830,7 +4838,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 }
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                 instance.updateIndexState(controller, builder.build(), rpcCallback);
-                if(controller.getFailedOn() != null) {
+                if (controller.getFailedOn() != null) {
                     throw controller.getFailedOn();
                 }
                 return rpcCallback.get();
@@ -4840,7 +4848,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     @Override
     public MetaDataMutationResult updateIndexState(final List<Mutation> tableMetaData, String parentTableName, Map<String, List<Pair<String,Object>>> stmtProperties,  PTable table) throws SQLException {
-        if(stmtProperties == null) {
+        if (stmtProperties == null) {
             return updateIndexState(tableMetaData,parentTableName);
         }
         Map<HTableDescriptor, HTableDescriptor> oldToNewTableDescriptors =
@@ -4949,7 +4957,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
      * Increment any of the set of sequences that need more values. These are the sequences
      * that are asking for the next value within a given statement. The returned sequences
      * are the ones that were not found because they were deleted by another client.
-     * @param sequenceKeys sorted list of sequence kyes
+     * @param sequenceAllocations sorted list of sequence kyes
      * @param timestamp
      * @throws SQLException if any of the sequences cannot be found
      *
@@ -5210,12 +5218,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                  * If we are throttling connections internal connections and client created connections
                  *   are counted separately against each respective quota.
                  */
-                if(shouldThrottleNumConnections) {
+                if (shouldThrottleNumConnections) {
                     int futureConnections = 1 + ( connection.isInternalConnection() ? internalConnectionCount : connectionCount);
                     int allowedConnections = connection.isInternalConnection() ? maxInternalConnectionsAllowed : maxConnectionsAllowed;
-                    if(allowedConnections != 0 && futureConnections > allowedConnections) {
+                    if (allowedConnections != 0 && futureConnections > allowedConnections) {
                         GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment();
-                        if(connection.isInternalConnection()) {
+                        if (connection.isInternalConnection()) {
                             throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED).
                                     build().buildException();
                         }
@@ -5224,7 +5232,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     }
                 }
 
-                if(!connection.isInternalConnection()) {
+                if (!connection.isInternalConnection()) {
                     connectionCount++;
                 } else {
                     internalConnectionCount++;
@@ -5243,7 +5251,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         if (returnSequenceValues) {
             ConcurrentMap<SequenceKey,Sequence> formerSequenceMap = null;
             synchronized (connectionCountLock) {
-                if(!connection.isInternalConnection()) {
+                if (!connection.isInternalConnection()) {
                     if (connectionCount + internalConnectionCount - 1 <= 0) {
                         if (!this.sequenceMap.isEmpty()) {
                             formerSequenceMap = this.sequenceMap;
@@ -5261,7 +5269,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
         if (returnSequenceValues || shouldThrottleNumConnections){ //still need to decrement connection count
             synchronized (connectionCountLock) {
-                if(connection.isInternalConnection() && internalConnectionCount > 0) {
+                if (connection.isInternalConnection() && internalConnectionCount > 0) {
                     --internalConnectionCount;
                 } else if (connectionCount > 0) {
                     --connectionCount;
@@ -5368,14 +5376,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         new BlockingRpcCallback<MetaDataResponse>();
                 GetFunctionsRequest.Builder builder = GetFunctionsRequest.newBuilder();
                 builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
-                for(Pair<byte[], Long> function: functions) {
+                for (Pair<byte[], Long> function: functions) {
                     builder.addFunctionNames(ByteStringer.wrap(function.getFirst()));
                     builder.addFunctionTimestamps(function.getSecond().longValue());
                 }
                 builder.setClientTimestamp(clientTimestamp);
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                 instance.getFunctions(controller, builder.build(), rpcCallback);
-                if(controller.getFailedOn() != null) {
+                if (controller.getFailedOn() != null) {
                     throw controller.getFailedOn();
                 }
                 return rpcCallback.get();
@@ -5432,7 +5440,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 builder.setReplace(function.isReplace());
                 builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
                 instance.createFunction(controller, builder.build(), rpcCallback);
-                if(controller.getFailedOn() != null) {
+                if (controller.getFailedOn() != null) {
                     throw controller.getFailedOn();
                 }
                 return rpcCallback.get();
@@ -5701,8 +5709,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
      * Manually adds {@link GuidePostsInfo} for a table to the client-side cache. Not a
      * {@link ConnectionQueryServices} method. Exposed for testing purposes.
      *
-     * @param tableName Table name
-     * @param stats Stats instance
+     * @param key Table name
+     * @param info Stats instance
      */
     public void addTableStats(GuidePostsKey key, GuidePostsInfo info) {
         this.tableStatsCache.put(Objects.requireNonNull(key), Objects.requireNonNull(info));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 4f50337..d4e0fa2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -87,6 +87,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_DDL_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
@@ -308,6 +309,7 @@ public interface QueryConstants {
             VIEW_INDEX_ID_DATA_TYPE + " INTEGER,\n" +
             PHOENIX_TTL + " BIGINT,\n" +
             PHOENIX_TTL_HWM + " BIGINT,\n" +
+            LAST_DDL_TIMESTAMP + " BIGINT, " +
             // Column metadata (will be null for table row)
             DATA_TYPE + " INTEGER," +
             COLUMN_SIZE + " INTEGER," +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 13e7bf1..2fde0fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -350,6 +350,11 @@ public class DelegateTable implements PTable {
         return delegate.hasViewModifiedPhoenixTTL();
     }
 
+    @Override
+    public Long getLastDDLTimestamp() {
+        return delegate.getLastDDLTimestamp();
+    }
+
     @Override public Map<String, String> getPropertyValues() { return delegate.getPropertyValues(); }
 
     @Override public Map<String, String> getDefaultPropertyValues() { return delegate.getDefaultPropertyValues(); }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 68b0891..e920049 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -837,7 +837,7 @@ public class MetaDataClient {
 
         do {
             List<Pair<byte[], Long>> functionsToFecth = new ArrayList<Pair<byte[], Long>>(functionNames.size());
-            for(int i = 0; i< functionNames.size(); i++) {
+            for (int i = 0; i< functionNames.size(); i++) {
                 functionsToFecth.add(new Pair<byte[], Long>(PVarchar.INSTANCE.toBytes(functionNames.get(i)), functionTimeStamps.get(i)));
             }
             result = connection.getQueryServices().getFunctions(tenantId, functionsToFecth, clientTimeStamp);
@@ -1757,7 +1757,7 @@ public class MetaDataClient {
                 QueryServices.INDEX_ASYNC_BUILD_ENABLED,
                 QueryServicesOptions.DEFAULT_INDEX_ASYNC_BUILD_ENABLED);
         // In async process, we return immediately as the MR job needs to be triggered .
-        if(statement.isAsync() && asyncIndexBuildEnabled) {
+        if (statement.isAsync() && asyncIndexBuildEnabled) {
             return new MutationState(0, 0, connection);
         }
 
@@ -1876,7 +1876,7 @@ public class MetaDataClient {
                 List<PFunction> functions = new ArrayList<PFunction>(1);
                 functions.add(function);
                 result = new MetaDataMutationResult(code, result.getMutationTime(), functions, true);
-                if(function.isReplace()) {
+                if (function.isReplace()) {
                     connection.removeFunction(function.getTenantId(), function.getFunctionName(),
                             result.getMutationTime());
                 }
@@ -3046,7 +3046,7 @@ public class MetaDataClient {
             if (code != MutationCode.TABLE_NOT_FOUND) {
                 boolean tableAlreadyExists = handleCreateTableMutationCode(result, code, statement,
                         schemaName, tableName, parent);
-                if(tableAlreadyExists) {
+                if (tableAlreadyExists) {
                     return null;
                 }
             }
@@ -3088,60 +3088,62 @@ public class MetaDataClient {
              */
             EncodedCQCounter cqCounterToBe = tableType == PTableType.VIEW ? NULL_COUNTER : cqCounter;
             PTable table = new PTableImpl.Builder()
-                    .setType(tableType)
-                    .setState(indexState)
-                    .setTimeStamp(timestamp != null ? timestamp : result.getMutationTime())
-                    .setIndexDisableTimestamp(0L)
-                    .setSequenceNumber(PTable.INITIAL_SEQ_NUM)
-                    .setImmutableRows(isImmutableRows)
-                    .setViewStatement(viewStatement)
-                    .setDisableWAL(Boolean.TRUE.equals(disableWAL))
-                    .setMultiTenant(multiTenant)
-                    .setStoreNulls(storeNulls)
-                    .setViewType(viewType)
-                    .setViewIndexIdType(viewIndexIdType)
-                    .setViewIndexId(result.getViewIndexId())
-                    .setIndexType(indexType)
-                    .setTransactionProvider(transactionProvider)
-                    .setUpdateCacheFrequency(updateCacheFrequency)
-                    .setNamespaceMapped(isNamespaceMapped)
-                    .setAutoPartitionSeqName(autoPartitionSeq)
-                    .setAppendOnlySchema(isAppendOnlySchema)
-                    .setImmutableStorageScheme(immutableStorageScheme == null ?
-                            ImmutableStorageScheme.ONE_CELL_PER_COLUMN : immutableStorageScheme)
-                    .setQualifierEncodingScheme(encodingScheme == null ?
-                            QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : encodingScheme)
-                    .setBaseColumnCount(baseTableColumnCount)
-                    .setEncodedCQCounter(cqCounterToBe)
-                    .setUseStatsForParallelization(useStatsForParallelizationProp)
-                    .setExcludedColumns(ImmutableList.<PColumn>of())
-                    .setTenantId(tenantId)
-                    .setSchemaName(newSchemaName)
-                    .setTableName(PNameFactory.newName(tableName))
-                    .setPkName(pkName == null ? null : PNameFactory.newName(pkName))
-                    .setDefaultFamilyName(defaultFamilyName == null ?
-                            null : PNameFactory.newName(defaultFamilyName))
-                    .setRowKeyOrderOptimizable(rowKeyOrderOptimizable)
-                    .setBucketNum(saltBucketNum)
-                    .setIndexes(Collections.<PTable>emptyList())
-                    .setParentSchemaName((parent == null) ? null : parent.getSchemaName())
-                    .setParentTableName((parent == null) ? null : parent.getTableName())
-                    .setPhysicalNames(physicalNames == null ?
-                            ImmutableList.<PName>of() : ImmutableList.copyOf(physicalNames))
-                    .setColumns(columns.values())
-                    .setPhoenixTTL(phoenixTTL == null ? PHOENIX_TTL_NOT_DEFINED : phoenixTTL)
-                    .setPhoenixTTLHighWaterMark(phoenixTTLHighWaterMark == null ? MIN_PHOENIX_TTL_HWM : phoenixTTLHighWaterMark)
-                    .setViewModifiedUpdateCacheFrequency(tableType ==  PTableType.VIEW &&
-                            parent != null &&
-                            parent.getUpdateCacheFrequency() != updateCacheFrequency)
-                    .setViewModifiedUseStatsForParallelization(tableType ==  PTableType.VIEW &&
-                            parent != null &&
-                            parent.useStatsForParallelization()
-                                    != useStatsForParallelizationProp)
-                    .setViewModifiedPhoenixTTL(tableType ==  PTableType.VIEW &&
-                            parent != null && phoenixTTL != null &&
-                            parent.getPhoenixTTL() != phoenixTTL)
-                    .build();
+                .setType(tableType)
+                .setState(indexState)
+                .setTimeStamp(timestamp != null ? timestamp : result.getMutationTime())
+                .setIndexDisableTimestamp(0L)
+                .setSequenceNumber(PTable.INITIAL_SEQ_NUM)
+                .setImmutableRows(isImmutableRows)
+                .setViewStatement(viewStatement)
+                .setDisableWAL(Boolean.TRUE.equals(disableWAL))
+                .setMultiTenant(multiTenant)
+                .setStoreNulls(storeNulls)
+                .setViewType(viewType)
+                .setViewIndexIdType(viewIndexIdType)
+                .setViewIndexId(result.getViewIndexId())
+                .setIndexType(indexType)
+                .setTransactionProvider(transactionProvider)
+                .setUpdateCacheFrequency(updateCacheFrequency)
+                .setNamespaceMapped(isNamespaceMapped)
+                .setAutoPartitionSeqName(autoPartitionSeq)
+                .setAppendOnlySchema(isAppendOnlySchema)
+                .setImmutableStorageScheme(immutableStorageScheme == null ?
+                    ImmutableStorageScheme.ONE_CELL_PER_COLUMN : immutableStorageScheme)
+                .setQualifierEncodingScheme(encodingScheme == null ?
+                    QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : encodingScheme)
+                .setBaseColumnCount(baseTableColumnCount)
+                .setEncodedCQCounter(cqCounterToBe)
+                .setUseStatsForParallelization(useStatsForParallelizationProp)
+                .setExcludedColumns(ImmutableList.<PColumn>of())
+                .setTenantId(tenantId)
+                .setSchemaName(newSchemaName)
+                .setTableName(PNameFactory.newName(tableName))
+                .setPkName(pkName == null ? null : PNameFactory.newName(pkName))
+                .setDefaultFamilyName(defaultFamilyName == null ?
+                    null : PNameFactory.newName(defaultFamilyName))
+                .setRowKeyOrderOptimizable(rowKeyOrderOptimizable)
+                .setBucketNum(saltBucketNum)
+                .setIndexes(Collections.<PTable>emptyList())
+                .setParentSchemaName((parent == null) ? null : parent.getSchemaName())
+                .setParentTableName((parent == null) ? null : parent.getTableName())
+                .setPhysicalNames(physicalNames == null ?
+                    ImmutableList.<PName>of() : ImmutableList.copyOf(physicalNames))
+                .setColumns(columns.values())
+                .setPhoenixTTL(phoenixTTL == null ? PHOENIX_TTL_NOT_DEFINED : phoenixTTL)
+                .setPhoenixTTLHighWaterMark(phoenixTTLHighWaterMark == null ? MIN_PHOENIX_TTL_HWM : phoenixTTLHighWaterMark)
+                .setViewModifiedUpdateCacheFrequency(tableType == PTableType.VIEW &&
+                    parent != null &&
+                    parent.getUpdateCacheFrequency() != updateCacheFrequency)
+                .setViewModifiedUseStatsForParallelization(tableType == PTableType.VIEW &&
+                    parent != null &&
+                    parent.useStatsForParallelization()
+                        != useStatsForParallelizationProp)
+                .setViewModifiedPhoenixTTL(tableType == PTableType.VIEW &&
+                    parent != null && phoenixTTL != null &&
+                    parent.getPhoenixTTL() != phoenixTTL)
+                .setLastDDLTimestamp(result.getTable() != null ?
+                    result.getTable().getLastDDLTimestamp() : null)
+                .build();
             result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
             addTableToCache(result);
             return table;
@@ -3167,10 +3169,10 @@ public class MetaDataClient {
                  PTable parent) throws SQLException {
         switch(code) {
             case TABLE_ALREADY_EXISTS:
-                if(result.getTable() != null) {
+                if (result.getTable() != null) {
                     addTableToCache(result);
                 }
-                if(!statement.ifNotExists()) {
+                if (!statement.ifNotExists()) {
                     throw new TableAlreadyExistsException(schemaName, tableName, result.getTable());
                 }
                 return true;
@@ -3684,7 +3686,7 @@ public class MetaDataClient {
         // if cascade keyword is passed and indexes are provided either implicitly or explicitly
         if (cascade && (indexes == null || !indexes.isEmpty())) {
             indexesPTable = getIndexesPTableForCascade(indexes, table);
-            for(PTable index : indexesPTable) {
+            for (PTable index : indexesPTable) {
                 indexToColumnSizeMap.put(index, index.getColumns().size());
             }
         }
@@ -3800,9 +3802,9 @@ public class MetaDataClient {
                     String addColumnSqlToUse = INSERT_COLUMN_CREATE_TABLE;
                     try (PreparedStatement colUpsert = connection.prepareStatement(addColumnSqlToUse)) {
                         short nextKeySeq = SchemaUtil.getMaxKeySeq(table);
-                        for( ColumnDef colDef : columnDefs) {
+                        for ( ColumnDef colDef : columnDefs) {
                             if (colDef != null && !colDef.isNull()) {
-                                if(colDef.isPK()) {
+                                if (colDef.isPK()) {
                                     throw new SQLExceptionInfo.Builder(SQLExceptionCode.NOT_NULLABLE_COLUMN_IN_ROW_KEY)
                                     .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                                 } else if (!willBeImmutableRows) {
@@ -4137,7 +4139,7 @@ public class MetaDataClient {
 
         // when indexes is null, that means ALL keyword is passed and
         // we ll collect all global indexes for cascading
-        if(indexes == null) {
+        if (indexes == null) {
             indexesPTable.addAll(table.getIndexes());
             for (PTable index : table.getIndexes()) {
                 // a child view has access to its parents indexes,
@@ -4157,7 +4159,7 @@ public class MetaDataClient {
             // if all the names in parameter list are correct, indexesParam list should be empty
             // by end of the loop
             for (PTable index : table.getIndexes()) {
-                if(index.getIndexType().equals(IndexType.LOCAL)) {
+                if (index.getIndexType().equals(IndexType.LOCAL)) {
                     throw new SQLExceptionInfo
                             .Builder(SQLExceptionCode.NOT_SUPPORTED_CASCADE_FEATURE_LOCAL_INDEX)
                             .setTableName(index.getName().getString())
@@ -4231,7 +4233,7 @@ public class MetaDataClient {
         buf.append("(" +
                 TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + "," +
                 COLUMN_NAME + ", " + COLUMN_FAMILY + ") IN (");
-        for(PColumn columnToDrop : columnsToDrop) {
+        for (PColumn columnToDrop : columnsToDrop) {
             buf.append("('" + tenantId + "'");
             buf.append(",'" + schemaName + "'");
             buf.append(",'" + tableName + "'");
@@ -4257,7 +4259,7 @@ public class MetaDataClient {
         colUpdate.setString(3, tableName);
         for (int i = columnsToDrop.get(columnsToDropIndex).getPosition() + 1; i < table.getColumns().size(); i++) {
             PColumn column = table.getColumns().get(i);
-            if(columnsToDrop.contains(column)) {
+            if (columnsToDrop.contains(column)) {
                 columnsToDropIndex++;
                 continue;
             }
@@ -4327,7 +4329,7 @@ public class MetaDataClient {
                 physicalTableName = SchemaUtil.getTableNameFromFullName(physicalName.getString());
 
                 List<ColumnName> columnRefs = statement.getColumnRefs();
-                if(columnRefs == null) {
+                if (columnRefs == null) {
                     columnRefs = Lists.newArrayListWithCapacity(0);
                 }
                 List<ColumnRef> columnsToDrop = Lists.newArrayListWithExpectedSize(columnRefs.size() + table.getIndexes().size());
@@ -4335,7 +4337,7 @@ public class MetaDataClient {
                 List<Mutation> tableMetaData = Lists.newArrayListWithExpectedSize((table.getIndexes().size() + 1) * (1 + table.getColumns().size() - columnRefs.size()));
                 List<PColumn>  tableColumnsToDrop = Lists.newArrayListWithExpectedSize(columnRefs.size());
 
-                for(ColumnName column : columnRefs) {
+                for (ColumnName column : columnRefs) {
                     ColumnRef columnRef = null;
                     try {
                         columnRef = resolver.resolveColumn(null, column.getFamilyName(), column.getColumnName());
@@ -4377,7 +4379,7 @@ public class MetaDataClient {
                     List<PColumn> indexColumnsToDrop = Lists.newArrayListWithExpectedSize(columnRefs.size());
                     Set<Pair<String, String>> indexedColsInfo = indexMaintainer.getIndexedColumnInfo();
                     Set<ColumnReference> coveredCols = indexMaintainer.getCoveredColumns();
-                    for(PColumn columnToDrop : tableColumnsToDrop) {
+                    for (PColumn columnToDrop : tableColumnsToDrop) {
                         Pair<String, String> columnToDropInfo = new Pair<>(columnToDrop.getFamilyName().getString(), columnToDrop.getName().getString());
                         ColumnReference colDropRef = new ColumnReference(columnToDrop.getFamilyName() == null ? null
                                 : columnToDrop.getFamilyName().getBytes(), columnToDrop.getColumnQualifierBytes());
@@ -4397,7 +4399,7 @@ public class MetaDataClient {
                             removedIndexTableOrColumn = true;
                         }
                     }
-                    if(!indexColumnsToDrop.isEmpty()) {
+                    if (!indexColumnsToDrop.isEmpty()) {
                         long indexTableSeqNum = incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size(), null, null, null);
                         dropColumnMutations(index, indexColumnsToDrop);
                         long clientTimestamp = MutationState.getTableTimestamp(timeStamp, connection.getSCN());
@@ -4463,10 +4465,9 @@ public class MetaDataClient {
                     // client-side cache as it would be too painful. Just let it pull it over from
                     // the server when needed.
                     if (tableColumnsToDrop.size() > 0) {
-                        if (removedIndexTableOrColumn)
-                            connection.removeTable(tenantId, tableName, table.getParentName() == null ? null : table.getParentName().getString(), table.getTimeStamp());
-                        else
-                            connection.removeColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, result.getMutationTime(), seqNum, TransactionUtil.getResolvedTime(connection, result));
+                        //need to remove the cached table because the DDL timestamp changed. We
+                        // also need to remove it if we dropped an indexed column
+                        connection.removeTable(tenantId, tableName, table.getParentName() == null ? null : table.getParentName().getString(), table.getTimeStamp());
                     }
                     // If we have a VIEW, then only delete the metadata, and leave the table data alone
                     if (table.getType() != PTableType.VIEW) {
@@ -4636,7 +4637,7 @@ public class MetaDataClient {
             TableRef indexRef = FromCompiler.getResolver(statement, connection).getTables().get(0);
             PreparedStatement tableUpsert = null;
             try {
-                if(newIndexState == PIndexState.ACTIVE){
+                if (newIndexState == PIndexState.ACTIVE){
                     tableUpsert = connection.prepareStatement(UPDATE_INDEX_STATE_TO_ACTIVE);
                 }else{
                     tableUpsert = connection.prepareStatement(UPDATE_INDEX_STATE);
@@ -4646,12 +4647,12 @@ public class MetaDataClient {
                 tableUpsert.setString(3, indexName);
                 tableUpsert.setString(4, newIndexState.getSerializedValue());
                 tableUpsert.setLong(5, 0);
-                if(newIndexState == PIndexState.ACTIVE){
+                if (newIndexState == PIndexState.ACTIVE){
                     tableUpsert.setLong(6, 0);
                 }
                 tableUpsert.execute();
             } finally {
-                if(tableUpsert != null) {
+                if (tableUpsert != null) {
                     tableUpsert.close();
                 }
             }
@@ -4804,7 +4805,7 @@ public class MetaDataClient {
     }
 
     private void addFunctionToCache(MetaDataMutationResult result) throws SQLException {
-        for(PFunction function: result.getFunctions()) {
+        for (PFunction function: result.getFunctions()) {
             connection.addFunction(function);
         }
     }
@@ -4950,7 +4951,7 @@ public class MetaDataClient {
 
             if (changePermsStatement.getSchemaName() != null) {
                 // SYSTEM.CATALOG doesn't have any entry for "default" HBase namespace, hence we will bypass the check
-                if(!changePermsStatement.getSchemaName().equals(SchemaUtil.SCHEMA_FOR_DEFAULT_NAMESPACE)) {
+                if (!changePermsStatement.getSchemaName().equals(SchemaUtil.SCHEMA_FOR_DEFAULT_NAMESPACE)) {
                     FromCompiler.getResolverForSchema(changePermsStatement.getSchemaName(), connection);
                 }
 
@@ -4984,7 +4985,7 @@ public class MetaDataClient {
     }
 
     private void changePermsOnSchema(ClusterConnection clusterConnection, ChangePermsStatement changePermsStatement) throws Throwable {
-        if(changePermsStatement.isGrantStatement()) {
+        if (changePermsStatement.isGrantStatement()) {
             AccessControlClient.grant(clusterConnection, changePermsStatement.getSchemaName(), changePermsStatement.getName(), changePermsStatement.getPermsList());
         } else {
             AccessControlClient.revoke(clusterConnection, changePermsStatement.getSchemaName(), changePermsStatement.getName(), Permission.Action.values());
@@ -5001,14 +5002,14 @@ public class MetaDataClient {
         boolean schemaInconsistency = false;
         List<PTable> inconsistentTables = null;
 
-        for(PTable indexTable : inputTable.getIndexes()) {
+        for (PTable indexTable : inputTable.getIndexes()) {
             // Local Indexes don't correspond to new physical table, they are just stored in separate CF of base table.
-            if(indexTable.getIndexType().equals(IndexType.LOCAL)) {
+            if (indexTable.getIndexType().equals(IndexType.LOCAL)) {
                 continue;
             }
             if (inputTable.isNamespaceMapped() != indexTable.isNamespaceMapped()) {
                 schemaInconsistency = true;
-                if(inconsistentTables == null) {
+                if (inconsistentTables == null) {
                     inconsistentTables = new ArrayList<>();
                 }
                 inconsistentTables.add(indexTable);
@@ -5020,8 +5021,8 @@ public class MetaDataClient {
             changePermsOnTable(clusterConnection, changePermsStatement, tableName);
         }
 
-        if(schemaInconsistency) {
-            for(PTable table : inconsistentTables) {
+        if (schemaInconsistency) {
+            for (PTable table : inconsistentTables) {
                 LOGGER.error("Fail to propagate permissions to Index Table: " + table.getName());
             }
             throw new TablesNotInSyncException(inputTable.getTableName().getString(),
@@ -5032,12 +5033,12 @@ public class MetaDataClient {
         byte[] viewIndexTableBytes = MetaDataUtil.getViewIndexPhysicalName(inputTable.getPhysicalName().getBytes());
         tableName = org.apache.hadoop.hbase.TableName.valueOf(viewIndexTableBytes);
         boolean viewIndexTableExists = admin.tableExists(tableName);
-        if(viewIndexTableExists) {
+        if (viewIndexTableExists) {
             LOGGER.info("Updating permissions for View Index Table: " +
                     Bytes.toString(viewIndexTableBytes) + " Base Table: " + inputTable.getName());
             changePermsOnTable(clusterConnection, changePermsStatement, tableName);
         } else {
-            if(inputTable.isMultiTenant()) {
+            if (inputTable.isMultiTenant()) {
                 LOGGER.error("View Index Table not found for MultiTenant Table: " + inputTable.getName());
                 LOGGER.error("Fail to propagate permissions to view Index Table: " + tableName.getNameAsString());
                 throw new TablesNotInSyncException(inputTable.getTableName().getString(),
@@ -5048,7 +5049,7 @@ public class MetaDataClient {
 
     private void changePermsOnTable(ClusterConnection clusterConnection, ChangePermsStatement changePermsStatement, org.apache.hadoop.hbase.TableName tableName)
             throws Throwable {
-        if(changePermsStatement.isGrantStatement()) {
+        if (changePermsStatement.isGrantStatement()) {
             AccessControlClient.grant(clusterConnection, tableName, changePermsStatement.getName(),
                     null, null, changePermsStatement.getPermsList());
         } else {
@@ -5059,7 +5060,7 @@ public class MetaDataClient {
 
     private void changePermsOnUser(ClusterConnection clusterConnection, ChangePermsStatement changePermsStatement)
             throws Throwable {
-        if(changePermsStatement.isGrantStatement()) {
+        if (changePermsStatement.isGrantStatement()) {
             AccessControlClient.grant(clusterConnection, changePermsStatement.getName(), changePermsStatement.getPermsList());
         } else {
             AccessControlClient.revoke(clusterConnection, changePermsStatement.getName(), Permission.Action.values());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index d15fe2c..c8b3e36 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -638,7 +638,8 @@ public interface PTable extends PMetaDataEntity {
     
     /**
      * Get the column with the given column qualifier.
-     * @param column qualifier bytes
+     * @param cf column family bytes
+     * @param cq qualifier bytes
      * @return the PColumn with the given column qualifier
      * @throws ColumnNotFoundException if no column with the given column qualifier can be found
      * @throws AmbiguousColumnException if multiple columns are found with the given column qualifier
@@ -820,6 +821,12 @@ public interface PTable extends PMetaDataEntity {
     boolean hasViewModifiedPhoenixTTL();
 
     /**
+     * @return the last timestamp at which this entity had its data shape created or modified (e
+     * .g, create entity, adding or dropping a column. Not affected by changing table properties
+     */
+    Long getLastDDLTimestamp();
+
+    /**
      * Class to help track encoded column qualifier counters per column family.
      */
     public class EncodedCQCounter {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 139f084..a67a218 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -198,6 +198,7 @@ public class PTableImpl implements PTable {
     private final long phoenixTTL;
     private final long phoenixTTLHighWaterMark;
     private final BitSet viewModifiedPropSet;
+    private final Long lastDDLTimestamp;
     private Map<String, String> propertyValues;
 
     public static class Builder {
@@ -255,6 +256,7 @@ public class PTableImpl implements PTable {
         private Boolean useStatsForParallelization;
         private long phoenixTTL;
         private long phoenixTTLHighWaterMark;
+        private Long lastDDLTimestamp;
         private Map<String, String> propertyValues = new HashMap<>();
 
         // Used to denote which properties a view has explicitly modified
@@ -592,6 +594,11 @@ public class PTableImpl implements PTable {
             return this;
         }
 
+        public Builder setLastDDLTimestamp(Long lastDDLTimestamp) {
+            this.lastDDLTimestamp = lastDDLTimestamp;
+            return this;
+        }
+
         /**
          * Populate derivable attributes of the PTable
          * @return PTableImpl.Builder object
@@ -855,6 +862,7 @@ public class PTableImpl implements PTable {
         this.phoenixTTLHighWaterMark = builder.phoenixTTLHighWaterMark;
         this.viewModifiedPropSet = builder.viewModifiedPropSet;
         this.propertyValues = builder.propertyValues;
+        this.lastDDLTimestamp = builder.lastDDLTimestamp;
     }
 
     // When cloning table, ignore the salt column as it will be added back in the constructor
@@ -926,7 +934,8 @@ public class PTableImpl implements PTable {
                 .setViewModifiedUpdateCacheFrequency(table.hasViewModifiedUpdateCacheFrequency())
                 .setViewModifiedPhoenixTTL(table.hasViewModifiedPhoenixTTL())
                 .setPhoenixTTL(table.getPhoenixTTL())
-                .setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark());
+                .setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark())
+                .setLastDDLTimestamp(table.getLastDDLTimestamp());
     }
 
     @Override
@@ -1766,6 +1775,10 @@ public class PTableImpl implements PTable {
         if (table.hasViewModifiedPhoenixTTL()) {
             viewModifiedPhoenixTTL = table.getViewModifiedPhoenixTTL();
         }
+        Long lastDDLTimestamp = null;
+        if (table.hasLastDDLTimestamp()) {
+            lastDDLTimestamp = table.getLastDDLTimestamp();
+        }
         try {
             return new PTableImpl.Builder()
                     .setType(tableType)
@@ -1815,6 +1828,7 @@ public class PTableImpl implements PTable {
                     .setViewModifiedUpdateCacheFrequency(viewModifiedUpdateCacheFrequency)
                     .setViewModifiedUseStatsForParallelization(viewModifiedUseStatsForParallelization)
                     .setViewModifiedPhoenixTTL(viewModifiedPhoenixTTL)
+                    .setLastDDLTimestamp(lastDDLTimestamp)
                     .build();
         } catch (SQLException e) {
             throw new RuntimeException(e); // Impossible
@@ -1923,6 +1937,9 @@ public class PTableImpl implements PTable {
       builder.setViewModifiedUpdateCacheFrequency(table.hasViewModifiedUpdateCacheFrequency());
       builder.setViewModifiedUseStatsForParallelization(table.hasViewModifiedUseStatsForParallelization());
       builder.setViewModifiedPhoenixTTL(table.hasViewModifiedPhoenixTTL());
+      if (table.getLastDDLTimestamp() != null) {
+          builder.setLastDDLTimestamp(table.getLastDDLTimestamp());
+      }
       return builder.build();
     }
 
@@ -2040,6 +2057,11 @@ public class PTableImpl implements PTable {
         return viewModifiedPropSet.get(VIEW_MODIFIED_PHOENIX_TTL_BIT_SET_POS);
     }
 
+    @Override
+    public Long getLastDDLTimestamp() {
+        return lastDDLTimestamp;
+    }
+
     private static final class KVColumnFamilyQualifier {
         @Nonnull
         private final String colFamilyName;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index 268d72b..637d2f5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -107,6 +107,29 @@ public class MetaDataUtil {
             HColumnDescriptor.KEEP_DELETED_CELLS,
             HColumnDescriptor.REPLICATION_SCOPE);
 
+    public static Put getLastDDLTimestampUpdate(byte[] tableHeaderRowKey,
+                                                     long clientTimestamp,
+                                                     long lastDDLTimestamp) {
+        //use client timestamp as the timestamp of the Cell, to match the other Cells that might
+        // be created by this DDL. But the actual value will be a _server_ timestamp
+        Put p = new Put(tableHeaderRowKey, clientTimestamp);
+        byte[] lastDDLTimestampBytes = PLong.INSTANCE.toBytes(lastDDLTimestamp);
+        p.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+            PhoenixDatabaseMetaData.LAST_DDL_TIMESTAMP_BYTES, lastDDLTimestampBytes);
+        return p;
+    }
+
+    /**
+     * Checks if a table is meant to be queried directly (and hence is relevant to external
+     * systems tracking Phoenix schema)
+     * @param tableType
+     * @return True if a table or view, false otherwise (such as for an index, system table, or
+     * subquery)
+     */
+    public static boolean isTableTypeDirectlyQueried(PTableType tableType) {
+        return tableType.equals(PTableType.TABLE) || tableType.equals(PTableType.VIEW);
+    }
+
     public static class ClientServerCompatibility {
 
         private int errorCode;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index d17ef96..c453dd1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -277,6 +277,12 @@ public class SchemaUtil {
         return l3;
     }
 
+    public static byte[] getTableKey(PTable dataTable) {
+        PName tenantId = dataTable.getTenantId();
+        PName schemaName = dataTable.getSchemaName();
+        return getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY : schemaName.getBytes(), dataTable.getTableName().getBytes());
+    }
+
     /**
      * Get the key used in the Phoenix metadata row for a table definition
      * @param schemaName
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 207d472..58c2757 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -31,6 +31,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_DDL_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
@@ -45,6 +46,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCH
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_CAT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
@@ -2586,4 +2588,23 @@ public class UpgradeUtil {
     public static void doNotUpgradeOnFirstConnection(Properties props) {
         props.setProperty(DO_NOT_UPGRADE, String.valueOf(true));
     }
+
+    //When upgrading to Phoenix 4.16, make each existing table's DDL timestamp equal to its last
+    // updated row timestamp.
+    public static void bootstrapLastDDLTimestamp(Connection metaConnection) throws SQLException  {
+        String pkCols = TENANT_ID + ", " + TABLE_SCHEM +
+            ", " + TABLE_NAME + ", " + COLUMN_NAME + ", " + COLUMN_FAMILY;
+        final String upsertSql =
+            "UPSERT INTO " + SYSTEM_CATALOG_NAME + " (" + pkCols + ", " +
+        LAST_DDL_TIMESTAMP + ")" + " " +
+            "SELECT " + pkCols + ", PHOENIX_ROW_TIMESTAMP() FROM " + SYSTEM_CATALOG_NAME + " " +
+                "WHERE " + TABLE_TYPE + " " + " in " + "('" + PTableType.TABLE.getSerializedValue()
+                + "', '" + PTableType.VIEW.getSerializedValue() + "')";
+        LOGGER.info("Setting DDL timestamps for tables and views to row timestamps");
+        try (PreparedStatement stmt = metaConnection.prepareStatement(upsertSql)) {
+            stmt.execute();
+            metaConnection.commit();
+        }
+        LOGGER.info("Setting DDL timestamps for tables and views is complete");
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
index 0915da7..5ce60e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
@@ -654,10 +654,15 @@ public class ViewUtil {
         }
 
         long maxTableTimestamp = view.getTimeStamp();
+        long maxDDLTimestamp = view.getLastDDLTimestamp() != null ? view.getLastDDLTimestamp() : 0L;
         int numPKCols = view.getPKColumns().size();
-        // set the final table timestamp as the max timestamp of the view/view index or its
-        // ancestors
+        // set the final table timestamp and DDL timestamp as the respective max timestamps of the
+        // view/view index or its ancestors
         maxTableTimestamp = Math.max(maxTableTimestamp, parentTable.getTimeStamp());
+
+        maxDDLTimestamp = Math.max(maxDDLTimestamp,
+                parentTable.getLastDDLTimestamp() != null ? parentTable.getLastDDLTimestamp() : 0L);
+
         if (hasIndexId) {
             // add all pk columns of parent tables to indexes
             // skip salted column as it will be added from the base table columns
@@ -762,6 +767,7 @@ public class ViewUtil {
                 .setExcludedColumns(ImmutableList.copyOf(excludedColumns))
                 .setUpdateCacheFrequency(updateCacheFreq)
                 .setUseStatsForParallelization(useStatsForParallelization)
+                .setLastDDLTimestamp(maxDDLTimestamp)
                 .build();
         pTable = WhereConstantParser.addViewInfoToPColumnsIfNeeded(pTable);
         
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java
index 57f4bf1..f0189fd 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java
@@ -18,9 +18,12 @@
 package org.apache.phoenix.util;
 
 import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.VIEW_MODIFIED_PROPERTY_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_DDL_TIMESTAMP_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY_BYTES;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -30,7 +33,6 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
@@ -39,8 +41,10 @@ import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.HBaseFactoryProvider;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
 import org.junit.Test;
@@ -284,6 +288,27 @@ public class MetaDataUtilTest {
         assertFalse(tagIterator.hasNext());
     }
 
+    @Test
+    public void testGetLastDDLTimestampUpdate() throws Exception {
+      byte[] tableHeaderRowKey = SchemaUtil.getTableKey("TenantId", "schema", "table");
+      long serverTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+        long clientTimestamp = serverTimestamp - 1000L;
+      Put p = MetaDataUtil.getLastDDLTimestampUpdate(tableHeaderRowKey, clientTimestamp,
+          serverTimestamp);
+      assertNotNull(p);
+      assertFalse("Mutation is empty!", p.isEmpty());
+      assertArrayEquals(tableHeaderRowKey, p.getRow());
+      assertEquals(clientTimestamp, p.getTimeStamp());
+      assertTrue(p.cellScanner().advance());
+      List<Cell> cells = p.get(TABLE_FAMILY_BYTES, LAST_DDL_TIMESTAMP_BYTES);
+      assertNotNull(cells);
+      assertTrue(cells.size() > 0);
+      Cell c = cells.get(0);
+      assertNotNull("Cell is null!", c);
+      assertEquals(serverTimestamp, PLong.INSTANCE.getCodec().decodeLong(CellUtil.cloneValue(c),
+          0, SortOrder.ASC));
+    }
+
     private static byte[] concatTags(byte[] tags, Cell cell) {
         int cellTagsLen = cell.getTagsLength();
         if (cellTagsLen == 0) {
diff --git a/phoenix-protocol/src/main/MetaDataService.proto b/phoenix-protocol/src/main/MetaDataService.proto
index cbc53d6..80644e0 100644
--- a/phoenix-protocol/src/main/MetaDataService.proto
+++ b/phoenix-protocol/src/main/MetaDataService.proto
@@ -144,6 +144,7 @@ message AddColumnRequest {
   repeated bytes tableMetadataMutations = 1;
   optional int32 clientVersion = 2;
   optional PTable parentTable = 3;
+  optional bool addingColumns = 4;
 }
 
 message DropColumnRequest {
diff --git a/phoenix-protocol/src/main/PTable.proto b/phoenix-protocol/src/main/PTable.proto
index 37fdd1d..95e90cb 100644
--- a/phoenix-protocol/src/main/PTable.proto
+++ b/phoenix-protocol/src/main/PTable.proto
@@ -109,6 +109,7 @@ message PTable {
   optional int64 phoenixTTL = 42;
   optional int64 phoenixTTLHighWaterMark = 43;
   optional bool viewModifiedPhoenixTTL = 44;
+  optional int64 lastDDLTimestamp=45;
 }
 
 message EncodedCQCounter {