You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2020/12/15 17:04:36 UTC

[phoenix] branch 4.x updated: PHOENIX-5435 - Annotate HBase WALs with Phoenix Metadata

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new d0639de  PHOENIX-5435 - Annotate HBase WALs with Phoenix Metadata
d0639de is described below

commit d0639de473f021505411006d5513a691116f2ac0
Author: Geoffrey Jacoby <gj...@salesforce.com>
AuthorDate: Tue Sep 29 15:07:59 2020 -0700

    PHOENIX-5435 - Annotate HBase WALs with Phoenix Metadata
---
 .../apache/phoenix/end2end/WALAnnotationIT.java    | 551 +++++++++++++++++++++
 .../apache/phoenix/end2end/index/ViewIndexIT.java  | 541 ++++++++++----------
 .../org/apache/phoenix/compile/DeleteCompiler.java |   1 +
 .../org/apache/phoenix/compile/UpsertCompiler.java |   4 +-
 .../GroupedAggregateRegionObserver.java            |   3 -
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  90 ++--
 .../UngroupedAggregateRegionObserver.java          |   2 -
 .../UngroupedAggregateRegionScanner.java           |  67 ++-
 .../coprocessor/generated/PTableProtos.java        | 106 +++-
 .../coprocessor/generated/ServerCachingProtos.java | 204 +++++++-
 .../apache/phoenix/exception/SQLExceptionCode.java |   4 +
 .../org/apache/phoenix/execute/MutationState.java  |  41 ++
 .../phoenix/hbase/index/IndexRegionObserver.java   |  53 +-
 .../apache/phoenix/index/GlobalIndexChecker.java   |   7 +-
 .../org/apache/phoenix/index/IndexMaintainer.java  |  21 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |   4 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |  10 +-
 .../org/apache/phoenix/query/QueryConstants.java   |   2 +
 .../org/apache/phoenix/schema/DelegateTable.java   |   5 +
 .../org/apache/phoenix/schema/MetaDataClient.java  |  74 ++-
 .../java/org/apache/phoenix/schema/PTable.java     |   5 +
 .../java/org/apache/phoenix/schema/PTableImpl.java |  29 +-
 .../org/apache/phoenix/schema/TableProperty.java   |  22 +
 .../org/apache/phoenix/util/SQLCloseables.java     |   3 +
 .../java/org/apache/phoenix/util/ScanUtil.java     |  30 +-
 .../org/apache/phoenix/util/WALAnnotationUtil.java |  74 +++
 .../apache/phoenix/query/PhoenixTestBuilder.java   | 141 +++++-
 .../java/org/apache/phoenix/util/TestDDLUtil.java  | 107 ++++
 .../java/org/apache/phoenix/util/TestUtil.java     | 505 ++++++++++---------
 .../compat/hbase/HbaseCompatCapabilities.java      |   3 +
 .../coprocessor/CompatIndexRegionObserver.java     |  49 ++
 .../compat/hbase/HbaseCompatCapabilities.java      |   4 +
 .../coprocessor/CompatIndexRegionObserver.java     |  48 ++
 .../compat/hbase/HbaseCompatCapabilities.java      |   3 +
 .../coprocessor/CompatIndexRegionObserver.java     |  23 +-
 phoenix-protocol/src/main/PTable.proto             |   1 +
 .../src/main/ServerCachingService.proto            |   1 +
 pom.xml                                            |  10 +-
 38 files changed, 2201 insertions(+), 647 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
new file mode 100644
index 0000000..e4618c7
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/WALAnnotationIT.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.BaseWALObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
+import org.apache.phoenix.compat.hbase.coprocessor.CompatIndexRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+@Category(NeedsOwnMiniClusterTest.class)
+public class WALAnnotationIT extends BaseUniqueNamesOwnClusterIT {
+    private final boolean isImmutable;
+    private final boolean isMultiTenant;
+
+    // name is used by failsafe as file name in reports
+    @Parameterized.Parameters(name = "WALAnnotationIT_isImmutable={0}_isMultiTenant={1}")
+    public static synchronized Collection<Object[]> data() {
+        return Arrays.asList(new Object[]{true, true}, new Object[]{true, false},
+            new Object[]{false, true}, new Object[]{false, false});
+    }
+
+    public WALAnnotationIT(boolean isImmutable, boolean isMultiTenant) {
+        this.isImmutable = isImmutable;
+        this.isMultiTenant = isMultiTenant;
+    }
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = new HashMap<>(2);
+        props.put("hbase.coprocessor.wal.classes",
+            AnnotatedWALObserver.class.getName());
+        props.put(IndexRegionObserver.PHOENIX_APPEND_METADATA_TO_WAL, "true");
+        props.put(QueryServices.ENABLE_SERVER_UPSERT_SELECT, "true");
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testSimpleUpsertAndDelete() throws Exception {
+        Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
+        SchemaBuilder builder = new SchemaBuilder(getUrl());
+        boolean createGlobalIndex = false;
+        long ddlTimestamp = upsertAndDeleteHelper(builder, createGlobalIndex);
+        assertAnnotation(2, builder.getPhysicalTableName(false), null,
+            builder.getTableOptions().getSchemaName(),
+            builder.getDataOptions().getTableName(), PTableType.TABLE, ddlTimestamp);
+    }
+
+    @Test
+    public void testNoAnnotationsIfChangeDetectionDisabled() throws Exception {
+        Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            SchemaBuilder builder = new SchemaBuilder(getUrl());
+            SchemaBuilder.TableOptions tableOptions = getTableOptions();
+            tableOptions.setChangeDetectionEnabled(false);
+            builder.withTableOptions(tableOptions).build();
+            PTable table = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTableName());
+            assertFalse("Change detection is enabled when it shouldn't be!",
+                table.isChangeDetectionEnabled());
+            String upsertSql = "UPSERT INTO " + builder.getEntityTableName() + " VALUES" +
+                " ('a', 'b', '2', 'bc', '3')";
+            conn.createStatement().execute(upsertSql);
+            List<Map<String, byte[]>> entries =
+                getEntriesForTable(TableName.valueOf(builder.getPhysicalTableName(false)));
+            assertEquals(0, entries.size());
+            //now flip to TRUE so we can test disabling it
+            String enableSql =
+                "ALTER TABLE " + builder.getEntityTableName() +
+                    " SET " + CHANGE_DETECTION_ENABLED + "=TRUE";
+            conn.createStatement().execute(enableSql);
+            table = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTableName());
+            assertTrue("Change detection is disabled when it should be enabled!",
+                table.isChangeDetectionEnabled());
+            //set to FALSE
+            String disableSql =
+                "ALTER TABLE " + builder.getEntityTableName() +
+                    " SET " + CHANGE_DETECTION_ENABLED + "=FALSE";
+            conn.createStatement().execute(disableSql);
+            table = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTableName());
+            assertFalse("Change detection is enabled when it should be disabled!",
+                table.isChangeDetectionEnabled());
+            //now upsert again
+            conn.createStatement().execute(upsertSql);
+            //check that we still didn't annotate anything
+            entries = getEntriesForTable(TableName.valueOf(builder.getPhysicalTableName(false)));
+            assertEquals(0, entries.size());
+        }
+    }
+
+    @Test
+    public void testCantSetChangeDetectionOnIndex() throws Exception {
+        Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            SchemaBuilder builder = new SchemaBuilder(getUrl());
+            builder.withTableDefaults().build();
+            try {
+                String badIndexSql =
+                    "CREATE INDEX IDX_SHOULD_FAIL"  + " ON " + builder.getEntityTableName() +
+                        "(COL1) "
+                        + CHANGE_DETECTION_ENABLED + "=TRUE";
+                conn.createStatement().execute(badIndexSql);
+                fail("Didn't throw a SQLException for setting change detection on an " +
+                    "index at create time!");
+            } catch (SQLException se) {
+                TestUtil.assertSqlExceptionCode(
+                    SQLExceptionCode.CHANGE_DETECTION_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY, se);
+            }
+        }
+    }
+
+    @Test
+    public void testUpsertAndDeleteWithGlobalIndex() throws Exception {
+        Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
+        SchemaBuilder builder = new SchemaBuilder(getUrl());
+        boolean createGlobalIndex = true;
+        long ddlTimestamp = upsertAndDeleteHelper(builder, createGlobalIndex);
+        assertAnnotation(2, builder.getPhysicalTableName(false), null,
+            builder.getTableOptions().getSchemaName(),
+            builder.getDataOptions().getTableName(), PTableType.TABLE, ddlTimestamp);
+        assertAnnotation(0, builder.getPhysicalTableIndexName(false),
+            null, null, null, null, ddlTimestamp);
+    }
+
+    // Note that local secondary indexes aren't supported because they go in the same WALEdit as the
+    // "base" table data they index.
+
+    private long upsertAndDeleteHelper(SchemaBuilder builder, boolean createGlobalIndex) throws Exception {
+        try (Connection conn = getConnection()) {
+            SchemaBuilder.TableOptions tableOptions = getTableOptions();
+
+            if (createGlobalIndex) {
+                builder.withTableOptions(tableOptions).withTableIndexDefaults().build();
+            } else {
+                builder.withTableOptions(tableOptions).build();
+            }
+
+            String upsertSql = "UPSERT INTO " + builder.getEntityTableName() + " VALUES" +
+                " ('a', 'b', 'c')";
+            conn.createStatement().execute(upsertSql);
+            conn.commit();
+            PTable table = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTableName());
+            assertEquals("Change Detection Enabled is false!", true, table.isChangeDetectionEnabled());
+            // Deleting by entire PK gets executed as more like an UPSERT VALUES than an UPSERT
+            // SELECT (i.e, it generates the Mutations and then pushes them to server, rather than
+            // running a select query and deleting the mutations returned)
+            String deleteSql = "DELETE FROM " + builder.getEntityTableName() + " " +
+                "WHERE OID = 'a' AND KP = 'b'";
+            conn.createStatement().execute(deleteSql);
+            conn.commit();
+            // DDL timestamp is the timestamp at which a table or view was created, or when it
+            // last had columns added or removed. It is NOT the timestamp of a particular mutation
+            // We need it in the annotation to match up with schema object in an external schema
+            // repo.
+            return table.getLastDDLTimestamp();
+        }
+    }
+
+    private SchemaBuilder.TableOptions getTableOptions() {
+        SchemaBuilder.TableOptions tableOptions =
+            SchemaBuilder.TableOptions.withDefaults();
+        tableOptions.setImmutable(isImmutable);
+        tableOptions.setMultiTenant(isMultiTenant);
+        tableOptions.setChangeDetectionEnabled(true);
+        return tableOptions;
+    }
+
+    @Test
+    public void testUpsertSelectClientSide() throws Exception {
+        Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
+        try (Connection conn = getConnection()) {
+            SchemaBuilder baseBuilder = new SchemaBuilder(getUrl());
+            SchemaBuilder targetBuilder = new SchemaBuilder(getUrl());
+            //upsert selecting from a different table will force processing to be client-side
+            baseBuilder.withTableOptions(getTableOptions()).build();
+            conn.createStatement().execute("UPSERT INTO " + baseBuilder.getEntityTableName() + " " +
+                "VALUES" +
+                " ('a', 'b', '2', 'bc', '3')");
+            conn.commit();
+            targetBuilder.withTableOptions(getTableOptions()).build();
+            String sql = "UPSERT INTO " + targetBuilder.getEntityTableName() +
+                " (OID, KP, COL1, COL2, COL3) SELECT * FROM " + baseBuilder.getEntityTableName();
+            conn.createStatement().execute(sql);
+            conn.commit();
+            int expectedAnnotations = 1;
+            verifyBaseAndTargetAnnotations(conn, baseBuilder, targetBuilder, expectedAnnotations);
+        }
+    }
+
+    private void verifyBaseAndTargetAnnotations(Connection conn, SchemaBuilder baseBuilder,
+                                                SchemaBuilder targetBuilder,
+                                                int expectedAnnotations) throws SQLException, IOException {
+        PTable baseTable = PhoenixRuntime.getTableNoCache(conn,
+            baseBuilder.getEntityTableName());
+        assertAnnotation(expectedAnnotations, baseBuilder.getPhysicalTableName(false), null,
+            baseBuilder.getTableOptions().getSchemaName(),
+            baseBuilder.getDataOptions().getTableName(),
+            PTableType.TABLE,
+            baseTable.getLastDDLTimestamp());
+        PTable targetTable = PhoenixRuntime.getTableNoCache(conn,
+            targetBuilder.getEntityTableName());
+        assertAnnotation(expectedAnnotations, targetBuilder.getPhysicalTableName(false), null,
+            targetBuilder.getTableOptions().getSchemaName(), targetBuilder.getDataOptions().getTableName(),
+            PTableType.TABLE, targetTable.getLastDDLTimestamp());
+    }
+
+    @Test
+    public void testUpsertSelectServerSide() throws Exception {
+        Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
+        Assume.assumeFalse(isImmutable); //only mutable tables can be processed server-side
+        SchemaBuilder targetBuilder = new SchemaBuilder(getUrl());
+        try (Connection conn = getConnection()) {
+            targetBuilder.withTableOptions(getTableOptions()).build();
+            conn.createStatement().execute("UPSERT INTO " + targetBuilder.getEntityTableName() + " " +
+                "VALUES" +
+                " ('a', 'b', '2', 'bc', '3')");
+            conn.commit();
+            conn.setAutoCommit(true); //required for server side execution
+            clearAnnotations(TableName.valueOf(targetBuilder.getPhysicalTableName(false)));
+            String sql = "UPSERT INTO " + targetBuilder.getEntityTableName() +
+                " (OID, KP, COL1, COL2, COL3) SELECT * FROM " + targetBuilder.getEntityTableName();
+            conn.createStatement().execute(sql);
+            PTable table = PhoenixRuntime.getTableNoCache(conn, targetBuilder.getEntityTableName());
+            assertAnnotation(1, targetBuilder.getPhysicalTableName(false), null,
+                targetBuilder.getTableOptions().getSchemaName(),
+                targetBuilder.getDataOptions().getTableName(),
+                PTableType.TABLE, table.getLastDDLTimestamp());
+        }
+
+    }
+
+    @Test
+    public void testGroupedUpsertSelect() throws Exception {
+        Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
+        // because we're inserting to a different table than we're selecting from, this should be
+        // processed client-side
+        SchemaBuilder baseBuilder = new SchemaBuilder(getUrl());
+        SchemaBuilder targetBuilder = new SchemaBuilder(getUrl());
+        try (Connection conn = getConnection()) {
+            baseBuilder.withTableOptions(getTableOptions()).build();
+            targetBuilder.withTableOptions(getTableOptions()).build();
+            conn.createStatement().execute("UPSERT INTO " + baseBuilder.getEntityTableName() + " VALUES" +
+                " ('a', 'b', '2', 'bc', '3')");
+            conn.commit();
+            String aggSql = "UPSERT INTO " + targetBuilder.getEntityTableName() +
+                " SELECT OID, KP, MAX(COL1), MIN(COL2), MAX(COL3) FROM " + baseBuilder.getEntityTableName() +
+                " GROUP BY OID, KP";
+            conn.createStatement().execute(aggSql);
+            conn.commit();
+            int expectedAnnotations = 1;
+            verifyBaseAndTargetAnnotations(conn, baseBuilder, targetBuilder, expectedAnnotations);
+        }
+    }
+
+    @Test
+    public void testRangeDeleteServerSide() throws Exception {
+        boolean isClientSide = false;
+        testRangeDeleteHelper(isClientSide);
+    }
+
+    private void testRangeDeleteHelper(boolean isClientSide) throws Exception {
+        Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
+        SchemaBuilder builder = new SchemaBuilder(getUrl());
+        builder.withTableOptions(getTableOptions()).build();
+        try (Connection conn = getConnection()) {
+            conn.createStatement().execute("UPSERT INTO " + builder.getEntityTableName() +
+                " VALUES ('a', 'b', '2', 'bc', '3')");
+            conn.commit();
+            // Deleting by a partial PK to so that it executes a SELECT and then deletes the
+            // returned mutations
+            String sql = "DELETE FROM " + builder.getEntityTableName() + " " +
+                "WHERE OID = 'a' AND KP = 'b'";
+
+            if (isClientSide) {
+                sql += " LIMIT 1";
+            }
+            conn.setAutoCommit(!isClientSide);
+            conn.createStatement().execute(sql);
+            conn.commit();
+            PTable table = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTableName());
+            assertAnnotation(2, table.getPhysicalName().getString(), null,
+                table.getSchemaName().getString(),
+                table.getTableName().getString(), PTableType.TABLE, table.getLastDDLTimestamp());
+        }
+
+    }
+
+    @Test
+    public void testRangeDeleteClientSide() throws Exception {
+        boolean isClientSide = true;
+        testRangeDeleteHelper(isClientSide);
+    }
+
+    @Test
+    public void testGlobalViewUpsert() throws Exception {
+        Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
+        SchemaBuilder builder = new SchemaBuilder(getUrl());
+        try (Connection conn = getConnection()) {
+            createGlobalViewHelper(builder, conn);
+            conn.createStatement().execute("UPSERT INTO " + builder.getEntityGlobalViewName()
+                + " VALUES" + " ('a', '" + PhoenixTestBuilder.DDLDefaults.DEFAULT_KP +
+                "', '2', 'bc', '3', 'c')");
+            conn.commit();
+            String deleteSql = "DELETE FROM " + builder.getEntityGlobalViewName() + " " +
+                "WHERE OID = 'a' AND KP = '" + PhoenixTestBuilder.DDLDefaults.DEFAULT_KP + "' " +
+                "and ID = 'c'";
+            conn.createStatement().execute(deleteSql);
+            conn.commit();
+            PTable view = PhoenixRuntime.getTableNoCache(conn, builder.getEntityGlobalViewName());
+            assertAnnotation(2, view.getPhysicalName().getString(), null,
+                view.getSchemaName().getString(),
+                view.getTableName().getString(), PTableType.VIEW, view.getLastDDLTimestamp());
+        }
+
+    }
+
+    private void createGlobalViewHelper(SchemaBuilder builder, Connection conn) throws Exception {
+        builder.withTableOptions(getTableOptions()).
+            withGlobalViewOptions(getGlobalViewOptions(builder)).build();
+        PTable view = PhoenixRuntime.getTableNoCache(conn, builder.getEntityGlobalViewName());
+        assertTrue("View does not have change detection enabled!",
+            view.isChangeDetectionEnabled());
+    }
+
+    private SchemaBuilder.GlobalViewOptions getGlobalViewOptions(SchemaBuilder builder) {
+        SchemaBuilder.GlobalViewOptions options = SchemaBuilder.GlobalViewOptions.withDefaults();
+        options.setChangeDetectionEnabled(true);
+        return options;
+    }
+
+    @Test
+    public void testTenantViewUpsert() throws Exception {
+        Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
+        Assume.assumeTrue(isMultiTenant);
+        boolean createIndex = false;
+        tenantViewHelper(createIndex);
+    }
+
+    private void tenantViewHelper(boolean createIndex) throws Exception {
+        // create a base table, global view, and child tenant view, then insert / delete into the
+        // child tenant view. Make sure that the annotations use the tenant view name
+        String tenant = generateUniqueName();
+        SchemaBuilder builder = new SchemaBuilder(getUrl());
+        try (Connection conn = getConnection()) {
+            createGlobalViewHelper(builder, conn);
+        }
+        try (Connection conn = getTenantConnection(tenant)) {
+            SchemaBuilder.DataOptions dataOptions = builder.getDataOptions();
+            dataOptions.setTenantId(tenant);
+            if (createIndex) {
+                builder.withTenantViewOptions(getTenantViewOptions(builder)).
+                    withDataOptions(dataOptions).withTenantViewIndexDefaults().build();
+            } else {
+                builder.withTenantViewOptions(getTenantViewOptions(builder)).
+                    withDataOptions(dataOptions).build();
+            }
+            builder.withTenantViewOptions(getTenantViewOptions(builder)).
+                withDataOptions(dataOptions).withTenantViewIndexDefaults().build();
+            conn.createStatement().execute("UPSERT INTO " + builder.getEntityTenantViewName()
+                + " VALUES" + " ('" + PhoenixTestBuilder.DDLDefaults.DEFAULT_KP + "', '2', 'bc', " +
+                "'3', 'c', " + "'col4', 'col5', 'col6', 'd')");
+            conn.commit();
+            String deleteSql = "DELETE FROM " + builder.getEntityTenantViewName() + " " +
+                "WHERE KP = '"+ PhoenixTestBuilder.DDLDefaults.DEFAULT_KP +
+                "' and COL1 = '2' AND ID = 'c' AND ZID = 'd'";
+            conn.createStatement().execute(deleteSql);
+            conn.commit();
+            PTable view = PhoenixRuntime.getTableNoCache(conn, builder.getEntityTenantViewName());
+            assertAnnotation(2, view.getPhysicalName().getString(), tenant,
+                view.getSchemaName().getString(),
+                view.getTableName().getString(), PTableType.VIEW, view.getLastDDLTimestamp());
+            if (createIndex) {
+                assertAnnotation(0,
+                    MetaDataUtil.getViewIndexPhysicalName(builder.getEntityTableName()),
+                    tenant, null, null, null, view.getLastDDLTimestamp());
+            }
+        }
+
+    }
+
+    private SchemaBuilder.TenantViewOptions getTenantViewOptions(SchemaBuilder builder) {
+        SchemaBuilder.TenantViewOptions options = SchemaBuilder.TenantViewOptions.withDefaults();
+        options.setChangeDetectionEnabled(true);
+        return options;
+    }
+
+    @Test
+    public void testTenantViewUpsertWithIndex() throws Exception {
+        Assume.assumeTrue(HbaseCompatCapabilities.hasPreWALAppend());
+        Assume.assumeTrue(isMultiTenant);
+        tenantViewHelper(true);
+    }
+
+    private List<Map<String, byte[]>> getEntriesForTable(TableName tableName) throws IOException {
+        AnnotatedWALObserver c = getTestCoprocessor(tableName);
+        List<Map<String, byte[]>> entries = c.getWalAnnotationsByTable(tableName);
+        return entries != null ? entries : new ArrayList<Map<String, byte[]>>();
+    }
+
+    private AnnotatedWALObserver getTestCoprocessor(TableName tableName) throws IOException {
+        HRegionInfo info = getUtility().getHBaseCluster().getRegions(tableName).get(0).getRegionInfo();
+        WAL wal = getUtility().getHBaseCluster().getRegionServer(0).getWAL(info);
+        WALCoprocessorHost host = wal.getCoprocessorHost();
+        return (AnnotatedWALObserver) host.findCoprocessor(AnnotatedWALObserver.class.getName());
+    }
+
+    private void clearAnnotations(TableName tableName) throws IOException {
+        AnnotatedWALObserver observer = getTestCoprocessor(tableName);
+        observer.clearAnnotations();
+    }
+
+    private void assertAnnotation(int numOccurrences, String physicalTableName, String tenant,
+                                  String schemaName,
+                                  String logicalTableName,
+                                  PTableType tableType, long ddlTimestamp) throws IOException {
+        int foundCount = 0;
+        int notFoundCount = 0;
+        List<Map<String, byte[]>> entries =
+            getEntriesForTable(TableName.valueOf(physicalTableName));
+        for (Map<String, byte[]> m : entries) {
+            byte[] tenantBytes = m.get(MutationState.MutationMetadataType.TENANT_ID.toString());
+            byte[] schemaBytes = m.get(MutationState.MutationMetadataType.SCHEMA_NAME.toString());
+            byte[] logicalTableBytes =
+                m.get(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString());
+            byte[] tableTypeBytes = m.get(MutationState.MutationMetadataType.TABLE_TYPE.toString());
+            byte[] timestampBytes = m.get(MutationState.MutationMetadataType.TIMESTAMP.toString());
+            assertNotNull(timestampBytes);
+            long timestamp = Bytes.toLong(timestampBytes);
+            if (Objects.equals(tenant, Bytes.toString(tenantBytes)) &&
+                Objects.equals(schemaName, Bytes.toString(schemaBytes)) &&
+                Objects.equals(logicalTableName, Bytes.toString(logicalTableBytes)) &&
+                Objects.equals(tableType.toString(), Bytes.toString(tableTypeBytes)) &&
+                Objects.equals(ddlTimestamp, timestamp)
+                && timestamp < HConstants.LATEST_TIMESTAMP) {
+                foundCount++;
+            } else {
+                notFoundCount++;
+            }
+        }
+        assertEquals(numOccurrences, foundCount);
+        assertEquals(0, notFoundCount);
+    }
+
+    private PhoenixConnection getConnection() throws SQLException {
+        Properties props = new Properties();
+        props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(false));
+        return (PhoenixConnection) DriverManager.getConnection(getUrl(), props);
+    }
+
+    private Connection getTenantConnection(String tenant) throws SQLException {
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenant);
+        props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(false));
+        return DriverManager.getConnection(getUrl(), props);
+    }
+
+    public static class AnnotatedWALObserver extends BaseWALObserver {
+        Map<TableName, List<Map<String, byte[]>>> walAnnotations = new HashMap<>();
+
+        public Map<TableName, List<Map<String, byte[]>>> getWalAnnotations() {
+            return walAnnotations;
+        }
+
+        public List<Map<String, byte[]>> getWalAnnotationsByTable(TableName tableName) {
+            return walAnnotations.get(tableName);
+        }
+
+        public void clearAnnotations() {
+            walAnnotations.clear();
+        }
+
+        @Override
+        public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
+                                 HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+            TableName tableName = logKey.getTablename();
+            Map<String, byte[]> annotationMap =
+                CompatIndexRegionObserver.getAttributeValuesFromWALKey(logKey);
+            if (annotationMap.size() > 0) {
+                if (!walAnnotations.containsKey(tableName)) {
+                    walAnnotations.put(tableName, new ArrayList<Map<String, byte[]>>());
+                }
+                walAnnotations.get(logKey.getTablename()).add(annotationMap);
+            }
+        }
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
index 5856fbc..7890e19 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
@@ -63,7 +63,6 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableNotFoundException;
@@ -89,35 +88,8 @@ public class ViewIndexIT extends SplitSystemCatalogIT {
         return Arrays.asList(true, false);
     }
 
-    private void createBaseTable(String schemaName, String tableName, boolean multiTenant,
-                                 Integer saltBuckets, String splits, boolean mutable)
-            throws SQLException {
-        Connection conn = getConnection();
-        if (isNamespaceMapped) {
-            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
-        }
-        String ddl = "CREATE " + (mutable ? "" : "IMMUTABLE") +
-            " TABLE " + SchemaUtil.getTableName(schemaName, tableName) +
-            " (t_id VARCHAR NOT NULL,\n" +
-                "k1 VARCHAR NOT NULL,\n" +
-                "k2 INTEGER NOT NULL,\n" +
-                "v1 VARCHAR,\n" +
-                "v2 INTEGER,\n" +
-                "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n";
-        String ddlOptions = multiTenant ? "MULTI_TENANT=true" : "";
-        if (saltBuckets != null) {
-            ddlOptions = ddlOptions
-                    + (ddlOptions.isEmpty() ? "" : ",")
-                    + "salt_buckets=" + saltBuckets;
-        }
-        if (splits != null) {
-            ddlOptions = ddlOptions
-                    + (ddlOptions.isEmpty() ? "" : ",")
-                    + "splits=" + splits;            
-        }
-        conn.createStatement().execute(ddl + ddlOptions);
-        conn.close();
-    }
+    public ViewIndexIT(boolean isNamespaceMapped) {
+        this.isNamespaceMapped = isNamespaceMapped; }
 
     @Test
     public void testDroppingColumnWhileCreatingIndex() throws Exception {
@@ -129,9 +101,9 @@ public class ViewIndexIT extends SplitSystemCatalogIT {
         String viewName = "VIEW_" + generateUniqueName();
         final String fullViewName = SchemaUtil.getTableName(viewSchemaName, viewName);
 
-        createBaseTable(schemaName, tableName, false, null, null, true);
         try (Connection conn = getConnection()) {
             conn.setAutoCommit(true);
+            createBaseTable(conn, schemaName, tableName, false, null, null, true);
             conn.createStatement().execute("CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullTableName);
             conn.commit();
             final AtomicInteger exceptionCode = new AtomicInteger();
@@ -176,27 +148,6 @@ public class ViewIndexIT extends SplitSystemCatalogIT {
         }
     }
 
-    private void createView(Connection conn, String schemaName, String viewName, String baseTableName) throws SQLException {
-        if (isNamespaceMapped) {
-            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
-        }
-        String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
-        String fullTableName = SchemaUtil.getTableName(schemaName, baseTableName);
-        conn.createStatement().execute("CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullTableName);
-        conn.commit();
-    }
-
-    private void createViewIndex(Connection conn, String schemaName, String indexName, String viewName,
-                                 String indexColumn) throws SQLException {
-        if (isNamespaceMapped) {
-            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
-            conn.commit();
-        }
-        String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
-        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullViewName + "(" + indexColumn + ")");
-        conn.commit();
-    }
-    
     private PhoenixConnection getConnection() throws SQLException{
         Properties props = new Properties();
         props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
@@ -209,10 +160,6 @@ public class ViewIndexIT extends SplitSystemCatalogIT {
         props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
         return DriverManager.getConnection(getUrl(),props);
     }
-    
-    public ViewIndexIT(boolean isNamespaceMapped) {
-        this.isNamespaceMapped = isNamespaceMapped;
-    }
 
     @Test
     public void testDeleteViewIndexSequences() throws Exception {
@@ -223,22 +170,21 @@ public class ViewIndexIT extends SplitSystemCatalogIT {
         String indexName = "IND_" + generateUniqueName();
         String viewName = "VIEW_" + generateUniqueName();
         String fullViewName = SchemaUtil.getTableName(viewSchemaName, viewName);
-
-        createBaseTable(schemaName, tableName, false, null, null, true);
-        Connection conn1 = getConnection();
-        Connection conn2 = getConnection();
-        conn1.createStatement().execute("CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullTableName);
-        conn1.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullViewName + " (v1)");
-        conn2.createStatement().executeQuery("SELECT * FROM " + fullTableName).next();
-        String sequenceName = getViewIndexSequenceName(PNameFactory.newName(fullTableName), null, isNamespaceMapped);
-        String sequenceSchemaName = getViewIndexSequenceSchemaName(PNameFactory.newName(fullTableName), isNamespaceMapped);
-        verifySequenceValue(null, sequenceName, sequenceSchemaName, Short.MIN_VALUE + 1);
-        conn1.createStatement().execute("CREATE INDEX " + indexName + "_2 ON " + fullViewName + " (v1)");
-        verifySequenceValue(null, sequenceName, sequenceSchemaName, Short.MIN_VALUE + 2);
-        conn1.createStatement().execute("DROP VIEW " + fullViewName);
-        conn1.createStatement().execute("DROP TABLE "+ fullTableName);
-        
-        verifySequenceNotExists(null, sequenceName, sequenceSchemaName);
+        try (Connection conn1 = getConnection();
+             Connection conn2 = getConnection()){
+            createBaseTable(conn1, schemaName, tableName, false, null, null, true);
+            conn1.createStatement().execute("CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullTableName);
+            conn1.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullViewName + " (v1)");
+            conn2.createStatement().executeQuery("SELECT * FROM " + fullTableName).next();
+            String sequenceName = getViewIndexSequenceName(PNameFactory.newName(fullTableName), null, isNamespaceMapped);
+            String sequenceSchemaName = getViewIndexSequenceSchemaName(PNameFactory.newName(fullTableName), isNamespaceMapped);
+            verifySequenceValue(null, sequenceName, sequenceSchemaName, Short.MIN_VALUE + 1);
+            conn1.createStatement().execute("CREATE INDEX " + indexName + "_2 ON " + fullViewName + " (v1)");
+            verifySequenceValue(null, sequenceName, sequenceSchemaName, Short.MIN_VALUE + 2);
+            conn1.createStatement().execute("DROP VIEW " + fullViewName);
+            conn1.createStatement().execute("DROP TABLE " + fullTableName);
+            verifySequenceNotExists(null, sequenceName, sequenceSchemaName);
+        }
     }
     
     @Test
@@ -247,77 +193,77 @@ public class ViewIndexIT extends SplitSystemCatalogIT {
 		String indexName = "IND_" + generateUniqueName();
         String fullTableName = SchemaUtil.getTableName(SCHEMA1, tableName);
         String fullViewName = SchemaUtil.getTableName(SCHEMA2, generateUniqueName());
-        
-        createBaseTable(SCHEMA1, tableName, true, null, null, true);
-        Connection conn = DriverManager.getConnection(getUrl());
-        PreparedStatement stmt = conn.prepareStatement(
+
+        try (Connection conn = getConnection();
+             Connection conn1 = getTenantConnection("10")) {
+
+            createBaseTable(conn, SCHEMA1, tableName, true, null, null, true);
+            PreparedStatement stmt = conn.prepareStatement(
                 "UPSERT INTO " + fullTableName
-                + " VALUES(?,?,?,?,?)");
-        stmt.setString(1, "10");
-        stmt.setString(2, "a");
-        stmt.setInt(3, 1);
-        stmt.setString(4, "x1");
-        stmt.setInt(5, 100);
-        stmt.execute();
-        stmt.setString(1, "10");
-        stmt.setString(2, "b");
-        stmt.setInt(3, 2);
-        stmt.setString(4, "x2");
-        stmt.setInt(5, 200);
-        stmt.execute();
-        stmt.setString(1, "10");
-        stmt.setString(2, "c");
-        stmt.setInt(3, 3);
-        stmt.setString(4, "x3");
-        stmt.setInt(5, 300);
-        stmt.execute();
-        stmt.setString(1, "20");
-        stmt.setString(2, "d");
-        stmt.setInt(3, 4);
-        stmt.setString(4, "x4");
-        stmt.setInt(5, 400);
-        stmt.execute();
-        conn.commit();
-        
-        Properties props  = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
-        props.setProperty("TenantId", "10");
-        Connection conn1 = DriverManager.getConnection(getUrl(), props);
-        conn1.createStatement().execute("CREATE VIEW " + fullViewName
+                    + " VALUES(?,?,?,?,?)");
+            stmt.setString(1, "10");
+            stmt.setString(2, "a");
+            stmt.setInt(3, 1);
+            stmt.setString(4, "x1");
+            stmt.setInt(5, 100);
+            stmt.execute();
+            stmt.setString(1, "10");
+            stmt.setString(2, "b");
+            stmt.setInt(3, 2);
+            stmt.setString(4, "x2");
+            stmt.setInt(5, 200);
+            stmt.execute();
+            stmt.setString(1, "10");
+            stmt.setString(2, "c");
+            stmt.setInt(3, 3);
+            stmt.setString(4, "x3");
+            stmt.setInt(5, 300);
+            stmt.execute();
+            stmt.setString(1, "20");
+            stmt.setString(2, "d");
+            stmt.setInt(3, 4);
+            stmt.setString(4, "x4");
+            stmt.setInt(5, 400);
+            stmt.execute();
+            conn.commit();
+
+            conn1.createStatement().execute("CREATE VIEW " + fullViewName
                 + " AS select * from " + fullTableName);
-        conn1.createStatement().execute("CREATE LOCAL INDEX "
+            conn1.createStatement().execute("CREATE LOCAL INDEX "
                 + indexName + " ON "
                 + fullViewName + "(v2)");
-        conn1.commit();
-        
-        String sql = "SELECT * FROM " + fullViewName + " WHERE v2 = 100";
-        ResultSet rs = conn1.prepareStatement("EXPLAIN " + sql).executeQuery();
-        assertEquals(
+            conn1.commit();
+
+            String sql = "SELECT * FROM " + fullViewName + " WHERE v2 = 100";
+            ResultSet rs = conn1.prepareStatement("EXPLAIN " + sql).executeQuery();
+            assertEquals(
                 "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + SchemaUtil.getPhysicalTableName(Bytes.toBytes(fullTableName), isNamespaceMapped) + " [1,'10',100]\n" +
-                "    SERVER FILTER BY FIRST KEY ONLY\n" +
-                "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-        rs = conn1.prepareStatement(sql).executeQuery();
-        assertTrue(rs.next());
-        assertFalse(rs.next());
-        
-        TestUtil.analyzeTable(conn, fullTableName);
-        List<KeyRange> guideposts = TestUtil.getAllSplits(conn, fullTableName);
-        assertEquals(1, guideposts.size());
-        assertEquals(KeyRange.EVERYTHING_RANGE, guideposts.get(0));
-        
-        conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET " + PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + "=20");
-        
-        TestUtil.analyzeTable(conn, fullTableName);
-        guideposts = TestUtil.getAllSplits(conn, fullTableName);
-        assertEquals(5, guideposts.size());
-
-        // Confirm that when view index used, the GUIDE_POSTS_WIDTH from the data physical table
-        // was used
-        sql = "SELECT * FROM " + fullViewName + " WHERE v2 >= 100";
-        rs = conn1.prepareStatement("EXPLAIN " + sql).executeQuery();
-        stmt = conn1.prepareStatement(sql);
-        stmt.executeQuery();
-        QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
-        assertEquals(4, plan.getSplits().size());
+                    "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                    "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+            rs = conn1.prepareStatement(sql).executeQuery();
+            assertTrue(rs.next());
+            assertFalse(rs.next());
+
+            TestUtil.analyzeTable(conn, fullTableName);
+            List<KeyRange> guideposts = TestUtil.getAllSplits(conn, fullTableName);
+            assertEquals(1, guideposts.size());
+            assertEquals(KeyRange.EVERYTHING_RANGE, guideposts.get(0));
+
+            conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET " + PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + "=20");
+
+            TestUtil.analyzeTable(conn, fullTableName);
+            guideposts = TestUtil.getAllSplits(conn, fullTableName);
+            assertEquals(5, guideposts.size());
+
+            // Confirm that when view index used, the GUIDE_POSTS_WIDTH from the data physical table
+            // was used
+            sql = "SELECT * FROM " + fullViewName + " WHERE v2 >= 100";
+            rs = conn1.prepareStatement("EXPLAIN " + sql).executeQuery();
+            stmt = conn1.prepareStatement(sql);
+            stmt.executeQuery();
+            QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+            assertEquals(4, plan.getSplits().size());
+        }
     }
 
     @Test
@@ -345,8 +291,8 @@ public class ViewIndexIT extends SplitSystemCatalogIT {
         String baseTable =  generateUniqueName();
         String globalView = generateUniqueName();
         String globalViewIdx =  generateUniqueName();
-        createBaseTable(schemaName, baseTable, multiTenant, null, null, mutable);
         try (PhoenixConnection conn = getConnection()) {
+            createBaseTable(conn, schemaName, baseTable, multiTenant, null, null, mutable);
             createView(conn, schemaName, globalView, baseTable);
             createViewIndex(conn, schemaName, globalViewIdx, globalView, "K1");
             //now check that the right coprocs are installed
@@ -483,103 +429,98 @@ public class ViewIndexIT extends SplitSystemCatalogIT {
 
     @Test
     public void testUpdateOnTenantViewWithGlobalView() throws Exception {
-        Connection conn = getConnection();
-        String baseSchemaName = generateUniqueName();
-        String viewSchemaName = generateUniqueName();
-        String tsViewSchemaName = generateUniqueName();
-        String baseTableName = generateUniqueName();
-        String baseFullName = SchemaUtil.getTableName(baseSchemaName, baseTableName);
-        String viewTableName = "V_" + generateUniqueName();
-        String viewFullName = SchemaUtil.getTableName(viewSchemaName, viewTableName);
-        String indexName = "I_" + generateUniqueName();
-        String tsViewTableName = "TSV_" + generateUniqueName();
-        String tsViewFullName = SchemaUtil.getTableName(tsViewSchemaName, tsViewTableName);
-        String tenantId = "tenant1";
-        try {
-            if (isNamespaceMapped) {
-                conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + baseSchemaName);
-            }
-            conn.createStatement().execute(
-                    "CREATE TABLE " + baseFullName + "(\n" + "    ORGANIZATION_ID CHAR(15) NOT NULL,\n"
-                            + "    KEY_PREFIX CHAR(3) NOT NULL,\n" + "    CREATED_DATE DATE,\n"
-                            + "    CREATED_BY CHAR(15),\n" + "    CONSTRAINT PK PRIMARY KEY (\n"
-                            + "        ORGANIZATION_ID,\n" + "        KEY_PREFIX\n" + "    )\n"
-                            + ") VERSIONS=1, IMMUTABLE_ROWS=true, MULTI_TENANT=true");
-            conn.createStatement().execute(
-                    "CREATE VIEW " + viewFullName + " (\n" + 
-                            "INT1 BIGINT NOT NULL,\n" + 
-                            "DOUBLE1 DECIMAL(12, 3),\n" +
-                            "IS_BOOLEAN BOOLEAN,\n" + 
-                            "TEXT1 VARCHAR,\n" + "CONSTRAINT PKVIEW PRIMARY KEY\n" + "(\n" +
-                            "INT1\n" + ")) AS SELECT * FROM " + baseFullName + " WHERE KEY_PREFIX = '123'");
-            conn.createStatement().execute(
-                    "CREATE INDEX " + indexName + " \n" + "ON " + viewFullName + " (TEXT1 DESC, INT1)\n"
-                            + "INCLUDE (CREATED_BY, DOUBLE1, IS_BOOLEAN, CREATED_DATE)");
+        try (Connection conn = getConnection()) {
+            String baseSchemaName = generateUniqueName();
+            String viewSchemaName = generateUniqueName();
+            String tsViewSchemaName = generateUniqueName();
+            String baseTableName = generateUniqueName();
+            String baseFullName = SchemaUtil.getTableName(baseSchemaName, baseTableName);
+            String viewTableName = "V_" + generateUniqueName();
+            String viewFullName = SchemaUtil.getTableName(viewSchemaName, viewTableName);
+            String indexName = "I_" + generateUniqueName();
+            String tsViewTableName = "TSV_" + generateUniqueName();
+            String tsViewFullName = SchemaUtil.getTableName(tsViewSchemaName, tsViewTableName);
+            String tenantId = "tenant1";
             Properties tsProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
             tsProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
-            Connection tsConn = DriverManager.getConnection(getUrl(), tsProps);
-            tsConn.createStatement().execute("CREATE VIEW " + tsViewFullName + " AS SELECT * FROM " + viewFullName);
-            tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (1,1.0, true, 'a')");
-            tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (2,2.0, true, 'b')");
-            tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (3,3.0, true, 'c')");
-            tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (4,4.0, true, 'd')");
-            tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (5,5.0, true, 'e')");
-            tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (6,6.0, true, 'f')");
-            tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (7,7.0, true, 'g')");
-            tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (8,8.0, true, 'h')");
-            tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (9,9.0, true, 'i')");
-            tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (10,10.0, true, 'j')");
-            tsConn.commit();
-            
-            String basePhysicalName = SchemaUtil.getPhysicalTableName(Bytes.toBytes(baseFullName), isNamespaceMapped).toString();
-            assertRowCount(tsConn, tsViewFullName, basePhysicalName, 10);
-            
-            tsConn.createStatement().execute("DELETE FROM " + tsViewFullName + " WHERE TEXT1='d'");
-            tsConn.commit();
-            assertRowCount(tsConn, tsViewFullName, basePhysicalName, 9);
+            try (Connection tsConn = DriverManager.getConnection(getUrl(), tsProps);
+                 Connection tsConn2 = DriverManager.getConnection(getUrl(), tsProps);
+                 Connection tsConn3 = DriverManager.getConnection(getUrl(), tsProps)) {
+                if (isNamespaceMapped) {
+                    conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + baseSchemaName);
+                }
+                conn.createStatement().execute(
+                    "CREATE TABLE " + baseFullName + "(\n" + "    ORGANIZATION_ID CHAR(15) NOT NULL,\n"
+                        + "    KEY_PREFIX CHAR(3) NOT NULL,\n" + "    CREATED_DATE DATE,\n"
+                        + "    CREATED_BY CHAR(15),\n" + "    CONSTRAINT PK PRIMARY KEY (\n"
+                        + "        ORGANIZATION_ID,\n" + "        KEY_PREFIX\n" + "    )\n"
+                        + ") VERSIONS=1, IMMUTABLE_ROWS=true, MULTI_TENANT=true");
+                conn.createStatement().execute(
+                    "CREATE VIEW " + viewFullName + " (\n" +
+                        "INT1 BIGINT NOT NULL,\n" +
+                        "DOUBLE1 DECIMAL(12, 3),\n" +
+                        "IS_BOOLEAN BOOLEAN,\n" +
+                        "TEXT1 VARCHAR,\n" + "CONSTRAINT PKVIEW PRIMARY KEY\n" + "(\n" +
+                        "INT1\n" + ")) AS SELECT * FROM " + baseFullName + " WHERE KEY_PREFIX = '123'");
+                conn.createStatement().execute(
+                    "CREATE INDEX " + indexName + " \n" + "ON " + viewFullName + " (TEXT1 DESC, INT1)\n"
+                        + "INCLUDE (CREATED_BY, DOUBLE1, IS_BOOLEAN, CREATED_DATE)");
+
+                tsConn.createStatement().execute("CREATE VIEW " + tsViewFullName + " AS SELECT * FROM " + viewFullName);
+                tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (1,1.0, true, 'a')");
+                tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (2,2.0, true, 'b')");
+                tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (3,3.0, true, 'c')");
+                tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (4,4.0, true, 'd')");
+                tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (5,5.0, true, 'e')");
+                tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (6,6.0, true, 'f')");
+                tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (7,7.0, true, 'g')");
+                tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (8,8.0, true, 'h')");
+                tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (9,9.0, true, 'i')");
+                tsConn.createStatement().execute("UPSERT INTO " + tsViewFullName + "(INT1,DOUBLE1,IS_BOOLEAN,TEXT1) VALUES (10,10.0, true, 'j')");
+                tsConn.commit();
+
+                String basePhysicalName = SchemaUtil.getPhysicalTableName(Bytes.toBytes(baseFullName), isNamespaceMapped).toString();
+                assertRowCount(tsConn, tsViewFullName, basePhysicalName, 10);
+
+                tsConn.createStatement().execute("DELETE FROM " + tsViewFullName + " WHERE TEXT1='d'");
+                tsConn.commit();
+                assertRowCount(tsConn, tsViewFullName, basePhysicalName, 9);
+
+                tsConn.createStatement().execute("DELETE FROM " + tsViewFullName + " WHERE INT1=2");
+                tsConn.commit();
+                assertRowCount(tsConn, tsViewFullName, basePhysicalName, 8);
+
+                // Use different connection for delete
+
+                tsConn2.createStatement().execute("DELETE FROM " + tsViewFullName + " WHERE DOUBLE1 > 7.5 AND DOUBLE1 < 9.5");
+                tsConn2.commit();
+                assertRowCount(tsConn2, tsViewFullName, basePhysicalName, 6);
+
+                tsConn2.createStatement().execute("DROP VIEW " + tsViewFullName);
+                // Should drop view and index and remove index data
+                conn.createStatement().execute("DROP VIEW " + viewFullName);
+                // Deletes table data (but wouldn't update index)
+                conn.setAutoCommit(true);
+                conn.createStatement().execute("DELETE FROM " + baseFullName);
+                try {
+                    tsConn3.createStatement().execute("SELECT * FROM " + tsViewFullName + " LIMIT 1");
+                    fail("Expected table not to be found");
+                } catch (TableNotFoundException e) {
 
-            tsConn.createStatement().execute("DELETE FROM " + tsViewFullName + " WHERE INT1=2");
-            tsConn.commit();
-            assertRowCount(tsConn, tsViewFullName, basePhysicalName, 8);
-            
-            // Use different connection for delete
-            Connection tsConn2 = DriverManager.getConnection(getUrl(), tsProps);
-            tsConn2.createStatement().execute("DELETE FROM " + tsViewFullName + " WHERE DOUBLE1 > 7.5 AND DOUBLE1 < 9.5");
-            tsConn2.commit();
-            assertRowCount(tsConn2, tsViewFullName, basePhysicalName, 6);
-            
-            tsConn2.createStatement().execute("DROP VIEW " + tsViewFullName);
-            // Should drop view and index and remove index data
-            conn.createStatement().execute("DROP VIEW " + viewFullName);
-            // Deletes table data (but wouldn't update index)
-            conn.setAutoCommit(true);
-            conn.createStatement().execute("DELETE FROM " + baseFullName);
-            Connection tsConn3 = DriverManager.getConnection(getUrl(), tsProps);
-            try {
-                tsConn3.createStatement().execute("SELECT * FROM " + tsViewFullName + " LIMIT 1");
-                fail("Expected table not to be found");
-            } catch (TableNotFoundException e) {
-                
-            }
-            conn.createStatement().execute(
-                    "CREATE VIEW " + viewFullName + " (\n" + 
-                            "INT1 BIGINT NOT NULL,\n" + 
-                            "DOUBLE1 DECIMAL(12, 3),\n" +
-                            "IS_BOOLEAN BOOLEAN,\n" + 
-                            "TEXT1 VARCHAR,\n" + "CONSTRAINT PKVIEW PRIMARY KEY\n" + "(\n" +
-                            "INT1\n" + ")) AS SELECT * FROM " + baseFullName + " WHERE KEY_PREFIX = '123'");
-            tsConn3.createStatement().execute("CREATE VIEW " + tsViewFullName + " AS SELECT * FROM " + viewFullName);
-            conn.createStatement().execute(
+                }
+                conn.createStatement().execute(
+                    "CREATE VIEW " + viewFullName + " (\n" +
+                        "INT1 BIGINT NOT NULL,\n" +
+                        "DOUBLE1 DECIMAL(12, 3),\n" +
+                        "IS_BOOLEAN BOOLEAN,\n" +
+                        "TEXT1 VARCHAR,\n" + "CONSTRAINT PKVIEW PRIMARY KEY\n" + "(\n" +
+                        "INT1\n" + ")) AS SELECT * FROM " + baseFullName + " WHERE KEY_PREFIX = '123'");
+                tsConn3.createStatement().execute("CREATE VIEW " + tsViewFullName + " AS SELECT * FROM " + viewFullName);
+                conn.createStatement().execute(
                     "CREATE INDEX " + indexName + " \n" + "ON " + viewFullName + " (TEXT1 DESC, INT1)\n"
-                            + "INCLUDE (CREATED_BY, DOUBLE1, IS_BOOLEAN, CREATED_DATE)");
-            assertRowCount(tsConn3, tsViewFullName, basePhysicalName, 0);
-            
-            tsConn.close();
-            tsConn2.close();
-            tsConn3.close();
-            
-        } finally {
-            conn.close();
+                        + "INCLUDE (CREATED_BY, DOUBLE1, IS_BOOLEAN, CREATED_DATE)");
+                assertRowCount(tsConn3, tsViewFullName, basePhysicalName, 0);
+            }
         }
     }
     
@@ -596,22 +537,23 @@ public class ViewIndexIT extends SplitSystemCatalogIT {
     
     private void testHintForIndexOnView(boolean includeColumns) throws Exception {
         Properties props = new Properties();
-        Connection conn1 = DriverManager.getConnection(getUrl(), props);
-        conn1.setAutoCommit(true);
-        String tableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName());
-        String viewName = SchemaUtil.getTableName(SCHEMA2, generateUniqueName());
-        String indexName = generateUniqueName();
-        String fullIndexName = SchemaUtil.getTableName(SCHEMA2, indexName);
-        conn1.createStatement().execute(
-          "CREATE TABLE "+tableName+" (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) UPDATE_CACHE_FREQUENCY=1000000");
-        conn1.createStatement().execute("upsert into "+tableName+" values ('row1', 'value1', 'key1')");
-        conn1.createStatement().execute(
-          "CREATE VIEW "+viewName+" (v3 VARCHAR, v4 VARCHAR) AS SELECT * FROM "+tableName+" WHERE v1 = 'value1'");
-        conn1.createStatement().execute("CREATE INDEX " + indexName + " ON " + viewName + "(v3)" + (includeColumns ? " INCLUDE(v4)" : ""));
-        PhoenixStatement stmt = conn1.createStatement().unwrap(PhoenixStatement.class);
-        ResultSet rs = stmt.executeQuery("SELECT /*+ INDEX(" + viewName + " " + fullIndexName + ") */ v1 FROM " + viewName + " WHERE v3 = 'foo' ORDER BY v4");
-        assertFalse(rs.next());
-        assertEquals(fullIndexName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString());
+        try (Connection conn1 = DriverManager.getConnection(getUrl(), props)) {
+            conn1.setAutoCommit(true);
+            String tableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName());
+            String viewName = SchemaUtil.getTableName(SCHEMA2, generateUniqueName());
+            String indexName = generateUniqueName();
+            String fullIndexName = SchemaUtil.getTableName(SCHEMA2, indexName);
+            conn1.createStatement().execute(
+                "CREATE TABLE " + tableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) UPDATE_CACHE_FREQUENCY=1000000");
+            conn1.createStatement().execute("upsert into " + tableName + " values ('row1', 'value1', 'key1')");
+            conn1.createStatement().execute(
+                "CREATE VIEW " + viewName + " (v3 VARCHAR, v4 VARCHAR) AS SELECT * FROM " + tableName + " WHERE v1 = 'value1'");
+            conn1.createStatement().execute("CREATE INDEX " + indexName + " ON " + viewName + "(v3)" + (includeColumns ? " INCLUDE(v4)" : ""));
+            PhoenixStatement stmt = conn1.createStatement().unwrap(PhoenixStatement.class);
+            ResultSet rs = stmt.executeQuery("SELECT /*+ INDEX(" + viewName + " " + fullIndexName + ") */ v1 FROM " + viewName + " WHERE v3 = 'foo' ORDER BY v4");
+            assertFalse(rs.next());
+            assertEquals(fullIndexName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString());
+        }
     }
 
     @Test
@@ -666,24 +608,25 @@ public class ViewIndexIT extends SplitSystemCatalogIT {
         String tenantViewName = "TV_" + generateUniqueName();
         String globalViewIndexName = "GV_" + generateUniqueName();
         String tenantViewIndexName = "TV_" + generateUniqueName();
-        Connection globalConn = getConnection();
-        Connection tenantConn = getTenantConnection(TENANT1);
-        createBaseTable(SCHEMA1, tableName, true, 0, null, true);
-        createView(globalConn, SCHEMA1, globalViewName, tableName);
-        createViewIndex(globalConn, SCHEMA1, globalViewIndexName, globalViewName, "v1");
-        createView(tenantConn, SCHEMA1, tenantViewName, tableName);
-        createViewIndex(tenantConn, SCHEMA1, tenantViewIndexName, tenantViewName, "v2");
-
-        PTable globalViewIndexTable = PhoenixRuntime.getTable(globalConn, SCHEMA1 + "." + globalViewIndexName);
-        PTable tenantViewIndexTable = PhoenixRuntime.getTable(tenantConn, SCHEMA1 + "." + tenantViewIndexName);
-        Assert.assertNotNull(globalViewIndexTable);
-        Assert.assertNotNull(tenantViewIndexName);
-        Assert.assertNotEquals(globalViewIndexTable.getViewIndexId(), tenantViewIndexTable.getViewIndexId());
-        globalConn.createStatement().execute("UPSERT INTO " + SchemaUtil.getTableName(SCHEMA1, globalViewName) + " (T_ID, K1, K2) VALUES ('GLOBAL', 'k1', 100)");
-        tenantConn.createStatement().execute("UPSERT INTO " + SchemaUtil.getTableName(SCHEMA1, tenantViewName) + " (T_ID, K1, K2) VALUES ('TENANT', 'k1', 101)");
-
-        assertEquals(1, getRowCountOfView(globalConn, SCHEMA1, globalViewIndexName));
-        assertEquals(1, getRowCountOfView(tenantConn, SCHEMA1, tenantViewName));
+        try(Connection globalConn = getConnection();
+            Connection tenantConn = getTenantConnection(TENANT1)) {
+            createBaseTable(globalConn, SCHEMA1, tableName, true, 0, null, true);
+            createView(globalConn, SCHEMA1, globalViewName, tableName);
+            createViewIndex(globalConn, SCHEMA1, globalViewIndexName, globalViewName, "v1");
+            createView(tenantConn, SCHEMA1, tenantViewName, tableName);
+            createViewIndex(tenantConn, SCHEMA1, tenantViewIndexName, tenantViewName, "v2");
+
+            PTable globalViewIndexTable = PhoenixRuntime.getTable(globalConn, SCHEMA1 + "." + globalViewIndexName);
+            PTable tenantViewIndexTable = PhoenixRuntime.getTable(tenantConn, SCHEMA1 + "." + tenantViewIndexName);
+            Assert.assertNotNull(globalViewIndexTable);
+            Assert.assertNotNull(tenantViewIndexName);
+            Assert.assertNotEquals(globalViewIndexTable.getViewIndexId(), tenantViewIndexTable.getViewIndexId());
+            globalConn.createStatement().execute("UPSERT INTO " + SchemaUtil.getTableName(SCHEMA1, globalViewName) + " (T_ID, K1, K2) VALUES ('GLOBAL', 'k1', 100)");
+            tenantConn.createStatement().execute("UPSERT INTO " + SchemaUtil.getTableName(SCHEMA1, tenantViewName) + " (T_ID, K1, K2) VALUES ('TENANT', 'k1', 101)");
+
+            assertEquals(1, getRowCountOfView(globalConn, SCHEMA1, globalViewIndexName));
+            assertEquals(1, getRowCountOfView(tenantConn, SCHEMA1, tenantViewName));
+        }
     }
 
     private int getRowCountOfView(Connection conn, String schemaName, String viewName) throws SQLException {
@@ -701,7 +644,7 @@ public class ViewIndexIT extends SplitSystemCatalogIT {
         String globalViewName = "V_" + generateUniqueName();
         String globalViewIndexName = "GV_" + generateUniqueName();
         try (Connection globalConn = getConnection()) {
-            createBaseTable(SCHEMA1, tableName, true, 0, null, true);
+            createBaseTable(globalConn, SCHEMA1, tableName, true, 0, null, true);
             createView(globalConn, SCHEMA1, globalViewName, tableName);
             createViewIndex(globalConn, SCHEMA1, globalViewIndexName, globalViewName, "v1");
 
@@ -738,4 +681,68 @@ public class ViewIndexIT extends SplitSystemCatalogIT {
             }
         }
     }
+
+    public void createBaseTable(Connection conn, String schemaName, String tableName,
+                                boolean multiTenant,
+                                Integer saltBuckets, String splits, boolean immutable)
+        throws SQLException {
+        if (isNamespaceMapped) {
+            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+        }
+        String ddl = "CREATE " + (immutable ? "IMMUTABLE" : "") +
+            " TABLE " + SchemaUtil.getTableName(schemaName, tableName) +
+            " (t_id VARCHAR NOT NULL,\n" +
+            "k1 VARCHAR NOT NULL,\n" +
+            "k2 INTEGER NOT NULL,\n" +
+            "v1 VARCHAR,\n" +
+            "v2 INTEGER,\n" +
+            "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n";
+        String ddlOptions = multiTenant ? "MULTI_TENANT=true" : "";
+        if (saltBuckets != null) {
+            ddlOptions = ddlOptions
+                + (ddlOptions.isEmpty() ? "" : ", ")
+                + "salt_buckets=" + saltBuckets;
+        }
+        if (splits != null) {
+            ddlOptions = ddlOptions
+                + (ddlOptions.isEmpty() ? "" : ", ")
+                + "splits=" + splits;
+        }
+        conn.createStatement().execute(ddl + ddlOptions);
+    }
+
+    public void createIndex(Connection conn, String schemaName, String indexName,
+                            String tableName, String indexedColumnName, boolean isLocal,
+                            boolean isAsync) throws SQLException {
+        if (isNamespaceMapped) {
+            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+        }
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String local = isLocal ? " LOCAL " : "";
+        String async = isAsync ? " ASYNC " : "";
+        String sql =
+            "CREATE " + local + " INDEX " + indexName + " ON " + fullTableName + "(" +
+                indexedColumnName + ")" + async;
+        conn.createStatement().execute(sql);
+    }
+    public void createView(Connection conn, String schemaName, String viewName,
+                           String baseTableName) throws SQLException {
+        if (isNamespaceMapped) {
+            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+        }
+        String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+        String fullTableName = SchemaUtil.getTableName(schemaName, baseTableName);
+        String viewSql = "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullTableName;
+        conn.createStatement().execute(viewSql);
+    }
+
+    public void createViewIndex(Connection conn, String schemaName, String indexName,
+                                String viewName,
+                                String indexColumn) throws SQLException {
+        if (isNamespaceMapped) {
+            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+        }
+        String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullViewName + "(" + indexColumn + ")");
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index ca573dc..f33e5c2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -793,6 +793,7 @@ public class DeleteCompiler {
             ImmutableBytesWritable ptr = context.getTempPtr();
             PTable table = dataPlan.getTableRef().getTable();
             table.getIndexMaintainers(ptr, context.getConnection());
+            ScanUtil.setWALAnnotationAttributes(table, context.getScan());
             byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
             ServerCache cache = null;
             try {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 6724e6a..f1aa249 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -795,7 +795,6 @@ public class UpsertCompiler {
                     final Scan scan = context.getScan();
                     scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE, UngroupedAggregateRegionObserver.serialize(projectedTable));
                     scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions));
-                    
                     // Ignore order by - it has no impact
                     final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, originalQueryPlan);
                     return new ServerUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, context, connection, scan, aggPlan, aggProjector, maxSize, maxSizeBytes);
@@ -935,7 +934,7 @@ public class UpsertCompiler {
                 allColumns, columnIndexes, overlapViewColumns, values, addViewColumns,
                 connection, pkSlotIndexes, useServerTimestamp, onDupKeyBytes, maxSize, maxSizeBytes);
     }
-    
+
     private static boolean isRowTimestampSet(int[] pkSlotIndexes, PTable table) {
         checkArgument(table.getRowTimestampColPos() != -1, "Call this method only for tables with row timestamp column");
         int rowTimestampColPKSlot = table.getRowTimestampColPos();
@@ -1108,6 +1107,7 @@ public class UpsertCompiler {
             ImmutableBytesWritable ptr = context.getTempPtr();
             PTable table = tableRef.getTable();
             table.getIndexMaintainers(ptr, context.getConnection());
+            ScanUtil.setWALAnnotationAttributes(table, scan);
             byte[] txState = table.isTransactional() ?
                     connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 716fd3a..cf2a094 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -400,7 +400,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
      * Used for an aggregate query in which the key order does not necessarily match the group by
      * key order. In this case, we must collect all distinct groups within a region into a map,
      * aggregating as we go.
-     * @param limit TODO
      */
 
     private static class UnorderedGroupByRegionScanner extends BaseRegionScanner {
@@ -518,8 +517,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
     /**
      * Used for an aggregate query in which the key order match the group by key order. In this
      * case, we can do the aggregation as we scan, by detecting when the group by key changes.
-     * @param limit TODO
-     * @throws IOException
      */
 
     private static class OrderedGroupByRegionScanner extends BaseRegionScanner {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 0cf62d1..e25e055 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.coprocessor.generated.MetaDataProtos.MutationCo
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES;
@@ -292,46 +293,51 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             new byte[]{PTable.LinkType.PHYSICAL_TABLE.getSerializedValue()};
 
     // KeyValues for Table
-    private static final KeyValue TABLE_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
-    private static final KeyValue TABLE_SEQ_NUM_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
-    private static final KeyValue COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_COUNT_BYTES);
-    private static final KeyValue SALT_BUCKETS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, SALT_BUCKETS_BYTES);
-    private static final KeyValue PK_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PK_NAME_BYTES);
-    private static final KeyValue DATA_TABLE_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
-    private static final KeyValue INDEX_STATE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
-    private static final KeyValue IMMUTABLE_ROWS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IMMUTABLE_ROWS_BYTES);
-    private static final KeyValue VIEW_EXPRESSION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_STATEMENT_BYTES);
-    private static final KeyValue DEFAULT_COLUMN_FAMILY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DEFAULT_COLUMN_FAMILY_NAME_BYTES);
-    private static final KeyValue DISABLE_WAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DISABLE_WAL_BYTES);
-    private static final KeyValue MULTI_TENANT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MULTI_TENANT_BYTES);
-    private static final KeyValue VIEW_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TYPE_BYTES);
-    private static final KeyValue VIEW_INDEX_ID_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);
+    private static final Cell TABLE_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
+        TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
+    private static final Cell TABLE_SEQ_NUM_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
+TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
+    private static final Cell COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_COUNT_BYTES);
+    private static final Cell SALT_BUCKETS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, SALT_BUCKETS_BYTES);
+    private static final Cell PK_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PK_NAME_BYTES);
+    private static final Cell DATA_TABLE_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
+    private static final Cell INDEX_STATE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
+    private static final Cell IMMUTABLE_ROWS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IMMUTABLE_ROWS_BYTES);
+    private static final Cell VIEW_EXPRESSION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_STATEMENT_BYTES);
+    private static final Cell DEFAULT_COLUMN_FAMILY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DEFAULT_COLUMN_FAMILY_NAME_BYTES);
+    private static final Cell DISABLE_WAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DISABLE_WAL_BYTES);
+    private static final Cell MULTI_TENANT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MULTI_TENANT_BYTES);
+    private static final Cell VIEW_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TYPE_BYTES);
+    private static final Cell VIEW_INDEX_ID_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);
     /**
      * A designator for choosing the right type for viewIndex (Short vs Long) to be backward compatible.
      **/
-    private static final KeyValue VIEW_INDEX_ID_DATA_TYPE_BYTES_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_DATA_TYPE_BYTES);
-    private static final KeyValue INDEX_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_TYPE_BYTES);
-    private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
-    private static final KeyValue STORE_NULLS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORE_NULLS_BYTES);
-    private static final KeyValue EMPTY_KEYVALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
-    private static final KeyValue BASE_COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES);
-    private static final KeyValue ROW_KEY_ORDER_OPTIMIZABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
-    private static final KeyValue TRANSACTIONAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTIONAL_BYTES);
-    private static final KeyValue TRANSACTION_PROVIDER_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTION_PROVIDER_BYTES);
-    private static final KeyValue UPDATE_CACHE_FREQUENCY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, UPDATE_CACHE_FREQUENCY_BYTES);
-    private static final KeyValue IS_NAMESPACE_MAPPED_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
+    private static final Cell VIEW_INDEX_ID_DATA_TYPE_BYTES_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_DATA_TYPE_BYTES);
+    private static final Cell INDEX_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_TYPE_BYTES);
+    private static final Cell INDEX_DISABLE_TIMESTAMP_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
+    private static final Cell STORE_NULLS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORE_NULLS_BYTES);
+    private static final Cell EMPTY_KEYVALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
+    private static final Cell BASE_COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES);
+    private static final Cell ROW_KEY_ORDER_OPTIMIZABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
+    private static final Cell TRANSACTIONAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTIONAL_BYTES);
+    private static final Cell TRANSACTION_PROVIDER_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTION_PROVIDER_BYTES);
+    private static final Cell UPDATE_CACHE_FREQUENCY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, UPDATE_CACHE_FREQUENCY_BYTES);
+    private static final Cell IS_NAMESPACE_MAPPED_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
             TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
-    private static final KeyValue AUTO_PARTITION_SEQ_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, AUTO_PARTITION_SEQ_BYTES);
-    private static final KeyValue APPEND_ONLY_SCHEMA_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, APPEND_ONLY_SCHEMA_BYTES);
-    private static final KeyValue STORAGE_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORAGE_SCHEME_BYTES);
-    private static final KeyValue ENCODING_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ENCODING_SCHEME_BYTES);
-    private static final KeyValue USE_STATS_FOR_PARALLELIZATION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, USE_STATS_FOR_PARALLELIZATION_BYTES);
-    private static final KeyValue PHOENIX_TTL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PHOENIX_TTL_BYTES);
-    private static final KeyValue PHOENIX_TTL_HWM_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PHOENIX_TTL_HWM_BYTES);
-    private static final KeyValue LAST_DDL_TIMESTAMP_KV =
+    private static final Cell AUTO_PARTITION_SEQ_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, AUTO_PARTITION_SEQ_BYTES);
+    private static final Cell APPEND_ONLY_SCHEMA_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, APPEND_ONLY_SCHEMA_BYTES);
+    private static final Cell STORAGE_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORAGE_SCHEME_BYTES);
+    private static final Cell ENCODING_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ENCODING_SCHEME_BYTES);
+    private static final Cell USE_STATS_FOR_PARALLELIZATION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, USE_STATS_FOR_PARALLELIZATION_BYTES);
+    private static final Cell PHOENIX_TTL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PHOENIX_TTL_BYTES);
+    private static final Cell PHOENIX_TTL_HWM_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PHOENIX_TTL_HWM_BYTES);
+    private static final Cell LAST_DDL_TIMESTAMP_KV =
         createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, LAST_DDL_TIMESTAMP_BYTES);
+    private static final Cell CHANGE_DETECTION_ENABLED_KV =
+        createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES,
+            CHANGE_DETECTION_ENABLED_BYTES);
 
-    private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
+    private static final List<Cell> TABLE_KV_COLUMNS = Lists.newArrayList(
             EMPTY_KEYVALUE_KV,
             TABLE_TYPE_KV,
             TABLE_SEQ_NUM_KV,
@@ -364,7 +370,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             USE_STATS_FOR_PARALLELIZATION_KV,
             PHOENIX_TTL_KV,
             PHOENIX_TTL_HWM_KV,
-            LAST_DDL_TIMESTAMP_KV
+            LAST_DDL_TIMESTAMP_KV,
+            CHANGE_DETECTION_ENABLED_KV
     );
 
     static {
@@ -404,6 +411,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int PHOENIX_TTL_HWM_INDEX = TABLE_KV_COLUMNS.indexOf(PHOENIX_TTL_HWM_KV);
     private static final int LAST_DDL_TIMESTAMP_INDEX =
         TABLE_KV_COLUMNS.indexOf(LAST_DDL_TIMESTAMP_KV);
+    private static final int CHANGE_DETECTION_ENABLED_INDEX =
+        TABLE_KV_COLUMNS.indexOf(CHANGE_DETECTION_ENABLED_KV);
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
     private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
@@ -422,7 +431,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue LINK_TYPE_KV =
             createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
 
-    private static final List<KeyValue> COLUMN_KV_COLUMNS = Arrays.<KeyValue>asList(
+    private static final List<Cell> COLUMN_KV_COLUMNS = Lists.newArrayList(
             DECIMAL_DIGITS_KV,
             COLUMN_SIZE_KV,
             NULLABLE_KV,
@@ -478,7 +487,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue MAX_VALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MAX_VALUE_BYTES);
     private static final KeyValue IS_ARRAY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ARRAY_BYTES);
 
-    private static final List<KeyValue> FUNCTION_KV_COLUMNS = Arrays.<KeyValue>asList(
+    private static final List<Cell> FUNCTION_KV_COLUMNS = Lists.newArrayList(
             EMPTY_KEYVALUE_KV,
             CLASS_NAME_KV,
             JAR_PATH_KV,
@@ -1160,6 +1169,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
            null : PLong.INSTANCE.getCodec().decodeLong(lastDDLTimestampKv.getValueArray(),
                 lastDDLTimestampKv.getValueOffset(), SortOrder.getDefault());
 
+        Cell changeDetectionEnabledKv = tableKeyValues[CHANGE_DETECTION_ENABLED_INDEX];
+        boolean isChangeDetectionEnabled = changeDetectionEnabledKv != null
+            && Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(changeDetectionEnabledKv.getValueArray(),
+            changeDetectionEnabledKv.getValueOffset(),
+            changeDetectionEnabledKv.getValueLength()));
+
         // Check the cell tag to see whether the view has modified this property
         final byte[] tagUseStatsForParallelization = (useStatsForParallelizationKv == null) ?
                 HConstants.EMPTY_BYTE_ARRAY : CellUtil.getTagArray(useStatsForParallelizationKv);
@@ -1254,6 +1269,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 .setViewModifiedUseStatsForParallelization(viewModifiedUseStatsForParallelization)
                 .setViewModifiedPhoenixTTL(viewModifiedPhoenixTTL)
                 .setLastDDLTimestamp(lastDDLTimestamp)
+                .setIsChangeDetectionEnabled(isChangeDetectionEnabled)
                 .setColumns(columns)
                 .build();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 6ec9bb3..ed4ea74 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -73,12 +73,10 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.filter.AllVersionsIndexRebuildFilter;
-import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.exception.IndexWriteException;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index 7f4e73c..a96303a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -34,6 +34,7 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATT
 import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
+import static org.apache.phoenix.util.WALAnnotationUtil.annotateMutation;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.expression.aggregator.Aggregators;
@@ -118,37 +120,37 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
 
     private long pageSizeInMs = Long.MAX_VALUE;
     private int maxBatchSize = 0;
-    private Scan scan;
-    private RegionScanner innerScanner;
-    private Region region;
+    private final Scan scan;
+    private final RegionScanner innerScanner;
+    private final Region region;
     private final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
     private final RegionCoprocessorEnvironment env;
     private final boolean useQualifierAsIndex;
     private boolean needToWrite = false;
     private final Pair<Integer, Integer> minMaxQualifiers;
     private byte[][] values = null;
-    private PTable.QualifierEncodingScheme encodingScheme;
+    private final PTable.QualifierEncodingScheme encodingScheme;
     private PTable writeToTable = null;
     private PTable projectedTable = null;
-    private boolean isDescRowKeyOrderUpgrade;
+    private final boolean isDescRowKeyOrderUpgrade;
     private final int offset;
-    private boolean buildLocalIndex;
-    private List<IndexMaintainer> indexMaintainers;
+    private final boolean buildLocalIndex;
+    private final List<IndexMaintainer> indexMaintainers;
     private boolean isPKChanging = false;
-    private long ts;
+    private final long ts;
     private PhoenixTransactionProvider txnProvider = null;
-    private UngroupedAggregateRegionObserver.MutationList indexMutations;
+    private final UngroupedAggregateRegionObserver.MutationList indexMutations;
     private boolean isDelete = false;
-    private byte[] replayMutations;
+    private final byte[] replayMutations;
     private boolean isUpsert = false;
     private List<Expression> selectExpressions = null;
     private byte[] deleteCQ = null;
     private byte[] deleteCF = null;
     private byte[] emptyCF = null;
-    private byte[] indexUUID;
-    private byte[] txState;
-    private byte[] clientVersionBytes;
-    private long blockingMemStoreSize;
+    private final byte[] indexUUID;
+    private final byte[] txState;
+    private final byte[] clientVersionBytes;
+    private final long blockingMemStoreSize;
     private long maxBatchSizeBytes = 0L;
     private HTable targetHTable = null;
     private boolean incrScanRefCount = false;
@@ -368,7 +370,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
         byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr);
         if (offset > 0) { // for local indexes (prepend region start key)
             byte[] newRowWithOffset = new byte[offset + newRow.length];
-            System.arraycopy(firstKV.getRowArray(), firstKV.getRowOffset(), newRowWithOffset, 0, offset);;
+            System.arraycopy(firstKV.getRowArray(), firstKV.getRowOffset(), newRowWithOffset, 0, offset);
             System.arraycopy(newRow, 0, newRowWithOffset, offset, newRow.length);
             newRow = newRowWithOffset;
         }
@@ -592,9 +594,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
                                 insertEmptyKeyValue(results, mutations);
                             }
                             if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                                ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
-                                        txState, targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
-                                mutations.clear();
+                                annotateAndCommit(mutations);
                             }
                             // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
 
@@ -609,9 +609,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
                     } while (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeInMs);
 
                     if (!mutations.isEmpty()) {
-                        ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
-                                targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
-                        mutations.clear();
+                        annotateAndCommit(mutations);
                     }
                     if (!indexMutations.isEmpty()) {
                         ungroupedAggregateRegionObserver.commitBatch(region, indexMutations, blockingMemStoreSize);
@@ -640,8 +638,35 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
         }
     }
 
+    private void annotateAndCommit(UngroupedAggregateRegionObserver.MutationList mutations) throws IOException {
+        if (isDelete || isUpsert) {
+            annotateDataMutations(mutations, scan);
+        }
+        ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
+            targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
+        mutations.clear();
+    }
+
     @Override
     public long getMaxResultSize() {
         return scan.getMaxResultSize();
     }
+
+    private void annotateDataMutations(UngroupedAggregateRegionObserver.MutationList mutationsList,
+                                       Scan scan) {
+        byte[] tenantId =
+            scan.getAttribute(MutationState.MutationMetadataType.TENANT_ID.toString());
+        byte[] schemaName =
+            scan.getAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString());
+        byte[] logicalTableName =
+            scan.getAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString());
+        byte[] tableType =
+            scan.getAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString());
+        byte[] ddlTimestamp =
+            scan.getAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString());
+
+        for (Mutation m : mutationsList) {
+            annotateMutation(m, tenantId, schemaName, logicalTableName, tableType, ddlTimestamp);
+        }
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index 122d893..5e471f6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -3764,6 +3764,16 @@ public final class PTableProtos {
      * <code>optional int64 lastDDLTimestamp = 45;</code>
      */
     long getLastDDLTimestamp();
+
+    // optional bool changeDetectionEnabled = 46;
+    /**
+     * <code>optional bool changeDetectionEnabled = 46;</code>
+     */
+    boolean hasChangeDetectionEnabled();
+    /**
+     * <code>optional bool changeDetectionEnabled = 46;</code>
+     */
+    boolean getChangeDetectionEnabled();
   }
   /**
    * Protobuf type {@code PTable}
@@ -4055,6 +4065,11 @@ public final class PTableProtos {
               lastDDLTimestamp_ = input.readInt64();
               break;
             }
+            case 368: {
+              bitField1_ |= 0x00000100;
+              changeDetectionEnabled_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4947,6 +4962,22 @@ public final class PTableProtos {
       return lastDDLTimestamp_;
     }
 
+    // optional bool changeDetectionEnabled = 46;
+    public static final int CHANGEDETECTIONENABLED_FIELD_NUMBER = 46;
+    private boolean changeDetectionEnabled_;
+    /**
+     * <code>optional bool changeDetectionEnabled = 46;</code>
+     */
+    public boolean hasChangeDetectionEnabled() {
+      return ((bitField1_ & 0x00000100) == 0x00000100);
+    }
+    /**
+     * <code>optional bool changeDetectionEnabled = 46;</code>
+     */
+    public boolean getChangeDetectionEnabled() {
+      return changeDetectionEnabled_;
+    }
+
     private void initFields() {
       schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       tableNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -4992,6 +5023,7 @@ public final class PTableProtos {
       phoenixTTLHighWaterMark_ = 0L;
       viewModifiedPhoenixTTL_ = false;
       lastDDLTimestamp_ = 0L;
+      changeDetectionEnabled_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5191,6 +5223,9 @@ public final class PTableProtos {
       if (((bitField1_ & 0x00000080) == 0x00000080)) {
         output.writeInt64(45, lastDDLTimestamp_);
       }
+      if (((bitField1_ & 0x00000100) == 0x00000100)) {
+        output.writeBool(46, changeDetectionEnabled_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -5381,6 +5416,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeInt64Size(45, lastDDLTimestamp_);
       }
+      if (((bitField1_ & 0x00000100) == 0x00000100)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(46, changeDetectionEnabled_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -5612,6 +5651,11 @@ public final class PTableProtos {
         result = result && (getLastDDLTimestamp()
             == other.getLastDDLTimestamp());
       }
+      result = result && (hasChangeDetectionEnabled() == other.hasChangeDetectionEnabled());
+      if (hasChangeDetectionEnabled()) {
+        result = result && (getChangeDetectionEnabled()
+            == other.getChangeDetectionEnabled());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -5801,6 +5845,10 @@ public final class PTableProtos {
         hash = (37 * hash) + LASTDDLTIMESTAMP_FIELD_NUMBER;
         hash = (53 * hash) + hashLong(getLastDDLTimestamp());
       }
+      if (hasChangeDetectionEnabled()) {
+        hash = (37 * hash) + CHANGEDETECTIONENABLED_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getChangeDetectionEnabled());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -6013,6 +6061,8 @@ public final class PTableProtos {
         bitField1_ = (bitField1_ & ~0x00000400);
         lastDDLTimestamp_ = 0L;
         bitField1_ = (bitField1_ & ~0x00000800);
+        changeDetectionEnabled_ = false;
+        bitField1_ = (bitField1_ & ~0x00001000);
         return this;
       }
 
@@ -6235,6 +6285,10 @@ public final class PTableProtos {
           to_bitField1_ |= 0x00000080;
         }
         result.lastDDLTimestamp_ = lastDDLTimestamp_;
+        if (((from_bitField1_ & 0x00001000) == 0x00001000)) {
+          to_bitField1_ |= 0x00000100;
+        }
+        result.changeDetectionEnabled_ = changeDetectionEnabled_;
         result.bitField0_ = to_bitField0_;
         result.bitField1_ = to_bitField1_;
         onBuilt();
@@ -6464,6 +6518,9 @@ public final class PTableProtos {
         if (other.hasLastDDLTimestamp()) {
           setLastDDLTimestamp(other.getLastDDLTimestamp());
         }
+        if (other.hasChangeDetectionEnabled()) {
+          setChangeDetectionEnabled(other.getChangeDetectionEnabled());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -8807,6 +8864,39 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional bool changeDetectionEnabled = 46;
+      private boolean changeDetectionEnabled_ ;
+      /**
+       * <code>optional bool changeDetectionEnabled = 46;</code>
+       */
+      public boolean hasChangeDetectionEnabled() {
+        return ((bitField1_ & 0x00001000) == 0x00001000);
+      }
+      /**
+       * <code>optional bool changeDetectionEnabled = 46;</code>
+       */
+      public boolean getChangeDetectionEnabled() {
+        return changeDetectionEnabled_;
+      }
+      /**
+       * <code>optional bool changeDetectionEnabled = 46;</code>
+       */
+      public Builder setChangeDetectionEnabled(boolean value) {
+        bitField1_ |= 0x00001000;
+        changeDetectionEnabled_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool changeDetectionEnabled = 46;</code>
+       */
+      public Builder clearChangeDetectionEnabled() {
+        bitField1_ = (bitField1_ & ~0x00001000);
+        changeDetectionEnabled_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PTable)
     }
 
@@ -9476,7 +9566,7 @@ public final class PTableProtos {
       "es\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003 \001(\003\022\025\n" +
       "\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePostsCount\030" +
       "\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGuidePosts" +
-      "\"\222\t\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016" +
+      "\"\262\t\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016" +
       "tableNameBytes\030\002 \002(\014\022\036\n\ttableType\030\003 \002(\0162" +
       "\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022\026\n\016sequ" +
       "enceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022\023\n\013p" +
@@ -9505,12 +9595,12 @@ public final class PTableProtos {
       "rParallelization\030) \001(\010\022\022\n\nphoenixTTL\030* \001" +
       "(\003\022\037\n\027phoenixTTLHighWaterMark\030+ \001(\003\022\036\n\026v" +
       "iewModifiedPhoenixTTL\030, \001(\010\022\030\n\020lastDDLTi" +
-      "mestamp\030- \001(\003\"6\n\020EncodedCQCounter\022\021\n\tcol" +
-      "Family\030\001 \002(\t\022\017\n\007counter\030\002 \002(\005*A\n\nPTableT" +
-      "ype\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005I" +
-      "NDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.phoenix.c" +
-      "oprocessor.generatedB\014PTableProtosH\001\210\001\001\240" +
-      "\001\001"
+      "mestamp\030- \001(\003\022\036\n\026changeDetectionEnabled\030" +
+      ". \001(\010\"6\n\020EncodedCQCounter\022\021\n\tcolFamily\030\001" +
+      " \002(\t\022\017\n\007counter\030\002 \002(\005*A\n\nPTableType\022\n\n\006S" +
+      "YSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010" +
+      "\n\004JOIN\020\004B@\n(org.apache.phoenix.coprocess" +
+      "or.generatedB\014PTableProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9534,7 +9624,7 @@ public final class PTableProtos {
           internal_static_PTable_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PTable_descriptor,
-              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisable [...]
+              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisable [...]
           internal_static_EncodedCQCounter_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_EncodedCQCounter_fieldAccessorTable = new
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
index ab1ffee..19d00ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
@@ -2192,6 +2192,21 @@ public final class ServerCachingProtos {
      */
     com.google.protobuf.ByteString
         getParentTableTypeBytes();
+
+    // optional string logicalIndexName = 25;
+    /**
+     * <code>optional string logicalIndexName = 25;</code>
+     */
+    boolean hasLogicalIndexName();
+    /**
+     * <code>optional string logicalIndexName = 25;</code>
+     */
+    java.lang.String getLogicalIndexName();
+    /**
+     * <code>optional string logicalIndexName = 25;</code>
+     */
+    com.google.protobuf.ByteString
+        getLogicalIndexNameBytes();
   }
   /**
    * Protobuf type {@code IndexMaintainer}
@@ -2400,6 +2415,11 @@ public final class ServerCachingProtos {
               parentTableType_ = input.readBytes();
               break;
             }
+            case 202: {
+              bitField0_ |= 0x00080000;
+              logicalIndexName_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2959,6 +2979,49 @@ public final class ServerCachingProtos {
       }
     }
 
+    // optional string logicalIndexName = 25;
+    public static final int LOGICALINDEXNAME_FIELD_NUMBER = 25;
+    private java.lang.Object logicalIndexName_;
+    /**
+     * <code>optional string logicalIndexName = 25;</code>
+     */
+    public boolean hasLogicalIndexName() {
+      return ((bitField0_ & 0x00080000) == 0x00080000);
+    }
+    /**
+     * <code>optional string logicalIndexName = 25;</code>
+     */
+    public java.lang.String getLogicalIndexName() {
+      java.lang.Object ref = logicalIndexName_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          logicalIndexName_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string logicalIndexName = 25;</code>
+     */
+    public com.google.protobuf.ByteString
+        getLogicalIndexNameBytes() {
+      java.lang.Object ref = logicalIndexName_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        logicalIndexName_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
       saltBuckets_ = 0;
       isMultiTenant_ = false;
@@ -2984,6 +3047,7 @@ public final class ServerCachingProtos {
       viewIndexIdType_ = 0;
       indexDataColumnCount_ = -1;
       parentTableType_ = "";
+      logicalIndexName_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3153,6 +3217,9 @@ public final class ServerCachingProtos {
       if (((bitField0_ & 0x00040000) == 0x00040000)) {
         output.writeBytes(24, getParentTableTypeBytes());
       }
+      if (((bitField0_ & 0x00080000) == 0x00080000)) {
+        output.writeBytes(25, getLogicalIndexNameBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -3263,6 +3330,10 @@ public final class ServerCachingProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(24, getParentTableTypeBytes());
       }
+      if (((bitField0_ & 0x00080000) == 0x00080000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(25, getLogicalIndexNameBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3391,6 +3462,11 @@ public final class ServerCachingProtos {
         result = result && getParentTableType()
             .equals(other.getParentTableType());
       }
+      result = result && (hasLogicalIndexName() == other.hasLogicalIndexName());
+      if (hasLogicalIndexName()) {
+        result = result && getLogicalIndexName()
+            .equals(other.getLogicalIndexName());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -3500,6 +3576,10 @@ public final class ServerCachingProtos {
         hash = (37 * hash) + PARENTTABLETYPE_FIELD_NUMBER;
         hash = (53 * hash) + getParentTableType().hashCode();
       }
+      if (hasLogicalIndexName()) {
+        hash = (37 * hash) + LOGICALINDEXNAME_FIELD_NUMBER;
+        hash = (53 * hash) + getLogicalIndexName().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -3682,6 +3762,8 @@ public final class ServerCachingProtos {
         bitField0_ = (bitField0_ & ~0x00400000);
         parentTableType_ = "";
         bitField0_ = (bitField0_ & ~0x00800000);
+        logicalIndexName_ = "";
+        bitField0_ = (bitField0_ & ~0x01000000);
         return this;
       }
 
@@ -3831,6 +3913,10 @@ public final class ServerCachingProtos {
           to_bitField0_ |= 0x00040000;
         }
         result.parentTableType_ = parentTableType_;
+        if (((from_bitField0_ & 0x01000000) == 0x01000000)) {
+          to_bitField0_ |= 0x00080000;
+        }
+        result.logicalIndexName_ = logicalIndexName_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -4020,6 +4106,11 @@ public final class ServerCachingProtos {
           parentTableType_ = other.parentTableType_;
           onChanged();
         }
+        if (other.hasLogicalIndexName()) {
+          bitField0_ |= 0x01000000;
+          logicalIndexName_ = other.logicalIndexName_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -5924,6 +6015,80 @@ public final class ServerCachingProtos {
         return this;
       }
 
+      // optional string logicalIndexName = 25;
+      private java.lang.Object logicalIndexName_ = "";
+      /**
+       * <code>optional string logicalIndexName = 25;</code>
+       */
+      public boolean hasLogicalIndexName() {
+        return ((bitField0_ & 0x01000000) == 0x01000000);
+      }
+      /**
+       * <code>optional string logicalIndexName = 25;</code>
+       */
+      public java.lang.String getLogicalIndexName() {
+        java.lang.Object ref = logicalIndexName_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          logicalIndexName_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string logicalIndexName = 25;</code>
+       */
+      public com.google.protobuf.ByteString
+          getLogicalIndexNameBytes() {
+        java.lang.Object ref = logicalIndexName_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          logicalIndexName_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string logicalIndexName = 25;</code>
+       */
+      public Builder setLogicalIndexName(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x01000000;
+        logicalIndexName_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string logicalIndexName = 25;</code>
+       */
+      public Builder clearLogicalIndexName() {
+        bitField0_ = (bitField0_ & ~0x01000000);
+        logicalIndexName_ = getDefaultInstance().getLogicalIndexName();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string logicalIndexName = 25;</code>
+       */
+      public Builder setLogicalIndexNameBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x01000000;
+        logicalIndexName_ = value;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:IndexMaintainer)
     }
 
@@ -9050,7 +9215,7 @@ public final class ServerCachingProtos {
       "ength\030\003 \002(\005\"4\n\017ColumnReference\022\016\n\006family" +
       "\030\001 \002(\014\022\021\n\tqualifier\030\002 \002(\014\"4\n\nColumnInfo\022" +
       "\022\n\nfamilyName\030\001 \001(\t\022\022\n\ncolumnName\030\002 \002(\t\"" +
-      "\232\006\n\017IndexMaintainer\022\023\n\013saltBuckets\030\001 \002(\005" +
+      "\264\006\n\017IndexMaintainer\022\023\n\013saltBuckets\030\001 \002(\005" +
       "\022\025\n\risMultiTenant\030\002 \002(\010\022\023\n\013viewIndexId\030\003" +
       " \001(\014\022(\n\016indexedColumns\030\004 \003(\0132\020.ColumnRef" +
       "erence\022 \n\030indexedColumnTypeOrdinal\030\005 \003(\005",
@@ -9069,23 +9234,24 @@ public final class ServerCachingProtos {
       "\003(\0132\013.ColumnInfo\022\026\n\016encodingScheme\030\024 \002(\005" +
       "\022\036\n\026immutableStorageScheme\030\025 \002(\005\022\027\n\017view" +
       "IndexIdType\030\026 \001(\005\022 \n\024indexDataColumnCoun" +
-      "t\030\027 \001(\005:\002-1\022\027\n\017parentTableType\030\030 \001(\t\"\370\001\n" +
-      "\025AddServerCacheRequest\022\020\n\010tenantId\030\001 \001(\014" +
-      "\022\017\n\007cacheId\030\002 \002(\014\022)\n\010cachePtr\030\003 \002(\0132\027.Im" +
-      "mutableBytesWritable\022)\n\014cacheFactory\030\004 \002" +
-      "(\0132\023.ServerCacheFactory\022\017\n\007txState\030\005 \001(\014",
-      "\022\"\n\032hasProtoBufIndexMaintainer\030\006 \001(\010\022\025\n\r" +
-      "clientVersion\030\007 \001(\005\022\032\n\022usePersistentCach" +
-      "e\030\010 \001(\010\"(\n\026AddServerCacheResponse\022\016\n\006ret" +
-      "urn\030\001 \002(\010\"=\n\030RemoveServerCacheRequest\022\020\n" +
-      "\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\"+\n\031Remo" +
-      "veServerCacheResponse\022\016\n\006return\030\001 \002(\0102\245\001" +
-      "\n\024ServerCachingService\022A\n\016addServerCache" +
-      "\022\026.AddServerCacheRequest\032\027.AddServerCach" +
-      "eResponse\022J\n\021removeServerCache\022\031.RemoveS" +
-      "erverCacheRequest\032\032.RemoveServerCacheRes",
-      "ponseBG\n(org.apache.phoenix.coprocessor." +
-      "generatedB\023ServerCachingProtosH\001\210\001\001\240\001\001"
+      "t\030\027 \001(\005:\002-1\022\027\n\017parentTableType\030\030 \001(\t\022\030\n\020" +
+      "logicalIndexName\030\031 \001(\t\"\370\001\n\025AddServerCach" +
+      "eRequest\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 " +
+      "\002(\014\022)\n\010cachePtr\030\003 \002(\0132\027.ImmutableBytesWr" +
+      "itable\022)\n\014cacheFactory\030\004 \002(\0132\023.ServerCac",
+      "heFactory\022\017\n\007txState\030\005 \001(\014\022\"\n\032hasProtoBu" +
+      "fIndexMaintainer\030\006 \001(\010\022\025\n\rclientVersion\030" +
+      "\007 \001(\005\022\032\n\022usePersistentCache\030\010 \001(\010\"(\n\026Add" +
+      "ServerCacheResponse\022\016\n\006return\030\001 \002(\010\"=\n\030R" +
+      "emoveServerCacheRequest\022\020\n\010tenantId\030\001 \001(" +
+      "\014\022\017\n\007cacheId\030\002 \002(\014\"+\n\031RemoveServerCacheR" +
+      "esponse\022\016\n\006return\030\001 \002(\0102\245\001\n\024ServerCachin" +
+      "gService\022A\n\016addServerCache\022\026.AddServerCa" +
+      "cheRequest\032\027.AddServerCacheResponse\022J\n\021r" +
+      "emoveServerCache\022\031.RemoveServerCacheRequ",
+      "est\032\032.RemoveServerCacheResponseBG\n(org.a" +
+      "pache.phoenix.coprocessor.generatedB\023Ser" +
+      "verCachingProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9115,7 +9281,7 @@ public final class ServerCachingProtos {
           internal_static_IndexMaintainer_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_IndexMaintainer_descriptor,
-              new java.lang.String[] { "SaltBuckets", "IsMultiTenant", "ViewIndexId", "IndexedColumns", "IndexedColumnTypeOrdinal", "DataTableColRefForCoveredColumns", "IndexTableColRefForCoveredColumns", "IsLocalIndex", "IndexTableName", "RowKeyOrderOptimizable", "DataTableEmptyKeyValueColFamily", "EmptyKeyValueColFamily", "IndexedExpressions", "RowKeyMetadata", "NumDataTableColFamilies", "IndexWalDisabled", "IndexRowKeyByteSize", "Immutable", "IndexedColumnInfo", "EncodingScheme", "Imm [...]
+              new java.lang.String[] { "SaltBuckets", "IsMultiTenant", "ViewIndexId", "IndexedColumns", "IndexedColumnTypeOrdinal", "DataTableColRefForCoveredColumns", "IndexTableColRefForCoveredColumns", "IsLocalIndex", "IndexTableName", "RowKeyOrderOptimizable", "DataTableEmptyKeyValueColFamily", "EmptyKeyValueColFamily", "IndexedExpressions", "RowKeyMetadata", "NumDataTableColFamilies", "IndexWalDisabled", "IndexRowKeyByteSize", "Immutable", "IndexedColumnInfo", "EncodingScheme", "Imm [...]
           internal_static_AddServerCacheRequest_descriptor =
             getDescriptor().getMessageTypes().get(4);
           internal_static_AddServerCacheRequest_fieldAccessorTable = new
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 788dea5..3800b99 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -54,6 +54,8 @@ import org.apache.phoenix.util.MetaDataUtil;
 
 import com.google.common.collect.Maps;
 
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED;
+
 
 /**
  * Various SQLException Information. Including a vendor-specific errorcode and a standard SQLState.
@@ -337,6 +339,8 @@ public enum SQLExceptionCode {
             + " limit or create ASYNC index."),
     CANNOT_SET_OR_ALTER_PHOENIX_TTL(10953, "44A35", "Cannot set or alter "
             + PhoenixDatabaseMetaData.PHOENIX_TTL + " property on an view when parent/child view has PHOENIX_TTL set,"),
+    CHANGE_DETECTION_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY(10954, "44A36",
+        CHANGE_DETECTION_ENABLED + " is only supported on tables and views"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index d1f97d3..7641db4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.htrace.Span;
 import org.apache.htrace.TraceScope;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
@@ -111,6 +112,7 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.TransactionUtil;
+import org.apache.phoenix.util.WALAnnotationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -657,6 +659,7 @@ public class MutationState implements SQLCloseable {
                 }
                 // The DeleteCompiler already generates the deletes for indexes, so no need to do it again
                 rowMutationsPertainingToIndex = Collections.emptyList();
+
             } else {
                 for (Map.Entry<PColumn, byte[]> valueEntry : rowEntry.getValue().getColumnValues().entrySet()) {
                     row.setValue(valueEntry.getKey(), valueEntry.getValue());
@@ -676,12 +679,42 @@ public class MutationState implements SQLCloseable {
                 }
                 rowMutationsPertainingToIndex = rowMutations;
             }
+            annotateMutationsWithMetadata(table, rowMutations);
             mutationList.addAll(rowMutations);
             if (mutationsPertainingToIndex != null) mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex);
         }
         values.putAll(modifiedValues);
     }
 
+    private void annotateMutationsWithMetadata(PTable table, List<Mutation> rowMutations) {
+        //only annotate if the change detection flag is on the table and HBase supports
+        // preWALAppend coprocs server-side
+        if (table == null || !table.isChangeDetectionEnabled()
+            || !HbaseCompatCapabilities.hasPreWALAppend()) {
+            return;
+        }
+        //annotate each mutation with enough metadata so that anyone interested can
+        // deterministically figure out exactly what Phoenix schema object created the mutation
+        // Server-side we can annotate the HBase WAL with these.
+        for (Mutation mutation : rowMutations) {
+            annotateMutationWithMetadata(table, mutation);
+        }
+
+    }
+
+    private void annotateMutationWithMetadata(PTable table, Mutation mutation) {
+        byte[] tenantId = table.getTenantId() != null ? table.getTenantId().getBytes() : null;
+        byte[] schemaName = table.getSchemaName() != null ? table.getSchemaName().getBytes() : null;
+        byte[] tableName = table.getTableName() != null ? table.getTableName().getBytes() : null;
+        byte[] tableType = table.getType().getValue().getBytes();
+        //Note that we use the _HBase_ byte encoding for a Long, not the Phoenix one, so that
+        //downstream consumers don't need to have the Phoenix codecs.
+        byte[] lastDDLTimestamp =
+            table.getLastDDLTimestamp() != null ? Bytes.toBytes(table.getLastDDLTimestamp()) : null;
+        WALAnnotationUtil.annotateMutation(mutation, tenantId, schemaName,
+            tableName, tableType, lastDDLTimestamp);
+    }
+
     /**
      * Get the unsorted list of HBase mutations for the tables with uncommitted data.
      * 
@@ -859,6 +892,14 @@ public class MutationState implements SQLCloseable {
         return batchCount;
     }
 
+    public enum MutationMetadataType {
+        TENANT_ID,
+        SCHEMA_NAME,
+        LOGICAL_TABLE_NAME,
+        TIMESTAMP,
+        TABLE_TYPE
+    }
+
     private static class TableInfo {
 
         private final boolean isDataTable;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 2a7747d..cb7cf0d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -54,18 +54,21 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
+import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
+import org.apache.phoenix.compat.hbase.coprocessor.CompatIndexRegionObserver;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
 import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner;
@@ -85,12 +88,15 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
+import org.apache.phoenix.util.WALAnnotationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,7 +115,7 @@ import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeCol
  * Phoenix always does batch mutations.
  * <p>
  */
-public class IndexRegionObserver extends BaseRegionObserver {
+public class IndexRegionObserver extends CompatIndexRegionObserver {
 
     private static final Logger LOG = LoggerFactory.getLogger(IndexRegionObserver.class);
     private static final OperationStatus IGNORE = new OperationStatus(OperationStatusCode.SUCCESS);
@@ -118,6 +124,8 @@ public class IndexRegionObserver extends BaseRegionObserver {
     protected static final byte UNVERIFIED_BYTE = 2;
     public static final byte[] UNVERIFIED_BYTES = new byte[] { UNVERIFIED_BYTE };
     public static final byte[] VERIFIED_BYTES = new byte[] { VERIFIED_BYTE };
+    public static final String PHOENIX_APPEND_METADATA_TO_WAL = "phoenix.append.metadata.to.wal";
+    public static final boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false;
 
   /**
    * Class to represent pending data table rows
@@ -176,7 +184,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
    * additional locks to serialize the access to the BatchMutateContext objects.
    */
 
-  private static class BatchMutateContext {
+  public static class BatchMutateContext {
       private BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
       // The max of reference counts on the pending rows of this batch at the time this batch arrives
       private int maxPendingRowCount = 0;
@@ -199,10 +207,27 @@ public class IndexRegionObserver extends BaseRegionObserver {
       private List<CountDownLatch> waitList = null;
       private Map<ImmutableBytesPtr, MultiMutation> multiMutationMap;
 
-      private BatchMutateContext(int clientVersion) {
+      //list containing the original mutations from the MiniBatchOperationInProgress. Contains
+      // any annotations we were sent by the client, and can be used in hooks that don't get
+      // passed MiniBatchOperationInProgress, like preWALAppend()
+      private List<Mutation> originalMutations;
+      public BatchMutateContext() {
+          this.clientVersion = 0;
+      }
+      public BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
       }
 
+      public void populateOriginalMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+          originalMutations = new ArrayList<Mutation>(miniBatchOp.size());
+          for (int k = 0; k < miniBatchOp.size(); k++) {
+              originalMutations.add(miniBatchOp.getOperation(k));
+          }
+      }
+      public List<Mutation> getOriginalMutations() {
+          return originalMutations;
+      }
+
       public BatchMutatePhase getCurrentPhase() {
           return currentPhase;
       }
@@ -268,6 +293,8 @@ public class IndexRegionObserver extends BaseRegionObserver {
   private int rowLockWaitDuration;
   private int concurrentMutationWaitDuration;
   private String dataTableName;
+  private boolean shouldWALAppend = DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL;
+  private boolean isNamespaceEnabled = false;
 
   private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
   private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100;
@@ -309,6 +336,10 @@ public class IndexRegionObserver extends BaseRegionObserver {
           this.metricSource = MetricsIndexerSourceFactory.getInstance().getIndexerSource();
           setSlowThresholds(e.getConfiguration());
           this.dataTableName = env.getRegionInfo().getTable().getNameAsString();
+          this.shouldWALAppend = env.getConfiguration().getBoolean(PHOENIX_APPEND_METADATA_TO_WAL,
+              DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL);
+          this.isNamespaceEnabled = SchemaUtil.isNamespaceMappingEnabled(PTableType.INDEX,
+              env.getConfiguration());
       } catch (NoSuchMethodError ex) {
           disabled = true;
           super.start(e);
@@ -927,6 +958,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
         PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
         BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
         setBatchMutateContext(c, context);
+        context.populateOriginalMutations(miniBatchOp);
         // Need to add cell tags to Delete Marker before we do any index processing
         // since we add tags to tables which doesn't have indexes also.
         setDeleteAttributes(miniBatchOp);
@@ -1022,9 +1054,9 @@ public class IndexRegionObserver extends BaseRegionObserver {
         }
     }
 
-  private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
+    private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
       this.batchMutateContext.set(context);
-  }
+    }
 
   private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
       return this.batchMutateContext.get();
@@ -1034,6 +1066,15 @@ public class IndexRegionObserver extends BaseRegionObserver {
       this.batchMutateContext.remove();
   }
 
+    @Override
+    public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> c, WALKey key,
+                             WALEdit edit) {
+        if (HbaseCompatCapabilities.hasPreWALAppend() && shouldWALAppend) {
+            BatchMutateContext context = getBatchMutateContext(c);
+            WALAnnotationUtil.appendMutationAttributesToWALKey(key, context);
+        }
+    }
+
   @Override
   public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
       MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 866a1dc..6961861 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.index;
 
+import static org.apache.phoenix.compat.hbase.CompatUtil.setSingleRow;
+import static org.apache.phoenix.compat.hbase.CompatUtil.setStartRow;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CHECK_VERIFY_COLUMN;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
@@ -25,8 +27,6 @@ import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
 import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
-import static org.apache.phoenix.compat.hbase.CompatUtil.*;
-
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -70,6 +70,7 @@ import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * 
  * Coprocessor that verifies the scanned rows of a non-transactional global index.
@@ -97,6 +98,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
     private static final Logger LOG = LoggerFactory.getLogger(GlobalIndexChecker.class);
     private HTableFactory hTableFactory;
     private GlobalIndexCheckerSource metricsSource;
+
     public enum RebuildReturnCode {
         NO_DATA_ROW(0),
         NO_INDEX_ROW(1),
@@ -603,5 +605,4 @@ public class GlobalIndexChecker extends BaseRegionObserver {
     public void stop(CoprocessorEnvironment e) throws IOException {
         this.hTableFactory.shutdown();
     }
-
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index b882b9c..6039bb2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -237,7 +237,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     
     /**
      * For client-side to append serialized IndexMaintainers of keyValueIndexes
-     * @param dataTable data table
+     * @param table data table
      * @param indexMetaDataPtr bytes pointer to hold returned serialized value
      * @param keyValueIndexes indexes to serialize
      */
@@ -381,6 +381,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     private Map<ColumnReference, ColumnReference> coveredColumnsMap;
     /**** END: New member variables added in 4.10 *****/
 
+    //**** START: New member variables added in 4.16 ****/
+    private String logicalIndexName;
+
     private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
         this.dataRowKeySchema = dataRowKeySchema;
         this.isDataTableSalted = isDataTableSalted;
@@ -596,6 +599,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             }
         }
         this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize);
+        this.logicalIndexName = index.getName().getString();
         initCachedState();
     }
     
@@ -1257,7 +1261,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         // If if there are no covered columns, we know it's our default name
         return emptyKeyValueCFPtr;
     }
-    
+
+    /**
+     * The logical index name. For global indexes on base tables this will be the same as the
+     * physical index table name (unless namespaces are enabled, then . gets replaced with : for
+     * the physical table name). For view indexes, the logical and physical names will be
+     * different because all view indexes of a base table are stored in the same physical table
+     * @return The logical index name
+     */
+    public String getLogicalIndexName() {
+        return logicalIndexName;
+    }
+
     @Deprecated // Only called by code older than our 4.10 release
     @Override
     public void readFields(DataInput input) throws IOException {
@@ -1460,6 +1475,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             }
             maintainer.coveredColumnsMap.put(dataTableColRef, indexTableColRef);
         }
+        maintainer.logicalIndexName = proto.getLogicalIndexName();
         maintainer.initCachedState();
         return maintainer;
     }
@@ -1583,6 +1599,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         }
         builder.setEncodingScheme(maintainer.encodingScheme.getSerializedMetadataValue());
         builder.setImmutableStorageScheme(maintainer.immutableStorageScheme.getSerializedMetadataValue());
+        builder.setLogicalIndexName(maintainer.logicalIndexName);
         return builder.build();
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 48a367c..c78af6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -381,6 +381,10 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final String LAST_DDL_TIMESTAMP = "LAST_DDL_TIMESTAMP";
     public static final byte[] LAST_DDL_TIMESTAMP_BYTES = Bytes.toBytes(LAST_DDL_TIMESTAMP);
 
+    public static final String CHANGE_DETECTION_ENABLED = "CHANGE_DETECTION_ENABLED";
+    public static final byte[] CHANGE_DETECTION_ENABLED_BYTES =
+        Bytes.toBytes(CHANGE_DETECTION_ENABLED);
+
     public static final String SYSTEM_CHILD_LINK_TABLE = "CHILD_LINK";
     public static final String SYSTEM_CHILD_LINK_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CHILD_LINK_TABLE);
     public static final byte[] SYSTEM_CHILD_LINK_NAME_BYTES = Bytes.toBytes(SYSTEM_CHILD_LINK_NAME);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 22227c2..1e83f6c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -3747,19 +3747,23 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             metaConnection = addColumnsIfNotExists(
                 metaConnection,
                 PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 - 2,
+                MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 - 3,
                 PhoenixDatabaseMetaData.PHOENIX_TTL + " "
                         + PInteger.INSTANCE.getSqlTypeName());
             metaConnection = addColumnsIfNotExists(
                 metaConnection,
                 PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 - 1,
+                MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 - 2,
                 PhoenixDatabaseMetaData.PHOENIX_TTL_HWM + " "
                         + PInteger.INSTANCE.getSqlTypeName());
             metaConnection = addColumnsIfNotExists(metaConnection,
-                PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0,
+                PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 -1,
                 PhoenixDatabaseMetaData.LAST_DDL_TIMESTAMP + " "
                     + PLong.INSTANCE.getSqlTypeName());
+            metaConnection = addColumnsIfNotExists(metaConnection,
+                PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0,
+                PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED
+                    + " " + PBoolean.INSTANCE.getSqlTypeName());
             UpgradeUtil.bootstrapLastDDLTimestamp(metaConnection);
 
             boolean isNamespaceMapping =
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index d4e0fa2..8d970ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -46,6 +46,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BUFFER_LENGTH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHAR_OCTET_LENGTH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLIENT_IP;
@@ -310,6 +311,7 @@ public interface QueryConstants {
             PHOENIX_TTL + " BIGINT,\n" +
             PHOENIX_TTL_HWM + " BIGINT,\n" +
             LAST_DDL_TIMESTAMP + " BIGINT, " +
+            CHANGE_DETECTION_ENABLED + " BOOLEAN, " +
             // Column metadata (will be null for table row)
             DATA_TYPE + " INTEGER," +
             COLUMN_SIZE + " INTEGER," +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 2fde0fb..3815e81 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -355,6 +355,11 @@ public class DelegateTable implements PTable {
         return delegate.getLastDDLTimestamp();
     }
 
+    @Override
+    public boolean isChangeDetectionEnabled() {
+        return delegate.isChangeDetectionEnabled();
+    }
+
     @Override public Map<String, String> getPropertyValues() { return delegate.getPropertyValues(); }
 
     @Override public Map<String, String> getDefaultPropertyValues() { return delegate.getDefaultPropertyValues(); }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 0f9e7f7..7c3ed20 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -31,6 +31,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_CREATED_DATE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
@@ -324,8 +325,10 @@ public class MetaDataClient {
                     USE_STATS_FOR_PARALLELIZATION +"," +
                     VIEW_INDEX_ID_DATA_TYPE +"," +
                     PHOENIX_TTL +"," +
-                    PHOENIX_TTL_HWM +
-                    ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+                    PHOENIX_TTL_HWM + "," +
+                    CHANGE_DETECTION_ENABLED +
+                    ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " +
+                "?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
     private static final String CREATE_SCHEMA = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE
             + "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)";
@@ -2063,6 +2066,7 @@ public class MetaDataClient {
             Long phoenixTTL = PHOENIX_TTL_NOT_DEFINED;
             Long phoenixTTLHighWaterMark = MIN_PHOENIX_TTL_HWM;
             Long phoenixTTLProp = (Long) TableProperty.PHOENIX_TTL.getValue(tableProps);;
+
             // Validate PHOENIX_TTL prop value if set
             if (phoenixTTLProp != null) {
                 if (phoenixTTLProp < 0) {
@@ -2102,6 +2106,10 @@ public class MetaDataClient {
                 }
             }
 
+            Boolean isChangeDetectionEnabledProp =
+                (Boolean) TableProperty.CHANGE_DETECTION_ENABLED.getValue(tableProps);
+            verifyChangeDetectionTableType(tableType, isChangeDetectionEnabledProp);
+
             if (parent != null && tableType == PTableType.INDEX) {
                 timestamp = TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider);
                 isImmutableRows = parent.isImmutableRows();
@@ -3003,6 +3011,12 @@ public class MetaDataClient {
                 tableUpsert.setLong(31, phoenixTTLHighWaterMark);
             }
 
+            if (isChangeDetectionEnabledProp == null) {
+                tableUpsert.setNull(32, Types.BOOLEAN);
+            } else {
+                tableUpsert.setBoolean(32, isChangeDetectionEnabledProp);
+            }
+
             tableUpsert.execute();
 
             if (asyncCreatedDate != null) {
@@ -3143,6 +3157,7 @@ public class MetaDataClient {
                     parent.getPhoenixTTL() != phoenixTTL)
                 .setLastDDLTimestamp(result.getTable() != null ?
                     result.getTable().getLastDDLTimestamp() : null)
+                .setIsChangeDetectionEnabled(isChangeDetectionEnabledProp)
                 .build();
             result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
             addTableToCache(result);
@@ -3155,6 +3170,17 @@ public class MetaDataClient {
         }
     }
 
+    private void verifyChangeDetectionTableType(PTableType tableType, Boolean isChangeDetectionEnabledProp) throws SQLException {
+        if (isChangeDetectionEnabledProp != null && isChangeDetectionEnabledProp) {
+            if (tableType != TABLE && tableType != VIEW) {
+                throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.CHANGE_DETECTION_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY)
+                    .build()
+                    .buildException();
+            }
+        }
+    }
+
     /* This method handles mutation codes sent by phoenix server, except for TABLE_NOT_FOUND which
     * is considered to be a success code. If TABLE_ALREADY_EXISTS in hbase, we don't need to add
     * it in ConnectionQueryServices and we return result as true. However if code is
@@ -3509,7 +3535,8 @@ public class MetaDataClient {
         return mutationCode;
     }
 
-    private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, MetaPropertiesEvaluated metaPropertiesEvaluated)
+    private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta,
+                                      MetaPropertiesEvaluated metaPropertiesEvaluated)
             throws SQLException {
         return incrementTableSeqNum(table, expectedType, columnCountDelta,
                 metaPropertiesEvaluated.getIsTransactional(),
@@ -3523,18 +3550,21 @@ public class MetaDataClient {
                 metaPropertiesEvaluated.getAppendOnlySchema(),
                 metaPropertiesEvaluated.getImmutableStorageScheme(),
                 metaPropertiesEvaluated.getUseStatsForParallelization(),
-                metaPropertiesEvaluated.getPhoenixTTL());
+                metaPropertiesEvaluated.getPhoenixTTL(),
+                metaPropertiesEvaluated.isChangeDetectionEnabled());
     }
 
     private  long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional, Long updateCacheFrequency, Long phoenixTTL) throws SQLException {
-        return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, null, updateCacheFrequency, null, null, null, null, -1L, null, null, null,phoenixTTL);
+        return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, null,
+            updateCacheFrequency, null, null, null, null, -1L, null, null, null,phoenixTTL, false);
     }
 
     private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta,
             Boolean isTransactional, TransactionFactory.Provider transactionProvider,
             Long updateCacheFrequency, Boolean isImmutableRows, Boolean disableWAL,
             Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, Boolean appendOnlySchema,
-            ImmutableStorageScheme immutableStorageScheme, Boolean useStatsForParallelization, Long phoenixTTL)
+            ImmutableStorageScheme immutableStorageScheme, Boolean useStatsForParallelization,
+            Long phoenixTTL, Boolean isChangeDetectionEnabled)
             throws SQLException {
         String schemaName = table.getSchemaName().getString();
         String tableName = table.getTableName().getString();
@@ -3590,6 +3620,9 @@ public class MetaDataClient {
         if (phoenixTTL != null) {
             mutateLongProperty(tenantId, schemaName, tableName, PHOENIX_TTL, phoenixTTL);
         }
+        if (isChangeDetectionEnabled != null) {
+            mutateBooleanProperty(tenantId, schemaName, tableName, CHANGE_DETECTION_ENABLED, isChangeDetectionEnabled);
+        }
         return seqNum;
     }
 
@@ -5112,6 +5145,8 @@ public class MetaDataClient {
                         metaProperties.setUseStatsForParallelizationProp((Boolean)value);
                     } else if (propName.equalsIgnoreCase(PHOENIX_TTL)) {
                         metaProperties.setPhoenixTTL((Long)value);
+                    } else if (propName.equalsIgnoreCase(CHANGE_DETECTION_ENABLED)) {
+                        metaProperties.setChangeDetectionEnabled((Boolean) value);
                     }
                 }
                 // if removeTableProps is true only add the property if it is not an HTable or Phoenix Table property
@@ -5265,6 +5300,15 @@ public class MetaDataClient {
             }
         }
 
+        if (metaProperties.isChangeDetectionEnabled() != null) {
+            verifyChangeDetectionTableType(table.getType(),
+                metaProperties.isChangeDetectionEnabled());
+            if (!metaProperties.isChangeDetectionEnabled().equals(table.isChangeDetectionEnabled())) {
+                metaPropertiesEvaluated.setChangeDetectionEnabled(metaProperties.isChangeDetectionEnabled());
+                changingPhoenixTableProperty = true;
+            }
+        }
+
         return changingPhoenixTableProperty;
     }
 
@@ -5282,6 +5326,7 @@ public class MetaDataClient {
         private Boolean useStatsForParallelizationProp = null;
         private boolean nonTxToTx = false;
         private Long phoenixTTL = null;
+        private Boolean isChangeDetectionEnabled = null;
 
         public Boolean getImmutableRowsProp() {
             return isImmutableRowsProp;
@@ -5383,6 +5428,14 @@ public class MetaDataClient {
         public Long getPhoenixTTL() { return phoenixTTL; }
 
         public void setPhoenixTTL(Long phoenixTTL) { this.phoenixTTL = phoenixTTL; }
+
+        public Boolean isChangeDetectionEnabled() {
+            return isChangeDetectionEnabled;
+        }
+
+        public void setChangeDetectionEnabled(Boolean isChangeDetectionEnabled) {
+            this.isChangeDetectionEnabled = isChangeDetectionEnabled;
+        }
     }
 
     class MetaPropertiesEvaluated{
@@ -5398,6 +5451,7 @@ public class MetaDataClient {
         private Boolean isTransactional = null;
         private TransactionFactory.Provider transactionProvider = null;
         private Long phoenixTTL = null;
+        private Boolean isChangeDetectionEnabled = null;
 
         public Boolean getIsImmutableRows() {
             return isImmutableRows;
@@ -5490,5 +5544,13 @@ public class MetaDataClient {
         public Long getPhoenixTTL() { return phoenixTTL; }
 
         public void setPhoenixTTL(Long phoenixTTL) { this.phoenixTTL = phoenixTTL; }
+
+        public Boolean isChangeDetectionEnabled() {
+            return isChangeDetectionEnabled;
+        }
+
+        public void setChangeDetectionEnabled(Boolean isChangeDetectionEnabled) {
+            this.isChangeDetectionEnabled = isChangeDetectionEnabled;
+        }
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index c8b3e36..103c65f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -827,6 +827,11 @@ public interface PTable extends PMetaDataEntity {
     Long getLastDDLTimestamp();
 
     /**
+     * @return Whether change detection is enabled on a given table or view. If it is, we will
+     * annotate write-ahead logs with additional metadata
+     */
+    boolean isChangeDetectionEnabled();
+    /**
      * Class to help track encoded column qualifier counters per column family.
      */
     public class EncodedCQCounter {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index a67a218..799d28a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -25,7 +25,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_ENCODED_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
@@ -199,6 +198,7 @@ public class PTableImpl implements PTable {
     private final long phoenixTTLHighWaterMark;
     private final BitSet viewModifiedPropSet;
     private final Long lastDDLTimestamp;
+    private final boolean isChangeDetectionEnabled;
     private Map<String, String> propertyValues;
 
     public static class Builder {
@@ -257,6 +257,7 @@ public class PTableImpl implements PTable {
         private long phoenixTTL;
         private long phoenixTTLHighWaterMark;
         private Long lastDDLTimestamp;
+        private boolean isChangeDetectionEnabled = false;
         private Map<String, String> propertyValues = new HashMap<>();
 
         // Used to denote which properties a view has explicitly modified
@@ -599,6 +600,13 @@ public class PTableImpl implements PTable {
             return this;
         }
 
+        public Builder setIsChangeDetectionEnabled(Boolean isChangeDetectionEnabled) {
+            if (isChangeDetectionEnabled != null) {
+                this.isChangeDetectionEnabled = isChangeDetectionEnabled;
+            }
+            return this;
+        }
+
         /**
          * Populate derivable attributes of the PTable
          * @return PTableImpl.Builder object
@@ -611,8 +619,8 @@ public class PTableImpl implements PTable {
             Preconditions.checkNotNull(this.columns);
             Preconditions.checkNotNull(this.indexes);
             Preconditions.checkNotNull(this.physicalNames);
-            Preconditions.checkNotNull(this.hasColumnsRequiringUpgrade);
-            Preconditions.checkNotNull(this.rowKeyOrderOptimizable);
+            //hasColumnsRequiringUpgrade and rowKeyOrderOptimizable are booleans and can never be
+            // null, so no need to check them
 
             PName fullName = PNameFactory.newName(SchemaUtil.getTableName(
                     this.schemaName.getString(), this.tableName.getString()));
@@ -863,6 +871,7 @@ public class PTableImpl implements PTable {
         this.viewModifiedPropSet = builder.viewModifiedPropSet;
         this.propertyValues = builder.propertyValues;
         this.lastDDLTimestamp = builder.lastDDLTimestamp;
+        this.isChangeDetectionEnabled = builder.isChangeDetectionEnabled;
     }
 
     // When cloning table, ignore the salt column as it will be added back in the constructor
@@ -935,7 +944,8 @@ public class PTableImpl implements PTable {
                 .setViewModifiedPhoenixTTL(table.hasViewModifiedPhoenixTTL())
                 .setPhoenixTTL(table.getPhoenixTTL())
                 .setPhoenixTTLHighWaterMark(table.getPhoenixTTLHighWaterMark())
-                .setLastDDLTimestamp(table.getLastDDLTimestamp());
+                .setLastDDLTimestamp(table.getLastDDLTimestamp())
+                .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled());
     }
 
     @Override
@@ -1779,6 +1789,10 @@ public class PTableImpl implements PTable {
         if (table.hasLastDDLTimestamp()) {
             lastDDLTimestamp = table.getLastDDLTimestamp();
         }
+        boolean isChangeDetectionEnabled = false;
+        if (table.hasChangeDetectionEnabled()) {
+            isChangeDetectionEnabled = table.getChangeDetectionEnabled();
+        }
         try {
             return new PTableImpl.Builder()
                     .setType(tableType)
@@ -1829,6 +1843,7 @@ public class PTableImpl implements PTable {
                     .setViewModifiedUseStatsForParallelization(viewModifiedUseStatsForParallelization)
                     .setViewModifiedPhoenixTTL(viewModifiedPhoenixTTL)
                     .setLastDDLTimestamp(lastDDLTimestamp)
+                    .setIsChangeDetectionEnabled(isChangeDetectionEnabled)
                     .build();
         } catch (SQLException e) {
             throw new RuntimeException(e); // Impossible
@@ -1940,6 +1955,7 @@ public class PTableImpl implements PTable {
       if (table.getLastDDLTimestamp() != null) {
           builder.setLastDDLTimestamp(table.getLastDDLTimestamp());
       }
+      builder.setChangeDetectionEnabled(table.isChangeDetectionEnabled());
       return builder.build();
     }
 
@@ -2062,6 +2078,11 @@ public class PTableImpl implements PTable {
         return lastDDLTimestamp;
     }
 
+    @Override
+    public boolean isChangeDetectionEnabled() {
+        return isChangeDetectionEnabled;
+    }
+
     private static final class KVColumnFamilyQualifier {
         @Nonnull
         private final String colFamilyName;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 7fbbc95..435911f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -272,6 +272,28 @@ public enum TableProperty {
         public Object getPTableValue(PTable table) {
             return table.getPhoenixTTL();
         }
+    },
+
+    CHANGE_DETECTION_ENABLED(PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED, true, true, true) {
+        /**
+         * CHANGE_DETECTION_ENABLED is a boolean that can take TRUE or FALSE
+         */
+        @Override
+        public Object getValue(Object value) {
+            if (value == null) {
+                return null;
+            } else if (value instanceof Boolean) {
+                return value;
+            } else {
+                throw new IllegalArgumentException("CHANGE_DETECTION_ENABLED property can only be" +
+                    " either true or false");
+            }
+        }
+
+        @Override
+        public Object getPTableValue(PTable table) {
+            return table.isChangeDetectionEnabled();
+        }
     }
     ;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SQLCloseables.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SQLCloseables.java
index 2f273c5..e5248b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SQLCloseables.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SQLCloseables.java
@@ -53,6 +53,9 @@ public class SQLCloseables {
         LinkedList<SQLException> exceptions = null;
         for (SQLCloseable closeable : iterable) {
             try {
+                if (closeable == null) {
+                    continue;
+                }
                 closeable.close();
             } catch (SQLException x) {
                 if (exceptions == null) exceptions = new LinkedList<SQLException>();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index cd1877f..28e58a0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -44,6 +43,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
+import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.filter.BooleanExpressionFilter;
 import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.filter.DistinctPrefixFilter;
@@ -1211,7 +1211,7 @@ public class ScanUtil {
     }
 
     public static void getDummyResult(byte[] rowKey, List<Cell> result) {
-        KeyValue keyValue =
+        Cell keyValue =
                 KeyValueUtil.newKeyValue(rowKey, 0,
                         rowKey.length, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY,
                         0, EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length);
@@ -1239,4 +1239,30 @@ public class ScanUtil {
         Cell cell = result.get(0);
         return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
     }
+
+    /**
+     *  Put the attributes we want to annotate the WALs with (such as logical table name,
+     *  tenant, DDL timestamp, etc) on the Scan object so that on the
+     *  Ungrouped/GroupedAggregateCoprocessor side, we
+     *  annotate the mutations with them, and then they get written into the WAL as part of
+     *  the RegionObserver's doWALAppend hook.
+     * @param table Table metadata for the target table/view of the write
+     * @param scan Scan to trigger the server-side coproc
+     */
+    public static void setWALAnnotationAttributes(PTable table, Scan scan) {
+        if (table.isChangeDetectionEnabled()) {
+            if (table.getTenantId() != null) {
+                scan.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(),
+                    table.getTenantId().getBytes());
+            }
+            scan.setAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString(),
+                table.getSchemaName().getBytes());
+            scan.setAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString(),
+                table.getTableName().getBytes());
+            scan.setAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString(),
+                table.getType().getValue().getBytes());
+            scan.setAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString(),
+                Bytes.toBytes(table.getLastDDLTimestamp()));
+        }
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java
new file mode 100644
index 0000000..76564a6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.phoenix.compat.hbase.coprocessor.CompatIndexRegionObserver;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+
+import java.util.Map;
+
+/**
+ * Utility functions shared between IndexRegionObserver and MutationState for annotating the
+ * HBase WAL with Phoenix-level metadata about mutations.
+ */
+public class WALAnnotationUtil {
+
+    public static void appendMutationAttributesToWALKey(WALKey key,
+                                        IndexRegionObserver.BatchMutateContext context) {
+        if (context != null && context.getOriginalMutations().size() > 0) {
+            Mutation firstMutation = context.getOriginalMutations().get(0);
+            Map<String, byte[]> attrMap = firstMutation.getAttributesMap();
+            for (MutationState.MutationMetadataType metadataType :
+                MutationState.MutationMetadataType.values()) {
+                String metadataTypeKey = metadataType.toString();
+                if (attrMap.containsKey(metadataTypeKey)) {
+                    CompatIndexRegionObserver.appendToWALKey(key, metadataTypeKey,
+                        attrMap.get(metadataTypeKey));
+                }
+            }
+        }
+    }
+
+    /**
+     * Add metadata about a mutation onto the attributes of the mutation. This will be written as
+     * an annotation into the HBase write ahead log (WAL) when the mutation is processed
+     * server-side, usually in IndexRegionObserver
+     * @param m Mutation
+     * @param tenantId Tenant (if for a tenant-owned view) otherwise null
+     * @param schemaName Schema name
+     * @param logicalTableName Logical name of the table or view
+     * @param tableType Table type (e.g table, view)
+     * @param ddlTimestamp Server-side timestamp when the table/view was created or last had a
+     *                     column added or dropped from it, whichever is greater
+     */
+    public static void annotateMutation(Mutation m, byte[] tenantId, byte[] schemaName,
+                                        byte[] logicalTableName, byte[] tableType, byte[] ddlTimestamp) {
+        if (!m.getDurability().equals(Durability.SKIP_WAL)) {
+            if (tenantId != null) {
+                m.setAttribute(MutationState.MutationMetadataType.TENANT_ID.toString(), tenantId);
+            }
+            m.setAttribute(MutationState.MutationMetadataType.SCHEMA_NAME.toString(), schemaName);
+            m.setAttribute(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString(),
+                logicalTableName);
+            m.setAttribute(MutationState.MutationMetadataType.TABLE_TYPE.toString(), tableType);
+            m.setAttribute(MutationState.MutationMetadataType.TIMESTAMP.toString(), ddlTimestamp);
+        }
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java
index ded8f7b..41ac3b4 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Table;
 import com.google.common.collect.TreeBasedTable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.schema.PTable;
@@ -44,6 +45,9 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Arrays.asList;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 
 /**
@@ -597,8 +601,9 @@ public class PhoenixTestBuilder {
         public static final List<String> TENANT_VIEW_INCLUDE_COLUMNS = asList("COL7");
 
         public static final String
-                DEFAULT_TABLE_PROPS =
-                "COLUMN_ENCODED_BYTES=0, MULTI_TENANT=true,DEFAULT_COLUMN_FAMILY='Z'";
+                DEFAULT_MUTABLE_TABLE_PROPS =
+                "COLUMN_ENCODED_BYTES=0,DEFAULT_COLUMN_FAMILY='Z'";
+        public static final String DEFAULT_IMMUTABLE_TABLE_PROPS = "DEFAULT_COLUMN_FAMILY='Z'";
         public static final String DEFAULT_TABLE_INDEX_PROPS = "";
         public static final String DEFAULT_GLOBAL_VIEW_PROPS = "";
         public static final String DEFAULT_GLOBAL_VIEW_INDEX_PROPS = "";
@@ -641,9 +646,12 @@ public class PhoenixTestBuilder {
         boolean tenantViewIndexCreated = false;
         String url;
         String entityKeyPrefix;
-        String entityTableName;
-        String entityGlobalViewName;
-        String entityTenantViewName;
+        private String entityTableName;
+        private String entityGlobalViewName;
+        private String entityTenantViewName;
+        private String entityTableIndexName;
+        private String entityGlobalViewIndexName;
+        private String entityTenantViewIndexName;
         PTable baseTable;
         ConnectOptions connectOptions;
         TableOptions tableOptions;
@@ -741,6 +749,28 @@ public class PhoenixTestBuilder {
             return entityTenantViewName;
         }
 
+        public String getEntityTableIndexName() {
+            return entityTableIndexName;
+        }
+
+        public String getEntityGlobalViewIndexName() {
+            return entityGlobalViewIndexName;
+        }
+
+        public String getEntityTenantViewIndexName() {
+            return entityTenantViewIndexName;
+        }
+
+        public String getPhysicalTableName(boolean isNamespaceEnabled) {
+            return SchemaUtil.getPhysicalTableName(Bytes.toBytes(getEntityTableName()),
+                isNamespaceEnabled).getNameAsString();
+        }
+
+        public String getPhysicalTableIndexName(boolean isNamespaceEnabled) {
+            return SchemaUtil.getPhysicalTableName(Bytes.toBytes(getEntityTableIndexName()),
+                isNamespaceEnabled).getNameAsString();
+        }
+
         public ConnectOptions getConnectOptions() {
             return connectOptions;
         }
@@ -1055,6 +1085,7 @@ public class PhoenixTestBuilder {
             String tenantViewName = SchemaUtil.normalizeIdentifier(entityKeyPrefix);
             entityTenantViewName = SchemaUtil.getTableName(tenantViewSchemaName, tenantViewName);
             String globalViewCondition = String.format("KP = '%s'", entityKeyPrefix);
+            String schemaName = SchemaUtil.getSchemaNameFromFullName(entityTableName);
 
             // Table and Table Index creation.
             try (Connection globalConnection = getGlobalConnection()) {
@@ -1080,6 +1111,7 @@ public class PhoenixTestBuilder {
                                     tableIndexOptions.tableIncludeColumns,
                                     tableIndexOptions.indexProps));
                     tableIndexCreated = true;
+                    entityTableIndexName = SchemaUtil.getTableName(schemaName, indexOnTableName);
                 }
             }
 
@@ -1103,6 +1135,8 @@ public class PhoenixTestBuilder {
                                     globalViewIndexOptions.globalViewIncludeColumns,
                                     globalViewIndexOptions.indexProps));
                     globalViewIndexCreated = true;
+                    entityGlobalViewIndexName =
+                        SchemaUtil.getTableName(schemaName, indexOnGlobalViewName);
                 }
             }
 
@@ -1136,6 +1170,8 @@ public class PhoenixTestBuilder {
                                     tenantViewIndexOptions.tenantViewIncludeColumns,
                                     tenantViewIndexOptions.indexProps));
                     tenantViewIndexCreated = true;
+                    entityTenantViewIndexName =
+                        SchemaUtil.getTableName(schemaName, indexOnTenantViewName);
                 }
             }
         }
@@ -1188,9 +1224,31 @@ public class PhoenixTestBuilder {
                 tableDefinition.append((")"));
             }
 
+
             statement.append("CREATE TABLE IF NOT EXISTS ").append(fullTableName)
                     .append(tableDefinition.toString()).append(" ")
                     .append((tableOptions.tableProps.isEmpty() ? "" : tableOptions.tableProps));
+            boolean hasAppendedTableProps = !tableOptions.tableProps.isEmpty();
+            if (tableOptions.isMultiTenant()) {
+                statement.append(hasAppendedTableProps ? ", MULTI_TENANT=true" : "MULTI_TENANT" +
+                    "=true");
+                hasAppendedTableProps = true;
+            }
+            if (tableOptions.getSaltBuckets() != null) {
+                String prop = SALT_BUCKETS +"=" + tableOptions.getSaltBuckets();
+                statement.append(hasAppendedTableProps ? ", " + prop : prop);
+                hasAppendedTableProps = true;
+            }
+            if (tableOptions.isImmutable()) {
+                String prop = IMMUTABLE_ROWS + "=true";
+                statement.append(hasAppendedTableProps ? ", " + prop : prop);
+                hasAppendedTableProps = true;
+            }
+            if (tableOptions.isChangeDetectionEnabled()) {
+                String prop = CHANGE_DETECTION_ENABLED + "=true";
+                statement.append(hasAppendedTableProps ? ", " + prop : prop);
+                hasAppendedTableProps = true;
+            }
             LOGGER.info(statement.toString());
             return statement.toString();
         }
@@ -1230,6 +1288,12 @@ public class PhoenixTestBuilder {
                     .append((globalViewOptions.tableProps.isEmpty() ?
                             "" :
                             globalViewOptions.tableProps));
+            if (globalViewOptions.isChangeDetectionEnabled()) {
+                if (!globalViewOptions.tableProps.isEmpty()) {
+                    statement.append(", ");
+                }
+                statement.append(CHANGE_DETECTION_ENABLED + "=true");
+            }
             LOGGER.info(statement.toString());
             return statement.toString();
         }
@@ -1268,6 +1332,12 @@ public class PhoenixTestBuilder {
                     .append(" ").append((tenantViewOptions.tableProps.isEmpty() ?
                     "" :
                     tenantViewOptions.tableProps));
+            if (tenantViewOptions.isChangeDetectionEnabled()) {
+                if (!tenantViewOptions.tableProps.isEmpty()) {
+                    statement.append(", ");
+                }
+                statement.append(CHANGE_DETECTION_ENABLED + "=true");
+            }
             LOGGER.info(statement.toString());
             return statement.toString();
         }
@@ -1355,7 +1425,11 @@ public class PhoenixTestBuilder {
             List<String> tablePKColumns = Lists.newArrayList();
             List<String> tablePKColumnTypes = Lists.newArrayList();
             List<String> tablePKColumnSort;
-            String tableProps = DDLDefaults.DEFAULT_TABLE_PROPS;
+            String tableProps = DDLDefaults.DEFAULT_MUTABLE_TABLE_PROPS;
+            boolean isMultiTenant = true;
+            Integer saltBuckets = null;
+            boolean isImmutable = false;
+            boolean isChangeDetectionEnabled = false;
 
             /*
              *****************************
@@ -1370,7 +1444,7 @@ public class PhoenixTestBuilder {
                 options.tableColumnTypes = Lists.newArrayList(DDLDefaults.COLUMN_TYPES);
                 options.tablePKColumns = Lists.newArrayList(DDLDefaults.TABLE_PK_COLUMNS);
                 options.tablePKColumnTypes = Lists.newArrayList(DDLDefaults.TABLE_PK_TYPES);
-                options.tableProps = DDLDefaults.DEFAULT_TABLE_PROPS;
+                options.tableProps = DDLDefaults.DEFAULT_MUTABLE_TABLE_PROPS;
                 return options;
             }
 
@@ -1429,6 +1503,41 @@ public class PhoenixTestBuilder {
             public void setTableProps(String tableProps) {
                 this.tableProps = tableProps;
             }
+
+            public boolean isMultiTenant() {
+                return this.isMultiTenant;
+            }
+
+            public void setMultiTenant(boolean isMultiTenant) {
+                this.isMultiTenant = isMultiTenant;
+            }
+
+            public Integer getSaltBuckets() {
+                return this.saltBuckets;
+            }
+            public void setSaltBuckets(Integer saltBuckets) {
+                this.saltBuckets = saltBuckets;
+            }
+
+            public boolean isImmutable() {
+                return isImmutable;
+            }
+
+            public void setImmutable(boolean immutable) {
+                isImmutable = immutable;
+                //default props includes a column encoding not supported in immutable tables
+                if (this.tableProps.equals(DDLDefaults.DEFAULT_MUTABLE_TABLE_PROPS)) {
+                    this.tableProps = DDLDefaults.DEFAULT_IMMUTABLE_TABLE_PROPS;
+                }
+            }
+
+            public boolean isChangeDetectionEnabled() {
+                return isChangeDetectionEnabled;
+            }
+
+            public void setChangeDetectionEnabled(boolean changeDetectionEnabled) {
+                isChangeDetectionEnabled = changeDetectionEnabled;
+            }
         }
 
         // Global View statement generation.
@@ -1441,6 +1550,7 @@ public class PhoenixTestBuilder {
             List<String> globalViewPKColumnSort;
             String tableProps = DDLDefaults.DEFAULT_TENANT_VIEW_PROPS;
             String globalViewCondition;
+            boolean isChangeDetectionEnabled = false;
 
             /*
              *****************************
@@ -1525,6 +1635,14 @@ public class PhoenixTestBuilder {
             public void setGlobalViewCondition(String globalViewCondition) {
                 this.globalViewCondition = globalViewCondition;
             }
+
+            public boolean isChangeDetectionEnabled() {
+                return isChangeDetectionEnabled;
+            }
+
+            public void setChangeDetectionEnabled(boolean changeDetectionEnabled) {
+                this.isChangeDetectionEnabled = changeDetectionEnabled;
+            }
         }
 
         // Tenant View statement generation.
@@ -1536,6 +1654,7 @@ public class PhoenixTestBuilder {
             List<String> tenantViewPKColumnTypes = Lists.newArrayList();
             List<String> tenantViewPKColumnSort;
             String tableProps = DDLDefaults.DEFAULT_TENANT_VIEW_PROPS;
+            boolean isChangeDetectionEnabled = false;
 
             /*
              *****************************
@@ -1611,6 +1730,14 @@ public class PhoenixTestBuilder {
             public void setTableProps(String tableProps) {
                 this.tableProps = tableProps;
             }
+
+            public boolean isChangeDetectionEnabled() {
+                return isChangeDetectionEnabled;
+            }
+
+            public void setChangeDetectionEnabled(boolean changeDetectionEnabled) {
+                this.isChangeDetectionEnabled = changeDetectionEnabled;
+            }
         }
 
         // Table Index statement generation.
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestDDLUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestDDLUtil.java
new file mode 100644
index 0000000..e098988
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestDDLUtil.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public class TestDDLUtil {
+    private boolean isNamespaceMapped;
+    private boolean isChangeDetectionEnabled;
+
+    public TestDDLUtil(boolean isNamespaceMapped) {
+        this.isNamespaceMapped = isNamespaceMapped;
+    }
+
+    public void createBaseTable(Connection conn, String schemaName, String tableName,
+                                 boolean multiTenant,
+                                 Integer saltBuckets, String splits, boolean immutable)
+        throws SQLException {
+        if (isNamespaceMapped) {
+            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+        }
+        String ddl = "CREATE " + (immutable ? "IMMUTABLE" : "") +
+            " TABLE " + SchemaUtil.getTableName(schemaName, tableName) +
+            " (t_id VARCHAR NOT NULL,\n" +
+            "k1 VARCHAR NOT NULL,\n" +
+            "k2 INTEGER NOT NULL,\n" +
+            "v1 VARCHAR,\n" +
+            "v2 INTEGER,\n" +
+            "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n";
+        String ddlOptions = multiTenant ? "MULTI_TENANT=true" : "";
+        if (saltBuckets != null) {
+            ddlOptions = ddlOptions
+                + (ddlOptions.isEmpty() ? "" : ", ")
+                + "salt_buckets=" + saltBuckets;
+        }
+        if (isChangeDetectionEnabled) {
+            ddlOptions = ddlOptions + (ddlOptions.isEmpty() ? "" : ", ") +
+                "CHANGE_DETECTION_ENABLED=TRUE";
+        }
+        if (splits != null) {
+            ddlOptions = ddlOptions
+                + (ddlOptions.isEmpty() ? "" : ", ")
+                + "splits=" + splits;
+        }
+        conn.createStatement().execute(ddl + ddlOptions);
+    }
+
+    public void createIndex(Connection conn, String schemaName, String indexName,
+                            String tableName, String indexedColumnName, boolean isLocal,
+                            boolean isAsync) throws SQLException {
+        if (isNamespaceMapped) {
+            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+        }
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String local = isLocal ? " LOCAL " : "";
+        String async = isAsync ? " ASYNC " : "";
+        String sql =
+            "CREATE " + local + " INDEX " + indexName + " ON " + fullTableName + "(" +
+        indexedColumnName + ")" + async;
+        conn.createStatement().execute(sql);
+    }
+    public void createView(Connection conn, String schemaName, String viewName,
+                        String baseTableName) throws SQLException {
+        if (isNamespaceMapped) {
+            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+        }
+        String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+        String fullTableName = SchemaUtil.getTableName(schemaName, baseTableName);
+        String viewSql = "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullTableName;
+        if (isChangeDetectionEnabled) {
+            viewSql = viewSql + " " + PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED + "=TRUE";
+        }
+        conn.createStatement().execute(viewSql);
+    }
+
+    public void createViewIndex(Connection conn, String schemaName, String indexName,
+                               String viewName,
+                                 String indexColumn) throws SQLException {
+        if (isNamespaceMapped) {
+            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+        }
+        String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullViewName + "(" + indexColumn + ")");
+    }
+
+    public void setChangeDetectionEnabled(boolean isChangeDetectionEnabled) {
+        this.isChangeDetectionEnabled = isChangeDetectionEnabled;
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 21f1501..f59a674 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -157,7 +157,7 @@ import com.google.common.collect.Lists;
 
 public class TestUtil {
     private static final Logger LOGGER = LoggerFactory.getLogger(TestUtil.class);
-    
+
     private static final Long ZERO = new Long(0);
     public static final String DEFAULT_SCHEMA_NAME = "S";
     public static final String DEFAULT_DATA_TABLE_NAME = "T";
@@ -166,26 +166,26 @@ public class TestUtil {
     public static final String DEFAULT_INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(DEFAULT_SCHEMA_NAME, "I");
 
     public static final String TEST_TABLE_SCHEMA = "(" +
-    "   varchar_pk VARCHAR NOT NULL, " +
-    "   char_pk CHAR(10) NOT NULL, " +
-    "   int_pk INTEGER NOT NULL, "+ 
-    "   long_pk BIGINT NOT NULL, " +
-    "   decimal_pk DECIMAL(31, 10) NOT NULL, " +
-    "   date_pk DATE NOT NULL, " +
-    "   a.varchar_col1 VARCHAR, " +
-    "   a.char_col1 CHAR(10), " +
-    "   a.int_col1 INTEGER, " +
-    "   a.long_col1 BIGINT, " +
-    "   a.decimal_col1 DECIMAL(31, 10), " +
-    "   a.date1 DATE, " +
-    "   b.varchar_col2 VARCHAR, " +
-    "   b.char_col2 CHAR(10), " +
-    "   b.int_col2 INTEGER, " +
-    "   b.long_col2 BIGINT, " +
-    "   b.decimal_col2 DECIMAL(31, 10), " +
-    "   b.date2 DATE " +
-    "   CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk, date_pk)) ";
-    
+        "   varchar_pk VARCHAR NOT NULL, " +
+        "   char_pk CHAR(10) NOT NULL, " +
+        "   int_pk INTEGER NOT NULL, " +
+        "   long_pk BIGINT NOT NULL, " +
+        "   decimal_pk DECIMAL(31, 10) NOT NULL, " +
+        "   date_pk DATE NOT NULL, " +
+        "   a.varchar_col1 VARCHAR, " +
+        "   a.char_col1 CHAR(10), " +
+        "   a.int_col1 INTEGER, " +
+        "   a.long_col1 BIGINT, " +
+        "   a.decimal_col1 DECIMAL(31, 10), " +
+        "   a.date1 DATE, " +
+        "   b.varchar_col2 VARCHAR, " +
+        "   b.char_col2 CHAR(10), " +
+        "   b.int_col2 INTEGER, " +
+        "   b.long_col2 BIGINT, " +
+        "   b.decimal_col2 DECIMAL(31, 10), " +
+        "   b.date2 DATE " +
+        "   CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk, date_pk)) ";
+
     private TestUtil() {
     }
 
@@ -214,7 +214,7 @@ public class TestUtil {
     public final static String ROW7 = "00B723122312312";
     public final static String ROW8 = "00B823122312312";
     public final static String ROW9 = "00C923122312312";
-    
+
     public final static String PARENTID1 = "0500x0000000001";
     public final static String PARENTID2 = "0500x0000000002";
     public final static String PARENTID3 = "0500x0000000003";
@@ -224,9 +224,9 @@ public class TestUtil {
     public final static String PARENTID7 = "0500x0000000007";
     public final static String PARENTID8 = "0500x0000000008";
     public final static String PARENTID9 = "0500x0000000009";
-    
+
     public final static List<String> PARENTIDS = Lists.newArrayList(PARENTID1, PARENTID2, PARENTID3, PARENTID4, PARENTID5, PARENTID6, PARENTID7, PARENTID8, PARENTID9);
-    
+
     public final static String ENTITYHISTID1 = "017x00000000001";
     public final static String ENTITYHISTID2 = "017x00000000002";
     public final static String ENTITYHISTID3 = "017x00000000003";
@@ -238,10 +238,10 @@ public class TestUtil {
     public final static String ENTITYHISTID9 = "017x00000000009";
 
     public final static List<String> ENTITYHISTIDS = Lists.newArrayList(ENTITYHISTID1, ENTITYHISTID2, ENTITYHISTID3, ENTITYHISTID4, ENTITYHISTID5, ENTITYHISTID6, ENTITYHISTID7, ENTITYHISTID8, ENTITYHISTID9);
-    
+
     public static final String LOCALHOST = "localhost";
     public static final String PHOENIX_JDBC_URL = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
-    public static final String PHOENIX_CONNECTIONLESS_JDBC_URL = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS  + JDBC_PROTOCOL_TERMINATOR  + PHOENIX_TEST_DRIVER_URL_PARAM;
+    public static final String PHOENIX_CONNECTIONLESS_JDBC_URL = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
 
     public static final String TEST_SCHEMA_FILE_NAME = "config" + File.separator + "test-schema.xml";
     public static final String CED_SCHEMA_FILE_NAME = "config" + File.separator + "schema.xml";
@@ -303,10 +303,12 @@ public class TestUtil {
         public String put(Object key, Object value) {
             throw new UnsupportedOperationException();
         }
+
         @Override
         public void clear() {
             throw new UnsupportedOperationException();
         }
+
         @Override
         public Object remove(Object key) {
             throw new UnsupportedOperationException();
@@ -314,12 +316,12 @@ public class TestUtil {
     };
 
     public static byte[][] getSplits(String tenantId) {
-        return new byte[][] {
+        return new byte[][]{
             HConstants.EMPTY_BYTE_ARRAY,
             Bytes.toBytes(tenantId + "00A"),
             Bytes.toBytes(tenantId + "00B"),
             Bytes.toBytes(tenantId + "00C"),
-            };
+        };
     }
 
     public static void assertRoundEquals(BigDecimal bd1, BigDecimal bd2) {
@@ -339,7 +341,7 @@ public class TestUtil {
     }
 
     public static Expression constantComparison(CompareOp op, PColumn c, Object o) {
-        return  new ComparisonExpression(Arrays.<Expression>asList(new KeyValueColumnExpression(c), LiteralExpression.newConstant(o)), op);
+        return new ComparisonExpression(Arrays.<Expression>asList(new KeyValueColumnExpression(c), LiteralExpression.newConstant(o)), op);
     }
 
     public static Expression kvColumn(PColumn c) {
@@ -351,57 +353,57 @@ public class TestUtil {
     }
 
     public static Expression constantComparison(CompareOp op, Expression e, Object o) {
-        return  new ComparisonExpression(Arrays.asList(e, LiteralExpression.newConstant(o)), op);
+        return new ComparisonExpression(Arrays.asList(e, LiteralExpression.newConstant(o)), op);
     }
 
     private static boolean useByteBasedRegex(StatementContext context) {
         return context
-                .getConnection()
-                .getQueryServices()
-                .getProps()
-                .getBoolean(QueryServices.USE_BYTE_BASED_REGEX_ATTRIB,
-                    QueryServicesOptions.DEFAULT_USE_BYTE_BASED_REGEX);
+            .getConnection()
+            .getQueryServices()
+            .getProps()
+            .getBoolean(QueryServices.USE_BYTE_BASED_REGEX_ATTRIB,
+                QueryServicesOptions.DEFAULT_USE_BYTE_BASED_REGEX);
     }
 
     public static Expression like(Expression e, Object o, StatementContext context) {
-        return useByteBasedRegex(context)?
-               ByteBasedLikeExpression.create(Arrays.asList(e, LiteralExpression.newConstant(o)), LikeType.CASE_SENSITIVE):
-               StringBasedLikeExpression.create(Arrays.asList(e, LiteralExpression.newConstant(o)), LikeType.CASE_SENSITIVE);
+        return useByteBasedRegex(context) ?
+            ByteBasedLikeExpression.create(Arrays.asList(e, LiteralExpression.newConstant(o)), LikeType.CASE_SENSITIVE) :
+            StringBasedLikeExpression.create(Arrays.asList(e, LiteralExpression.newConstant(o)), LikeType.CASE_SENSITIVE);
     }
 
     public static Expression ilike(Expression e, Object o, StatementContext context) {
-        return useByteBasedRegex(context)?
-                ByteBasedLikeExpression.create(Arrays.asList(e, LiteralExpression.newConstant(o)), LikeType.CASE_INSENSITIVE):
-                StringBasedLikeExpression.create(Arrays.asList(e, LiteralExpression.newConstant(o)), LikeType.CASE_INSENSITIVE);
+        return useByteBasedRegex(context) ?
+            ByteBasedLikeExpression.create(Arrays.asList(e, LiteralExpression.newConstant(o)), LikeType.CASE_INSENSITIVE) :
+            StringBasedLikeExpression.create(Arrays.asList(e, LiteralExpression.newConstant(o)), LikeType.CASE_INSENSITIVE);
     }
 
     public static Expression substr(Expression e, Object offset, Object length) {
-        return  new SubstrFunction(Arrays.asList(e, LiteralExpression.newConstant(offset), LiteralExpression.newConstant(length)));
+        return new SubstrFunction(Arrays.asList(e, LiteralExpression.newConstant(offset), LiteralExpression.newConstant(length)));
     }
 
     public static Expression substr2(Expression e, Object offset) {
 
-        return  new SubstrFunction(Arrays.asList(e, LiteralExpression.newConstant(offset), LiteralExpression.newConstant(null)));
+        return new SubstrFunction(Arrays.asList(e, LiteralExpression.newConstant(offset), LiteralExpression.newConstant(null)));
     }
 
     public static Expression columnComparison(CompareOp op, Expression c1, Expression c2) {
-        return  new ComparisonExpression(Arrays.<Expression>asList(c1, c2), op);
+        return new ComparisonExpression(Arrays.<Expression>asList(c1, c2), op);
     }
 
     public static SingleKeyValueComparisonFilter singleKVFilter(Expression e) {
-        return  new SingleCQKeyValueComparisonFilter(e);
+        return new SingleCQKeyValueComparisonFilter(e);
     }
 
     public static RowKeyComparisonFilter rowKeyFilter(Expression e) {
-        return  new RowKeyComparisonFilter(e, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
+        return new RowKeyComparisonFilter(e, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
     }
 
     public static MultiKeyValueComparisonFilter multiKVFilter(Expression e) {
-        return  new MultiCQKeyValueComparisonFilter(e, false, ByteUtil.EMPTY_BYTE_ARRAY);
+        return new MultiCQKeyValueComparisonFilter(e, false, ByteUtil.EMPTY_BYTE_ARRAY);
     }
-    
+
     public static MultiEncodedCQKeyValueComparisonFilter multiEncodedKVFilter(Expression e, QualifierEncodingScheme encodingScheme) {
-        return  new MultiEncodedCQKeyValueComparisonFilter(e, encodingScheme, false, null);
+        return new MultiEncodedCQKeyValueComparisonFilter(e, encodingScheme, false, null);
     }
 
     public static Expression and(Expression... expressions) {
@@ -411,7 +413,7 @@ public class TestUtil {
     public static Expression not(Expression expression) {
         return new NotExpression(expression);
     }
-    
+
     public static Expression or(Expression... expressions) {
         return new OrExpression(Arrays.asList(expressions));
     }
@@ -439,12 +441,12 @@ public class TestUtil {
         assertNull(scan.getFilter());
         assertArrayEquals(KeyRange.EMPTY_RANGE.getLowerRange(), scan.getStartRow());
         assertArrayEquals(KeyRange.EMPTY_RANGE.getLowerRange(), scan.getStopRow());
-        assertEquals(null,scan.getFilter());
+        assertEquals(null, scan.getFilter());
     }
 
     public static void assertNotDegenerate(Scan scan) {
         assertFalse(
-                Bytes.compareTo(KeyRange.EMPTY_RANGE.getLowerRange(), scan.getStartRow()) == 0 &&
+            Bytes.compareTo(KeyRange.EMPTY_RANGE.getLowerRange(), scan.getStartRow()) == 0 &&
                 Bytes.compareTo(KeyRange.EMPTY_RANGE.getLowerRange(), scan.getStopRow()) == 0);
     }
 
@@ -452,33 +454,34 @@ public class TestUtil {
         assertNull(scan.getFilter());
         assertArrayEquals(ByteUtil.EMPTY_BYTE_ARRAY, scan.getStartRow());
         assertArrayEquals(ByteUtil.EMPTY_BYTE_ARRAY, scan.getStopRow());
-        assertEquals(null,scan.getFilter());
+        assertEquals(null, scan.getFilter());
     }
 
     /**
      * Does a deep comparison of two Results, down to the byte arrays.
+     *
      * @param res1 first result to compare
      * @param res2 second result to compare
      * @throws Exception Every difference is throwing an exception
      */
     public static void compareTuples(Tuple res1, Tuple res2)
         throws Exception {
-      if (res2 == null) {
-        throw new Exception("There wasn't enough rows, we stopped at "
-            + res1);
-      }
-      if (res1.size() != res2.size()) {
-        throw new Exception("This row doesn't have the same number of KVs: "
-            + res1.toString() + " compared to " + res2.toString());
-      }
-      for (int i = 0; i < res1.size(); i++) {
-          Cell ourKV = res1.getValue(i);
-          Cell replicatedKV = res2.getValue(i);
-          if (!ourKV.equals(replicatedKV)) {
-              throw new Exception("This result was different: "
-                  + res1.toString() + " compared to " + res2.toString());
+        if (res2 == null) {
+            throw new Exception("There wasn't enough rows, we stopped at "
+                + res1);
+        }
+        if (res1.size() != res2.size()) {
+            throw new Exception("This row doesn't have the same number of KVs: "
+                + res1.toString() + " compared to " + res2.toString());
+        }
+        for (int i = 0; i < res1.size(); i++) {
+            Cell ourKV = res1.getValue(i);
+            Cell replicatedKV = res2.getValue(i);
+            if (!ourKV.equals(replicatedKV)) {
+                throw new Exception("This result was different: "
+                    + res1.toString() + " compared to " + res2.toString());
+            }
         }
-      }
     }
 
     public static void clearMetaDataCache(Connection conn) throws Throwable {
@@ -490,29 +493,31 @@ public class TestUtil {
                 public ClearCacheResponse call(MetaDataService instance) throws IOException {
                     ServerRpcController controller = new ServerRpcController();
                     BlockingRpcCallback<ClearCacheResponse> rpcCallback =
-                            new BlockingRpcCallback<ClearCacheResponse>();
+                        new BlockingRpcCallback<ClearCacheResponse>();
                     ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder();
                     instance.clearCache(controller, builder.build(), rpcCallback);
-                    if(controller.getFailedOn() != null) {
+                    if (controller.getFailedOn() != null) {
                         throw controller.getFailedOn();
                     }
-                    return rpcCallback.get(); 
+                    return rpcCallback.get();
                 }
-              });
+            });
     }
 
     public static void closeStatement(Statement stmt) {
         try {
             stmt.close();
-        } catch (Throwable ignore) {}
+        } catch (Throwable ignore) {
+        }
     }
-    
+
     public static void closeConnection(Connection conn) {
         try {
             conn.close();
-        } catch (Throwable ignore) {}
+        } catch (Throwable ignore) {
+        }
     }
-    
+
     public static void closeStmtAndConn(Statement stmt, Connection conn) {
         closeStatement(stmt);
         closeConnection(conn);
@@ -520,19 +525,15 @@ public class TestUtil {
 
     public static void bindParams(PhoenixPreparedStatement stmt, List<Object> binds) throws SQLException {
         for (int i = 0; i < binds.size(); i++) {
-            stmt.setObject(i+1, binds.get(i));
+            stmt.setObject(i + 1, binds.get(i));
         }
     }
-    
+
     /**
-     * @param conn
-     *            connection to be used
-     * @param sortOrder
-     *            sort order of column contain input values
-     * @param id
-     *            id of the row being inserted
-     * @param input
-     *            input to be inserted
+     * @param conn      connection to be used
+     * @param sortOrder sort order of column contain input values
+     * @param id        id of the row being inserted
+     * @param input     input to be inserted
      */
     public static void upsertRow(Connection conn, String tableName, String sortOrder, int id, Object input) throws SQLException {
         String dml = String.format("UPSERT INTO " + tableName + "_%s VALUES(?,?)", sortOrder);
@@ -558,15 +559,15 @@ public class TestUtil {
 
     public static void createGroupByTestTable(Connection conn, String tableName) throws SQLException {
         conn.createStatement().execute("create table " + tableName +
-                "   (id varchar not null primary key,\n" +
-                "    uri varchar, appcpu integer)");
+            "   (id varchar not null primary key,\n" +
+            "    uri varchar, appcpu integer)");
     }
-    
+
     private static void createTable(Connection conn, String inputSqlType, String tableName, String sortOrder) throws SQLException {
         String dmlFormat =
             "CREATE TABLE " + tableName + "_%s (id INTEGER NOT NULL, pk %s NOT NULL, " + "kv %s "
                 + "CONSTRAINT PK_CONSTRAINT PRIMARY KEY (id, pk %s))";
-        String ddl = String.format(dmlFormat,sortOrder, inputSqlType, inputSqlType, sortOrder);
+        String ddl = String.format(dmlFormat, sortOrder, inputSqlType, inputSqlType, sortOrder);
         conn.createStatement().execute(ddl);
         conn.commit();
     }
@@ -574,13 +575,10 @@ public class TestUtil {
     /**
      * Creates a table to be used for testing. It contains one id column, one varchar column to be used as input, and
      * one column which will contain null values
-     * 
-     * @param conn
-     *            connection to be used
-     * @param inputSqlType
-     *            sql type of input
-     * @param inputList
-     *            list of values to be inserted into the pk column
+     *
+     * @param conn         connection to be used
+     * @param inputSqlType sql type of input
+     * @param inputList    list of values to be inserted into the pk column
      */
     public static String initTables(Connection conn, String inputSqlType, List<Object> inputList) throws Exception {
         String tableName = generateUniqueName();
@@ -592,22 +590,22 @@ public class TestUtil {
         }
         return tableName;
     }
-    
+
     public static List<KeyRange> getAllSplits(Connection conn, String tableName) throws SQLException {
         return getSplits(conn, tableName, null, null, null, null, null);
     }
-    
+
     public static List<KeyRange> getAllSplits(Connection conn, String tableName, String where, String selectClause) throws SQLException {
         return getSplits(conn, tableName, null, null, null, where, selectClause);
     }
-    
+
     public static List<KeyRange> getSplits(Connection conn, String tableName, String pkCol, byte[] lowerRange, byte[] upperRange, String whereClauseSuffix, String selectClause) throws SQLException {
-        String whereClauseStart = 
-                (lowerRange == null && upperRange == null ? "" : 
-                    " WHERE " + ((lowerRange != null ? (pkCol + " >= ? " + (upperRange != null ? " AND " : "")) : "") 
-                              + (upperRange != null ? (pkCol + " < ?") : "" )));
+        String whereClauseStart =
+            (lowerRange == null && upperRange == null ? "" :
+                " WHERE " + ((lowerRange != null ? (pkCol + " >= ? " + (upperRange != null ? " AND " : "")) : "")
+                    + (upperRange != null ? (pkCol + " < ?") : "")));
         String whereClause = whereClauseSuffix == null ? whereClauseStart : whereClauseStart.length() == 0 ? (" WHERE " + whereClauseSuffix) : (" AND " + whereClauseSuffix);
-        String query = "SELECT /*+ NO_INDEX */ "+selectClause+" FROM " + tableName + whereClause;
+        String query = "SELECT /*+ NO_INDEX */ " + selectClause + " FROM " + tableName + whereClause;
         PhoenixPreparedStatement pstmt = conn.prepareStatement(query).unwrap(PhoenixPreparedStatement.class);
         if (lowerRange != null) {
             pstmt.setBytes(1, lowerRange);
@@ -625,18 +623,18 @@ public class TestUtil {
     }
 
     public static Collection<GuidePostsInfo> getGuidePostsList(Connection conn, String tableName, String where)
-            throws SQLException {
+        throws SQLException {
         return getGuidePostsList(conn, tableName, null, null, null, where);
     }
 
     public static Collection<GuidePostsInfo> getGuidePostsList(Connection conn, String tableName, String pkCol,
-            byte[] lowerRange, byte[] upperRange, String whereClauseSuffix) throws SQLException {
+                                                               byte[] lowerRange, byte[] upperRange, String whereClauseSuffix) throws SQLException {
         String whereClauseStart = (lowerRange == null && upperRange == null ? ""
-                : " WHERE "
-                        + ((lowerRange != null ? (pkCol + " >= ? " + (upperRange != null ? " AND " : "")) : "") + (upperRange != null ? (pkCol + " < ?")
-                                : "")));
+            : " WHERE "
+            + ((lowerRange != null ? (pkCol + " >= ? " + (upperRange != null ? " AND " : "")) : "") + (upperRange != null ? (pkCol + " < ?")
+            : "")));
         String whereClause = whereClauseSuffix == null ? whereClauseStart
-                : whereClauseStart.length() == 0 ? (" WHERE " + whereClauseSuffix) : (" AND " + whereClauseSuffix);
+            : whereClauseStart.length() == 0 ? (" WHERE " + whereClauseSuffix) : (" AND " + whereClauseSuffix);
         String query = "SELECT /*+ NO_INDEX */ COUNT(*) FROM " + tableName + whereClause;
         PhoenixPreparedStatement pstmt = conn.prepareStatement(query).unwrap(PhoenixPreparedStatement.class);
         if (lowerRange != null) {
@@ -656,52 +654,52 @@ public class TestUtil {
     public static void analyzeTable(Connection conn, String tableName) throws IOException, SQLException {
         analyzeTable(conn, tableName, false);
     }
-    
+
     public static void analyzeTable(Connection conn, String tableName, boolean transactional) throws IOException, SQLException {
         String query = "UPDATE STATISTICS " + tableName;
         conn.createStatement().execute(query);
         // if the table is transactional burn a txn in order to make sure the next txn read pointer is close to wall clock time
         conn.commit();
     }
-    
+
     public static void analyzeTableIndex(Connection conn, String tableName) throws IOException, SQLException {
-        String query = "UPDATE STATISTICS " + tableName+ " INDEX";
+        String query = "UPDATE STATISTICS " + tableName + " INDEX";
         conn.createStatement().execute(query);
     }
-    
+
     public static void analyzeTableColumns(Connection conn, String tableName) throws IOException, SQLException {
         String query = "UPDATE STATISTICS " + tableName + " COLUMNS";
         conn.createStatement().execute(query);
     }
-    
-   public static void analyzeTable(String url, Properties props, String tableName) throws IOException, SQLException {
+
+    public static void analyzeTable(String url, Properties props, String tableName) throws IOException, SQLException {
         Connection conn = DriverManager.getConnection(url, props);
         analyzeTable(conn, tableName);
         conn.close();
     }
-    
+
     public static void setRowKeyColumns(PreparedStatement stmt, int i) throws SQLException {
         // insert row
         stmt.setString(1, "varchar" + String.valueOf(i));
         stmt.setString(2, "char" + String.valueOf(i));
         stmt.setInt(3, i);
         stmt.setLong(4, i);
-        stmt.setBigDecimal(5, new BigDecimal(i*0.5d));
+        stmt.setBigDecimal(5, new BigDecimal(i * 0.5d));
         Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * MILLIS_IN_DAY);
         stmt.setDate(6, date);
     }
-    
+
     public static void validateRowKeyColumns(ResultSet rs, int i) throws SQLException {
         assertTrue(rs.next());
         assertEquals(rs.getString(1), "varchar" + String.valueOf(i));
         assertEquals(rs.getString(2), "char" + String.valueOf(i));
         assertEquals(rs.getInt(3), i);
         assertEquals(rs.getInt(4), i);
-        assertEquals(rs.getBigDecimal(5), new BigDecimal(i*0.5d));
+        assertEquals(rs.getBigDecimal(5), new BigDecimal(i * 0.5d));
         Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * MILLIS_IN_DAY);
         assertEquals(rs.getDate(6), date);
     }
-    
+
     public static ClientAggregators getSingleSumAggregator(String url, Properties props) throws SQLException {
         try (PhoenixConnection pconn = DriverManager.getConnection(url, props).unwrap(PhoenixConnection.class)) {
             PhoenixStatement statement = new PhoenixStatement(pconn);
@@ -712,35 +710,37 @@ public class TestUtil {
                 public PName getName() {
                     return SINGLE_COLUMN_NAME;
                 }
+
                 @Override
                 public PName getFamilyName() {
                     return SINGLE_COLUMN_FAMILY_NAME;
                 }
+
                 @Override
                 public int getPosition() {
                     return 0;
                 }
-                
+
                 @Override
                 public SortOrder getSortOrder() {
                     return SortOrder.getDefault();
                 }
-                
+
                 @Override
                 public Integer getArraySize() {
                     return 0;
                 }
-                
+
                 @Override
                 public byte[] getViewConstant() {
                     return null;
                 }
-                
+
                 @Override
                 public boolean isViewReferenced() {
                     return false;
                 }
-                
+
                 @Override
                 public String getExpressionStr() {
                     return null;
@@ -765,10 +765,12 @@ public class TestUtil {
                 public boolean isRowTimestamp() {
                     return false;
                 }
+
                 @Override
                 public boolean isDynamic() {
                     return false;
                 }
+
                 @Override
                 public byte[] getColumnQualifierBytes() {
                     return SINGLE_COLUMN_NAME.getBytes();
@@ -782,25 +784,25 @@ public class TestUtil {
 
     public static void createMultiCFTestTable(Connection conn, String tableName, String options) throws SQLException {
         String ddl = "create table if not exists " + tableName + "(" +
-                "   varchar_pk VARCHAR NOT NULL, " +
-                "   char_pk CHAR(5) NOT NULL, " +
-                "   int_pk INTEGER NOT NULL, "+ 
-                "   long_pk BIGINT NOT NULL, " +
-                "   decimal_pk DECIMAL(31, 10) NOT NULL, " +
-                "   a.varchar_col1 VARCHAR, " +
-                "   a.char_col1 CHAR(5), " +
-                "   a.int_col1 INTEGER, " +
-                "   a.long_col1 BIGINT, " +
-                "   a.decimal_col1 DECIMAL(31, 10), " +
-                "   b.varchar_col2 VARCHAR, " +
-                "   b.char_col2 CHAR(5), " +
-                "   b.int_col2 INTEGER, " +
-                "   b.long_col2 BIGINT, " +
-                "   b.decimal_col2 DECIMAL, " +
-                "   b.date_col DATE " + 
-                "   CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk)) "
-                + (options!=null? options : "");
-            conn.createStatement().execute(ddl);
+            "   varchar_pk VARCHAR NOT NULL, " +
+            "   char_pk CHAR(5) NOT NULL, " +
+            "   int_pk INTEGER NOT NULL, " +
+            "   long_pk BIGINT NOT NULL, " +
+            "   decimal_pk DECIMAL(31, 10) NOT NULL, " +
+            "   a.varchar_col1 VARCHAR, " +
+            "   a.char_col1 CHAR(5), " +
+            "   a.int_col1 INTEGER, " +
+            "   a.long_col1 BIGINT, " +
+            "   a.decimal_col1 DECIMAL(31, 10), " +
+            "   b.varchar_col2 VARCHAR, " +
+            "   b.char_col2 CHAR(5), " +
+            "   b.int_col2 INTEGER, " +
+            "   b.long_col2 BIGINT, " +
+            "   b.decimal_col2 DECIMAL, " +
+            "   b.date_col DATE " +
+            "   CONSTRAINT pk PRIMARY KEY (varchar_pk, char_pk, int_pk, long_pk DESC, decimal_pk)) "
+            + (options != null ? options : "");
+        conn.createStatement().execute(ddl);
     }
 
     public static void majorCompact(HBaseTestingUtility utility, TableName table)
@@ -813,7 +815,7 @@ public class TestUtil {
         while ((lastCompactionTimestamp = admin.getLastMajorCompactionTimestamp(table))
             < compactionRequestedSCN
             || (state = admin.getCompactionState(table)).
-            equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR)){
+            equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR)) {
             Thread.sleep(100);
         }
     }
@@ -824,9 +826,9 @@ public class TestUtil {
      * @param tableName name of the table to be compacted
      */
     public static void doMajorCompaction(Connection conn, String tableName) throws Exception {
-    
+
         tableName = SchemaUtil.normalizeIdentifier(tableName);
-    
+
         // We simply write a marker row, request a major compaction, and then wait until the marker
         // row is gone
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
@@ -838,7 +840,7 @@ public class TestUtil {
         }
         try (Table htable = mutationState.getHTable(table)) {
             byte[] markerRowKey = Bytes.toBytes("TO_DELETE");
-           
+
             Put put = new Put(markerRowKey);
             long timestamp = 0L;
             // We do not want to wait an hour because of PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY
@@ -855,20 +857,20 @@ public class TestUtil {
             if (table.isTransactional()) {
                 mutationState.commit();
             }
-        
+
             HBaseAdmin hbaseAdmin = services.getAdmin();
             hbaseAdmin.flush(tableName);
             hbaseAdmin.majorCompact(tableName);
             hbaseAdmin.close();
-        
+
             boolean compactionDone = false;
             while (!compactionDone) {
                 Thread.sleep(6000L);
                 Scan scan = new Scan();
                 scan.setStartRow(markerRowKey);
-                scan.setStopRow(Bytes.add(markerRowKey, new byte[] { 0 }));
+                scan.setStopRow(Bytes.add(markerRowKey, new byte[]{0}));
                 scan.setRaw(true);
-        
+
                 try (HTableInterface htableForRawScan = services.getTable(Bytes.toBytes(tableName))) {
                     ResultScanner scanner = htableForRawScan.getScanner(scan);
                     List<Result> results = Lists.newArrayList(scanner);
@@ -877,7 +879,7 @@ public class TestUtil {
                     scanner.close();
                 }
                 LOGGER.info("Compaction done: " + compactionDone);
-                
+
                 // need to run compaction after the next txn snapshot has been written so that compaction can remove deleted rows
                 if (!compactionDone && table.isTransactional()) {
                     hbaseAdmin = services.getAdmin();
@@ -898,7 +900,7 @@ public class TestUtil {
     }
 
     public static void dumpTable(Connection conn, TableName tableName)
-        throws SQLException, IOException{
+        throws SQLException, IOException {
         ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
         HTableInterface table = cqs.getTable(tableName.getName());
         dumpTable(table);
@@ -907,7 +909,8 @@ public class TestUtil {
     public static void dumpTable(HTableInterface table) throws IOException {
         System.out.println("************ dumping " + table + " **************");
         Scan s = new Scan();
-        s.setRaw(true);;
+        s.setRaw(true);
+        ;
         s.setMaxVersions();
         try (ResultScanner scanner = table.getScanner(s)) {
             Result result = null;
@@ -931,7 +934,8 @@ public class TestUtil {
 
     public static int getRowCount(Table table, boolean isRaw) throws IOException {
         Scan s = new Scan();
-        s.setRaw(isRaw);;
+        s.setRaw(isRaw);
+        ;
         s.setMaxVersions();
         int rows = 0;
         try (ResultScanner scanner = table.getScanner(s)) {
@@ -950,7 +954,8 @@ public class TestUtil {
 
     public static CellCount getCellCount(Table table, boolean isRaw) throws IOException {
         Scan s = new Scan();
-        s.setRaw(isRaw);;
+        s.setRaw(isRaw);
+        ;
         s.setMaxVersions();
 
         CellCount cellCount = new CellCount();
@@ -971,16 +976,16 @@ public class TestUtil {
     static class CellCount {
         private Map<String, Integer> rowCountMap = new HashMap<String, Integer>();
 
-        void addCell(String key){
-            if (rowCountMap.containsKey(key)){
-                rowCountMap.put(key, rowCountMap.get(key) +1);
+        void addCell(String key) {
+            if (rowCountMap.containsKey(key)) {
+                rowCountMap.put(key, rowCountMap.get(key) + 1);
             } else {
                 rowCountMap.put(key, 1);
             }
         }
 
-        int getCellCount(String key){
-            if (rowCountMap.containsKey(key)){
+        int getCellCount(String key) {
+            if (rowCountMap.containsKey(key)) {
                 return rowCountMap.get(key);
             } else {
                 return 0;
@@ -989,7 +994,7 @@ public class TestUtil {
     }
 
     public static void dumpIndexStatus(Connection conn, String indexName) throws IOException, SQLException {
-        try (HTableInterface table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) { 
+        try (HTableInterface table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
             System.out.println("************ dumping index status for " + indexName + " **************");
             Scan s = new Scan();
             s.setRaw(true);
@@ -1011,11 +1016,11 @@ public class TestUtil {
                 }
             }
             System.out.println("-----------------------------------------------");
-    }
+        }
     }
 
     public static void printResultSet(ResultSet rs) throws SQLException {
-        while(rs.next()){
+        while (rs.next()) {
             printResult(rs, false);
         }
     }
@@ -1023,13 +1028,13 @@ public class TestUtil {
     public static void printResult(ResultSet rs, boolean multiLine) throws SQLException {
         StringBuilder builder = new StringBuilder();
         int columnCount = rs.getMetaData().getColumnCount();
-        for(int i = 0; i < columnCount; i++) {
-            Object value = rs.getObject(i+1);
+        for (int i = 0; i < columnCount; i++) {
+            Object value = rs.getObject(i + 1);
             String output = value == null ? "null" : value.toString();
             builder.append(output);
-            if(i + 1 < columnCount){
+            if (i + 1 < columnCount) {
                 builder.append(",");
-                if (multiLine){
+                if (multiLine) {
                     builder.append("\n");
                 }
             }
@@ -1042,15 +1047,15 @@ public class TestUtil {
     }
 
     private static class IndexStateCheck {
-    	public final PIndexState indexState;
-    	public final Long indexDisableTimestamp;
-    	public final Boolean success;
-    	
-    	public IndexStateCheck(PIndexState indexState, Long indexDisableTimestamp, Boolean success) {
-    		this.indexState = indexState;
-    		this.indexDisableTimestamp = indexDisableTimestamp;
-    		this.success = success;
-    	}
+        public final PIndexState indexState;
+        public final Long indexDisableTimestamp;
+        public final Boolean success;
+
+        public IndexStateCheck(PIndexState indexState, Long indexDisableTimestamp, Boolean success) {
+            this.indexState = indexState;
+            this.indexDisableTimestamp = indexDisableTimestamp;
+            this.success = success;
+        }
     }
 
     public static void waitForIndexState(Connection conn, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException {
@@ -1061,9 +1066,9 @@ public class TestUtil {
             String index = SchemaUtil.getTableNameFromFullName(fullIndexName);
             Thread.sleep(1000); // sleep 1 sec
             String query = "SELECT " + PhoenixDatabaseMetaData.INDEX_STATE + " FROM " +
-                    PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME
-                    + ") = (" + "'" + schema + "','" + index + "') "
-                    + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL";
+                PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME
+                + ") = (" + "'" + schema + "','" + index + "') "
+                + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL";
             ResultSet rs = conn.createStatement().executeQuery(query);
             if (rs.next()) {
                 actualIndexState = PIndexState.fromSerializedValue(rs.getString(1));
@@ -1074,7 +1079,7 @@ public class TestUtil {
             }
         } while (++nTries < maxTries);
         fail("Ran out of time waiting for index state to become " + expectedIndexState + " last seen actual state is " +
-                (actualIndexState == null ? "Unknown" : actualIndexState.toString()));
+            (actualIndexState == null ? "Unknown" : actualIndexState.toString()));
     }
 
     public static void waitForIndexState(Connection conn, String fullIndexName, PIndexState expectedIndexState, Long expectedIndexDisableTimestamp) throws InterruptedException, SQLException {
@@ -1093,24 +1098,24 @@ public class TestUtil {
     }
 
     public static boolean checkIndexState(Connection conn, String fullIndexName, PIndexState expectedIndexState, Long expectedIndexDisableTimestamp) throws SQLException {
-        return Boolean.TRUE.equals(checkIndexStateInternal(conn,fullIndexName, expectedIndexState, expectedIndexDisableTimestamp).success);
+        return Boolean.TRUE.equals(checkIndexStateInternal(conn, fullIndexName, expectedIndexState, expectedIndexDisableTimestamp).success);
     }
-    
+
     public static void assertIndexState(Connection conn, String fullIndexName, PIndexState expectedIndexState, Long expectedIndexDisableTimestamp) throws SQLException {
-    	IndexStateCheck state = checkIndexStateInternal(conn,fullIndexName, expectedIndexState, expectedIndexDisableTimestamp);
+        IndexStateCheck state = checkIndexStateInternal(conn, fullIndexName, expectedIndexState, expectedIndexDisableTimestamp);
         if (!Boolean.TRUE.equals(state.success)) {
-        	if (expectedIndexState != null) {
-        		assertEquals(expectedIndexState, state.indexState);
-        	}
-        	if (expectedIndexDisableTimestamp != null) {
-        		assertEquals(expectedIndexDisableTimestamp, state.indexDisableTimestamp);
-        	}
+            if (expectedIndexState != null) {
+                assertEquals(expectedIndexState, state.indexState);
+            }
+            if (expectedIndexDisableTimestamp != null) {
+                assertEquals(expectedIndexDisableTimestamp, state.indexDisableTimestamp);
+            }
         }
     }
-    
+
     public static PIndexState getIndexState(Connection conn, String fullIndexName) throws SQLException {
-    	IndexStateCheck state = checkIndexStateInternal(conn, fullIndexName, null, null);
-    	return state.indexState;
+        IndexStateCheck state = checkIndexStateInternal(conn, fullIndexName, null, null);
+        return state.indexState;
     }
 
     public static long getPendingDisableCount(PhoenixConnection conn, String indexTableName) {
@@ -1120,34 +1125,34 @@ public class TestUtil {
 
         try {
             Result pendingDisableCountResult =
-                    conn.getQueryServices()
-                            .getTable(SchemaUtil.getPhysicalTableName(
-                                    PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
-                                    conn.getQueryServices().getProps()).getName())
-                            .get(get);
+                conn.getQueryServices()
+                    .getTable(SchemaUtil.getPhysicalTableName(
+                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
+                        conn.getQueryServices().getProps()).getName())
+                    .get(get);
             return Bytes.toLong(pendingDisableCountResult.getValue(TABLE_FAMILY_BYTES,
-                    PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES));
+                PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES));
         } catch (Exception e) {
             LOGGER.error("Exception in getPendingDisableCount: " + e);
             return 0;
         }
     }
-    
+
     private static IndexStateCheck checkIndexStateInternal(Connection conn, String fullIndexName, PIndexState expectedIndexState, Long expectedIndexDisableTimestamp) throws SQLException {
         String schema = SchemaUtil.getSchemaNameFromFullName(fullIndexName);
         String index = SchemaUtil.getTableNameFromFullName(fullIndexName);
         String query = "SELECT CAST(" + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " AS BIGINT)," + PhoenixDatabaseMetaData.INDEX_STATE + " FROM " +
-                PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME
-                + ") = (" + "'" + schema + "','" + index + "') "
-                + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL";
+            PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME
+            + ") = (" + "'" + schema + "','" + index + "') "
+            + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL";
         ResultSet rs = conn.createStatement().executeQuery(query);
         Long actualIndexDisableTimestamp = null;
         PIndexState actualIndexState = null;
         if (rs.next()) {
             actualIndexDisableTimestamp = rs.getLong(1);
             actualIndexState = PIndexState.fromSerializedValue(rs.getString(2));
-            boolean matchesExpected = (expectedIndexDisableTimestamp == null || Objects.equal(actualIndexDisableTimestamp, expectedIndexDisableTimestamp)) 
-                    && (expectedIndexState == null || actualIndexState == expectedIndexState);
+            boolean matchesExpected = (expectedIndexDisableTimestamp == null || Objects.equal(actualIndexDisableTimestamp, expectedIndexDisableTimestamp))
+                && (expectedIndexState == null || actualIndexState == expectedIndexState);
             if (matchesExpected) {
                 return new IndexStateCheck(actualIndexState, actualIndexDisableTimestamp, Boolean.TRUE);
             }
@@ -1163,27 +1168,27 @@ public class TestUtil {
         assertTrue(rs.next());
         return rs.getLong(1);
     }
-    
+
     public static void addCoprocessor(Connection conn, String tableName, Class coprocessorClass) throws Exception {
         int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
         ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
         HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
-		if (!descriptor.getCoprocessors().contains(coprocessorClass.getName())) {
-			descriptor.addCoprocessor(coprocessorClass.getName(), null, priority, null);
-		}else{
-			return;
-		}
+        if (!descriptor.getCoprocessors().contains(coprocessorClass.getName())) {
+            descriptor.addCoprocessor(coprocessorClass.getName(), null, priority, null);
+        } else {
+            return;
+        }
         final int retries = 10;
         int numTries = 10;
         try (HBaseAdmin admin = services.getAdmin()) {
             admin.modifyTable(Bytes.toBytes(tableName), descriptor);
             while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
-                    && numTries > 0) {
+                && numTries > 0) {
                 numTries--;
                 if (numTries == 0) {
                     throw new Exception(
-                            "Failed to add " + coprocessorClass.getName() + " after "
-                                    + retries + " retries.");
+                        "Failed to add " + coprocessorClass.getName() + " after "
+                            + retries + " retries.");
                 }
                 Thread.sleep(1000);
             }
@@ -1195,7 +1200,7 @@ public class TestUtil {
         HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
         if (descriptor.getCoprocessors().contains(coprocessorClass.getName())) {
             descriptor.removeCoprocessor(coprocessorClass.getName());
-        }else{
+        } else {
             return;
         }
         final int retries = 10;
@@ -1203,18 +1208,18 @@ public class TestUtil {
         try (HBaseAdmin admin = services.getAdmin()) {
             admin.modifyTable(Bytes.toBytes(tableName), descriptor);
             while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
-                    && numTries > 0) {
+                && numTries > 0) {
                 numTries--;
                 if (numTries == 0) {
                     throw new Exception(
-                            "Failed to remove " + coprocessorClass.getName() + " after "
-                                    + retries + " retries.");
+                        "Failed to remove " + coprocessorClass.getName() + " after "
+                            + retries + " retries.");
                 }
                 Thread.sleep(1000);
             }
         }
     }
-     
+
     public static boolean compare(CompareOp op, ImmutableBytesWritable lhsOutPtr, ImmutableBytesWritable rhsOutPtr) {
         int compareResult = Bytes.compareTo(lhsOutPtr.get(), lhsOutPtr.getOffset(), lhsOutPtr.getLength(), rhsOutPtr.get(), rhsOutPtr.getOffset(), rhsOutPtr.getLength());
         return ByteUtil.compare(op, compareResult);
@@ -1233,31 +1238,30 @@ public class TestUtil {
         return queryPlan;
     }
 
-    public static void assertResultSet(ResultSet rs,Object[][] rows) throws Exception {
-        for(int rowIndex=0; rowIndex < rows.length; rowIndex++) {
-            assertTrue("rowIndex:["+rowIndex+"] rs.next error!",rs.next());
-            for(int columnIndex = 1; columnIndex <= rows[rowIndex].length; columnIndex++) {
+    public static void assertResultSet(ResultSet rs, Object[][] rows) throws Exception {
+        for (int rowIndex = 0; rowIndex < rows.length; rowIndex++) {
+            assertTrue("rowIndex:[" + rowIndex + "] rs.next error!", rs.next());
+            for (int columnIndex = 1; columnIndex <= rows[rowIndex].length; columnIndex++) {
                 Object realValue = rs.getObject(columnIndex);
-                Object expectedValue = rows[rowIndex][columnIndex-1];
-                if(realValue == null) {
-                    assertNull("rowIndex:["+rowIndex+"],columnIndex:["+columnIndex+"]",expectedValue);
-                }
-                else {
-                    assertEquals("rowIndex:["+rowIndex+"],columnIndex:["+columnIndex+"],realValue:["+
-                            realValue+"],expectedValue:["+expectedValue+"]",
-                            expectedValue,
-                            realValue
-                            );
+                Object expectedValue = rows[rowIndex][columnIndex - 1];
+                if (realValue == null) {
+                    assertNull("rowIndex:[" + rowIndex + "],columnIndex:[" + columnIndex + "]", expectedValue);
+                } else {
+                    assertEquals("rowIndex:[" + rowIndex + "],columnIndex:[" + columnIndex + "],realValue:[" +
+                            realValue + "],expectedValue:[" + expectedValue + "]",
+                        expectedValue,
+                        realValue
+                    );
                 }
             }
         }
         assertTrue(!rs.next());
     }
-    
+
     public static Collection<Object[]> filterTxParamData(Collection<Object[]> data, int index) {
         boolean runAllTests = true;
         boolean runNoTests = true;
-        
+
         for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) {
             runAllTests &= provider.runTests();
             runNoTests &= !provider.runTests();
@@ -1270,7 +1274,7 @@ public class TestUtil {
         }
         List<Object[]> filteredData = Lists.newArrayListWithExpectedSize(data.size());
         for (Object[] params : data) {
-            String provider = (String)params[index];
+            String provider = (String) params[index];
             if (provider == null || TransactionFactory.Provider.valueOf(provider).runTests()) {
                 filteredData.add(params);
             }
@@ -1280,6 +1284,7 @@ public class TestUtil {
 
     /**
      * Find a random free port in localhost for binding.
+     *
      * @return A port number or -1 for failure.
      */
     public static int getRandomPort() {
@@ -1293,9 +1298,9 @@ public class TestUtil {
 
     public static boolean hasFilter(Scan scan, Class<? extends Filter> filterClass) {
         Iterator<Filter> filterIter = ScanUtil.getFilterIterator(scan);
-        while(filterIter.hasNext()) {
+        while (filterIter.hasNext()) {
             Filter filter = filterIter.next();
-            if(filterClass.isInstance(filter)) {
+            if (filterClass.isInstance(filter)) {
                 return true;
             }
         }
@@ -1316,7 +1321,7 @@ public class TestUtil {
         return JoinCompiler.compile(stmt, select, resolver);
     }
 
-    public static void assertSelectStatement(FilterableStatement selectStatement , String sql) {
+    public static void assertSelectStatement(FilterableStatement selectStatement, String sql) {
         assertTrue(selectStatement.toString().trim().equals(sql));
     }
 
@@ -1354,7 +1359,7 @@ public class TestUtil {
 
     public static void assertRawCellCount(Connection conn, TableName tableName,
                                           byte[] row, int expectedCellCount)
-        throws SQLException, IOException{
+        throws SQLException, IOException {
         ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
         Table table = cqs.getTable(tableName.getName());
         CellCount cellCount = getCellCount(table, true);
@@ -1368,10 +1373,10 @@ public class TestUtil {
         Properties props = new Properties();
         ResultSet rs;
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
-        try (Connection conn = DriverManager.getConnection(url, props)){
+        try (Connection conn = DriverManager.getConnection(url, props)) {
             rs = conn.createStatement().executeQuery(sql);
             rowExists = rs.next();
-            if (shouldExist){
+            if (shouldExist) {
                 Assert.assertTrue("Row was not found at time " + scn +
                         " when it should have been",
                     rowExists);
@@ -1388,7 +1393,7 @@ public class TestUtil {
         Properties props = new Properties();
         ResultSet rs;
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
-        try (Connection conn = DriverManager.getConnection(url, props)){
+        try (Connection conn = DriverManager.getConnection(url, props)) {
             rs = conn.createStatement().executeQuery(sql);
             Assert.assertTrue("Value " + value + " does not exist at scn " + scn, rs.next());
             Assert.assertEquals(value, rs.getString(1));
@@ -1396,4 +1401,12 @@ public class TestUtil {
 
     }
 
+    public static String getExplainPlan(Connection conn, String sql) throws SQLException {
+        try (ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + sql)){
+            return QueryUtil.getExplainPlan(rs);
+        }
+    }
+
+
+
 }
diff --git a/phoenix-hbase-compat-1.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java b/phoenix-hbase-compat-1.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
index d3c204e..515e7e6 100644
--- a/phoenix-hbase-compat-1.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
+++ b/phoenix-hbase-compat-1.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
@@ -22,4 +22,7 @@ public class HbaseCompatCapabilities {
     public static boolean hasNewMetrics() {
         return false;
     }
+
+    //HBase 1.5+ has preWALAppend() on RegionObserver (HBASE-22623)
+    public static boolean hasPreWALAppend() { return false; }
 }
diff --git a/phoenix-hbase-compat-1.3.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatIndexRegionObserver.java b/phoenix-hbase-compat-1.3.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatIndexRegionObserver.java
new file mode 100644
index 0000000..57e4685
--- /dev/null
+++ b/phoenix-hbase-compat-1.3.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatIndexRegionObserver.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.compat.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CompatIndexRegionObserver extends BaseRegionObserver {
+
+    public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> c, WALKey key,
+                             WALEdit edit) {
+        //no-op implementation for HBase 1.3 and 1.4 that doesn't support this co-proc hook.
+    }
+
+    public static void appendToWALKey(WALKey key, String attrKey, byte[] attrValue) {
+        //no-op for HBase 1.3 and 1.4 because we don't have WALKey.addExtendedAttribute(String,
+        // byte[])
+    }
+
+    public static byte[] getAttributeValueFromWALKey(WALKey key, String attrKey) {
+        return null;
+    }
+
+    public static Map<String, byte[]> getAttributeValuesFromWALKey(WALKey key) {
+        return new HashMap<String, byte[]>();
+    }
+}
diff --git a/phoenix-hbase-compat-1.4.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java b/phoenix-hbase-compat-1.4.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
index 80ae804..d00827c 100644
--- a/phoenix-hbase-compat-1.4.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
+++ b/phoenix-hbase-compat-1.4.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
@@ -22,4 +22,8 @@ public class HbaseCompatCapabilities {
     public static boolean hasNewMetrics() {
         return true;
     }
+
+    //HBase 1.5+ has preWALAppend() on RegionObserver (HBASE-22623)
+    public static boolean hasPreWALAppend() { return false; }
+
 }
diff --git a/phoenix-hbase-compat-1.4.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatIndexRegionObserver.java b/phoenix-hbase-compat-1.4.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatIndexRegionObserver.java
new file mode 100644
index 0000000..60858a6
--- /dev/null
+++ b/phoenix-hbase-compat-1.4.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatIndexRegionObserver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CompatIndexRegionObserver extends BaseRegionObserver {
+
+    public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> c, WALKey key,
+                             WALEdit edit) {
+        //no-op implementation for HBase 1.3 and 1.4 that doesn't support this co-proc hook.
+    }
+
+    public static void appendToWALKey(WALKey key, String attrKey, byte[] attrValue) {
+        //no-op for HBase 1.3 and 1.4 because we don't have WALKey.addExtendedAttribute(String,
+        // byte[])
+    }
+
+    public static byte[] getAttributeValueFromWALKey(WALKey key, String attrKey) {
+        return null;
+    }
+
+    public static Map<String, byte[]> getAttributeValuesFromWALKey(WALKey key) {
+        return new HashMap<String, byte[]>();
+    }
+}
diff --git a/phoenix-hbase-compat-1.5.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java b/phoenix-hbase-compat-1.5.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
index 80ae804..e3df05a 100644
--- a/phoenix-hbase-compat-1.5.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
+++ b/phoenix-hbase-compat-1.5.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
@@ -22,4 +22,7 @@ public class HbaseCompatCapabilities {
     public static boolean hasNewMetrics() {
         return true;
     }
+
+    //HBase 1.5+ has preWALAppend() on RegionObserver (HBASE-22623)
+    public static boolean hasPreWALAppend() { return true; }
 }
diff --git a/phoenix-hbase-compat-1.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java b/phoenix-hbase-compat-1.5.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatIndexRegionObserver.java
similarity index 53%
copy from phoenix-hbase-compat-1.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
copy to phoenix-hbase-compat-1.5.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatIndexRegionObserver.java
index d3c204e..b8eb0a5 100644
--- a/phoenix-hbase-compat-1.3.0/src/main/java/org/apache/phoenix/compat/hbase/HbaseCompatCapabilities.java
+++ b/phoenix-hbase-compat-1.5.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatIndexRegionObserver.java
@@ -15,11 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.phoenix.compat.hbase;
+package org.apache.phoenix.compat.hbase.coprocessor;
 
-public class HbaseCompatCapabilities {
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.wal.WALKey;
 
-    public static boolean hasNewMetrics() {
-        return false;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CompatIndexRegionObserver extends BaseRegionObserver {
+
+    public static void appendToWALKey(WALKey key, String attrKey, byte[] attrValue) {
+        key.addExtendedAttribute(attrKey, attrValue);
+    }
+
+    public static byte[] getAttributeValueFromWALKey(WALKey key, String attrKey) {
+        return key.getExtendedAttribute(attrKey);
     }
+
+    public static Map<String, byte[]> getAttributeValuesFromWALKey(WALKey key) {
+        return new HashMap<String, byte[]>(key.getExtendedAttributes());
+    }
+
 }
diff --git a/phoenix-protocol/src/main/PTable.proto b/phoenix-protocol/src/main/PTable.proto
index 95e90cb..9185a59 100644
--- a/phoenix-protocol/src/main/PTable.proto
+++ b/phoenix-protocol/src/main/PTable.proto
@@ -110,6 +110,7 @@ message PTable {
   optional int64 phoenixTTLHighWaterMark = 43;
   optional bool viewModifiedPhoenixTTL = 44;
   optional int64 lastDDLTimestamp=45;
+  optional bool changeDetectionEnabled=46;
 }
 
 message EncodedCQCounter {
diff --git a/phoenix-protocol/src/main/ServerCachingService.proto b/phoenix-protocol/src/main/ServerCachingService.proto
index 0e37de3..9ce303a 100644
--- a/phoenix-protocol/src/main/ServerCachingService.proto
+++ b/phoenix-protocol/src/main/ServerCachingService.proto
@@ -65,6 +65,7 @@ message IndexMaintainer {
   optional int32 viewIndexIdType = 22 ;
   optional int32 indexDataColumnCount = 23 [default = -1];
   optional string parentTableType = 24;
+  optional string logicalIndexName = 25;
 }
 
 message AddServerCacheRequest {
diff --git a/pom.xml b/pom.xml
index f14d25e..f65e14d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1171,7 +1171,7 @@
     </profile>
     <!-- See BUILDING.md for profile selection -->
     <profile>
-      <id>phoenix-hbase-compat-1.3.0-default</id>
+      <id>phoenix-hbase-compat-1.5.0-default</id>
       <activation>
         <property>
           <name>!hbase.profile</name>
@@ -1180,13 +1180,13 @@
       <dependencies>
         <dependency>
           <groupId>org.apache.phoenix</groupId>
-          <artifactId>phoenix-hbase-compat-1.3.0</artifactId>
+          <artifactId>phoenix-hbase-compat-1.5.0</artifactId>
         </dependency>
       </dependencies>
       <properties>
-        <hbase.profile>1.3</hbase.profile>
-        <hbase.compat.version>1.3.0</hbase.compat.version>
-        <hbase.version>1.3.5</hbase.version>
+        <hbase.profile>1.5</hbase.profile>
+        <hbase.compat.version>1.5.0</hbase.compat.version>
+        <hbase.version>1.5.0</hbase.version>
       </properties>
     </profile>
     <profile>