You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2021/11/10 13:17:01 UTC

[phoenix] branch 4.x updated: PHOENIX-6227 - Option for DDL changes to export to external schema repository (#1341)

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new bfbe8d9  PHOENIX-6227 - Option for DDL changes to export to external schema repository (#1341)
bfbe8d9 is described below

commit bfbe8d9a3cffca518d73462b15356a4e353eb686
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Wed Nov 10 08:16:53 2021 -0500

    PHOENIX-6227 - Option for DDL changes to export to external schema repository (#1341)
---
 .../end2end/AlterMultiTenantTableWithViewsIT.java  |  63 +++-
 .../org/apache/phoenix/end2end/AlterTableIT.java   |  96 +++++
 .../org/apache/phoenix/end2end/CreateTableIT.java  |  21 ++
 .../apache/phoenix/end2end/LogicalTableNameIT.java |  32 +-
 .../phoenix/end2end/SchemaRegistryFailureIT.java   | 135 +++++++
 .../it/java/org/apache/phoenix/end2end/ViewIT.java |  94 ++++-
 .../apache/phoenix/end2end/WALAnnotationIT.java    |  85 ++---
 .../coprocessor/IndexToolVerificationResult.java   |   1 -
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  | 416 ++++++++++++++++-----
 .../phoenix/coprocessor/MetaDataProtocol.java      |   7 +-
 .../UngroupedAggregateRegionScanner.java           |  15 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |   4 +-
 .../org/apache/phoenix/execute/MutationState.java  |  13 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |   3 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |   7 +-
 .../org/apache/phoenix/query/QueryConstants.java   |   2 +
 .../org/apache/phoenix/schema/DelegateTable.java   |   5 +
 .../org/apache/phoenix/schema/MetaDataClient.java  |  11 +-
 .../java/org/apache/phoenix/schema/PTable.java     |   6 +
 .../java/org/apache/phoenix/schema/PTableImpl.java |  58 ++-
 .../export/DefaultSchemaRegistryRepository.java    |  91 +++++
 .../phoenix/schema/export/DefaultSchemaWriter.java |  42 +++
 .../phoenix/schema/export/SchemaImporter.java      |  39 ++
 .../schema/export/SchemaRegistryRepository.java    |  70 ++++
 .../export/SchemaRegistryRepositoryFactory.java    |  67 ++++
 .../apache/phoenix/schema/export/SchemaWriter.java |  51 +++
 .../phoenix/schema/export/SchemaWriterFactory.java |  41 ++
 .../java/org/apache/phoenix/util/MetaDataUtil.java | 146 +++++++-
 .../java/org/apache/phoenix/util/ScanUtil.java     |  14 +-
 .../java/org/apache/phoenix/util/ViewUtil.java     |  65 +++-
 .../org/apache/phoenix/util/WALAnnotationUtil.java |  13 +-
 .../src/main/protobuf/MetaDataService.proto        |   1 +
 phoenix-core/src/main/protobuf/PTable.proto        |   1 +
 33 files changed, 1477 insertions(+), 238 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterMultiTenantTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterMultiTenantTableWithViewsIT.java
index 92dbaaf..7abec5a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterMultiTenantTableWithViewsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterMultiTenantTableWithViewsIT.java
@@ -45,11 +45,13 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ViewUtil;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.apache.phoenix.thirdparty.com.google.common.base.Objects;
@@ -82,7 +84,66 @@ public class AlterMultiTenantTableWithViewsIT extends SplitSystemCatalogIT {
         assertFalse(rs.next());
         assertEquals(values.length, i - 1);
     }
-    
+
+    @Test
+    public void testCreateAndAlterViewsWithChangeDetectionEnabled() throws Exception {
+        String tenantId = "TE_" + generateUniqueName();
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "T_" + generateUniqueName();
+        String globalViewName = "GV_" + generateUniqueName();
+        String tenantViewName = "TV_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullGlobalViewName = SchemaUtil.getTableName(schemaName, globalViewName);
+        String fullTenantViewName = SchemaUtil.getTableName(schemaName, tenantViewName);
+
+        PTable globalView = null;
+        PTable alteredGlobalView = null;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String ddl = "CREATE TABLE " + fullTableName +
+                " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL," +
+                " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) " +
+                "MULTI_TENANT=true, CHANGE_DETECTION_ENABLED=true";
+            conn.createStatement().execute(ddl);
+            PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+            assertTrue(table.isChangeDetectionEnabled());
+            AlterTableIT.verifySchemaExport(table, getUtility().getConfiguration());
+
+            String globalViewDdl = "CREATE VIEW " + fullGlobalViewName +
+                " (id2 CHAR(12) NOT NULL PRIMARY KEY, col3 BIGINT NULL) " +
+                " AS SELECT * FROM " + fullTableName + " CHANGE_DETECTION_ENABLED=true";
+
+            conn.createStatement().execute(globalViewDdl);
+            globalView = PhoenixRuntime.getTableNoCache(conn, fullGlobalViewName);
+            assertTrue(globalView.isChangeDetectionEnabled());
+            //   base column count doesn't get set properly
+            PTableImpl.Builder builder = PTableImpl.builderFromExisting(globalView);
+            builder.setBaseColumnCount(table.getColumns().size());
+            globalView = builder.setColumns(globalView.getColumns()).build();
+            AlterTableIT.verifySchemaExport(globalView, getUtility().getConfiguration());
+
+            String alterViewDdl = "ALTER VIEW " + fullGlobalViewName + " ADD id3 VARCHAR(12) NULL "
+                + "PRIMARY KEY, " + " col4 BIGINT NULL";
+            conn.createStatement().execute(alterViewDdl);
+            alteredGlobalView = PhoenixRuntime.getTableNoCache(conn, fullGlobalViewName);
+
+            assertTrue(alteredGlobalView.isChangeDetectionEnabled());
+            AlterTableIT.verifySchemaExport(alteredGlobalView, getUtility().getConfiguration());
+        }
+
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)) {
+            String tenantViewDdl = "CREATE VIEW " + fullTenantViewName +
+                " (col5 VARCHAR NULL) " +
+                " AS SELECT * FROM " + fullGlobalViewName + " CHANGE_DETECTION_ENABLED=true";
+            tenantConn.createStatement().execute(tenantViewDdl);
+            PTable tenantView = PhoenixRuntime.getTableNoCache(tenantConn, fullTenantViewName);
+            assertTrue(tenantView.isChangeDetectionEnabled());
+            PTable tenantViewWithParents = ViewUtil.addDerivedColumnsFromParent(tenantView, alteredGlobalView);
+            AlterTableIT.verifySchemaExport(tenantViewWithParents, getUtility().getConfiguration());
+        }
+    }
+
     @Test
     public void testAddDropColumnToBaseTablePropagatesToEntireViewHierarchy() throws Exception {
         String baseTable = SchemaUtil.getTableName(SCHEMA1, generateUniqueName());
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 3c63e6d..85d92cb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -36,6 +36,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -46,6 +47,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Properties;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
@@ -64,6 +66,9 @@ import org.apache.phoenix.schema.PTable.EncodedCQCounter;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.export.DefaultSchemaRegistryRepository;
+import org.apache.phoenix.schema.export.DefaultSchemaWriter;
+import org.apache.phoenix.schema.export.SchemaRegistryRepositoryFactory;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
@@ -71,6 +76,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -230,7 +236,97 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
         }
     }
 
+    @Test
+    public void testAlterTableUpdatesSchemaRegistry() throws Exception {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String createDdl = "CREATE TABLE " + fullTableName +
+                " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL," +
+                " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) " +
+                "CHANGE_DETECTION_ENABLED=true, SCHEMA_VERSION='OLD'";
+            conn.createStatement().execute(createDdl);
+            PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+            assertEquals("OLD", table.getSchemaVersion());
+            String expectedSchemaId = String.format("global*%s*%s*OLD", schemaName, tableName);
+            assertEquals(expectedSchemaId, table.getExternalSchemaId());
+
+            String alterVersionDdl = "ALTER TABLE " + fullTableName + " SET SCHEMA_VERSION='NEW'";
+            conn.createStatement().execute(alterVersionDdl);
+
+            String alterDdl = "ALTER TABLE " + fullTableName +
+                " ADD col3 VARCHAR NULL";
+
+            conn.createStatement().execute(alterDdl);
+            PTable newTable = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+            verifySchemaExport(newTable, getUtility().getConfiguration());
+        }
+    }
+
+    @Test
+    public void testAlterChangeDetectionActivatesSchemaRegistryExport() throws Exception {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String createDdl = "CREATE TABLE " + fullTableName + " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL,"
+                + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) "
+                + " SCHEMA_VERSION='OLD'";
+            conn.createStatement().execute(createDdl);
+            PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+            Assert.assertNull(table.getExternalSchemaId());
+            String alterDdl = "ALTER TABLE " + fullTableName + " SET CHANGE_DETECTION_ENABLED=true";
+            conn.createStatement().execute(alterDdl);
+            PTable alteredTable = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+            assertTrue(alteredTable.isChangeDetectionEnabled());
+            verifySchemaExport(alteredTable, getUtility().getConfiguration());
+        }
+    }
 
+    @Test
+    public void testChangeDetectionFalseDoesntExportToSchemaRegistry() throws Exception {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String createDdl = "CREATE TABLE " + fullTableName + " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL,"
+                + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) "
+                + "CHANGE_DETECTION_ENABLED=false, SCHEMA_VERSION='OLD'";
+            conn.createStatement().execute(createDdl);
+            PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+            assertFalse(table.isChangeDetectionEnabled());
+            assertNull(table.getExternalSchemaId());
+        }
+    }
+
+    public static void verifySchemaExport(PTable newTable, Configuration conf) throws IOException {
+        assertEquals(DefaultSchemaRegistryRepository.getSchemaId(newTable),
+            newTable.getExternalSchemaId());
+        String expectedSchemaText = new DefaultSchemaWriter().exportSchema(newTable);
+        String actualSchemaText = SchemaRegistryRepositoryFactory.getSchemaRegistryRepository(
+            conf).getSchemaById(newTable.getExternalSchemaId());
+
+        //filter out table and column timestamp fields, which can vary by a few ms because
+        //HBase assigns the real server timestamp after we update the schema registry
+        String pattern = "(?i)\\s*timestamp:\\s\\d*";
+        expectedSchemaText = expectedSchemaText.replaceAll(pattern, "");
+        actualSchemaText = actualSchemaText.replaceAll(pattern, "");
+
+        //external schema id can be different because it's assigned at the registry AFTER
+        //we save it
+        String externalSchemaPattern = "(?i)\\s*externalSchemaId:\\s\".*\"";
+        expectedSchemaText = expectedSchemaText.replaceAll(externalSchemaPattern, "");
+        actualSchemaText = actualSchemaText.replaceAll(externalSchemaPattern, "");
+
+        //reconstructing the complete view sometimes messes up the base column count. It's not
+        //needed in an external schema registry. TODO: fix the base column count anyway
+        String baseColumnCountPattern = "(?i)\\s*baseColumnCount:\\s\".*\"";
+        expectedSchemaText = expectedSchemaText.replaceAll(baseColumnCountPattern, "");
+        actualSchemaText = expectedSchemaText.replaceAll(baseColumnCountPattern, "");
+
+        assertEquals(expectedSchemaText, actualSchemaText);
+    }
 
     @Test
     public void testSetPropertyAndAddColumnForNewColumnFamily() throws Exception {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index 8d6a372..e41b6ae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -61,6 +61,7 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SchemaNotFoundException;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.export.DefaultSchemaRegistryRepository;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -758,6 +759,26 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
     }
 
     @Test
+    public void testCreateChangeDetectionEnabledTable() throws Exception {
+        //create a table with CHANGE_DETECTION_ENABLED and verify both that it's set properly
+        //on the PTable, and that it gets persisted to the external schema registry
+
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String ddl = "CREATE TABLE " + fullTableName +
+                " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL," +
+                " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) " +
+                "CHANGE_DETECTION_ENABLED=true";
+            conn.createStatement().execute(ddl);
+            PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+            assertTrue(table.isChangeDetectionEnabled());
+            AlterTableIT.verifySchemaExport(table, getUtility().getConfiguration());
+        }
+    }
+
+    @Test
     public void testCreateIndexWithDifferentStorageAndEncoding() throws Exception {
         verifyIndexSchemeChange(false, false);
         verifyIndexSchemeChange(false, true);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
index c4f3c2e..729a8d7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LogicalTableNameIT.java
@@ -26,6 +26,8 @@ import org.apache.phoenix.end2end.join.HashJoinGlobalIndexIT;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -51,9 +53,7 @@ import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVA
 import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
 import static org.apache.phoenix.util.MetaDataUtil.VIEW_INDEX_TABLE_PREFIX;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
+import static org.junit.Assert.*;
 
 @RunWith(Parameterized.class)
 @Category(NeedsOwnMiniClusterTest.class)
@@ -430,6 +430,32 @@ public class LogicalTableNameIT extends LogicalTableNameBaseIT {
     }
 
     @Test
+    public void testChangeDetectionAfterTableNameChange() throws Exception {
+        try(Connection conn = getConnection(props)) {
+            String schemaName = "S_" + generateUniqueName();
+            String tableName = "T_" + generateUniqueName();
+            String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+            createTable(conn, fullTableName);
+            String alterDdl = "ALTER TABLE " + fullTableName + " SET CHANGE_DETECTION_ENABLED=true";
+            conn.createStatement().execute(alterDdl);
+
+            PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+            assertTrue(table.isChangeDetectionEnabled());
+            assertNotNull(table.getExternalSchemaId());
+            AlterTableIT.verifySchemaExport(table, getUtility().getConfiguration());
+
+            String newTableName = "T_" + generateUniqueName();
+            String fullNewTableName = SchemaUtil.getTableName(schemaName, newTableName);
+            LogicalTableNameIT.createAndPointToNewPhysicalTable(conn, fullTableName, fullNewTableName, false);
+
+            //logical table name should still be the same for PTable lookup
+            PTable newTable = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+            //but the schema in the registry should match the new PTable
+            AlterTableIT.verifySchemaExport(newTable, getUtility().getConfiguration());
+        }
+    }
+
+    @Test
     public void testHashJoin() throws Exception {
         if (immutable || createChildAfterRename) {
             return;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SchemaRegistryFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SchemaRegistryFailureIT.java
new file mode 100644
index 0000000..980def2
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SchemaRegistryFailureIT.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.export.SchemaRegistryRepository;
+import org.apache.phoenix.schema.export.SchemaWriter;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class SchemaRegistryFailureIT extends ParallelStatsDisabledIT{
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(60*60)); // An hour
+        props.put(SchemaRegistryRepository.SCHEMA_REGISTRY_IMPL_KEY,
+            ExplodingSchemaRegistryRepository.class.getName());
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testFailedCreateRollback() throws Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "T_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String ddl = "CREATE TABLE " + fullTableName + " (id char(1) NOT NULL," +
+                " col1 integer NOT NULL," + " col2 bigint NOT NULL,"
+                + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) "
+                + "MULTI_TENANT=true, CHANGE_DETECTION_ENABLED=true";
+            try {
+                conn.createStatement().execute(ddl);
+                Assert.fail("Should have thrown SQLException");
+            } catch (SQLException e) {
+                Assert.assertEquals(SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY.getErrorCode(),
+                    e.getErrorCode());
+            }
+
+            try {
+                PTable table = PhoenixRuntime.getTable(conn, fullTableName);
+                Assert.fail("Shouldn't have found the table because it shouldn't have been created");
+            } catch (TableNotFoundException tnfe) {
+                //eat the exception, which is what we expect
+            }
+        }
+    }
+
+    @Test
+    public void testFailedAlterRollback() throws Exception {
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "T_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String ddl = "CREATE TABLE " + fullTableName + " (id char(1) NOT NULL,"
+                + " col1 integer NOT NULL," + " col2 bigint NOT NULL,"
+                + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) " + "MULTI_TENANT=true";
+
+            conn.createStatement().execute(ddl);
+
+            String alterDdl = "ALTER TABLE " + fullTableName + " SET CHANGE_DETECTION_ENABLED=true";
+            try {
+                conn.createStatement().execute(alterDdl);
+                Assert.fail("Should have failed because of schema registry exception");
+            } catch (SQLException se) {
+                Assert.assertEquals(SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY.getErrorCode(),
+                    se.getErrorCode());
+            }
+            PTable table = PhoenixRuntime.getTable(conn, fullTableName);
+            Assert.assertFalse(table.isChangeDetectionEnabled());
+        }
+    }
+
+    public static class ExplodingSchemaRegistryRepository implements SchemaRegistryRepository {
+
+        //need a real default constructor for reflection
+        public ExplodingSchemaRegistryRepository() {
+
+        }
+
+        @Override public void init(Configuration conf) throws IOException {
+
+        }
+
+        @Override public String exportSchema(SchemaWriter writer, PTable table) throws IOException {
+            throw new IOException("I always explode!");
+        }
+
+        @Override public String getSchemaById(String schemaId) {
+            return null;
+        }
+
+        @Override public String getSchemaByTable(PTable table) {
+            return null;
+        }
+
+        @Override public void close() throws IOException {
+
+        }
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
index 594aa99..8b1a93b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ExplainPlan;
@@ -55,15 +56,15 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.ReadOnlyTableException;
+import org.apache.phoenix.schema.export.DefaultSchemaRegistryRepository;
+import org.apache.phoenix.schema.export.DefaultSchemaWriter;
+import org.apache.phoenix.schema.export.SchemaRegistryRepository;
+import org.apache.phoenix.schema.export.SchemaRegistryRepositoryFactory;
 import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import org.apache.phoenix.transaction.TransactionFactory;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.*;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -345,6 +346,56 @@ public class ViewIT extends SplitSystemCatalogIT {
     }
 
     @Test
+    public void testCreateViewsWithChangeDetectionEnabled() throws Exception {
+        String tenantId = "T_" + generateUniqueName();
+        String schemaName = "S_" + generateUniqueName();
+        String tableName = "T_" + generateUniqueName();
+        String globalViewName = "GV_" + generateUniqueName();
+        String tenantViewName = "TV_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullGlobalViewName = SchemaUtil.getTableName(schemaName, globalViewName);
+        String fullTenantViewName = SchemaUtil.getTableName(schemaName, tenantViewName);
+
+        PTable globalView = null;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String ddl = "CREATE TABLE " + fullTableName +
+                " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL," +
+                " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) " +
+                "MULTI_TENANT=true, CHANGE_DETECTION_ENABLED=true";
+            conn.createStatement().execute(ddl);
+            PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+            assertTrue(table.isChangeDetectionEnabled());
+            AlterTableIT.verifySchemaExport(table, getUtility().getConfiguration());
+
+            String globalViewDdl = "CREATE VIEW " + fullGlobalViewName +
+                " (id2 CHAR(12) PRIMARY KEY, col3 BIGINT NULL) " +
+                " AS SELECT * FROM " + fullTableName + " CHANGE_DETECTION_ENABLED=true";
+
+            conn.createStatement().execute(globalViewDdl);
+            globalView = PhoenixRuntime.getTableNoCache(conn, fullGlobalViewName);
+            assertTrue(globalView.isChangeDetectionEnabled());
+            PTable globalViewWithParents = ViewUtil.addDerivedColumnsFromParent(globalView, table);
+            //   base column count doesn't get set properly
+            PTableImpl.Builder builder = PTableImpl.builderFromExisting(globalViewWithParents);
+            builder.setBaseColumnCount(table.getColumns().size());
+            globalViewWithParents = builder.setColumns(globalViewWithParents.getColumns()).build();
+            AlterTableIT.verifySchemaExport(globalViewWithParents, getUtility().getConfiguration());
+        }
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)) {
+            String tenantViewDdl = "CREATE VIEW " + fullTenantViewName +
+                " (id3 VARCHAR PRIMARY KEY, col4 VARCHAR NULL) " +
+                " AS SELECT * FROM " + fullGlobalViewName + " CHANGE_DETECTION_ENABLED=true";
+            tenantConn.createStatement().execute(tenantViewDdl);
+            PTable tenantView = PhoenixRuntime.getTableNoCache(tenantConn, fullTenantViewName);
+            assertTrue(tenantView.isChangeDetectionEnabled());
+            PTable tenantViewWithParents = ViewUtil.addDerivedColumnsFromParent(tenantView, globalView);
+            AlterTableIT.verifySchemaExport(tenantViewWithParents, getUtility().getConfiguration());
+        }
+    }
+
+    @Test
     public void testCreateViewTimestamp() throws Exception {
         String tenantId = null;
         createViewTimestampHelper(tenantId);
@@ -384,6 +435,37 @@ public class ViewIT extends SplitSystemCatalogIT {
         }
     }
 
+    @Test
+    public void testCreateChangeDetectionEnabledTable() throws Exception {
+        //create a view with CHANGE_DETECTION_ENABLED and verify both that it's set properly
+        //on the PTable, and that it gets persisted to the external schema registry
+
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String viewName = generateUniqueName();
+        String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String ddl = "CREATE TABLE " + fullTableName +
+                " (id char(1) NOT NULL," + " col1 integer NOT NULL," + " col2 bigint NOT NULL," +
+                " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2)) " +
+                "CHANGE_DETECTION_ENABLED=true";
+            conn.createStatement().execute(ddl);
+
+            String viewDdl = "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullTableName
+                + " CHANGE_DETECTION_ENABLED=true";
+            conn.createStatement().execute(viewDdl);
+            PTable view = PhoenixRuntime.getTableNoCache(conn, fullViewName);
+            assertTrue(view.isChangeDetectionEnabled());
+            assertEquals(DefaultSchemaRegistryRepository.getSchemaId(view),
+                view.getExternalSchemaId());
+            String schemaText = new DefaultSchemaWriter().exportSchema(view);
+            SchemaRegistryRepository repo = SchemaRegistryRepositoryFactory.getSchemaRegistryRepository(
+                getUtility().getConfiguration());
+            //assertEquals(schemaText, repo.getSchemaById(view.getExternalSchemaId()));
+        }
+    }
+
     private void testViewUsesTableIndex(boolean localIndex) throws Exception {
         ResultSet rs;
         // Use unique name for table with local index as otherwise we run
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
index d27bc71..f30a357 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.end2end;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.BaseWALObserver;
@@ -39,13 +38,10 @@ import org.apache.phoenix.query.PhoenixTestBuilder;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -106,10 +102,8 @@ public class WALAnnotationIT extends BaseTest {
         Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
         SchemaBuilder builder = new SchemaBuilder(getUrl());
         boolean createGlobalIndex = false;
-        long ddlTimestamp = upsertAndDeleteHelper(builder, createGlobalIndex);
-        assertAnnotation(2, builder.getPhysicalTableName(false), null,
-            builder.getTableOptions().getSchemaName(),
-            builder.getDataOptions().getTableName(), PTableType.TABLE, ddlTimestamp);
+        String externalSchemaId = upsertAndDeleteHelper(builder, createGlobalIndex);
+        assertAnnotation(2, builder.getPhysicalTableName(false), externalSchemaId);
     }
 
     @Test
@@ -180,18 +174,16 @@ public class WALAnnotationIT extends BaseTest {
         Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
         SchemaBuilder builder = new SchemaBuilder(getUrl());
         boolean createGlobalIndex = true;
-        long ddlTimestamp = upsertAndDeleteHelper(builder, createGlobalIndex);
-        assertAnnotation(2, builder.getPhysicalTableName(false), null,
-            builder.getTableOptions().getSchemaName(),
-            builder.getDataOptions().getTableName(), PTableType.TABLE, ddlTimestamp);
+        String externalSchemaId = upsertAndDeleteHelper(builder, createGlobalIndex);
+        assertAnnotation(2, builder.getPhysicalTableName(false), externalSchemaId);
         assertAnnotation(0, builder.getPhysicalTableIndexName(false),
-            null, null, null, null, ddlTimestamp);
+            externalSchemaId);
     }
 
     // Note that local secondary indexes aren't supported because they go in the same WALEdit as the
     // "base" table data they index.
 
-    private long upsertAndDeleteHelper(SchemaBuilder builder, boolean createGlobalIndex) throws Exception {
+    private String upsertAndDeleteHelper(SchemaBuilder builder, boolean createGlobalIndex) throws Exception {
         try (Connection conn = getConnection()) {
             SchemaBuilder.TableOptions tableOptions = getTableOptions();
 
@@ -206,7 +198,7 @@ public class WALAnnotationIT extends BaseTest {
             conn.createStatement().execute(upsertSql);
             conn.commit();
             PTable table = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTableName());
-            assertEquals("Change Detection Enabled is false!", true, table.isChangeDetectionEnabled());
+            assertTrue("Change Detection Enabled is false!", table.isChangeDetectionEnabled());
             // Deleting by entire PK gets executed as more like an UPSERT VALUES than an UPSERT
             // SELECT (i.e, it generates the Mutations and then pushes them to server, rather than
             // running a select query and deleting the mutations returned)
@@ -218,7 +210,7 @@ public class WALAnnotationIT extends BaseTest {
             // last had columns added or removed. It is NOT the timestamp of a particular mutation
             // We need it in the annotation to match up with schema object in an external schema
             // repo.
-            return table.getLastDDLTimestamp();
+            return table.getExternalSchemaId();
         }
     }
 
@@ -258,16 +250,10 @@ public class WALAnnotationIT extends BaseTest {
                                                 int expectedAnnotations) throws SQLException, IOException {
         PTable baseTable = PhoenixRuntime.getTableNoCache(conn,
             baseBuilder.getEntityTableName());
-        assertAnnotation(expectedAnnotations, baseBuilder.getPhysicalTableName(false), null,
-            baseBuilder.getTableOptions().getSchemaName(),
-            baseBuilder.getDataOptions().getTableName(),
-            PTableType.TABLE,
-            baseTable.getLastDDLTimestamp());
+        assertAnnotation(expectedAnnotations, baseBuilder.getPhysicalTableName(false), baseTable.getExternalSchemaId());
         PTable targetTable = PhoenixRuntime.getTableNoCache(conn,
             targetBuilder.getEntityTableName());
-        assertAnnotation(expectedAnnotations, targetBuilder.getPhysicalTableName(false), null,
-            targetBuilder.getTableOptions().getSchemaName(), targetBuilder.getDataOptions().getTableName(),
-            PTableType.TABLE, targetTable.getLastDDLTimestamp());
+        assertAnnotation(expectedAnnotations, targetBuilder.getPhysicalTableName(false), targetTable.getExternalSchemaId());
     }
 
     @Test
@@ -287,10 +273,8 @@ public class WALAnnotationIT extends BaseTest {
                 " (OID, KP, COL1, COL2, COL3) SELECT * FROM " + targetBuilder.getEntityTableName();
             conn.createStatement().execute(sql);
             PTable table = PhoenixRuntime.getTableNoCache(conn, targetBuilder.getEntityTableName());
-            assertAnnotation(1, targetBuilder.getPhysicalTableName(false), null,
-                targetBuilder.getTableOptions().getSchemaName(),
-                targetBuilder.getDataOptions().getTableName(),
-                PTableType.TABLE, table.getLastDDLTimestamp());
+            assertAnnotation(1, targetBuilder.getPhysicalTableName(false),
+                table.getExternalSchemaId());
         }
 
     }
@@ -344,9 +328,7 @@ public class WALAnnotationIT extends BaseTest {
             conn.createStatement().execute(sql);
             conn.commit();
             PTable table = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTableName());
-            assertAnnotation(2, table.getPhysicalName().getString(), null,
-                table.getSchemaName().getString(),
-                table.getTableName().getString(), PTableType.TABLE, table.getLastDDLTimestamp());
+            assertAnnotation(2, table.getPhysicalName().getString(), table.getExternalSchemaId());
         }
 
     }
@@ -373,9 +355,7 @@ public class WALAnnotationIT extends BaseTest {
             conn.createStatement().execute(deleteSql);
             conn.commit();
             PTable view = PhoenixRuntime.getTableNoCache(conn, builder.getEntityGlobalViewName());
-            assertAnnotation(2, view.getPhysicalName().getString(), null,
-                view.getSchemaName().getString(),
-                view.getTableName().getString(), PTableType.VIEW, view.getLastDDLTimestamp());
+            assertAnnotation(2, view.getPhysicalName().getString(), view.getExternalSchemaId());
         }
 
     }
@@ -432,13 +412,11 @@ public class WALAnnotationIT extends BaseTest {
             conn.createStatement().execute(deleteSql);
             conn.commit();
             PTable view = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTenantViewName());
-            assertAnnotation(2, view.getPhysicalName().getString(), tenant,
-                view.getSchemaName().getString(),
-                view.getTableName().getString(), PTableType.VIEW, view.getLastDDLTimestamp());
+            assertAnnotation(2, view.getPhysicalName().getString(), view.getExternalSchemaId());
             if (createIndex) {
                 assertAnnotation(0,
                     MetaDataUtil.getViewIndexPhysicalName(builder.getEntityTableName()),
-                    tenant, null, null, null, view.getLastDDLTimestamp());
+                    view.getExternalSchemaId());
             }
         }
 
@@ -466,7 +444,7 @@ public class WALAnnotationIT extends BaseTest {
             SchemaBuilder.TableOptions tableOptions = getTableOptions();
             builder.withTableOptions(tableOptions).withTableIndexDefaults().build();
             PTable table = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTableName());
-            assertEquals("Change Detection Enabled is false!", true, table.isChangeDetectionEnabled());
+            assertTrue("Change Detection Enabled is false!", table.isChangeDetectionEnabled());
             Long ddlTimestamp = table.getLastDDLTimestamp();
             String upsertSql = "UPSERT INTO " + builder.getEntityTableName() + " VALUES" +
                 " ('a', 'b', 'c', 'd')";
@@ -483,11 +461,9 @@ public class WALAnnotationIT extends BaseTest {
                 " ('a', 'b', 'c', 'd') ON DUPLICATE KEY UPDATE " + onDupClause;
             conn.createStatement().execute(upsertSql);
             conn.commit();
-            assertAnnotation(2, builder.getPhysicalTableName(false), null,
-                builder.getTableOptions().getSchemaName(),
-                builder.getDataOptions().getTableName(), PTableType.TABLE, ddlTimestamp);
+            assertAnnotation(2, builder.getPhysicalTableName(false), table.getExternalSchemaId());
             assertAnnotation(0, builder.getPhysicalTableIndexName(false),
-                null, null, null, null, ddlTimestamp);
+                table.getExternalSchemaId());
         }
     }
 
@@ -509,29 +485,16 @@ public class WALAnnotationIT extends BaseTest {
         observer.clearAnnotations();
     }
 
-    private void assertAnnotation(int numOccurrences, String physicalTableName, String tenant,
-                                  String schemaName,
-                                  String logicalTableName,
-                                  PTableType tableType, long ddlTimestamp) throws IOException {
+    private void assertAnnotation(int numOccurrences, String physicalTableName,
+        String externalSchemaId) throws IOException {
         int foundCount = 0;
         int notFoundCount = 0;
         List<Map<String, byte[]>> entries =
             getEntriesForTable(TableName.valueOf(physicalTableName));
         for (Map<String, byte[]> m : entries) {
-            byte[] tenantBytes = m.get(MutationState.MutationMetadataType.TENANT_ID.toString());
-            byte[] schemaBytes = m.get(MutationState.MutationMetadataType.SCHEMA_NAME.toString());
-            byte[] logicalTableBytes =
-                m.get(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString());
-            byte[] tableTypeBytes = m.get(MutationState.MutationMetadataType.TABLE_TYPE.toString());
-            byte[] timestampBytes = m.get(MutationState.MutationMetadataType.TIMESTAMP.toString());
-            assertNotNull(timestampBytes);
-            long timestamp = Bytes.toLong(timestampBytes);
-            if (Objects.equals(tenant, Bytes.toString(tenantBytes)) &&
-                Objects.equals(schemaName, Bytes.toString(schemaBytes)) &&
-                Objects.equals(logicalTableName, Bytes.toString(logicalTableBytes)) &&
-                Objects.equals(tableType.toString(), Bytes.toString(tableTypeBytes)) &&
-                Objects.equals(ddlTimestamp, timestamp)
-                && timestamp < HConstants.LATEST_TIMESTAMP) {
+            byte[] externalSchemaIdBytes = m.get(MutationState.MutationMetadataType.EXTERNAL_SCHEMA_ID.toString());
+            assertNotNull(externalSchemaIdBytes);
+            if (Objects.equals(externalSchemaId, Bytes.toString(externalSchemaIdBytes))) {
                 foundCount++;
             } else {
                 notFoundCount++;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
index acdedab..8c8fb3b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import com.sun.org.apache.xpath.internal.operations.Bool;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Scan;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 49d5fbb..dae26ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -36,6 +36,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAM
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES;
@@ -82,6 +83,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYT
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
 import static org.apache.phoenix.query.QueryConstants.VIEW_MODIFIED_PROPERTY_TAG_TYPE;
 import static org.apache.phoenix.schema.PTableType.INDEX;
+import static org.apache.phoenix.schema.PTableType.VIEW;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
@@ -91,6 +93,7 @@ import static org.apache.phoenix.util.ViewUtil.getSystemTableForChildLinks;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -127,9 +130,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
 import org.apache.hadoop.hbase.ipc.RpcUtil;
@@ -182,6 +183,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.metrics.Metrics;
 import org.apache.phoenix.parse.LiteralParseNode;
 import org.apache.phoenix.parse.PFunction;
@@ -217,6 +219,10 @@ import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.SequenceNotFoundException;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.export.SchemaRegistryRepository;
+import org.apache.phoenix.schema.export.SchemaRegistryRepositoryFactory;
+import org.apache.phoenix.schema.export.SchemaWriter;
+import org.apache.phoenix.schema.export.SchemaWriterFactory;
 import org.apache.phoenix.schema.task.SystemTaskParams;
 import org.apache.phoenix.schema.task.Task;
 import org.apache.phoenix.schema.types.PBinary;
@@ -344,6 +350,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             CHANGE_DETECTION_ENABLED_BYTES);
     private static final Cell SCHEMA_VERSION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
             TABLE_FAMILY_BYTES, SCHEMA_VERSION_BYTES);
+    private static final Cell EXTERNAL_SCHEMA_ID_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
+        TABLE_FAMILY_BYTES, EXTERNAL_SCHEMA_ID_BYTES);
 
     private static final List<Cell> TABLE_KV_COLUMNS = Lists.newArrayList(
             EMPTY_KEYVALUE_KV,
@@ -381,7 +389,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             PHOENIX_TTL_HWM_KV,
             LAST_DDL_TIMESTAMP_KV,
             CHANGE_DETECTION_ENABLED_KV,
-            SCHEMA_VERSION_KV
+            SCHEMA_VERSION_KV,
+            EXTERNAL_SCHEMA_ID_KV
     );
 
     static {
@@ -425,6 +434,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
     private static final int CHANGE_DETECTION_ENABLED_INDEX =
         TABLE_KV_COLUMNS.indexOf(CHANGE_DETECTION_ENABLED_KV);
     private static final int SCHEMA_VERSION_INDEX = TABLE_KV_COLUMNS.indexOf(SCHEMA_VERSION_KV);
+    private static final int EXTERNAL_SCHEMA_ID_INDEX =
+        TABLE_KV_COLUMNS.indexOf(EXTERNAL_SCHEMA_ID_KV);
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
     private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
@@ -600,7 +611,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
 
     @Override
     public void stop(CoprocessorEnvironment env) throws IOException {
-        // nothing to do
+        SchemaRegistryRepositoryFactory.close();
     }
 
     @Override
@@ -806,7 +817,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
 
     private void addColumnToTable(List<Cell> results, PName colName, PName famName,
                                   Cell[] colKeyValues, List<PColumn> columns, boolean isSalted, int baseColumnCount,
-                                  boolean isRegularView) {
+                                  boolean isRegularView, long timestamp) {
         int i = 0;
         int j = 0;
         while (i < results.size() && j < COLUMN_KV_COLUMNS.size()) {
@@ -911,7 +922,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable,
                         position - 1, sortOrder, arraySize, viewConstant, isViewReferenced,
                         expressionStr, isRowTimestamp, false, columnQualifierBytes,
-                        results.get(0).getTimestamp());
+                        timestamp);
         columns.add(column);
     }
 
@@ -1008,11 +1019,51 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         if (results.isEmpty()) {
             return null;
         }
+        List<Cell> tableCellList = results;
+        results = Lists.newArrayList();
+        List<List<Cell>> allColumnCellList = Lists.newArrayList();
+
+        do {
+            if (results.size() > 0) {
+                allColumnCellList.add(results);
+                results = Lists.newArrayList();
+            }
+        } while (scanner.next(results));
+        if (results != null && results.size() > 0) {
+            allColumnCellList.add(results);
+        }
+
+        return getTableFromCells(tableCellList, allColumnCellList, clientTimeStamp, clientVersion);
+    }
+
+    private PTable getTableFromCells(List<Cell> tableCellList, List<List<Cell>> allColumnCellList,
+                                     long clientTimeStamp, int clientVersion)
+        throws IOException, SQLException {
+        return getTableFromCells(tableCellList, allColumnCellList, clientTimeStamp, clientVersion, null);
+    }
+
+    /**
+     * Utility method to get a PTable from the HBase Cells either read from SYSTEM.CATALOG or
+     * generated by a DDL statement. Optionally, an existing PTable can be provided so that its
+     * properties can be merged with the "new" PTable created from the Cell. This is useful when
+     * generating an updated PTable following an ALTER DDL statement
+     * @param tableCellList Cells from the header row containing table level properties
+     * @param allColumnCellList Cells from column or link rows
+     * @param clientTimeStamp client-provided timestamp
+     * @param clientVersion client-provided version
+     * @param oldTable Optional parameters containing properties for an existing PTable
+     * @return
+     * @throws IOException
+     * @throws SQLException
+     */
+    private PTable getTableFromCells(List<Cell> tableCellList, List<List<Cell>> allColumnCellList,
+                                     long clientTimeStamp, int clientVersion, PTable oldTable)
+        throws IOException, SQLException {
         Cell[] tableKeyValues = new Cell[TABLE_KV_COLUMNS.size()];
         Cell[] colKeyValues = new Cell[COLUMN_KV_COLUMNS.size()];
 
         // Create PTable based on KeyValues from scan
-        Cell keyValue = results.get(0);
+        Cell keyValue = tableCellList.get(0);
         byte[] keyBuffer = keyValue.getRowArray();
         int keyLength = keyValue.getRowLength();
         int keyOffset = keyValue.getRowOffset();
@@ -1033,20 +1084,24 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         // This will prevent the client from continually looking for the current
         // table when we know that there will never be one since we disallow updates
         // unless the table is the latest
-        // If we already have a table newer than the one we just found and
-        // the client timestamp is less that the existing table time stamp,
-        // bump up the timeStamp to right before the client time stamp, since
-        // we know it can't possibly change.
+
         long timeStamp = keyValue.getTimestamp();
-        // long timeStamp = tableTimeStamp > keyValue.getTimestamp() &&
-        // clientTimeStamp < tableTimeStamp
-        // ? clientTimeStamp-1
-        // : keyValue.getTimestamp();
+
+        PTableImpl.Builder builder = null;
+        if (oldTable != null) {
+            builder = PTableImpl.builderFromExisting(oldTable);
+            builder.setColumns(oldTable.getColumns());
+        } else {
+            builder = new PTableImpl.Builder();
+        }
+        builder.setTenantId(tenantId);
+        builder.setSchemaName(schemaName);
+        builder.setTableName(tableName);
 
         int i = 0;
         int j = 0;
-        while (i < results.size() && j < TABLE_KV_COLUMNS.size()) {
-            Cell kv = results.get(i);
+        while (i < tableCellList.size() && j < TABLE_KV_COLUMNS.size()) {
+            Cell kv = tableCellList.get(i);
             Cell searchKv = TABLE_KV_COLUMNS.get(j);
             int cmp =
                     Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
@@ -1069,7 +1124,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 || tableKeyValues[COLUMN_COUNT_INDEX] == null) {
             // since we allow SYSTEM.CATALOG to split in certain cases there might be child links or
             // other metadata rows that are invalid and should be ignored
-            Cell cell = results.get(0);
+            Cell cell = tableCellList.get(0);
             LOGGER.error("Found invalid metadata rows for rowkey " +
                     Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
             return null;
@@ -1079,18 +1134,25 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         PTableType tableType =
                 PTableType
                         .fromSerializedValue(tableTypeKv.getValueArray()[tableTypeKv.getValueOffset()]);
+        builder.setType(tableType);
+
         Cell tableSeqNumKv = tableKeyValues[TABLE_SEQ_NUM_INDEX];
         long tableSeqNum =
                 PLong.INSTANCE.getCodec().decodeLong(tableSeqNumKv.getValueArray(),
                         tableSeqNumKv.getValueOffset(), SortOrder.getDefault());
+        builder.setSequenceNumber(tableSeqNum);
+
         Cell columnCountKv = tableKeyValues[COLUMN_COUNT_INDEX];
         int columnCount =
                 PInteger.INSTANCE.getCodec().decodeInt(columnCountKv.getValueArray(),
                         columnCountKv.getValueOffset(), SortOrder.getDefault());
+
         Cell pkNameKv = tableKeyValues[PK_NAME_INDEX];
         PName pkName =
                 pkNameKv != null ? newPName(pkNameKv.getValueArray(), pkNameKv.getValueOffset(),
                         pkNameKv.getValueLength()) : null;
+        builder.setPkName(pkName != null ? pkName : oldTable != null ? oldTable.getPKName() : null);
+
         Cell saltBucketNumKv = tableKeyValues[SALT_BUCKETS_INDEX];
         Integer saltBucketNum =
                 saltBucketNumKv != null ? (Integer) PInteger.INSTANCE.getCodec().decodeInt(
@@ -1098,37 +1160,62 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         if (saltBucketNum != null && saltBucketNum.intValue() == 0) {
             saltBucketNum = null; // Zero salt buckets means not salted
         }
+        builder.setBucketNum(saltBucketNum != null ? saltBucketNum : oldTable != null ? oldTable.getBucketNum() : null);
+
+        //data table name is used to find the parent table for indexes later
         Cell dataTableNameKv = tableKeyValues[DATA_TABLE_NAME_INDEX];
         PName dataTableName =
                 dataTableNameKv != null ? newPName(dataTableNameKv.getValueArray(),
                         dataTableNameKv.getValueOffset(), dataTableNameKv.getValueLength()) : null;
+
         Cell physicalTableNameKv = tableKeyValues[PHYSICAL_TABLE_NAME_INDEX];
         PName physicalTableName =
                 physicalTableNameKv != null ? newPName(physicalTableNameKv.getValueArray(),
                         physicalTableNameKv.getValueOffset(), physicalTableNameKv.getValueLength()) : null;
+        builder.setPhysicalTableName(physicalTableName != null ? physicalTableName : oldTable != null ? oldTable.getPhysicalName(true) : null);
 
         Cell indexStateKv = tableKeyValues[INDEX_STATE_INDEX];
         PIndexState indexState =
                 indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv
                         .getValueArray()[indexStateKv.getValueOffset()]);
+        builder.setState(indexState != null ? indexState : oldTable != null ? oldTable.getIndexState() : null);
 
         Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
-        boolean isImmutableRows =
-                immutableRowsKv == null ? false : (Boolean) PBoolean.INSTANCE.toObject(
-                        immutableRowsKv.getValueArray(), immutableRowsKv.getValueOffset(),
-                        immutableRowsKv.getValueLength());
+        boolean isImmutableRows = immutableRowsKv != null && (Boolean) PBoolean.INSTANCE.toObject(
+            immutableRowsKv.getValueArray(), immutableRowsKv.getValueOffset(),
+            immutableRowsKv.getValueLength());
+        builder.setImmutableRows(immutableRowsKv != null ? isImmutableRows :
+            oldTable != null && oldTable.isImmutableRows());
+
         Cell defaultFamilyNameKv = tableKeyValues[DEFAULT_COLUMN_FAMILY_INDEX];
         PName defaultFamilyName = defaultFamilyNameKv != null ? newPName(defaultFamilyNameKv.getValueArray(), defaultFamilyNameKv.getValueOffset(), defaultFamilyNameKv.getValueLength()) : null;
+        builder.setDefaultFamilyName(defaultFamilyName != null ? defaultFamilyName : oldTable != null ? oldTable.getDefaultFamilyName() : null);
+
         Cell viewStatementKv = tableKeyValues[VIEW_STATEMENT_INDEX];
         String viewStatement = viewStatementKv != null ? (String) PVarchar.INSTANCE.toObject(viewStatementKv.getValueArray(), viewStatementKv.getValueOffset(),
                 viewStatementKv.getValueLength()) : null;
+        builder.setViewStatement(viewStatement != null ? viewStatement : oldTable != null ? oldTable.getViewStatement() : null);
+
         Cell disableWALKv = tableKeyValues[DISABLE_WAL_INDEX];
         boolean disableWAL = disableWALKv == null ? PTable.DEFAULT_DISABLE_WAL : Boolean.TRUE.equals(
                 PBoolean.INSTANCE.toObject(disableWALKv.getValueArray(), disableWALKv.getValueOffset(), disableWALKv.getValueLength()));
+        builder.setDisableWAL(disableWALKv != null ? disableWAL :
+            oldTable != null && oldTable.isWALDisabled());
+
         Cell multiTenantKv = tableKeyValues[MULTI_TENANT_INDEX];
-        boolean multiTenant = multiTenantKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(multiTenantKv.getValueArray(), multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
+        boolean multiTenant = multiTenantKv != null && Boolean.TRUE.equals(
+            PBoolean.INSTANCE.toObject(multiTenantKv.getValueArray(),
+                multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
+        builder.setMultiTenant(multiTenantKv != null ? multiTenant :
+            oldTable != null && oldTable.isMultiTenant());
+
         Cell storeNullsKv = tableKeyValues[STORE_NULLS_INDEX];
-        boolean storeNulls = storeNullsKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(storeNullsKv.getValueArray(), storeNullsKv.getValueOffset(), storeNullsKv.getValueLength()));
+        boolean storeNulls = storeNullsKv != null && Boolean.TRUE.equals(
+            PBoolean.INSTANCE.toObject(storeNullsKv.getValueArray(), storeNullsKv.getValueOffset(),
+                storeNullsKv.getValueLength()));
+        builder.setStoreNulls(storeNullsKv != null ? storeNulls :
+            oldTable != null && oldTable.getStoreNulls());
+
         Cell transactionalKv = tableKeyValues[TRANSACTIONAL_INDEX];
         Cell transactionProviderKv = tableKeyValues[TRANSACTION_PROVIDER_INDEX];
         TransactionFactory.Provider transactionProvider = null;
@@ -1148,115 +1235,199 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                             transactionProviderKv.getValueOffset(),
                             SortOrder.getDefault()));
         }
+        builder.setTransactionProvider(transactionProviderKv != null || transactionalKv != null
+            ? transactionProvider : oldTable != null ? oldTable.getTransactionProvider() : null);
+
         Cell viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
         ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
-        PDataType viewIndexIdType = getViewIndexIdType(tableKeyValues);
+        builder.setViewType(viewType != null ? viewType : oldTable != null ? oldTable.getViewType() : null);
+
+        PDataType viewIndexIdType = oldTable != null ? oldTable.getviewIndexIdType() :
+            getViewIndexIdType(tableKeyValues);
+        builder.setViewIndexIdType(viewIndexIdType);
+
         Long viewIndexId = getViewIndexId(tableKeyValues, viewIndexIdType);
+        builder.setViewIndexId(viewIndexId != null ? viewIndexId : oldTable != null ? oldTable.getViewIndexId() : null);
+
         Cell indexTypeKv = tableKeyValues[INDEX_TYPE_INDEX];
         IndexType indexType = indexTypeKv == null ? null : IndexType.fromSerializedValue(indexTypeKv.getValueArray()[indexTypeKv.getValueOffset()]);
+        builder.setIndexType(indexType != null ? indexType : oldTable != null ? oldTable.getIndexType() : null);
+
         Cell baseColumnCountKv = tableKeyValues[BASE_COLUMN_COUNT_INDEX];
         int baseColumnCount = baseColumnCountKv == null ? 0 : PInteger.INSTANCE.getCodec().decodeInt(baseColumnCountKv.getValueArray(),
                 baseColumnCountKv.getValueOffset(), SortOrder.getDefault());
+        builder.setBaseColumnCount(baseColumnCountKv != null ? baseColumnCount : oldTable != null ? oldTable.getBaseColumnCount() : 0);
+
         Cell rowKeyOrderOptimizableKv = tableKeyValues[ROW_KEY_ORDER_OPTIMIZABLE_INDEX];
-        boolean rowKeyOrderOptimizable = rowKeyOrderOptimizableKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(rowKeyOrderOptimizableKv.getValueArray(), rowKeyOrderOptimizableKv.getValueOffset(), rowKeyOrderOptimizableKv.getValueLength()));
+        boolean rowKeyOrderOptimizable = rowKeyOrderOptimizableKv != null && Boolean.TRUE.equals(
+            PBoolean.INSTANCE.toObject(rowKeyOrderOptimizableKv.getValueArray(),
+                rowKeyOrderOptimizableKv.getValueOffset(),
+                rowKeyOrderOptimizableKv.getValueLength()));
+        builder.setRowKeyOrderOptimizable(rowKeyOrderOptimizableKv != null ? rowKeyOrderOptimizable :
+            oldTable != null && oldTable.rowKeyOrderOptimizable());
+
         Cell updateCacheFrequencyKv = tableKeyValues[UPDATE_CACHE_FREQUENCY_INDEX];
         long updateCacheFrequency = updateCacheFrequencyKv == null ? 0 :
                 PLong.INSTANCE.getCodec().decodeLong(updateCacheFrequencyKv.getValueArray(),
                         updateCacheFrequencyKv.getValueOffset(), SortOrder.getDefault());
+        builder.setUpdateCacheFrequency(updateCacheFrequencyKv != null ? updateCacheFrequency : oldTable != null ? oldTable.getUpdateCacheFrequency() : 0);
 
         // Check the cell tag to see whether the view has modified this property
         final byte[] tagUpdateCacheFreq = (updateCacheFrequencyKv == null) ?
                 HConstants.EMPTY_BYTE_ARRAY : CellUtil.getTagArray(updateCacheFrequencyKv);
         boolean viewModifiedUpdateCacheFrequency = (PTableType.VIEW.equals(tableType)) &&
                 Bytes.contains(tagUpdateCacheFreq, VIEW_MODIFIED_PROPERTY_BYTES);
+        builder.setViewModifiedUpdateCacheFrequency(!Bytes.equals(tagUpdateCacheFreq,
+            HConstants.EMPTY_BYTE_ARRAY) ? viewModifiedUpdateCacheFrequency :
+            oldTable != null && oldTable.hasViewModifiedUpdateCacheFrequency());
+
         Cell indexDisableTimestampKv = tableKeyValues[INDEX_DISABLE_TIMESTAMP];
         long indexDisableTimestamp = indexDisableTimestampKv == null ? 0L : PLong.INSTANCE.getCodec().decodeLong(indexDisableTimestampKv.getValueArray(),
                 indexDisableTimestampKv.getValueOffset(), SortOrder.getDefault());
+        builder.setIndexDisableTimestamp(indexDisableTimestampKv != null ?
+            indexDisableTimestamp : oldTable != null ? oldTable.getIndexDisableTimestamp() : 0L);
+
         Cell isNamespaceMappedKv = tableKeyValues[IS_NAMESPACE_MAPPED_INDEX];
-        boolean isNamespaceMapped = isNamespaceMappedKv == null ? false
-                : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isNamespaceMappedKv.getValueArray(),
+        boolean isNamespaceMapped = isNamespaceMappedKv != null && Boolean.TRUE.equals(
+            PBoolean.INSTANCE.toObject(isNamespaceMappedKv.getValueArray(),
                 isNamespaceMappedKv.getValueOffset(), isNamespaceMappedKv.getValueLength()));
+        builder.setNamespaceMapped(isNamespaceMappedKv != null ? isNamespaceMapped :
+            oldTable != null && oldTable.isNamespaceMapped());
+
         Cell autoPartitionSeqKv = tableKeyValues[AUTO_PARTITION_SEQ_INDEX];
         String autoPartitionSeq = autoPartitionSeqKv != null ? (String) PVarchar.INSTANCE.toObject(autoPartitionSeqKv.getValueArray(), autoPartitionSeqKv.getValueOffset(),
                 autoPartitionSeqKv.getValueLength()) : null;
+        builder.setAutoPartitionSeqName(autoPartitionSeq != null
+            ? autoPartitionSeq : oldTable != null ? oldTable.getAutoPartitionSeqName() : null);
+
         Cell isAppendOnlySchemaKv = tableKeyValues[APPEND_ONLY_SCHEMA_INDEX];
-        boolean isAppendOnlySchema = isAppendOnlySchemaKv == null ? false
-                : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(),
+        boolean isAppendOnlySchema = isAppendOnlySchemaKv != null && Boolean.TRUE.equals(
+            PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(),
                 isAppendOnlySchemaKv.getValueOffset(), isAppendOnlySchemaKv.getValueLength()));
+        builder.setAppendOnlySchema(isAppendOnlySchemaKv != null ? isAppendOnlySchema :
+            oldTable != null && oldTable.isAppendOnlySchema());
+
         Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX];
         //TODO: change this once we start having other values for storage schemes
         ImmutableStorageScheme storageScheme = storageSchemeKv == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ImmutableStorageScheme
                 .fromSerializedValue((byte) PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(),
                         storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength()));
+        builder.setImmutableStorageScheme(storageSchemeKv != null ? storageScheme :
+            oldTable != null ? oldTable.getImmutableStorageScheme() : ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
+
         Cell encodingSchemeKv = tableKeyValues[QUALIFIER_ENCODING_SCHEME_INDEX];
         QualifierEncodingScheme encodingScheme = encodingSchemeKv == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : QualifierEncodingScheme
                 .fromSerializedValue((byte) PTinyint.INSTANCE.toObject(encodingSchemeKv.getValueArray(),
                         encodingSchemeKv.getValueOffset(), encodingSchemeKv.getValueLength()));
+        builder.setQualifierEncodingScheme(encodingSchemeKv != null ? encodingScheme :
+            oldTable != null ? oldTable.getEncodingScheme() : QualifierEncodingScheme.NON_ENCODED_QUALIFIERS);
+
         Cell useStatsForParallelizationKv = tableKeyValues[USE_STATS_FOR_PARALLELIZATION_INDEX];
-        Boolean useStatsForParallelization = useStatsForParallelizationKv == null ? null : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(useStatsForParallelizationKv.getValueArray(), useStatsForParallelizationKv.getValueOffset(), useStatsForParallelizationKv.getValueLength()));
+        Boolean useStatsForParallelization = useStatsForParallelizationKv == null ? null :
+            Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(useStatsForParallelizationKv.getValueArray(), useStatsForParallelizationKv.getValueOffset(), useStatsForParallelizationKv.getValueLength()));
+         builder.setUseStatsForParallelization(useStatsForParallelization != null ?
+             useStatsForParallelization : oldTable != null ? oldTable.useStatsForParallelization() : null);
 
         Cell phoenixTTLKv = tableKeyValues[PHOENIX_TTL_INDEX];
         long phoenixTTL = phoenixTTLKv == null ? PHOENIX_TTL_NOT_DEFINED :
                 PLong.INSTANCE.getCodec().decodeLong(phoenixTTLKv.getValueArray(),
                         phoenixTTLKv.getValueOffset(), SortOrder.getDefault());
+        builder.setPhoenixTTL(phoenixTTLKv != null ? phoenixTTL :
+            oldTable != null ? oldTable.getPhoenixTTL() : PHOENIX_TTL_NOT_DEFINED);
 
         Cell phoenixTTLHWMKv = tableKeyValues[PHOENIX_TTL_HWM_INDEX];
         long phoenixTTLHWM = phoenixTTLHWMKv == null ? MIN_PHOENIX_TTL_HWM :
                 PLong.INSTANCE.getCodec().decodeLong(phoenixTTLHWMKv.getValueArray(),
                         phoenixTTLHWMKv.getValueOffset(), SortOrder.getDefault());
+        builder.setPhoenixTTLHighWaterMark(phoenixTTLHWMKv != null ? phoenixTTLHWM :
+            oldTable != null ? oldTable.getPhoenixTTLHighWaterMark() : MIN_PHOENIX_TTL_HWM);
 
         // Check the cell tag to see whether the view has modified this property
         final byte[] tagPhoenixTTL = (phoenixTTLKv == null) ?
                 HConstants.EMPTY_BYTE_ARRAY : CellUtil.getTagArray(phoenixTTLKv);
         boolean viewModifiedPhoenixTTL = (PTableType.VIEW.equals(tableType)) &&
                 Bytes.contains(tagPhoenixTTL, VIEW_MODIFIED_PROPERTY_BYTES);
+        builder.setViewModifiedPhoenixTTL(oldTable != null ?
+            oldTable.hasViewModifiedPhoenixTTL() || viewModifiedPhoenixTTL : viewModifiedPhoenixTTL);
 
         Cell lastDDLTimestampKv = tableKeyValues[LAST_DDL_TIMESTAMP_INDEX];
         Long lastDDLTimestamp = lastDDLTimestampKv == null ?
            null : PLong.INSTANCE.getCodec().decodeLong(lastDDLTimestampKv.getValueArray(),
                 lastDDLTimestampKv.getValueOffset(), SortOrder.getDefault());
+        builder.setLastDDLTimestamp(lastDDLTimestampKv != null ? lastDDLTimestamp :
+            oldTable != null ? oldTable.getLastDDLTimestamp() : null);
 
         Cell changeDetectionEnabledKv = tableKeyValues[CHANGE_DETECTION_ENABLED_INDEX];
         boolean isChangeDetectionEnabled = changeDetectionEnabledKv != null
             && Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(changeDetectionEnabledKv.getValueArray(),
             changeDetectionEnabledKv.getValueOffset(),
             changeDetectionEnabledKv.getValueLength()));
+        builder.setIsChangeDetectionEnabled(changeDetectionEnabledKv != null ?
+            isChangeDetectionEnabled : oldTable != null && oldTable.isChangeDetectionEnabled());
 
         Cell schemaVersionKv = tableKeyValues[SCHEMA_VERSION_INDEX];
         String schemaVersion = schemaVersionKv != null ? (String) PVarchar.INSTANCE.toObject(
                 schemaVersionKv.getValueArray(), schemaVersionKv.getValueOffset(), schemaVersionKv.getValueLength())
                 : null;
+        builder.setSchemaVersion(schemaVersion != null ?
+            schemaVersion : oldTable != null ? oldTable.getSchemaVersion() : null);
+
+        Cell externalSchemaIdKv = tableKeyValues[EXTERNAL_SCHEMA_ID_INDEX];
+        String externalSchemaId = externalSchemaIdKv != null ?
+            (String) PVarchar.INSTANCE.toObject(externalSchemaIdKv.getValueArray(),
+                externalSchemaIdKv.getValueOffset(), externalSchemaIdKv.getValueLength())
+            : null;
+        builder.setExternalSchemaId(externalSchemaId != null ? externalSchemaId :
+            oldTable != null ? oldTable.getExternalSchemaId() : null);
 
         // Check the cell tag to see whether the view has modified this property
         final byte[] tagUseStatsForParallelization = (useStatsForParallelizationKv == null) ?
                 HConstants.EMPTY_BYTE_ARRAY : CellUtil.getTagArray(useStatsForParallelizationKv);
         boolean viewModifiedUseStatsForParallelization = (PTableType.VIEW.equals(tableType)) &&
                 Bytes.contains(tagUseStatsForParallelization, VIEW_MODIFIED_PROPERTY_BYTES);
+        builder.setViewModifiedUseStatsForParallelization(viewModifiedUseStatsForParallelization ||
+            (oldTable != null && oldTable.hasViewModifiedUseStatsForParallelization()));
 
+        boolean setPhysicalName = false;
         List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
         List<PTable> indexes = Lists.newArrayList();
         List<PName> physicalTables = Lists.newArrayList();
         PName parentTableName = tableType == INDEX ? dataTableName : null;
         PName parentSchemaName = tableType == INDEX ? schemaName : null;
         PName parentLogicalName = null;
-        EncodedCQCounter cqCounter =
-                (!EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme) || tableType == PTableType.VIEW) ? PTable.EncodedCQCounter.NULL_COUNTER
-                        : new EncodedCQCounter();
+        EncodedCQCounter cqCounter = null;
+        if (oldTable != null) {
+            cqCounter = oldTable.getEncodedCQCounter();
+        } else {
+            cqCounter = (!EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme) || tableType == PTableType.VIEW) ?
+                PTable.EncodedCQCounter.NULL_COUNTER :
+                new EncodedCQCounter();
+        }
+
+        if (timeStamp == HConstants.LATEST_TIMESTAMP) {
+            timeStamp = lastDDLTimestamp != null ? lastDDLTimestamp : clientTimeStamp;
+        }
+        builder.setTimeStamp(timeStamp);
+
+
         boolean isRegularView = (tableType == PTableType.VIEW && viewType != ViewType.MAPPED);
-        while (true) {
-            results.clear();
-            scanner.next(results);
-            if (results.isEmpty()) {
-                break;
-            }
-            Cell colKv = results.get(LINK_TYPE_INDEX);
+        for (List<Cell> columnCellList : allColumnCellList) {
+
+            Cell colKv = columnCellList.get(LINK_TYPE_INDEX);
             int colKeyLength = colKv.getRowLength();
+
             PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength - offset);
+            if (colName == null) {
+                continue;
+            }
             int colKeyOffset = offset + colName.getBytes().length + 1;
             PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength - colKeyOffset);
+
             if (isQualifierCounterKV(colKv)) {
-                Integer value = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), colKv.getValueOffset(), SortOrder.ASC);
-                cqCounter.setValue(famName.getString(), value);
+                if (famName != null) {
+                    Integer value = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), colKv.getValueOffset(), SortOrder.ASC);
+                    cqCounter.setValue(famName.getString(), value);
+                }
             } else if (Bytes.compareTo(LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length, colKv.getQualifierArray(), colKv.getQualifierOffset(), colKv.getQualifierLength()) == 0) {
                 LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
                 if (linkType == LinkType.INDEX_TABLE) {
@@ -1288,17 +1459,21 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                                 clientTimeStamp);
                             if (tablePhysicalName == null) {
                                 physicalTables.add(famName);
+                                setPhysicalName = true;
                             } else {
                                 physicalTables.add(SchemaUtil.getPhysicalHBaseTableName(schemaName, tablePhysicalName, isNamespaceMapped));
+                                setPhysicalName = true;
                             }
                         } else {
                             physicalTables.add(famName);
+                            setPhysicalName = true;
                         }
                         // If this is a view index, then one of the link is IDX_VW -> _IDX_ PhysicalTable link. Since famName is _IDX_ and we can't get this table hence it is null, we need to use actual view name
                         parentLogicalName = (tableType == INDEX ? SchemaUtil.getTableName(parentSchemaName, parentTableName) : famName);
                     } else {
                         String parentPhysicalTableName = parentTable.getPhysicalName().getString();
                         physicalTables.add(PNameFactory.newName(parentPhysicalTableName));
+                        setPhysicalName = true;
                         parentLogicalName = SchemaUtil.getTableName(parentTable.getSchemaName(), parentTable.getTableName());
                     }
                 } else if (linkType == LinkType.PARENT_TABLE) {
@@ -1309,63 +1484,40 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     addExcludedColumnToTable(columns, colName, famName, colKv.getTimestamp());
                 }
             } else {
-                addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null, baseColumnCount, isRegularView);
+                long columnTimestamp =
+                    columnCellList.get(0).getTimestamp() != HConstants.LATEST_TIMESTAMP ?
+                        columnCellList.get(0).getTimestamp() : timeStamp;
+                addColumnToTable(columnCellList, colName, famName, colKeyValues, columns,
+                    saltBucketNum != null, baseColumnCount, isRegularView, columnTimestamp);
             }
         }
+        builder.setEncodedCQCounter(cqCounter);
+
+        builder.setIndexes(indexes != null ? indexes : oldTable != null
+            ? oldTable.getIndexes() : Collections.<PTable>emptyList());
+
+        if (physicalTables == null) {
+            builder.setPhysicalNames(oldTable != null ? oldTable.getPhysicalNames()
+                : ImmutableList.<PName>of());
+        } else {
+            builder.setPhysicalNames(ImmutableList.copyOf(physicalTables));
+        }
+        if (!setPhysicalName && oldTable != null) {
+            builder.setPhysicalTableName(oldTable.getPhysicalName());
+        }
+
+        builder.setExcludedColumns(ImmutableList.<PColumn>of());
+        builder.setBaseTableLogicalName(parentLogicalName != null ?
+            parentLogicalName : oldTable != null ? oldTable.getBaseTableLogicalName() : null);
+        builder.setParentTableName(parentTableName != null ?
+            parentTableName : oldTable != null ? oldTable.getParentTableName() : null);
+        builder.setParentSchemaName(parentSchemaName != null ? parentSchemaName :
+            oldTable != null ? oldTable.getParentSchemaName() : null);
+
+        builder.addOrSetColumns(columns);
         // Avoid querying the stats table because we're holding the rowLock here. Issuing an RPC to a remote
         // server while holding this lock is a bad idea and likely to cause contention.
-        return new PTableImpl.Builder()
-                .setType(tableType)
-                .setState(indexState)
-                .setTimeStamp(timeStamp)
-                .setIndexDisableTimestamp(indexDisableTimestamp)
-                .setSequenceNumber(tableSeqNum)
-                .setImmutableRows(isImmutableRows)
-                .setViewStatement(viewStatement)
-                .setDisableWAL(disableWAL)
-                .setMultiTenant(multiTenant)
-                .setStoreNulls(storeNulls)
-                .setViewType(viewType)
-                .setViewIndexIdType(viewIndexIdType)
-                .setViewIndexId(viewIndexId)
-                .setIndexType(indexType)
-                .setTransactionProvider(transactionProvider)
-                .setUpdateCacheFrequency(updateCacheFrequency)
-                .setNamespaceMapped(isNamespaceMapped)
-                .setAutoPartitionSeqName(autoPartitionSeq)
-                .setAppendOnlySchema(isAppendOnlySchema)
-                .setImmutableStorageScheme(storageScheme == null ?
-                        ImmutableStorageScheme.ONE_CELL_PER_COLUMN : storageScheme)
-                .setQualifierEncodingScheme(encodingScheme == null ?
-                        QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : encodingScheme)
-                .setBaseColumnCount(baseColumnCount)
-                .setEncodedCQCounter(cqCounter)
-                .setUseStatsForParallelization(useStatsForParallelization)
-                .setPhoenixTTL(phoenixTTL)
-                .setPhoenixTTLHighWaterMark(phoenixTTLHWM)
-                .setExcludedColumns(ImmutableList.<PColumn>of())
-                .setTenantId(tenantId)
-                .setSchemaName(schemaName)
-                .setTableName(tableName)
-                .setPhysicalTableName(physicalTableName)
-                .setPkName(pkName)
-                .setDefaultFamilyName(defaultFamilyName)
-                .setRowKeyOrderOptimizable(rowKeyOrderOptimizable)
-                .setBucketNum(saltBucketNum)
-                .setIndexes(indexes == null ? Collections.<PTable>emptyList() : indexes)
-                .setParentSchemaName(parentSchemaName)
-                .setParentTableName(parentTableName)
-                .setBaseTableLogicalName(parentLogicalName)
-                .setPhysicalNames(physicalTables == null ?
-                        ImmutableList.<PName>of() : ImmutableList.copyOf(physicalTables))
-                .setViewModifiedUpdateCacheFrequency(viewModifiedUpdateCacheFrequency)
-                .setViewModifiedUseStatsForParallelization(viewModifiedUseStatsForParallelization)
-                .setViewModifiedPhoenixTTL(viewModifiedPhoenixTTL)
-                .setLastDDLTimestamp(lastDDLTimestamp)
-                .setIsChangeDetectionEnabled(isChangeDetectionEnabled)
-                .setSchemaVersion(schemaVersion)
-                .setColumns(columns)
-                .build();
+        return builder.build();
     }
 
     private Long getViewIndexId(Cell[] tableKeyValues, PDataType viewIndexIdType) {
@@ -1854,6 +2006,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
             ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
             long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
+            boolean isChangeDetectionEnabled = MetaDataUtil.getChangeDetectionEnabled(tableMetadata);
+
             PTable table = null;
             // Get as of latest timestamp so we can detect if we have a newer table that already
             // exists without making an additional query
@@ -2171,7 +2325,25 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 if (MetaDataUtil.isTableTypeDirectlyQueried(tableType)) {
                     tableMetadata.add(MetaDataUtil.getLastDDLTimestampUpdate(tableKey,
                         clientTimeStamp, EnvironmentEdgeManager.currentTimeMillis()));
+
+                    //and if we're doing change detection on this table or view, notify the
+                    //external schema registry and get its schema id
+                    if (isChangeDetectionEnabled) {
+                        try {
+                            exportSchema(tableMetadata, tableKey, clientTimeStamp, clientVersion, null);
+                        } catch (IOException ie){
+                            //If we fail to write to the schema registry, fail the entire
+                            //CREATE TABLE or VIEW operation so we stay consistent
+                            LOGGER.error("Error writing schema to external schema registry", ie);
+                            builder.setReturnCode(
+                                MetaDataProtos.MutationCode.ERROR_WRITING_TO_SCHEMA_REGISTRY);
+                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                            done.run(builder.build());
+                            return;
+                        }
+                    }
                 }
+
                 // When we drop a view we first drop the view metadata and then drop the parent->child linking row
                 List<Mutation> localMutations =
                         Lists.newArrayListWithExpectedSize(tableMetadata.size());
@@ -2240,6 +2412,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 if (newTable != null) {
                     builder.setTable(PTableImpl.toProto(newTable));
                 }
+
                 done.run(builder.build());
             } finally {
                 releaseRowLocks(region, locks);
@@ -2251,6 +2424,47 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         }
     }
 
+    private void exportSchema(List<Mutation> tableMetadata, byte[] tableKey, long clientTimestamp,
+                              int clientVersion, PTable oldTable) throws SQLException, IOException {
+        List<Cell> tableCellList = MetaDataUtil.getTableCellsFromMutations(tableMetadata);
+
+        List<List<Cell>> allColumnsCellList = MetaDataUtil.getColumnAndLinkCellsFromMutations(tableMetadata);
+        //getTableFromCells assumes the Cells are sorted as they would be when reading from HBase
+        Collections.sort(tableCellList, KeyValue.COMPARATOR);
+        for (List<Cell> columnCellList : allColumnsCellList) {
+            Collections.sort(columnCellList, KeyValue.COMPARATOR);
+        }
+
+        PTable newTable = getTableFromCells(tableCellList, allColumnsCellList, clientTimestamp,
+            clientVersion, oldTable);
+        PTable parentTable = null;
+        //if this is a view, we need to get the columns from its parent table / view
+        if (newTable != null && newTable.getType().equals(PTableType.VIEW)) {
+            try (PhoenixConnection conn = (PhoenixConnection)
+                ConnectionUtil.getInputConnection(env.getConfiguration())) {
+                newTable = ViewUtil.addDerivedColumnsAndIndexesFromAncestors(conn, newTable);
+            }
+        }
+        Configuration conf = env.getConfiguration();
+        SchemaRegistryRepository exporter = SchemaRegistryRepositoryFactory.
+            getSchemaRegistryRepository(conf);
+        if (exporter != null) {
+            SchemaWriter schemaWriter = SchemaWriterFactory.getSchemaWriter(conf);
+            //we export to an external schema registry, then put the schema id
+            //to lookup the schema in the registry into SYSTEM.CATALOG so we
+            //can look it up later (and use it in WAL annotations)
+
+            //Note that if we succeed here but the write to SYSTEM.CATALOG fails,
+            //we can have "orphaned" rows in the schema registry because there's
+            //no way to make this fully atomic.
+            String externalSchemaId =
+                exporter.exportSchema(schemaWriter, newTable);
+            tableMetadata.add(MetaDataUtil.getExternalSchemaIdUpdate(tableKey,
+                externalSchemaId));
+
+        }
+    }
+
     private long getViewIndexSequenceValue(PhoenixConnection connection, String tenantIdStr, PTable parentTable) throws SQLException {
         int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
         // parentTable is parent of the view index which is the view. View table name is _IDX_+logical name of base table
@@ -2986,6 +3200,16 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     }
                 }
 
+                if (table.isChangeDetectionEnabled() || MetaDataUtil.getChangeDetectionEnabled(tableMetadata)) {
+                    try {
+                        exportSchema(tableMetadata, key, clientTimeStamp, clientVersion, table);
+                    } catch (Exception e) {
+                        LOGGER.error("Error writing to schema registry", e);
+                        result = new MetaDataMutationResult(MutationCode.ERROR_WRITING_TO_SCHEMA_REGISTRY,
+                            EnvironmentEdgeManager.currentTimeMillis(), table);
+                        return result;
+                    }
+                }
                 Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                         GlobalCache.getInstance(this.env).getMetaDataCache();
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 6d9f09f..9d002c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -95,7 +95,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = MIN_TABLE_TIMESTAMP + 28;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0 = MIN_TABLE_TIMESTAMP + 29;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = MIN_TABLE_TIMESTAMP + 33;
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 = MIN_TABLE_TIMESTAMP + 35;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 = MIN_TABLE_TIMESTAMP + 36;
     // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0;
     // Version below which we should disallow usage of mutable secondary indexing.
@@ -181,7 +181,8 @@ public abstract class MetaDataProtocol extends MetaDataService {
         UNABLE_TO_UPDATE_PARENT_TABLE,
         UNABLE_TO_DELETE_CHILD_LINK,
         UNABLE_TO_UPSERT_TASK,
-        NO_OP
+        ERROR_WRITING_TO_SCHEMA_REGISTRY,
+        NO_OP,
     }
 
   public static class SharedTableState {
@@ -382,7 +383,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
 
         public static MetaDataMutationResult constructFromProto(MetaDataResponse proto) {
           MetaDataMutationResult result = new MetaDataMutationResult();
-          result.returnCode = MutationCode.values()[proto.getReturnCode().ordinal()];
+          result.returnCode = MutationCode.values()[proto.getReturnCode().getNumber()];
           result.mutationTime = proto.getMutationTime();
           if (proto.hasTable()) {
             result.wasUpdated = true;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index 73eb93f..4f4ddba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -658,19 +658,10 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
 
     private void annotateDataMutations(UngroupedAggregateRegionObserver.MutationList mutationsList,
                                        Scan scan) {
-        byte[] tenantId =
-            scan.getAttribute(MutationState.MutationMetadataType.TENANT_ID.toString());
-        byte[] schemaName =
-            scan.getAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString());
-        byte[] logicalTableName =
-            scan.getAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString());
-        byte[] tableType =
-            scan.getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString());
-        byte[] ddlTimestamp =
-            scan.getAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString());
-
+        byte[] externalSchemaRegistryId = scan.getAttribute(
+            MutationState.MutationMetadataType.EXTERNAL_SCHEMA_ID.toString());
         for (Mutation m : mutationsList) {
-            annotateMutation(m, tenantId, schemaName, logicalTableName, tableType, ddlTimestamp);
+            annotateMutation(m, externalSchemaRegistryId);
         }
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index b6e088b..2e0c55f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -561,7 +561,9 @@ public enum SQLExceptionCode {
         "CASCADE INDEX feature is not supported for local index"),
 
     INVALID_REGION_SPLIT_POLICY(908, "43M19",
-        "REGION SPLIT POLICY is incorrect.");
+        "REGION SPLIT POLICY is incorrect."),
+    ERROR_WRITING_TO_SCHEMA_REGISTRY(909, "4320",
+            "Error writing DDL change to external schema registry");
 
     private final int errorCode;
     private final String sqlState;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 78eca85..4f67764 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -780,12 +780,13 @@ public class MutationState implements SQLCloseable {
         byte[] schemaName = table.getSchemaName() != null ? table.getSchemaName().getBytes() : null;
         byte[] tableName = table.getTableName() != null ? table.getTableName().getBytes() : null;
         byte[] tableType = table.getType().getValue().getBytes();
+        byte[] externalSchemaRegistryId = table.getExternalSchemaId() != null ?
+            Bytes.toBytes(table.getExternalSchemaId()) : null;
         //Note that we use the _HBase_ byte encoding for a Long, not the Phoenix one, so that
         //downstream consumers don't need to have the Phoenix codecs.
         byte[] lastDDLTimestamp =
             table.getLastDDLTimestamp() != null ? Bytes.toBytes(table.getLastDDLTimestamp()) : null;
-        WALAnnotationUtil.annotateMutation(mutation, tenantId, schemaName,
-            tableName, tableType, lastDDLTimestamp);
+        WALAnnotationUtil.annotateMutation(mutation, externalSchemaRegistryId);
     }
 
     /**
@@ -1047,11 +1048,17 @@ public class MutationState implements SQLCloseable {
     }
 
     public enum MutationMetadataType {
+        @Deprecated
         TENANT_ID,
+        @Deprecated
         SCHEMA_NAME,
+        @Deprecated
         LOGICAL_TABLE_NAME,
+        @Deprecated
         TIMESTAMP,
-        TABLE_TYPE
+        @Deprecated
+        TABLE_TYPE,
+        EXTERNAL_SCHEMA_ID
     }
 
     private static class TableInfo {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 05fb75a..36322e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -379,6 +379,9 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final String SCHEMA_VERSION = "SCHEMA_VERSION";
     public static final byte[] SCHEMA_VERSION_BYTES = Bytes.toBytes(SCHEMA_VERSION);
 
+    public static final String EXTERNAL_SCHEMA_ID = "EXTERNAL_SCHEMA_ID";
+    public static final byte[] EXTERNAL_SCHEMA_ID_BYTES = Bytes.toBytes(EXTERNAL_SCHEMA_ID);
+
     public static final String SYSTEM_CHILD_LINK_TABLE = "CHILD_LINK";
     public static final String SYSTEM_CHILD_LINK_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CHILD_LINK_TABLE);
     public static final byte[] SYSTEM_CHILD_LINK_NAME_BYTES = Bytes.toBytes(SYSTEM_CHILD_LINK_NAME);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 14d182d..820209a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -3887,13 +3887,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
         if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0) {
             metaConnection = addColumnsIfNotExists(metaConnection,
-                    PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 -1,
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 -2,
                     PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME
                             + " " + PVarchar.INSTANCE.getSqlTypeName());
 
             metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                    MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0,
+                    MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 -1,
                     PhoenixDatabaseMetaData.SCHEMA_VERSION + " " + PVarchar.INSTANCE.getSqlTypeName());
+            metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0,
+                PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID + " " + PVarchar.INSTANCE.getSqlTypeName());
         }
         return metaConnection;
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 3424fd5..c608e51 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -69,6 +69,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT;
@@ -316,6 +317,7 @@ public interface QueryConstants {
             LAST_DDL_TIMESTAMP + " BIGINT, \n" +
             CHANGE_DETECTION_ENABLED + " BOOLEAN, \n" +
             SCHEMA_VERSION + " VARCHAR, \n" +
+            EXTERNAL_SCHEMA_ID + " VARCHAR, \n" +
             // Column metadata (will be null for table row)
             DATA_TYPE + " INTEGER," +
             COLUMN_SIZE + " INTEGER," +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 9f2f980..10f8287 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -375,6 +375,11 @@ public class DelegateTable implements PTable {
         return delegate.getSchemaVersion();
     }
 
+    @Override
+    public String getExternalSchemaId() {
+        return delegate.getExternalSchemaId();
+    }
+
     @Override public Map<String, String> getPropertyValues() { return delegate.getPropertyValues(); }
 
     @Override public Map<String, String> getDefaultPropertyValues() { return delegate.getDefaultPropertyValues(); }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index ff1035b..65e31f8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -17,13 +17,12 @@
  */
 package org.apache.phoenix.schema;
 
+import static org.apache.phoenix.exception.SQLExceptionCode.*;
 import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSet;
 import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSetWithExpectedSize;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB;
 import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.INDEX_NAME;
 import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.REBUILD_ALL;
-import static org.apache.phoenix.exception.SQLExceptionCode.INSUFFICIENT_MULTI_TENANT_COLUMNS;
-import static org.apache.phoenix.exception.SQLExceptionCode.PARENT_TABLE_NOT_FOUND;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
@@ -3229,6 +3228,8 @@ public class MetaDataClient {
                                 result.getTable().getLastDDLTimestamp() : null)
                         .setIsChangeDetectionEnabled(isChangeDetectionEnabledProp)
                         .setSchemaVersion(schemaVersion)
+                        .setExternalSchemaId(result.getTable() != null ?
+                        result.getTable().getExternalSchemaId() : null)
                         .build();
                 result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
                 addTableToCache(result);
@@ -3349,6 +3350,9 @@ public class MetaDataClient {
                 throwsSQLExceptionUtil(String.valueOf(code), SchemaUtil.getSchemaNameFromFullName(
                 parent.getPhysicalName().getString()),SchemaUtil.getTableNameFromFullName(
                 parent.getPhysicalName().getString()));
+            case ERROR_WRITING_TO_SCHEMA_REGISTRY:
+                throw new SQLExceptionInfo.Builder(ERROR_WRITING_TO_SCHEMA_REGISTRY)
+                    .setSchemaName(schemaName).setTableName(tableName).build().buildException();
             default:
                 // Cannot use SQLExecptionInfo here since not all mutation codes have their
                 // corresponding codes in the enum SQLExceptionCode
@@ -3649,6 +3653,9 @@ public class MetaDataClient {
             .setSchemaName(schemaName).setTableName(tableName).build().buildException();
         case TABLE_ALREADY_EXISTS:
             break;
+        case ERROR_WRITING_TO_SCHEMA_REGISTRY:
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY).
+                    setSchemaName(schemaName).setTableName(tableName).build().buildException();
         default:
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNEXPECTED_MUTATION_CODE).setSchemaName(schemaName)
             .setTableName(tableName).setMessage("mutation code: " + mutationCode).build().buildException();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 2411ba5..dfa92da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -854,6 +854,12 @@ public interface PTable extends PMetaDataEntity {
     String getSchemaVersion();
 
     /**
+     * @return String provided by an external schema registry to be used to lookup the schema for
+     * a Phoenix table or view in the registry.
+     */
+    String getExternalSchemaId();
+
+    /**
      * Class to help track encoded column qualifier counters per column family.
      */
     public class EncodedCQCounter {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 1f22976..70efc26 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -71,7 +71,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -204,6 +203,7 @@ public class PTableImpl implements PTable {
     private final boolean isChangeDetectionEnabled;
     private Map<String, String> propertyValues;
     private String schemaVersion;
+    private String externalSchemaId;
 
     public static class Builder {
         private PTableKey key;
@@ -266,6 +266,7 @@ public class PTableImpl implements PTable {
         private boolean isChangeDetectionEnabled = false;
         private Map<String, String> propertyValues = new HashMap<>();
         private String schemaVersion;
+        private String externalSchemaId;
 
         // Used to denote which properties a view has explicitly modified
         private BitSet viewModifiedPropSet = new BitSet(3);
@@ -524,6 +525,10 @@ public class PTableImpl implements PTable {
             if (physicalTableName != null) {
                 propertyValues.put(PHYSICAL_TABLE_NAME, String.valueOf(physicalTableName));
             }
+            if (this.physicalTableName.equals(PName.EMPTY_NAME) && physicalTableName == null) {
+                //don't override a "blank" PName with null.
+                return this;
+            }
             this.physicalTableName = physicalTableName;
             return this;
         }
@@ -615,6 +620,32 @@ public class PTableImpl implements PTable {
             return this;
         }
 
+        public Builder addOrSetColumns(Collection<PColumn> changedColumns) {
+            if (this.columns == null || this.columns.size() == 0) {
+                //no need to merge, just take the changes as the complete set of PColumns
+                this.columns = changedColumns;
+            } else {
+                //We have to merge the old and new columns, keeping the columns in the original order
+                List<PColumn> existingColumnList = Lists.newArrayList(this.columns);
+                List<PColumn> columnsToAdd = Lists.newArrayList();
+                //create a new list that's almost a copy of this.columns, but everywhere there's
+                //a "newer" PColumn of an existing column in the parameter, replace it with the
+                //newer version
+                for (PColumn newColumn : changedColumns) {
+                    int indexOf = existingColumnList.indexOf(newColumn);
+                    if (indexOf != -1) {
+                        existingColumnList.set(indexOf, newColumn);
+                    } else {
+                        columnsToAdd.add(newColumn);
+                    }
+                }
+                //now tack on any completely new columns at the end
+                existingColumnList.addAll(columnsToAdd);
+                this.columns = existingColumnList;
+            }
+            return this;
+        }
+
         public Builder setLastDDLTimestamp(Long lastDDLTimestamp) {
             this.lastDDLTimestamp = lastDDLTimestamp;
             return this;
@@ -634,6 +665,13 @@ public class PTableImpl implements PTable {
             return this;
         }
 
+        public Builder setExternalSchemaId(String externalSchemaId) {
+            if (externalSchemaId != null) {
+                this.externalSchemaId = externalSchemaId;
+            }
+            return this;
+         }
+
         /**
          * Populate derivable attributes of the PTable
          * @return PTableImpl.Builder object
@@ -902,6 +940,7 @@ public class PTableImpl implements PTable {
         this.lastDDLTimestamp = builder.lastDDLTimestamp;
         this.isChangeDetectionEnabled = builder.isChangeDetectionEnabled;
         this.schemaVersion = builder.schemaVersion;
+        this.externalSchemaId = builder.externalSchemaId;
     }
 
     // When cloning table, ignore the salt column as it will be added back in the constructor
@@ -925,7 +964,7 @@ public class PTableImpl implements PTable {
      * Get a PTableImpl.Builder from an existing PTable
      * @param table Original PTable
      */
-    private static PTableImpl.Builder builderFromExisting(PTable table) {
+    public static PTableImpl.Builder builderFromExisting(PTable table) {
         return new PTableImpl.Builder()
                 .setType(table.getType())
                 .setState(table.getIndexState())
@@ -978,7 +1017,8 @@ public class PTableImpl implements PTable {
                 .setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark())
                 .setLastDDLTimestamp(table.getLastDDLTimestamp())
                 .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
-                .setSchemaVersion(table.getSchemaVersion());
+                .setSchemaVersion(table.getSchemaVersion())
+                .setExternalSchemaId(table.getExternalSchemaId());
     }
 
     @Override
@@ -1875,6 +1915,11 @@ public class PTableImpl implements PTable {
         if (table.hasSchemaVersion()) {
             schemaVersion = (String) PVarchar.INSTANCE.toObject(table.getSchemaVersion().toByteArray());
         }
+        String externalSchemaId = null;
+        if (table.hasExternalSchemaId()) {
+            externalSchemaId =
+                (String) PVarchar.INSTANCE.toObject(table.getExternalSchemaId().toByteArray());
+        }
         try {
             return new PTableImpl.Builder()
                     .setType(tableType)
@@ -1929,6 +1974,7 @@ public class PTableImpl implements PTable {
                     .setLastDDLTimestamp(lastDDLTimestamp)
                     .setIsChangeDetectionEnabled(isChangeDetectionEnabled)
                     .setSchemaVersion(schemaVersion)
+                    .setExternalSchemaId(externalSchemaId)
                     .build();
         } catch (SQLException e) {
             throw new RuntimeException(e); // Impossible
@@ -2056,6 +2102,7 @@ public class PTableImpl implements PTable {
         }
         builder.setChangeDetectionEnabled(table.isChangeDetectionEnabled());
         builder.setSchemaVersion(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getSchemaVersion())));
+        builder.setExternalSchemaId(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getExternalSchemaId())));
         return builder.build();
     }
 
@@ -2188,6 +2235,11 @@ public class PTableImpl implements PTable {
         return schemaVersion;
     }
 
+    @Override
+    public String getExternalSchemaId() {
+        return externalSchemaId;
+    }
+
     private static final class KVColumnFamilyQualifier {
         @Nonnull
         private final String colFamilyName;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/DefaultSchemaRegistryRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/DefaultSchemaRegistryRepository.java
new file mode 100644
index 0000000..35b514b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/DefaultSchemaRegistryRepository.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.export;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Default in-memory implementation of SchemaRegistryRepository. Not intended for production use
+ */
+public class DefaultSchemaRegistryRepository implements SchemaRegistryRepository {
+    public static final String DEFAULT_SCHEMA_NAME = "default_schema";
+    public static final String DEFAULT_TENANT_ID = "global";
+
+    private static final String SEPARATOR = "*";
+
+    Map<String, String> schemaMap = new HashMap<String, String>();
+
+    @Override
+    public void init(Configuration conf) throws IOException {
+
+    }
+
+    @Override
+    public String exportSchema(SchemaWriter writer, PTable table) throws IOException {
+        String schemaId = getSchemaId(table);
+        schemaMap.put(schemaId, writer.exportSchema(table));
+        return schemaId;
+    }
+
+    @Override
+    public String getSchemaById(String schemaId) {
+        return schemaMap.get(schemaId);
+    }
+
+    @Override
+    public String getSchemaByTable(PTable table) {
+        return schemaMap.get(getSchemaId(table));
+    }
+
+    @Override
+    public void close() throws IOException {
+        schemaMap.clear();
+    }
+
+    public static String getSchemaId(PTable table) {
+        String schemaMetadataName = getSchemaMetadataName(table);
+        String version = table.getSchemaVersion() != null ? table.getSchemaVersion() :
+            table.getLastDDLTimestamp() != null ? table.getLastDDLTimestamp().toString() :
+                Long.toString(EnvironmentEdgeManager.currentTimeMillis());
+
+        //tenant*schema*table*version-id
+        return String.format("%s" + SEPARATOR + "%s", schemaMetadataName,
+            version);
+    }
+
+    private static String getSchemaMetadataName(PTable table) {
+        String schemaGroup = getSchemaGroup(table);
+        return schemaGroup + SEPARATOR + table.getTableName().getString();
+    }
+
+    private static String getSchemaGroup(PTable table) {
+        String tenantId = (table.getTenantId() != null) ? table.getTenantId().getString() :
+            DEFAULT_TENANT_ID;
+        String schemaName = table.getSchemaName().getString();
+        if (schemaName == null) {
+            schemaName = DEFAULT_SCHEMA_NAME;
+        }
+        return tenantId + SEPARATOR + schemaName;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/DefaultSchemaWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/DefaultSchemaWriter.java
new file mode 100644
index 0000000..f9930cd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/DefaultSchemaWriter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.export;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+
+import java.io.IOException;
+
+public class DefaultSchemaWriter implements SchemaWriter {
+    @Override
+    public void init(Configuration conf) throws IOException {
+
+    }
+
+    @Override
+    public String exportSchema(PTable table) throws IOException {
+        return PTableImpl.toProto(table).toString();
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaImporter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaImporter.java
new file mode 100644
index 0000000..89c0fdd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaImporter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.export;
+
+import org.apache.phoenix.schema.PTable;
+
+import java.io.IOException;
+
+/**
+ * Interface for importing schemas stored externally from Phoenix into Phoenix by converting the
+ * schema into a PTable
+ */
+public interface SchemaImporter {
+
+    /**
+     *
+     * @param schema String form of an external schema. The expected format of the schema depends
+     *               on the implementation of the class.
+     * @return a Phoenix PTable
+     */
+    PTable getTableFromSchema(String schema) throws IOException;
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaRegistryRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaRegistryRepository.java
new file mode 100644
index 0000000..52a26e2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaRegistryRepository.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.export;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.schema.PTable;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Interface for exporting a Phoenix object (e.g a table or view) to an external schema repository
+ * The choice of which schema repository, and the transport mechanism, are deferred to
+ * implementing classes
+ */
+public interface SchemaRegistryRepository extends Closeable {
+
+    public static final String SCHEMA_WRITER_IMPL_KEY =
+        "org.apache.phoenix.export.schemawriter.impl";
+    public static final String SCHEMA_REGISTRY_IMPL_KEY =
+            "org.apache.phoenix.export.schemaregistry.impl";
+
+    /**
+     * Optional method for any necessary bootstrapping to connect to the external schema registry
+     * @param conf Configuration object with necessary parameters to talk to the external schema
+     *             registry
+     * @throws IOException Exception if something goes wrong in connecting to the registry
+     */
+    void init(Configuration conf) throws IOException;
+
+    /**
+     * Export a Phoenix PTable into an external schema registry by reformatting it into a suitable
+     * form.
+     * @param writer An object which can translate a PTable into a String suitable for the external
+     *               schema registry
+     * @param table a Phoenix PTable for a table or view
+     * @returns Schema id generated by the schema registry, represented as a string.
+     * @throws IOException Exception if something goes wrong in constructing or sending the schema
+     */
+    String exportSchema(SchemaWriter writer, PTable table) throws IOException;
+
+    /**
+     * Return a schema from an external schema repository by its unique identifier
+     * @param schemaId schema identifier
+     * @return a schema
+     */
+    String getSchemaById(String schemaId);
+
+    /**
+     * Return a schema from an external schema repository using information on a PTable
+     * @param table a Phoenix PTable for a table or view
+     * @return a schema
+     */
+    String getSchemaByTable(PTable table);
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaRegistryRepositoryFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaRegistryRepositoryFactory.java
new file mode 100644
index 0000000..d20a487
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaRegistryRepositoryFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.export;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public final class SchemaRegistryRepositoryFactory {
+    private static final Logger LOGGER = LoggerFactory.getLogger(SchemaRegistryRepository.class);
+    private static SchemaRegistryRepository exporter;
+
+    public synchronized static SchemaRegistryRepository getSchemaRegistryRepository(Configuration conf)
+            throws IOException {
+        if (exporter != null) {
+            return exporter;
+        }
+        try {
+            String className = conf.get(SchemaRegistryRepository.SCHEMA_REGISTRY_IMPL_KEY);
+            if (className == null) {
+                exporter = new DefaultSchemaRegistryRepository();
+            } else {
+                Class<SchemaRegistryRepository> clazz =
+                    (Class<SchemaRegistryRepository>) Class.forName(className);
+                exporter = clazz.newInstance();
+            }
+            exporter.init(conf);
+            return exporter;
+        } catch (Exception e) {
+            LOGGER.error("Error constructing SchemaRegistryExporter object", e);
+            if (exporter != null) {
+                try {
+                    exporter.close();
+                    exporter = null;
+                } catch (IOException innerE) {
+                    LOGGER.error("Error closing incorrectly constructed SchemaRegistryExporter", e);
+                }
+            }
+            throw new IOException(e);
+        }
+    }
+
+    public synchronized static void close() throws IOException {
+        if (exporter != null) {
+            exporter.close();
+            exporter = null;
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaWriter.java
new file mode 100644
index 0000000..bef6352
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.export;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.schema.PTable;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Interface for classes to implement for converting Phoenix schema data, captured in a PTable, into
+ * some other format interpretable by an external system. Possible implementing classes would include
+ * converters to Avro, Protobuf, Thrift, various dialects of SQL,
+ * or other similar cross-platform data description languages.
+ */
+public interface SchemaWriter extends Closeable {
+    public static final String SCHEMA_REGISTRY_IMPL_KEY =
+            "org.apache.phoenix.export.schemawriter.impl";
+
+    /**
+     * Initialize the schema writer with appropriate configuration
+     * @param conf a Configuration object
+     * @throws IOException if something goes wrong during initialization
+     */
+    void init(Configuration conf) throws IOException;
+
+    /**
+     * Given a Phoenix PTable, output a schema document readable by some external system.
+     * @param table A Phoenix PTable describing a table or view
+     * @return a String interpretable as a data format schema in an external system
+     */
+    String exportSchema(PTable table) throws IOException;
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaWriterFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaWriterFactory.java
new file mode 100644
index 0000000..a47709b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/export/SchemaWriterFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.schema.export;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+public class SchemaWriterFactory {
+    public static SchemaWriter getSchemaWriter(Configuration conf)
+            throws IOException {
+        try {
+            String className = conf.get(SchemaRegistryRepository.SCHEMA_WRITER_IMPL_KEY);
+            if (className == null) {
+                return new DefaultSchemaWriter();
+            }
+            Class<SchemaWriter> clazz =
+                    (Class<SchemaWriter>) Class.forName(className);
+            return clazz.newInstance();
+        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
+                ClassCastException e) {
+            throw new IOException(e);
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index 5fdc9f9..1ae532b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -17,8 +17,7 @@
  */
 package org.apache.phoenix.util;
 
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.*;
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
 
 import java.io.IOException;
@@ -26,6 +25,10 @@ import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
 import java.util.*;
 
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PTinyint;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.conf.Configuration;
@@ -126,6 +129,17 @@ public class MetaDataUtil {
         return p;
     }
 
+    public static Put getExternalSchemaIdUpdate(byte[] tableHeaderRowKey,
+                                                String externalSchemaId) {
+        //use client timestamp as the timestamp of the Cell, to match the other Cells that might
+        // be created by this DDL. But the actual value will be a _server_ timestamp
+        Put p = new Put(tableHeaderRowKey);
+        byte[] externalSchemaIdBytes = PVarchar.INSTANCE.toBytes(externalSchemaId);
+        p.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+            PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID_BYTES, externalSchemaIdBytes);
+        return p;
+    }
+
     /**
      * Checks if a table is meant to be queried directly (and hence is relevant to external
      * systems tracking Phoenix schema)
@@ -420,7 +434,7 @@ public class MetaDataUtil {
         List<Cell> kvs = tableMutation.getFamilyCellMap().get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES);
         if (kvs != null) {
             for (Cell kv : kvs) { // list is not ordered, so search. TODO: we could potentially assume the position
-                if (Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, 0, PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES.length) == 0) {
+                if (isSequenceNumber(kv)) {
                     return PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
                 }
             }
@@ -431,7 +445,21 @@ public class MetaDataUtil {
     public static long getSequenceNumber(List<Mutation> tableMetaData) {
         return getSequenceNumber(getPutOnlyTableHeaderRow(tableMetaData));
     }
-    
+
+    public static boolean isSequenceNumber(Mutation m) {
+        boolean foundSequenceNumber = false;
+        for (Cell kv : m.getFamilyCellMap().get(TABLE_FAMILY_BYTES)) {
+            if (isSequenceNumber(kv)) {
+                foundSequenceNumber = true;
+                break;
+            }
+        }
+        return foundSequenceNumber;
+    }
+    public static boolean isSequenceNumber(Cell kv) {
+        return CellUtil.matchingQualifier(kv, PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES);
+    }
+
     public static PTableType getTableType(List<Mutation> tableMetaData, KeyValueBuilder builder,
       ImmutableBytesWritable value) {
         if (getMutationValue(getPutOnlyTableHeaderRow(tableMetaData),
@@ -523,6 +551,52 @@ public class MetaDataUtil {
         return false;
     }
 
+    public static List<Cell> getTableCellsFromMutations(List<Mutation> tableMetaData) {
+        List<Cell> tableCells = Lists.newArrayList();
+        byte[] tableKey = tableMetaData.get(0).getRow();
+        for (int k = 0; k < tableMetaData.size(); k++) {
+            Mutation m = tableMetaData.get(k);
+            if (Bytes.equals(m.getRow(), tableKey)) {
+                tableCells.addAll(getCellList(m));
+            }
+        }
+        return tableCells;
+    }
+
+    public static List<List<Cell>> getColumnAndLinkCellsFromMutations(List<Mutation> tableMetaData) {
+        //skip the first mutation because it's the table header row with table-specific information
+        //all the rest of the mutations are either from linking rows or column definition rows
+        List<List<Cell>> allColumnsCellList = Lists.newArrayList();
+        byte[] tableKey = tableMetaData.get(0).getRow();
+        for (int k = 1; k < tableMetaData.size(); k++) {
+            Mutation m = tableMetaData.get(k);
+            //filter out mutations for the table header row and TABLE_SEQ_NUM and parent table
+            //rows such as a view's column qualifier count
+            if (!Bytes.equals(m.getRow(), tableKey) && !(!isLinkType(m) && isSequenceNumber(m)
+                && !isParentTableColumnQualifierCounter(m, tableKey))) {
+                List<Cell> listToAdd = getCellList(m);
+                if (listToAdd != null && listToAdd.size() > 0) {
+                    allColumnsCellList.add(listToAdd);
+                }
+            }
+        }
+        return allColumnsCellList;
+    }
+
+    private static List<Cell> getCellList(Mutation m) {
+        List<Cell> cellList = Lists.newArrayList();
+        for (Cell c : m.getFamilyCellMap().get(TABLE_FAMILY_BYTES)) {
+            //Mutations will mark NULL columns as deletes, whereas when we read
+            //from HBase we just won't get Cells for those columns. To use Mutation cells
+            //with code expecting Cells read from HBase results, we have to purge those
+            //Delete mutations
+            if (c != null && !CellUtil.isDelete(c)) {
+                cellList.add(c);
+            }
+        }
+        return cellList;
+    }
+
     /**
      * Returns the first Put element in <code>tableMetaData</code>. There could be leading Delete elements before the
      * table header row
@@ -902,11 +976,10 @@ public class MetaDataUtil {
     public static LinkType getLinkType(Collection<Cell> kvs) {
         if (kvs != null) {
             for (Cell kv : kvs) {
-                if (Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
-                    PhoenixDatabaseMetaData.LINK_TYPE_BYTES, 0,
-                    PhoenixDatabaseMetaData.LINK_TYPE_BYTES.length) == 0) { return LinkType
-                    .fromSerializedValue(PUnsignedTinyint.INSTANCE.getCodec().decodeByte(kv.getValueArray(),
-                        kv.getValueOffset(), SortOrder.getDefault())); }
+                if (isLinkType(kv))  {
+                    return LinkType.fromSerializedValue(PUnsignedTinyint.INSTANCE.getCodec().
+                        decodeByte(kv.getValueArray(), kv.getValueOffset(),
+                        SortOrder.getDefault())); }
             }
         }
         return null;
@@ -917,6 +990,48 @@ public class MetaDataUtil {
         return false;
     }
 
+    public static boolean isLinkType(Cell kv) {
+        return CellUtil.matchingQualifier(kv, PhoenixDatabaseMetaData.LINK_TYPE_BYTES);
+    }
+
+    public static boolean isLinkType(Mutation m) {
+        boolean foundLinkType = false;
+        for (Cell kv : m.getFamilyCellMap().get(TABLE_FAMILY_BYTES)) {
+            if (isLinkType(kv)) {
+                foundLinkType = true;
+                break;
+            }
+        }
+        return foundLinkType;
+    }
+
+    public static boolean isParentTableColumnQualifierCounter(Mutation m, byte[] tableRow) {
+        boolean foundCQCounter = false;
+        for (Cell kv : m.getFamilyCellMap().get(TABLE_FAMILY_BYTES)) {
+            if (isParentTableColumnQualifierCounter(kv, tableRow)) {
+                foundCQCounter = true;
+                break;
+            }
+        }
+        return foundCQCounter;
+    }
+    public static boolean isParentTableColumnQualifierCounter(Cell kv, byte[] tableRow) {
+        byte[][] tableRowKeyMetaData = new byte[5][];
+        getVarChars(tableRow, tableRowKeyMetaData);
+        byte[] tableName = tableRowKeyMetaData[TABLE_NAME_INDEX];
+
+        byte[][] columnRowKeyMetaData = new byte[5][];
+        int nColumns = getVarChars(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+            0, columnRowKeyMetaData);
+        if (nColumns == 5) {
+            byte[] columnTableName = columnRowKeyMetaData[TABLE_NAME_INDEX];
+            if (!Bytes.equals(tableName, columnTableName)) {
+                return CellUtil.matchingQualifier(kv, COLUMN_QUALIFIER_BYTES);
+            }
+        }
+        return false;
+    }
+
     public static boolean isViewIndex(String physicalName) {
         if (physicalName.contains(QueryConstants.NAMESPACE_SEPARATOR)) {
             return SchemaUtil.getTableNameFromFullName(physicalName).startsWith(VIEW_INDEX_TABLE_PREFIX);
@@ -1023,6 +1138,19 @@ public class MetaDataUtil {
         return getLegacyViewIndexIdDataType();
     }
 
+    public static boolean getChangeDetectionEnabled(List<Mutation> tableMetaData) {
+        KeyValueBuilder builder = GenericKeyValueBuilder.INSTANCE;
+        ImmutableBytesWritable value = new ImmutableBytesWritable();
+        if (getMutationValue(getPutOnlyTableHeaderRow(tableMetaData),
+            PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED_BYTES, builder, value)) {
+            return Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(value.get(),
+                value.getOffset(),
+                value.getLength()));
+        } else {
+            return false;
+        }
+    }
+
     public static PColumn getColumn(int pkCount, byte[][] rowKeyMetaData, PTable table) throws ColumnFamilyNotFoundException, ColumnNotFoundException {
         PColumn col = null;
         if (pkCount > FAMILY_NAME_INDEX
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index e4e089e..18972fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -1363,18 +1363,10 @@ public class ScanUtil {
      */
     public static void setWALAnnotationAttributes(PTable table, Scan scan) {
         if (table.isChangeDetectionEnabled()) {
-            if (table.getTenantId() != null) {
-                scan.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(),
-                    table.getTenantId().getBytes());
+            if (table.getExternalSchemaId() != null) {
+                scan.setAttribute(MutationState.MutationMetadataType.EXTERNAL_SCHEMA_ID.toString(),
+                    Bytes.toBytes(table.getExternalSchemaId()));
             }
-            scan.setAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString(),
-                table.getSchemaName().getBytes());
-            scan.setAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString(),
-                table.getTableName().getBytes());
-            scan.setAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString(),
-                table.getType().getValue().getBytes());
-            scan.setAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString(),
-                Bytes.toBytes(table.getLastDDLTimestamp()));
         }
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
index 705acf6..4d9dd92 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
@@ -65,18 +65,8 @@ import org.apache.phoenix.parse.DropTableStatement;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.ColumnNotFoundException;
-import org.apache.phoenix.schema.MetaDataClient;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PColumnImpl;
-import org.apache.phoenix.schema.PName;
-import org.apache.phoenix.schema.PNameFactory;
-import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.*;
 import org.apache.phoenix.schema.PTable.LinkType;
-import org.apache.phoenix.schema.PTableImpl;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.SaltingUtil;
-import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PLong;
 import org.slf4j.Logger;
@@ -567,7 +557,38 @@ public class ViewUtil {
             }
         }
     }
-    
+
+    public static PTable addDerivedColumnsAndIndexesFromAncestors(PhoenixConnection connection,
+        PTable table) throws SQLException {
+        List<PTable> ancestorList = Lists.newArrayList(table);
+        //First generate a list of tables from child to base table. First element will be the
+        //ultimate descendant, last element will be the base table.
+        PName parentName = table.getParentName();
+        while (parentName != null && parentName.getString().length() > 0) {
+            PTable currentTable = ancestorList.get(ancestorList.size() -1);
+            String parentTableName = SchemaUtil.getTableName(currentTable.getParentSchemaName(),
+                currentTable.getParentTableName()).getString();
+            PTable parentTable;
+            try {
+                parentTable = PhoenixRuntime.getTable(connection, parentTableName);
+            } catch (TableNotFoundException tnfe) {
+                //check to see if there's a tenant-owned parent
+                parentTable = PhoenixRuntime.getTable(connection, table.getTenantId().getString(), parentTableName);
+            }
+            ancestorList.add(parentTable);
+            parentName = parentTable.getParentName();
+        }
+        //now add the columns from all ancestors up from the base table to the top-most view
+        if (ancestorList.size() > 1) {
+            for (int k = ancestorList.size() -2; k >= 0; k--) {
+                ancestorList.set(k, addDerivedColumnsAndIndexesFromParent(connection,
+                    ancestorList.get(k), ancestorList.get(k +1)));
+            }
+            return ancestorList.get(0);
+        } else {
+            return table;
+        }
+    }
     /**
      * Inherit all indexes and columns from the parent 
      * @return table with inherited columns and indexes
@@ -609,6 +630,17 @@ public class ViewUtil {
      * @return table with inherited columns
      */
     public static PTable addDerivedColumnsFromParent(PTable view, PTable parentTable)
+        throws SQLException {
+        return addDerivedColumnsFromParent(view, parentTable, true);
+    }
+    /**
+     * Inherit all columns from the parent unless it's an excluded column.
+     * If the same column is present in the parent and child (for table metadata created before
+     * PHOENIX-3534) we choose the child column over the parent column
+     * @return table with inherited columns
+     */
+    public static PTable addDerivedColumnsFromParent(PTable view, PTable parentTable,
+        boolean recalculateBaseColumnCount)
             throws SQLException {
         // combine columns for view and view indexes
         boolean hasIndexId = view.getViewIndexId() != null;
@@ -730,9 +762,12 @@ public class ViewUtil {
         }
         // we need to include the salt column when setting the base table column count in order to
         // maintain b/w compatibility
-        int baseTableColumnCount =
-                isDiverged ? QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT
-                        : columnsToAdd.size() - myColumns.size() + (isSalted ? 1 : 0);
+        int baseTableColumnCount = view.getBaseColumnCount();
+        if (recalculateBaseColumnCount) {
+            baseTableColumnCount = isDiverged ?
+                QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT :
+                columnsToAdd.size() - myColumns.size() + (isSalted ? 1 : 0);
+        }
         // Inherit view-modifiable properties from the parent table/view if the current view has
         // not previously modified this property
         long updateCacheFreq = (view.getType() != PTableType.VIEW ||
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java
index 76564a6..4dba27a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java
@@ -58,17 +58,12 @@ public class WALAnnotationUtil {
      * @param ddlTimestamp Server-side timestamp when the table/view was created or last had a
      *                     column added or dropped from it, whichever is greater
      */
-    public static void annotateMutation(Mutation m, byte[] tenantId, byte[] schemaName,
-                                        byte[] logicalTableName, byte[] tableType, byte[] ddlTimestamp) {
+    public static void annotateMutation(Mutation m, byte[] externalSchemaId) {
         if (!m.getDurability().equals(Durability.SKIP_WAL)) {
-            if (tenantId != null) {
-                m.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(), tenantId);
+            if (externalSchemaId != null) {
+                m.setAttribute(MutationState.MutationMetadataType.EXTERNAL_SCHEMA_ID.toString(),
+                    externalSchemaId);
             }
-            m.setAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString(), schemaName);
-            m.setAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString(),
-                logicalTableName);
-            m.setAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString(), tableType);
-            m.setAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString(), ddlTimestamp);
         }
     }
 }
diff --git a/phoenix-core/src/main/protobuf/MetaDataService.proto b/phoenix-core/src/main/protobuf/MetaDataService.proto
index 0af87e4..63c79ae 100644
--- a/phoenix-core/src/main/protobuf/MetaDataService.proto
+++ b/phoenix-core/src/main/protobuf/MetaDataService.proto
@@ -54,6 +54,7 @@ enum MutationCode {
   UNABLE_TO_UPDATE_PARENT_TABLE = 24;
   UNABLE_TO_DELETE_CHILD_LINK = 25;
   UNABLE_TO_UPSERT_TASK = 26;
+  ERROR_WRITING_TO_SCHEMA_REGISTRY = 27;
 };
 
 message SharedTableState {
diff --git a/phoenix-core/src/main/protobuf/PTable.proto b/phoenix-core/src/main/protobuf/PTable.proto
index 1a70868..45e1b6e 100644
--- a/phoenix-core/src/main/protobuf/PTable.proto
+++ b/phoenix-core/src/main/protobuf/PTable.proto
@@ -114,6 +114,7 @@ message PTable {
   optional bytes physicalTableNameBytes = 47;
   optional bytes baseTableLogicalNameBytes = 48;
   optional bytes schemaVersion = 49;
+  optional bytes externalSchemaId=50;
 }
 
 message EncodedCQCounter {