You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2024/03/19 15:28:34 UTC

(phoenix) branch PHOENIX-7001-feature updated: PHOENIX-7015 Implementing CDCGlobalIndexRegionScanner (#1813)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch PHOENIX-7001-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this push:
     new e395780e9d PHOENIX-7015 Implementing CDCGlobalIndexRegionScanner (#1813)
e395780e9d is described below

commit e395780e9d6fe3c61c3ed3e3be0387f79bb0cd5b
Author: TheNamesRai <sa...@outlook.com>
AuthorDate: Tue Mar 19 20:58:27 2024 +0530

    PHOENIX-7015 Implementing CDCGlobalIndexRegionScanner (#1813)
---
 .../java/org/apache/phoenix/end2end/CDCBaseIT.java | 655 ++++++++++++++++++++
 .../apache/phoenix/end2end/CDCDefinitionIT.java    | 328 ++++++++++
 .../java/org/apache/phoenix/end2end/CDCMiscIT.java | 449 --------------
 .../org/apache/phoenix/end2end/CDCQueryIT.java     | 667 +++++++++++++++++++++
 .../phoenix/end2end/index/SingleCellIndexIT.java   |  11 +-
 .../phoenix/compile/CreateIndexCompiler.java       |   2 +-
 .../phoenix/compile/IndexStatementRewriter.java    |   8 +-
 .../org/apache/phoenix/compile/QueryCompiler.java  |  78 ++-
 .../apache/phoenix/compile/StatementContext.java   |  20 +-
 .../phoenix/compile/TupleProjectionCompiler.java   |   9 +-
 .../coprocessor/BaseScannerRegionObserver.java     |  10 +-
 .../coprocessor/CDCGlobalIndexRegionScanner.java   | 301 ++++++----
 .../coprocessor/GlobalIndexRegionScanner.java      |   2 +-
 .../coprocessor/UncoveredIndexRegionScanner.java   |   2 -
 .../org/apache/phoenix/execute/BaseQueryPlan.java  |  15 +-
 .../org/apache/phoenix/execute/MutationState.java  |  74 +++
 .../org/apache/phoenix/execute/TupleProjector.java |   4 +-
 .../expression/SingleCellColumnExpression.java     |  17 +-
 .../hbase/index/util/ImmutableBytesPtr.java        |  37 +-
 .../org/apache/phoenix/index/CDCTableInfo.java     | 276 +++++++++
 .../org/apache/phoenix/index/IndexMaintainer.java  |  23 +-
 .../phoenix/iterate/RegionScannerFactory.java      |   4 +-
 .../apache/phoenix/optimize/QueryOptimizer.java    | 142 +++--
 .../phoenix/parse/FamilyWildcardParseNode.java     |  12 +-
 .../apache/phoenix/parse/ParseNodeRewriter.java    |  11 +-
 .../phoenix/parse/TableWildcardParseNode.java      |  10 +
 .../apache/phoenix/parse/TerminalParseNode.java    |   8 +
 .../apache/phoenix/parse/WildcardParseNode.java    |  13 +-
 .../phoenix/query/ConnectionQueryServicesImpl.java |   4 +-
 .../org/apache/phoenix/query/QueryConstants.java   |   6 +
 .../org/apache/phoenix/schema/DelegateTable.java   |   6 +
 .../org/apache/phoenix/schema/KeyValueSchema.java  |  26 +
 .../org/apache/phoenix/schema/MetaDataClient.java  | 116 ++--
 .../java/org/apache/phoenix/schema/PTable.java     |   3 +-
 .../java/org/apache/phoenix/schema/PTableImpl.java |  13 +-
 .../org/apache/phoenix/schema/SaltingUtil.java     |   3 -
 .../org/apache/phoenix/util/CDCChangeBuilder.java  | 151 +++++
 .../main/java/org/apache/phoenix/util/CDCUtil.java |  31 +-
 .../java/org/apache/phoenix/util/IndexUtil.java    |   4 +-
 .../java/org/apache/phoenix/util/ScanUtil.java     | 147 +----
 .../CDCInfo.proto}                                 |  34 +-
 41 files changed, 2830 insertions(+), 902 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
new file mode 100644
index 0000000000..8f4bcf17a2
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.ToNumberPolicy;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableProperty;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.phoenix.query.QueryConstants.CDC_CHANGE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
+import static org.apache.phoenix.util.MetaDataUtil.getViewIndexPhysicalName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class CDCBaseIT extends ParallelStatsDisabledIT {
+    static final HashSet<PTable.CDCChangeScope> CHANGE_IMG =
+            new HashSet<>(Arrays.asList(PTable.CDCChangeScope.CHANGE));
+    static final HashSet<PTable.CDCChangeScope> PRE_POST_IMG = new HashSet<>(
+            Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST));
+
+    protected ManualEnvironmentEdge injectEdge;
+    protected Gson gson = new GsonBuilder()
+            .setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE)
+            .create();
+    protected Calendar cal = Calendar.getInstance();
+
+    protected void createTable(Connection conn, String table_sql)
+            throws Exception {
+        createTable(conn, table_sql, null, false, null, false, null);
+    }
+
+    protected void createTable(Connection conn, String table_sql,
+                               PTable.QualifierEncodingScheme encodingScheme)
+            throws Exception {
+        createTable(conn, table_sql, encodingScheme, false, null, false, null);
+    }
+
+    protected void createTable(Connection conn, String table_sql,
+                               PTable.QualifierEncodingScheme encodingScheme, boolean multitenant)
+            throws Exception {
+        createTable(conn, table_sql, encodingScheme, multitenant, null, false, null);
+    }
+
+    protected void createTable(Connection conn, String table_sql,
+                               PTable.QualifierEncodingScheme encodingScheme, boolean multitenant,
+                               Integer nSaltBuckets, boolean immutable, PTable.ImmutableStorageScheme immutableStorageScheme)
+            throws Exception {
+        createTable(conn, table_sql, encodingScheme, multitenant, nSaltBuckets, null, immutable, immutableStorageScheme);
+    }
+
+    protected void createTable(Connection conn, String table_sql,
+                               PTable.QualifierEncodingScheme encodingScheme, boolean multitenant,
+                               Integer nSaltBuckets, PTable.IndexType indexType, boolean immutable,
+                               PTable.ImmutableStorageScheme immutableStorageScheme)
+            throws Exception {
+        createTable(conn, table_sql, new HashMap<String, Object>() {{
+            put(TableProperty.COLUMN_ENCODED_BYTES.getPropertyName(), encodingScheme != null ?
+                    new Byte(encodingScheme.getSerializedMetadataValue()) : null);
+            put(TableProperty.MULTI_TENANT.getPropertyName(), multitenant);
+            put(TableProperty.SALT_BUCKETS.getPropertyName(), nSaltBuckets);
+            put(TableProperty.INDEX_TYPE.getPropertyName(), indexType);
+            put(TableProperty.IMMUTABLE_ROWS.getPropertyName(), immutable);
+            put(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName(), immutableStorageScheme != null ?
+                    immutableStorageScheme.name() : null);
+        }});
+    }
+
+    protected void createTable(Connection conn, String table_sql,
+                               Map<String,Object> tableProps) throws Exception {
+        List<String> props = new ArrayList<>();
+        Byte encodingScheme = (Byte) TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
+        if (encodingScheme != null && encodingScheme !=
+                QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES) {
+            props.add(TableProperty.COLUMN_ENCODED_BYTES.getPropertyName() + "=" + encodingScheme);
+        }
+        Boolean multitenant = (Boolean) TableProperty.MULTI_TENANT.getValue(tableProps);
+        if (multitenant != null && multitenant) {
+            props.add(TableProperty.MULTI_TENANT.getPropertyName() + "=" + multitenant);
+        }
+        Integer nSaltBuckets = (Integer) TableProperty.SALT_BUCKETS.getValue(tableProps);
+        if (nSaltBuckets != null) {
+            props.add(TableProperty.SALT_BUCKETS.getPropertyName() + "=" + nSaltBuckets);
+        }
+        PTable.IndexType indexType = (PTable.IndexType) TableProperty.INDEX_TYPE.getValue(
+                tableProps);
+        if (indexType != null && indexType == PTable.IndexType.LOCAL) {
+            props.add(TableProperty.INDEX_TYPE.getPropertyName() + "=" +
+                    (indexType == PTable.IndexType.LOCAL ? "l" : "g"));
+        }
+        if (nSaltBuckets != null) {
+            props.add(TableProperty.INDEX_TYPE.getPropertyName() + "=" + indexType);
+        }
+        Boolean immutableTable = (Boolean) TableProperty.IMMUTABLE_ROWS.getValue(tableProps);
+        if (immutableTable) {
+            props.add(TableProperty.IMMUTABLE_ROWS.getPropertyName() + "=true");
+        }
+        PTable.ImmutableStorageScheme immutableStorageScheme =
+                (PTable.ImmutableStorageScheme) TableProperty
+                        .IMMUTABLE_STORAGE_SCHEME.getValue(tableProps);
+        if (immutableStorageScheme != null) {
+            props.add(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName() + "="
+                    + immutableStorageScheme.name());
+        }
+        table_sql += " " + String.join(", ", props);
+        conn.createStatement().execute(table_sql);
+    }
+
+    protected void createCDCAndWait(Connection conn, String tableName, String cdcName,
+                                    String cdc_sql) throws Exception {
+        createCDCAndWait(conn, tableName, cdcName, cdc_sql, null, null, null);
+    }
+
+    protected void createCDCAndWait(Connection conn, String tableName, String cdcName,
+                                  String cdc_sql, PTable.IndexType indexType) throws Exception{
+        createCDCAndWait(conn, tableName, cdcName, cdc_sql, null, null, indexType);
+    }
+
+    protected void createCDCAndWait(Connection conn, String tableName, String cdcName,
+                                    String cdc_sql, PTable.QualifierEncodingScheme encodingScheme,
+                                    Integer nSaltBuckets) throws Exception {
+        createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme, nSaltBuckets, null);
+    }
+
+    protected void createCDCAndWait(Connection conn, String tableName, String cdcName,
+                                    String cdc_sql, PTable.QualifierEncodingScheme encodingScheme,
+                                    Integer nSaltBuckets, PTable.IndexType indexType) throws Exception {
+        // For CDC, multitenancy gets derived automatically via the parent table.
+        createTable(conn, cdc_sql, encodingScheme, false, nSaltBuckets, indexType, false, null);
+        String schemaName = SchemaUtil.getSchemaNameFromFullName(tableName);
+        tableName = SchemaUtil.getTableNameFromFullName(tableName);
+        IndexToolIT.runIndexTool(false, schemaName, tableName,
+                "\""+CDCUtil.getCDCIndexName(cdcName)+"\"");
+        String indexFullName = SchemaUtil.getTableName(schemaName,
+                CDCUtil.getCDCIndexName(cdcName));
+        TestUtil.waitForIndexState(conn, indexFullName, PIndexState.ACTIVE);
+    }
+
+    protected void assertCDCState(Connection conn, String cdcName, String expInclude,
+                                  int idxType) throws SQLException {
+        try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
+                "system.catalog WHERE table_name = '" + cdcName +
+                "' AND column_name IS NULL and column_family IS NULL")) {
+            assertEquals(true, rs.next());
+            assertEquals(expInclude, rs.getString(1));
+        }
+        try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " +
+                "system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) +
+                "' AND column_name IS NULL and column_family IS NULL")) {
+            assertEquals(true, rs.next());
+            assertEquals(idxType, rs.getInt(1));
+        }
+    }
+
+    protected void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes,
+                                String tableName, String datatableName)
+            throws SQLException {
+        Properties props = new Properties();
+        String schemaName = SchemaUtil.getSchemaNameFromFullName(tableName);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
+        PTable cdcTable = PhoenixRuntime.getTable(conn, cdcFullName);
+        assertEquals(expIncludeScopes, cdcTable.getCDCIncludeScopes());
+        assertEquals(expIncludeScopes, TableProperty.INCLUDE.getPTableValue(cdcTable));
+        assertNull(cdcTable.getIndexState()); // Index state should be null for CDC.
+        assertNull(cdcTable.getIndexType()); // This is not an index.
+        assertEquals(tableName, cdcTable.getParentName().getString());
+        String indexFullName = SchemaUtil.getTableName(schemaName,
+                CDCUtil.getCDCIndexName(cdcName));
+        assertEquals(cdcTable.getPhysicalName().getString(), tableName == datatableName ?
+                indexFullName : getViewIndexPhysicalName(datatableName));
+    }
+
+    protected void assertSaltBuckets(Connection conn, String tableName, Integer nbuckets)
+            throws SQLException {
+        PTable table = PhoenixRuntime.getTable(conn, tableName);
+        assertSaltBuckets(table, nbuckets);
+    }
+
+    protected void assertSaltBuckets(PTable table, Integer nbuckets) {
+        if (nbuckets == null || nbuckets == 0) {
+            assertNull(table.getBucketNum());
+        } else {
+            assertEquals(nbuckets, table.getBucketNum());
+        }
+    }
+
+    protected void assertNoResults(Connection conn, String cdcName) throws SQLException {
+        try (Statement stmt = conn.createStatement()) {
+            ResultSet rs = stmt.executeQuery("select * from " + cdcName);
+            assertFalse(rs.next());
+        }
+    }
+
+    protected Connection newConnection() throws SQLException {
+        return newConnection(null);
+    }
+
+    protected Connection newConnection(String tenantId) throws SQLException {
+        Properties props = new Properties();
+        // Uncomment these only while debugging.
+        //props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
+        //props.put("hbase.client.scanner.timeout.period", "6000000");
+        //props.put("phoenix.query.timeoutMs", "6000000");
+        //props.put("zookeeper.session.timeout", "6000000");
+        //props.put("hbase.rpc.timeout", "6000000");
+        if (tenantId != null) {
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        return DriverManager.getConnection(getUrl(), props);
+    }
+
+    private Map<String, Object> addChange(Connection conn, Map preImage,
+                                          long changeTS, String changeType, String tableName,
+                                          Map<String, Object> pks, Map<String, Object> values)
+                                          throws SQLException {
+        if (conn != null) {
+            String sql;
+            if (changeType == CDC_DELETE_EVENT_TYPE) {
+                String predicates = pks.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).
+                        collect(Collectors.joining(", "));
+                sql = "DELETE FROM " + tableName + " WHERE " + predicates;
+            }
+            else {
+                String columnList = Stream.concat(pks.keySet().stream(),
+                        values.keySet().stream()).collect(Collectors.joining(", "));
+                String valueList =
+                        Stream.concat(pks.values().stream(), values.values().stream())
+                                .map(v -> String.valueOf(v)).collect(Collectors.joining(", "));
+                sql = "UPSERT INTO " + tableName + " (" + columnList + ") VALUES (" + valueList + ")";
+            }
+            cal.setTimeInMillis(changeTS);
+            injectEdge.setValue(changeTS);
+            try (Statement stmt = conn.createStatement()) {
+                stmt.execute(sql);
+            }
+        }
+        Map<String, Object> cdcChange = new HashMap<>();
+        cdcChange.put(CDC_EVENT_TYPE, changeType);
+        cdcChange.put(CDC_PRE_IMAGE, preImage);
+        if (changeType == CDC_UPSERT_EVENT_TYPE) {
+            Map<String, Object> changeImage = new HashMap<>();
+            changeImage.putAll(values);
+            cdcChange.put(CDC_CHANGE_IMAGE, changeImage);
+            Map<String, Object> postImage = new HashMap<>();
+            postImage.putAll(preImage);
+            postImage.putAll(changeImage);
+            cdcChange.put(CDC_POST_IMAGE, postImage);
+        }
+        return cdcChange;
+    }
+
+    // FIXME: Add the following with consecutive upserts on the sake PK (no delete in between):
+    //  - with different values
+    //  - with a null
+    //  - missing columns
+    protected List<ChangeRow> generateChanges(long startTS, String[] tenantids, String tableName,
+                                              String datatableNameForDDL, CommitAdapter committer)
+                            throws Exception {
+        List<ChangeRow> changes = new ArrayList<>();
+        EnvironmentEdgeManager.injectEdge(injectEdge);
+        injectEdge.setValue(startTS);
+        boolean dropV3Done = false;
+        committer.init();
+        Map<String, Object> pk1 = new HashMap() {{ put("K", 1); }};
+        Map<String, Object> pk2 = new HashMap() {{ put("K", 2); }};
+        Map<String, Object> pk3 = new HashMap() {{ put("K", 3); }};
+        Map<String, Object> c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12;
+        for (String tid: tenantids) {
+            try (Connection conn = committer.getConnection(tid)) {
+                c1 = addChange(conn, new HashMap(), startTS,
+                        CDC_UPSERT_EVENT_TYPE, tableName, pk1, new TreeMap<String, Object>() {{
+                            put("V1", 100L);
+                            put("V2", 1000L);
+                            put("B.VB", 10000L);
+                        }});
+                changes.add(new ChangeRow(tid, startTS, pk1, c1));
+                c2 = addChange(conn, new HashMap(), startTS,
+                        CDC_UPSERT_EVENT_TYPE, tableName, pk2, new TreeMap<String, Object>() {{
+                            put("V1", 200L);
+                            put("V2", 2000L);
+                        }});
+                changes.add(new ChangeRow(tid, startTS, pk2, c2));
+                committer.commit(conn);
+
+                c3 = addChange(conn, new HashMap(), startTS +=100,
+                        CDC_UPSERT_EVENT_TYPE,
+                        tableName, pk3, new TreeMap<String, Object>() {{
+                            put("V1", 300L);
+                            put("V2", null);
+                            put("B.VB", null);
+                        }});
+                changes.add(new ChangeRow(tid, startTS, pk3, c3));
+                committer.commit(conn);
+
+                c4 = addChange(conn, (Map) c1.get(CDC_POST_IMAGE),
+                        startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+                        new TreeMap<String, Object>() {{
+                            put("V1", 101L);
+                        }});
+                changes.add(new ChangeRow(tid, startTS, pk1, c4));
+                committer.commit(conn);
+            }
+            if (datatableNameForDDL != null && !dropV3Done) {
+                try (Connection conn = newConnection()) {
+                    conn.createStatement().execute("ALTER TABLE " + datatableNameForDDL +
+                            " DROP COLUMN v3");
+                }
+                dropV3Done = true;
+            }
+            try (Connection conn = newConnection(tid)) {
+                c5 = addChange(conn, (Map) c4.get(CDC_POST_IMAGE), startTS +=100,
+                        CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+                changes.add(new ChangeRow(tid, startTS, pk1, c5));
+                committer.commit(conn);
+
+                c6 = addChange(conn, new HashMap(),
+                        startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+                        new TreeMap<String, Object>() {{
+                            put("V1", 102L);
+                            put("V2", 1002L);
+                        }});
+                changes.add(new ChangeRow(tid, startTS, pk1, c6));
+                committer.commit(conn);
+
+                c7 = addChange(conn, (Map) c6.get(CDC_POST_IMAGE), startTS +=100,
+                        CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+                changes.add(new ChangeRow(tid, startTS, pk1, c7));
+                committer.commit(conn);
+
+                c8 = addChange(conn, (Map) c2.get(CDC_POST_IMAGE),
+                        startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk2,
+                        new TreeMap<String, Object>() {{
+                            put("V1", 201L);
+                            put("V2", null);
+                            put("B.VB", 20001L);
+                        }});
+                changes.add(new ChangeRow(tid, startTS, pk2, c8));
+                committer.commit(conn);
+
+                c9 = addChange(conn, new HashMap(),
+                        startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+                        new TreeMap<String, Object>() {{
+                            put("V1", 103L);
+                            put("V2", 1003L);
+                        }});
+                changes.add(new ChangeRow(tid, startTS, pk1, c9));
+                committer.commit(conn);
+
+                c10 = addChange(conn, (Map) c9.get(CDC_POST_IMAGE), startTS +=100,
+                        CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+                changes.add(new ChangeRow(tid, startTS, pk1, c10));
+                committer.commit(conn);
+
+                c11 = addChange(conn, new HashMap(),
+                        startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+                        new TreeMap<String, Object>() {{
+                            put("V1", 104L);
+                            put("V2", 1004L);
+                        }});
+                changes.add(new ChangeRow(tid, startTS, pk1, c11));
+                committer.commit(conn);
+
+                c12 = addChange(conn, (Map) c11.get(CDC_POST_IMAGE), startTS +=100,
+                        CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+                changes.add(new ChangeRow(tid, startTS, pk1, c12));
+                committer.commit(conn);
+            }
+        }
+        committer.reset();
+        return changes;
+    }
+
+    private void _copyScopeIfRelevant(Set<PTable.CDCChangeScope> changeScopes,
+                                      PTable.CDCChangeScope changeScope,
+                                      Map<String, Object> change, Map<String, Object> expChange,
+                                      String scopeKeyName) {
+        if (changeScopes.contains(changeScope) && change.containsKey(scopeKeyName)) {
+            expChange.put(scopeKeyName, change.get(scopeKeyName));
+        }
+    }
+
+    protected void verifyChanges(String tenantId, ResultSet rs, List<ChangeRow> changes,
+                                 Set<PTable.CDCChangeScope> changeScopes,
+                                 boolean mutableTable) throws Exception {
+        for (int i = 0, changenr = 0; i < changes.size(); ++i) {
+            ChangeRow changeRow = changes.get(i);
+            if (changeRow.getTenantID() != tenantId) {
+                continue;
+            }
+            Map<String, Object> expChange = new HashMap<>();
+            Map<String, Object> change = changeRow.change;
+            expChange.put(CDC_EVENT_TYPE, change.get(CDC_EVENT_TYPE));
+            _copyScopeIfRelevant(changeScopes, PTable.CDCChangeScope.PRE, change, expChange,
+                    CDC_PRE_IMAGE);
+            _copyScopeIfRelevant(changeScopes, PTable.CDCChangeScope.POST, change, expChange,
+                    CDC_POST_IMAGE);
+            _copyScopeIfRelevant(changeScopes, PTable.CDCChangeScope.CHANGE, change, expChange,
+                    CDC_CHANGE_IMAGE);
+            String changeDesc = "Change " + (changenr+1) + ": " + changeRow;
+            assertTrue(changeDesc, rs.next());
+            Map cdcObj = gson.fromJson(rs.getString(3), HashMap.class);
+            // This is needed because for immutable tables, CDC can't distinguish a null value from
+            //  that of a a missing cell.
+            if (!mutableTable && changeDesc != null) {
+                _purgeNulls(changeRow.getPreImage());
+                _purgeNulls(changeRow.getChangeImage());
+                _purgeNulls(changeRow.getPostImage());
+            }
+            assertEquals(changeDesc, changeRow.getChangeTimestamp(),
+                    rs.getDate(1).getTime());
+            for (Map.Entry<String, Object> pk: changeRow.getPrimaryKeys().entrySet()) {
+                assertEquals(changeDesc, pk.getValue(), rs.getObject(pk.getKey()));
+            }
+            assertEquals(changeDesc, expChange, cdcObj);
+            ++changenr;
+        }
+        assertFalse(rs.next());
+    }
+
+    private Object _ifIntConvertToLong(Object val) {
+        return (val instanceof Integer) ? new Long(((Integer) val).intValue()) : val;
+    }
+
+    private void _purgeNulls(Map image) {
+        if (image == null) {
+            return;
+        }
+        for (Iterator<Map.Entry> it = image.entrySet().iterator(); it.hasNext(); ) {
+            if (it.next().getValue() == null) {
+                it.remove();
+            }
+        }
+    }
+
+    protected List<ChangeRow> generateChangesImmutableTable(long startTS, String[] tenantids,
+                                                            String tableName, CommitAdapter committer)
+            throws Exception {
+        List<ChangeRow> changes = new ArrayList<>();
+        EnvironmentEdgeManager.injectEdge(injectEdge);
+        injectEdge.setValue(startTS);
+        committer.init();
+        Map<String, Object> pk1 = new HashMap() {{ put("K", 1); }};
+        Map<String, Object> pk2 = new HashMap() {{ put("K", 2); }};
+        Map<String, Object> pk3 = new HashMap() {{ put("K", 3); }};
+        Map<String, Object> c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12;
+        for (String tid: tenantids) {
+            try (Connection conn = newConnection(tid)) {
+                c1 = addChange(conn, new HashMap(), startTS,
+                        CDC_UPSERT_EVENT_TYPE, tableName, pk1, new TreeMap<String, Object>() {{
+                            put("V1", 100L);
+                            put("V2", 1000L);
+                        }});
+                committer.commit(conn);
+                changes.add(new ChangeRow(tid, startTS, pk1, c1));
+                c2 = addChange(conn, new HashMap(), startTS += 100,
+                        CDC_UPSERT_EVENT_TYPE, tableName, pk2, new TreeMap<String, Object>() {{
+                            put("V1", 200L);
+                        }});
+                committer.commit(conn);
+                changes.add(new ChangeRow(tid, startTS, pk2, c2));
+                c3 = addChange(conn, new HashMap(), startTS += 100,
+                        CDC_UPSERT_EVENT_TYPE,
+                        tableName, pk3, new TreeMap<String, Object>() {{
+                            put("V1", 300L);
+                            put("V2", null);
+                        }});
+                committer.commit(conn);
+                changes.add(new ChangeRow(tid, startTS, pk3, c3));
+                c4 = addChange(conn, (Map) c1.get(CDC_POST_IMAGE), startTS += 100,
+                        CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+                committer.commit(conn);
+                changes.add(new ChangeRow(tid, startTS, pk1, c4));
+                c5 = addChange(conn, new HashMap(),
+                        startTS += 100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+                        new TreeMap<String, Object>() {{
+                            put("V1", 102L);
+                            put("V2", 1002L);
+                        }});
+                committer.commit(conn);
+                changes.add(new ChangeRow(tid, startTS, pk1, c5));
+                c6 = addChange(conn, (Map) c5.get(CDC_POST_IMAGE), startTS += 100,
+                        CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+                committer.commit(conn);
+                changes.add(new ChangeRow(tid, startTS, pk1, c6));
+                c7 = addChange(conn, new HashMap(),
+                        startTS += 100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+                        new TreeMap<String, Object>() {{
+                            put("V1", 103L);
+                            put("V2", 1003L);
+                        }});
+                committer.commit(conn);
+                changes.add(new ChangeRow(tid, startTS, pk1, c7));
+                c8 = addChange(conn, (Map) c7.get(CDC_POST_IMAGE), startTS += 100,
+                        CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+                committer.commit(conn);
+                changes.add(new ChangeRow(tid, startTS, pk1, c8));
+                c9 = addChange(conn, new HashMap(),
+                        startTS += 100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+                        new TreeMap<String, Object>() {{
+                            put("V1", 104L);
+                            put("V2", 1004L);
+                        }});
+                committer.commit(conn);
+                changes.add(new ChangeRow(tid, startTS, pk1, c9));
+                c10 = addChange(conn, (Map) c9.get(CDC_POST_IMAGE), startTS += 100,
+                        CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+                committer.commit(conn);
+                changes.add(new ChangeRow(tid, startTS, pk1, c10));
+            }
+        }
+        committer.reset();
+        return changes;
+    }
+
+    protected class ChangeRow {
+        private final String tenantid;
+        private final long changeTS;
+        private final Map<String, Object> pks;
+
+        public String getTenantID() {
+            return tenantid;
+        }
+
+        public Map<String, Object> getPreImage() {
+            return (Map<String, Object>) change.get(CDC_PRE_IMAGE);
+        }
+
+        public Map<String, Object> getChangeImage() {
+            return (Map<String, Object>) change.get(CDC_CHANGE_IMAGE);
+        }
+
+        public Map<String, Object> getPostImage() {
+            return (Map<String, Object>) change.get(CDC_POST_IMAGE);
+        }
+
+        private final Map<String, Object> change;
+
+        ChangeRow(String tenantid, long changeTS, Map<String, Object> pks, Map<String, Object> change) {
+            this.tenantid = tenantid;
+            this.changeTS = changeTS;
+            this.pks = pks;
+            this.change = change;
+        }
+
+        public String toString() {
+            return gson.toJson(this);
+        }
+
+        public Map<String, Object> getPrimaryKeys() {
+            return pks;
+        }
+
+        public long getChangeTimestamp() {
+            return changeTS;
+        }
+    }
+
+    protected abstract class CommitAdapter {
+        abstract void commit(Connection conn) throws SQLException;
+
+        void init() {
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+        }
+
+        public void reset() {
+            EnvironmentEdgeManager.reset();
+        }
+
+        public Connection getConnection(String tid) throws SQLException {
+            return newConnection(tid);
+        }
+    }
+
+    protected final CommitAdapter COMMIT_SUCCESS = new CommitAdapter() {
+        @Override
+        public void commit(Connection conn) throws SQLException {
+            conn.commit();
+        }
+    };
+
+    protected final CommitAdapter COMMIT_FAILURE_EXPECTED = new CommitAdapter() {
+        @Override
+        public void commit(Connection conn) throws SQLException {
+            try {
+                conn.commit();
+                // It is config issue commit didn't fail.
+                fail("Commit expected to fail");
+            } catch (SQLException e) {
+                // this is expected
+            }
+        }
+
+        @Override
+        void init() {
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+        }
+
+        @Override
+        public void reset() {
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+        }
+    };
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
new file mode 100644
index 0000000000..62001ebaac
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+@Category(ParallelStatsDisabledTest.class)
+public class CDCDefinitionIT extends CDCBaseIT {
+    private final boolean forView;
+
+    public CDCDefinitionIT(boolean forView) {
+        this.forView = forView;
+    }
+
+    @Parameterized.Parameters(name = "forView={0}")
+    public static synchronized Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {
+                { false}, { true }
+        });
+    }
+
+    @Test
+    public void testCreate() throws Exception {
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        String datatableName = tableName;
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+                        + " v2 DATE)");
+        if (forView) {
+            String viewName = generateUniqueName();
+            conn.createStatement().execute(
+                    "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName);
+            tableName = viewName;
+        }
+        String cdcName = generateUniqueName();
+        String cdc_sql;
+
+        try {
+            conn.createStatement().execute("CREATE CDC " + cdcName
+                    + " ON NON_EXISTENT_TABLE");
+            fail("Expected to fail due to non-existent table");
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
+        }
+
+        cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+        createCDCAndWait(conn, tableName, cdcName, cdc_sql, null, null, null);
+        assertCDCState(conn, cdcName, null, 3);
+        assertNoResults(conn, cdcName);
+
+        try {
+            conn.createStatement().execute(cdc_sql);
+            fail("Expected to fail due to duplicate index");
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(), e.getErrorCode());
+            assertTrue(e.getMessage().endsWith(cdcName));
+        }
+
+        conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + cdcName + " ON " + tableName +
+                " INCLUDE (pre, post) INDEX_TYPE=g");
+
+        cdcName = generateUniqueName();
+        cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE (pre, post)";
+        createCDCAndWait(conn, tableName, cdcName, cdc_sql, PTable.IndexType.UNCOVERED_GLOBAL);
+        assertCDCState(conn, cdcName, "PRE,POST", 3);
+        assertPTable(cdcName, new HashSet<>(
+                Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), tableName,
+                datatableName);
+        assertNoResults(conn, cdcName);
+
+        cdcName = generateUniqueName();
+        cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INDEX_TYPE=l";
+        createCDCAndWait(conn, tableName, cdcName, cdc_sql, PTable.IndexType.LOCAL);
+        assertCDCState(conn, cdcName, null, 2);
+        assertPTable(cdcName, null, tableName, datatableName);
+        assertNoResults(conn, cdcName);
+
+        conn.close();
+    }
+
+    @Test
+    public void testCreateWithSalt() throws Exception {
+        // Indexes on views don't support salt buckets and is currently silently ignored.
+        if (forView) {
+            return;
+        }
+
+        // {data table bucket count, CDC bucket count}
+        Integer[][] saltingConfigs = new Integer[][] {
+                new Integer[]{null, 2},
+                new Integer[]{0, 2},
+                new Integer[]{4, null},
+                new Integer[]{4, 1},
+                new Integer[]{4, 0},
+                new Integer[]{4, 2}
+        };
+
+        for (Integer[] saltingConfig: saltingConfigs) {
+            try (Connection conn = newConnection()) {
+                String tableName = generateUniqueName();
+                createTable(conn, "CREATE TABLE  " + tableName +
+                                " ( k INTEGER PRIMARY KEY, v1 INTEGER, v2 DATE)",
+                                null, false, saltingConfig[0], false, null);
+                assertSaltBuckets(conn, tableName, saltingConfig[0]);
+
+                String cdcName = generateUniqueName();
+                String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+                createCDCAndWait(conn, tableName, cdcName, cdc_sql, null,
+                        saltingConfig[1], null);
+                try {
+                    assertCDCState(conn, cdcName, null, 3);
+                    // Index inherits table salt buckets.
+                    assertSaltBuckets(conn, cdcName, null);
+                    assertSaltBuckets(conn, CDCUtil.getCDCIndexName(cdcName),
+                            saltingConfig[1] != null ? saltingConfig[1] : saltingConfig[0]);
+                    assertNoResults(conn, cdcName);
+                } catch (Exception error) {
+                    throw new AssertionError("{tableSaltBuckets=" + saltingConfig[0] + ", " +
+                            "cdcSaltBuckets=" + saltingConfig[1] + "} " + error.getMessage(),
+                            error);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testCreateWithSchemaName() throws Exception {
+        Properties props = new Properties();
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String schemaName = generateUniqueName();
+        String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+        String datatableName = tableName;
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," +
+                        " v1 INTEGER, v2 DATE)");
+        if (forView) {
+            String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+            conn.createStatement().execute(
+                    "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName);
+            tableName = viewName;
+        }
+        String cdcName = generateUniqueName();
+        String cdc_sql;
+
+        try {
+            conn.createStatement().execute("CREATE CDC " + cdcName
+                    + " ON NON_EXISTENT_TABLE");
+            fail("Expected to fail due to non-existent table");
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
+        }
+
+        cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+        createCDCAndWait(conn, tableName, cdcName, cdc_sql);
+        assertCDCState(conn, cdcName, null, 3);
+        assertPTable(cdcName, null, tableName, datatableName);
+    }
+
+    @Test
+    public void testCreateCDCMultitenant() throws Exception {
+        Properties props = new Properties();
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        conn.createStatement().execute("CREATE TABLE  " + tableName +
+                " (tenantId INTEGER NOT NULL, k INTEGER NOT NULL," + " v1 INTEGER, v2 DATE, " +
+                "CONSTRAINT pk PRIMARY KEY (tenantId, k)) MULTI_TENANT=true");
+        String cdcName = generateUniqueName();
+        conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName);
+
+        PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName));
+        assertEquals(true, indexTable.isMultiTenant());
+        List<PColumn> idxPkColumns = indexTable.getPKColumns();
+        assertEquals(":TENANTID", idxPkColumns.get(0).getName().getString());
+        assertEquals(": PHOENIX_ROW_TIMESTAMP()", idxPkColumns.get(1).getName().getString());
+        assertEquals(":K", idxPkColumns.get(2).getName().getString());
+
+        PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName);
+        assertEquals(true, cdcTable.isMultiTenant());
+        List<PColumn> cdcPkColumns = cdcTable.getPKColumns();
+        assertEquals("TENANTID", cdcPkColumns.get(0).getName().getString());
+        assertEquals("K", cdcPkColumns.get(1).getName().getString());
+    }
+
+    @Test
+    public void testCreateWithNonDefaultColumnEncoding() throws Exception {
+        Properties props = new Properties();
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+                        + " v2 DATE)");
+        if (forView) {
+            String viewName = generateUniqueName();
+            conn.createStatement().execute(
+                    "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName);
+            tableName = viewName;
+        }
+        String cdcName = generateUniqueName();
+
+        conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
+                " COLUMN_ENCODED_BYTES=" +
+                String.valueOf(NON_ENCODED_QUALIFIERS.getSerializedMetadataValue()));
+        PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName));
+        assertEquals(indexTable.getEncodingScheme(), NON_ENCODED_QUALIFIERS);
+    }
+
+    public void testDropCDC () throws SQLException {
+        Properties props = new Properties();
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+                        + " v2 DATE)");
+        String cdcName = generateUniqueName();
+
+        String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+        conn.createStatement().execute(drop_cdc_sql);
+
+        try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
+                "system.catalog WHERE table_name = '" + cdcName +
+                "' AND column_name IS NULL and column_family IS NULL")) {
+            assertEquals(false, rs.next());
+        }
+        try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " +
+                "system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) +
+                "' AND column_name IS NULL and column_family IS NULL")) {
+            assertEquals(false, rs.next());
+        }
+
+        try {
+            conn.createStatement().execute(drop_cdc_sql);
+            fail("Expected to fail as cdc table doesn't exist");
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
+            assertTrue(e.getMessage().endsWith(cdcName));
+        }
+    }
+
+    @Test
+    public void testDropCDCIndex () throws SQLException {
+        Properties props = new Properties();
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+                        + " v2 DATE)");
+        String cdcName = generateUniqueName();
+        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+        conn.createStatement().execute(cdc_sql);
+        assertCDCState(conn, cdcName, null, 3);
+        String drop_cdc_index_sql = "DROP INDEX \"" + CDCUtil.getCDCIndexName(cdcName) + "\" ON " + tableName;
+        try {
+            conn.createStatement().execute(drop_cdc_index_sql);
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.CANNOT_DROP_CDC_INDEX.getErrorCode(), e.getErrorCode());
+            assertTrue(e.getMessage().endsWith(CDCUtil.getCDCIndexName(cdcName)));
+        }
+    }
+
+    @Test
+    public void testSelectCDCBadIncludeSpec() throws Exception {
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        conn.createStatement().execute("CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," +
+                " v1 INTEGER)");
+        if (forView) {
+            String viewName = generateUniqueName();
+            conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " +
+                    tableName);
+            tableName = viewName;
+        }
+        String cdcName = generateUniqueName();
+        String cdc_sql = "CREATE CDC  " + cdcName + " ON " + tableName;
+        createCDCAndWait(conn, tableName, cdcName, cdc_sql);
+        try {
+            conn.createStatement().executeQuery("SELECT " +
+                    "/*+ CDC_INCLUDE(DUMMY) */ * FROM " + cdcName);
+            fail("Expected to fail due to invalid CDC INCLUDE hint");
+        }
+        catch (SQLException e) {
+            assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(),
+                    e.getErrorCode());
+            assertTrue(e.getMessage().endsWith("DUMMY"));
+        }
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
deleted file mode 100644
index c8cff782d7..0000000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import com.google.gson.Gson;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableProperty;
-import org.apache.phoenix.util.CDCUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@RunWith(Parameterized.class)
-@Category(ParallelStatsDisabledTest.class)
-public class CDCMiscIT extends ParallelStatsDisabledIT {
-    private final boolean forView;
-
-    public CDCMiscIT(boolean forView) {
-        this.forView = forView;
-    }
-
-    @Parameterized.Parameters(name = "forVieiw={0}")
-    public static synchronized Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] {
-                { false}, { true }
-        });
-    }
-
-    private void assertCDCState(Connection conn, String cdcName, String expInclude,
-                                int idxType) throws SQLException {
-        try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
-                "system.catalog WHERE table_name = '" + cdcName +
-                "' AND column_name IS NULL and column_family IS NULL")) {
-            assertEquals(true, rs.next());
-            assertEquals(expInclude, rs.getString(1));
-        }
-        try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " +
-                "system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) +
-                "' AND column_name IS NULL and column_family IS NULL")) {
-                assertEquals(true, rs.next());
-            assertEquals(idxType, rs.getInt(1));
-        }
-    }
-
-    private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes,
-                              String datatableName)
-            throws SQLException {
-        Properties props = new Properties();
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        PTable table = PhoenixRuntime.getTable(conn, cdcName);
-        assertEquals(expIncludeScopes, table.getCDCIncludeScopes());
-        assertEquals(expIncludeScopes, TableProperty.INCLUDE.getPTableValue(table));
-        assertNull(table.getIndexState()); // Index state should be null for CDC.
-        assertNull(table.getIndexType()); // This is not an index.
-        assertEquals(datatableName, table.getParentName().getString());
-        assertEquals(CDCUtil.getCDCIndexName(cdcName), table.getPhysicalName().getString());
-    }
-
-    private void assertSaltBuckets(String cdcName, Integer nbuckets) throws SQLException {
-        Properties props = new Properties();
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName);
-        assertEquals(nbuckets, cdcTable.getBucketNum());
-        PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName));
-        assertEquals(nbuckets, indexTable.getBucketNum());
-    }
-
-    private void createAndWait(Connection conn, String tableName, String cdcName, String cdc_sql)
-            throws Exception {
-        conn.createStatement().execute(cdc_sql);
-        IndexToolIT.runIndexTool(false, null, tableName,
-                "\""+CDCUtil.getCDCIndexName(cdcName)+"\"");
-        TestUtil.waitForIndexState(conn, CDCUtil.getCDCIndexName(cdcName), PIndexState.ACTIVE);
-    }
-
-    @Test
-    public void testCreate() throws Exception {
-        Properties props = new Properties();
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        String tableName = generateUniqueName();
-        conn.createStatement().execute(
-                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
-                        + " v2 DATE)");
-        if (forView) {
-            String viewName = generateUniqueName();
-            conn.createStatement().execute(
-                    "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName);
-            tableName = viewName;
-        }
-        String cdcName = generateUniqueName();
-
-        try {
-            conn.createStatement().execute("CREATE CDC " + cdcName
-                    + " ON NON_EXISTENT_TABLE");
-            fail("Expected to fail due to non-existent table");
-        } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
-        }
-
-        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-        createAndWait(conn, tableName, cdcName, cdc_sql);
-        assertCDCState(conn, cdcName, null, 3);
-
-        try {
-            conn.createStatement().execute(cdc_sql);
-            fail("Expected to fail due to duplicate index");
-        } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(), e.getErrorCode());
-            assertTrue(e.getMessage().endsWith(cdcName));
-        }
-
-        conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + cdcName + " ON " + tableName +
-                " INCLUDE (pre, post) INDEX_TYPE=g");
-
-        cdcName = generateUniqueName();
-        cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName +
-                " INCLUDE (pre, post) INDEX_TYPE=g";
-        createAndWait(conn, tableName, cdcName, cdc_sql);
-        assertCDCState(conn, cdcName, "PRE,POST", 3);
-        assertPTable(cdcName, new HashSet<>(
-                Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), tableName);
-
-        cdcName = generateUniqueName();
-        cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INDEX_TYPE=l";
-        createAndWait(conn, tableName, cdcName, cdc_sql);
-        assertCDCState(conn, cdcName, null, 2);
-        assertPTable(cdcName, null, tableName);
-
-        // Indexes on views don't support salt buckets and is currently silently ignored.
-        if (! forView) {
-            cdcName = generateUniqueName();
-            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " SALT_BUCKETS = 4";
-            createAndWait(conn, tableName, cdcName, cdc_sql);
-            assertSaltBuckets(cdcName, 4);
-        }
-
-        conn.close();
-    }
-
-    @Test
-    public void testCreateCDCMultitenant() throws Exception {
-        Properties props = new Properties();
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        String tableName = generateUniqueName();
-        conn.createStatement().execute("CREATE TABLE  " + tableName +
-                " (tenantId INTEGER NOT NULL, k INTEGER NOT NULL," + " v1 INTEGER, v2 DATE, " +
-                "CONSTRAINT pk PRIMARY KEY (tenantId, k)) MULTI_TENANT=true");
-        String cdcName = generateUniqueName();
-        conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName);
-
-        PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName));
-        List<PColumn> idxPkColumns = indexTable.getPKColumns();
-        assertEquals(":TENANTID", idxPkColumns.get(0).getName().getString());
-        assertEquals(": PHOENIX_ROW_TIMESTAMP()", idxPkColumns.get(1).getName().getString());
-        assertEquals(":K", idxPkColumns.get(2).getName().getString());
-
-        PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName);
-        List<PColumn> cdcPkColumns = cdcTable.getPKColumns();
-        assertEquals("PHOENIX_ROW_TIMESTAMP()", cdcPkColumns.get(0).getName().getString());
-        assertEquals("TENANTID", cdcPkColumns.get(1).getName().getString());
-        assertEquals("K", cdcPkColumns.get(2).getName().getString());
-    }
-
-    public void testDropCDC () throws SQLException {
-        Properties props = new Properties();
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        String tableName = generateUniqueName();
-        conn.createStatement().execute(
-                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
-                        + " v2 DATE)");
-        String cdcName = generateUniqueName();
-
-        String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
-        conn.createStatement().execute(drop_cdc_sql);
-
-        try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
-                "system.catalog WHERE table_name = '" + cdcName +
-                "' AND column_name IS NULL and column_family IS NULL")) {
-            assertEquals(false, rs.next());
-        }
-        try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " +
-                "system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) +
-                "' AND column_name IS NULL and column_family IS NULL")) {
-            assertEquals(false, rs.next());
-        }
-
-        try {
-            conn.createStatement().execute(drop_cdc_sql);
-            fail("Expected to fail as cdc table doesn't exist");
-        } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
-            assertTrue(e.getMessage().endsWith(cdcName));
-        }
-    }
-
-    @Test
-    public void testDropCDCIndex () throws SQLException {
-        Properties props = new Properties();
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        String tableName = generateUniqueName();
-        conn.createStatement().execute(
-                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
-                        + " v2 DATE)");
-        String cdcName = generateUniqueName();
-        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
-        conn.createStatement().execute(cdc_sql);
-        assertCDCState(conn, cdcName, null, 3);
-        String drop_cdc_index_sql = "DROP INDEX \"" + CDCUtil.getCDCIndexName(cdcName) + "\" ON " + tableName;
-        try {
-            conn.createStatement().execute(drop_cdc_index_sql);
-        } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.CANNOT_DROP_CDC_INDEX.getErrorCode(), e.getErrorCode());
-            assertTrue(e.getMessage().endsWith(CDCUtil.getCDCIndexName(cdcName)));
-        }
-    }
-
-    private void assertResultSet(ResultSet rs) throws Exception{
-        Gson gson = new Gson();
-        assertEquals(true, rs.next());
-        assertEquals(1, rs.getInt(2));
-        assertEquals(new HashMap(){{put("V1", 100d);}}, gson.fromJson(rs.getString(3),
-                HashMap.class));
-        assertEquals(true, rs.next());
-        assertEquals(2, rs.getInt(2));
-        assertEquals(new HashMap(){{put("V1", 200d);}}, gson.fromJson(rs.getString(3),
-                HashMap.class));
-        assertEquals(true, rs.next());
-        assertEquals(1, rs.getInt(2));
-        assertEquals(new HashMap(){{put("V1", 101d);}}, gson.fromJson(rs.getString(3),
-                HashMap.class));
-        assertEquals(false, rs.next());
-        rs.close();
-    }
-
-    private Connection newConnection() throws SQLException {
-        Properties props = new Properties();
-        // Use these only for debugging.
-        //props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
-        //props.put("hbase.client.scanner.timeout.period", "6000000");
-        //props.put("phoenix.query.timeoutMs", "6000000");
-        //props.put("zookeeper.session.timeout", "6000000");
-        //props.put("hbase.rpc.timeout", "6000000");
-        return DriverManager.getConnection(getUrl(), props);
-    }
-
-    @Test
-    public void testSelectCDC() throws Exception {
-        Connection conn = newConnection();
-        String tableName = generateUniqueName();
-        conn.createStatement().execute(
-                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
-        conn.commit();
-        Thread.sleep(10);
-        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
-        conn.commit();
-        String cdcName = generateUniqueName();
-        String cdc_sql = "CREATE CDC " + cdcName
-                + " ON " + tableName;
-        createAndWait(conn, tableName, cdcName, cdc_sql);
-        assertCDCState(conn, cdcName, null, 3);
-        // NOTE: To debug the query execution, add the below condition where you need a breakpoint.
-        //      if (<table>.getTableName().getString().equals("N000002") ||
-        //                 <table>.getTableName().getString().equals("__CDC__N000002")) {
-        //          "".isEmpty();
-        //      }
-        assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName));
-        assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName +
-                " WHERE PHOENIX_ROW_TIMESTAMP() < NOW()"));
-        assertResultSet(conn.createStatement().executeQuery("SELECT " +
-                "/*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName));
-        assertResultSet(conn.createStatement().executeQuery("SELECT " +
-                "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcName));
-
-        HashMap<String, int[]> testQueries = new HashMap<String, int[]>() {{
-            put("SELECT 'dummy', k FROM " + cdcName, new int [] {2, 1});
-            put("SELECT * FROM " + cdcName +
-                    " ORDER BY k ASC", new int [] {1, 1, 2});
-            put("SELECT * FROM " + cdcName +
-                    " ORDER BY k DESC", new int [] {2, 1, 1});
-            put("SELECT * FROM " + cdcName +
-                    " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC", new int [] {1, 2, 1});
-        }};
-        for (Map.Entry<String, int[]> testQuery: testQueries.entrySet()) {
-            try (ResultSet rs = conn.createStatement().executeQuery(testQuery.getKey())) {
-                for (int k:  testQuery.getValue()) {
-                    assertEquals(true, rs.next());
-                    assertEquals(k, rs.getInt(2));
-                }
-                assertEquals(false, rs.next());
-            }
-        }
-
-        try (ResultSet rs = conn.createStatement().executeQuery(
-                "SELECT * FROM " + cdcName + " WHERE PHOENIX_ROW_TIMESTAMP() > NOW()")) {
-            assertEquals(false, rs.next());
-        }
-        try (ResultSet rs = conn.createStatement().executeQuery("SELECT 'abc' FROM " + cdcName)) {
-            assertEquals(true, rs.next());
-            assertEquals("abc", rs.getString(1));
-            assertEquals(true, rs.next());
-            assertEquals("abc", rs.getString(1));
-            assertEquals(false, rs.next());
-        }
-    }
-
-    @Test
-    public void testSelectCDCBadIncludeSpec() throws Exception {
-        Connection conn = newConnection();
-        String tableName = generateUniqueName();
-        conn.createStatement().execute(
-                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
-        String cdcName = generateUniqueName();
-        String cdc_sql = "CREATE CDC  " + cdcName
-                + " ON " + tableName;
-        conn.createStatement().execute(cdc_sql);
-        try {
-            conn.createStatement().executeQuery("SELECT " +
-                    "/*+ CDC_INCLUDE(DUMMY) */ * FROM " + cdcName);
-            fail("Expected to fail due to invalid CDC INCLUDE hint");
-        }
-        catch (SQLException e) {
-            assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(),
-                    e.getErrorCode());
-            assertTrue(e.getMessage().endsWith("DUMMY"));
-        }
-    }
-
-    @Test
-    public void testSelectTimeRangeQueries() throws Exception {
-        Connection conn = newConnection();
-        String tableName = generateUniqueName();
-        conn.createStatement().execute(
-                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
-        String cdcName = generateUniqueName();
-        String cdc_sql = "CREATE CDC " + cdcName
-                + " ON " + tableName;
-        conn.createStatement().execute(cdc_sql);
-        Timestamp ts1 = new Timestamp(System.currentTimeMillis());
-        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
-        conn.commit();
-        Thread.sleep(10);
-        Timestamp ts2 = new Timestamp(System.currentTimeMillis());
-        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (3, 300)");
-        conn.commit();
-        Thread.sleep(10);
-        Timestamp ts3 = new Timestamp(System.currentTimeMillis());
-        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
-        conn.createStatement().execute("DELETE FROM " + tableName + " WHERE k = 2");
-        Timestamp ts4 = new Timestamp(System.currentTimeMillis());
-
-        String sel_sql = "SELECT * FROM " + cdcName + " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND " +
-                "PHOENIX_ROW_TIMESTAMP() <= ?";
-        Object[] testDataSets = new Object[] {
-                new Object[] {ts1, ts2, new int[] {1, 2}}/*,
-                new Object[] {ts2, ts3, new int[] {1, 3}},
-                new Object[] {ts3, ts4, new int[] {1}}*/
-        };
-        PreparedStatement stmt = conn.prepareStatement(sel_sql);
-        for (int i = 0; i < testDataSets.length; ++i) {
-            Object[] testData = (Object[]) testDataSets[i];
-            stmt.setTimestamp(1, (Timestamp) testData[0]);
-            stmt.setTimestamp(2, (Timestamp) testData[1]);
-            try (ResultSet rs = stmt.executeQuery()) {
-                for (int k:  (int[]) testData[2]) {
-                    assertEquals(true, rs.next());
-                    assertEquals(k, rs.getInt(2));
-                }
-                assertEquals(false, rs.next());
-            }
-        }
-    }
-
-    // Temporary test case used as a reference for debugging and comparing against the CDC query.
-    @Test
-    public void testSelectUncoveredIndex() throws Exception {
-        Connection conn = newConnection();
-        String tableName = generateUniqueName();
-        conn.createStatement().execute(
-                "CREATE TABLE  " + tableName + " (k INTEGER PRIMARY KEY, v1 INTEGER)");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" +
-                " (1, 100)");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" +
-                " (2, 200)");
-        conn.commit();
-        String indexName = generateUniqueName();
-        String index_sql = "CREATE UNCOVERED INDEX " + indexName
-                + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
-        conn.createStatement().execute(index_sql);
-        //ResultSet rs =
-        //        conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName +
-        //                " " + indexName + ") */ * FROM " + tableName);
-        ResultSet rs =
-                conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName +
-                        " " + indexName + ") */ K, V1, PHOENIX_ROW_TIMESTAMP() FROM " + tableName);
-        assertEquals(true, rs.next());
-        assertEquals(1, rs.getInt(1));
-        assertEquals(100, rs.getInt(2));
-        assertEquals(true, rs.next());
-        assertEquals(2, rs.getInt(1));
-        assertEquals(200, rs.getInt(2));
-        assertEquals(false, rs.next());
-    }
-}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
new file mode 100644
index 0000000000..09f3af45a4
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.phoenix.query.QueryConstants.CDC_CHANGE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+// NOTE: To debug the query execution, add the below condition or the equivalent where you need a
+// breakpoint.
+//      if (<table>.getTableName().getString().equals("N000002") ||
+//                 <table>.getTableName().getString().equals("__CDC__N000002")) {
+//          "".isEmpty();
+//      }
+@RunWith(Parameterized.class)
+@Category(ParallelStatsDisabledTest.class)
+public class CDCQueryIT extends CDCBaseIT {
+    // Offset of the first column, depending on whether PHOENIX_ROW_TIMESTAMP() is in the schema
+    // or not.
+    private final boolean forView;
+    private final boolean dataBeforeCDC;
+    private final PTable.QualifierEncodingScheme encodingScheme;
+    private final boolean multitenant;
+    private final Integer indexSaltBuckets;
+    private final Integer tableSaltBuckets;
+    private final boolean withSchemaName;
+
+    public CDCQueryIT(Boolean forView, Boolean dataBeforeCDC,
+                      PTable.QualifierEncodingScheme encodingScheme, boolean multitenant,
+                      Integer indexSaltBuckets, Integer tableSaltBuckets, boolean withSchemaName) {
+        this.forView = forView;
+        this.dataBeforeCDC = dataBeforeCDC;
+        this.encodingScheme = encodingScheme;
+        this.multitenant = multitenant;
+        this.indexSaltBuckets = indexSaltBuckets;
+        this.tableSaltBuckets = tableSaltBuckets;
+        this.withSchemaName = withSchemaName;
+    }
+
+    @Parameterized.Parameters(name = "forView={0} dataBeforeCDC={1}, encodingScheme={2}, " +
+            "multitenant={3}, indexSaltBuckets={4}, tableSaltBuckets={5} withSchemaName=${6}")
+    public static synchronized Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                { Boolean.FALSE, Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, null,
+                        Boolean.FALSE },
+                { Boolean.FALSE, Boolean.TRUE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, null,
+                        Boolean.TRUE },
+                { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 1, 1,
+                        Boolean.FALSE },
+                // Once PHOENIX-7239, change this to have different salt buckets for data and index.
+                { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 1,
+                        Boolean.TRUE },
+                { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4, null,
+                        Boolean.FALSE },
+                { Boolean.TRUE, Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, null,
+                        Boolean.FALSE },
+        });
+    }
+
+    @Before
+    public void beforeTest(){
+        EnvironmentEdgeManager.reset();
+        injectEdge = new ManualEnvironmentEdge();
+        injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+    }
+
+    @Test
+    public void testSelectCDC() throws Exception {
+        String cdcName, cdc_sql;
+        String schemaName = withSchemaName ? generateUniqueName() : null;
+        String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+        String datatableName = tableName;
+        try (Connection conn = newConnection()) {
+            createTable(conn, "CREATE TABLE  " + tableName + " ("
+                    + (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "")
+                    + "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, B.vb INTEGER, "
+                    + "CONSTRAINT PK PRIMARY KEY " + (multitenant ? "(TENANT_ID, k) " : "(k)")
+                    + ")", encodingScheme, multitenant, tableSaltBuckets, false, null);
+            if (forView) {
+                String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+                createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName,
+                        encodingScheme);
+                tableName = viewName;
+            }
+            cdcName = generateUniqueName();
+            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+            if (!dataBeforeCDC) {
+                createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+                        indexSaltBuckets, null);
+            }
+        }
+
+        String tenantId = multitenant ? "1000" : null;
+        String[] tenantids = {tenantId};
+        if (multitenant) {
+            tenantids = new String[] {tenantId, "2000"};
+        }
+
+        long startTS = System.currentTimeMillis();
+        List<ChangeRow> changes = generateChanges(startTS, tenantids, tableName, null,
+                COMMIT_SUCCESS);
+
+        if (dataBeforeCDC) {
+            try (Connection conn = newConnection()) {
+                createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+                        indexSaltBuckets, null);
+            }
+            // Testing with flushed data adds more coverage.
+            getUtility().getAdmin().flush(TableName.valueOf(datatableName));
+            getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
+                    CDCUtil.getCDCIndexName(cdcName))));
+        }
+
+        //SingleCellIndexIT.dumpTable(tableName);
+        //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
+
+        String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
+        try (Connection conn = newConnection(tenantId)) {
+
+            // Existence of CDC shouldn't cause the regular query path to fail.
+            String uncovered_sql = "SELECT " + " /*+ INDEX(" + tableName + " " +
+                    CDCUtil.getCDCIndexName(cdcName) + ") */ k, v1 FROM " + tableName;
+            try (ResultSet rs = conn.createStatement().executeQuery(uncovered_sql)) {
+                assertTrue(rs.next());
+                assertEquals(3, rs.getInt(1));
+                assertEquals(300, rs.getInt(2));
+                assertTrue(rs.next());
+                assertEquals(2, rs.getInt(1));
+                assertEquals(201, rs.getInt(2));
+                assertFalse(rs.next());
+            }
+
+            verifyChanges(tenantId, conn.createStatement().executeQuery(
+                    "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName), changes,
+                    CHANGE_IMG, true);
+            verifyChanges(tenantId, conn.createStatement().executeQuery(
+                            "SELECT /*+ CDC_INCLUDE(CHANGE) */ PHOENIX_ROW_TIMESTAMP(), K," +
+                                    "\"CDC JSON\" FROM " + cdcFullName), changes,
+                    CHANGE_IMG, true);
+            verifyChanges(tenantId, conn.createStatement().executeQuery(
+                    "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName),
+                    changes, PRE_POST_IMG, true);
+            verifyChanges(tenantId, conn.createStatement().executeQuery("SELECT * FROM " + cdcFullName),
+                    changes, new HashSet<>(), true);
+
+            HashMap<String, int[]> testQueries = new HashMap<String, int[]>() {{
+                put("SELECT 'dummy', k, \"CDC JSON\" FROM " + cdcFullName,
+                        new int[]{1, 2, 3, 1, 1, 1, 1, 2, 1, 1, 1, 1});
+                put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + cdcFullName +
+                        " ORDER BY k ASC", new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 3});
+                put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + cdcFullName +
+                        " ORDER BY k DESC", new int[]{3, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1});
+                put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + cdcFullName +
+                        " ORDER BY PHOENIX_ROW_TIMESTAMP() DESC",
+                        new int[]{1, 1, 1, 1, 2, 1, 1, 1, 1, 3, 2, 1});
+            }};
+            Map dummyChange = new HashMap() {{
+                put(CDC_EVENT_TYPE, "dummy");
+            }};
+            for (Map.Entry<String, int[]> testQuery : testQueries.entrySet()) {
+                try (ResultSet rs = conn.createStatement().executeQuery(testQuery.getKey())) {
+                    for (int i = 0; i < testQuery.getValue().length; ++i) {
+                        int k = testQuery.getValue()[i];
+                        assertEquals(true, rs.next());
+                        assertEquals("Index: " + i + " for query: " + testQuery.getKey(),
+                                k, rs.getInt(2));
+                        Map change = gson.fromJson(rs.getString(3), HashMap.class);
+                        change.put(CDC_EVENT_TYPE, "dummy");
+                        // Verify that we are getting nothing but the event type as we specified
+                        // no change scopes.
+                        assertEquals(dummyChange, change);
+                    }
+                    assertEquals(false, rs.next());
+                }
+            }
+        }
+    }
+
+    private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStorageScheme)
+            throws Exception {
+        String cdcName, cdc_sql;
+        String schemaName = withSchemaName ? generateUniqueName() : null;
+        String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+        String datatableName = tableName;
+        try (Connection conn = newConnection()) {
+           createTable(conn, "CREATE TABLE  " + tableName + " (" +
+                            (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") +
+                            "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, CONSTRAINT PK PRIMARY KEY " +
+                            (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", encodingScheme, multitenant,
+                    tableSaltBuckets, true, immutableStorageScheme);
+            if (forView) {
+                String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+                createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName,
+                        encodingScheme);
+                tableName = viewName;
+            }
+            cdcName = generateUniqueName();
+            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+            if (!dataBeforeCDC) {
+                createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+                        indexSaltBuckets, null);
+            }
+        }
+
+        String tenantId = multitenant ? "1000" : null;
+        String[] tenantids = {tenantId};
+        if (multitenant) {
+            tenantids = new String[] {tenantId, "2000"};
+        }
+
+        long startTS = System.currentTimeMillis();
+        List<ChangeRow> changes = generateChangesImmutableTable(startTS, tenantids, tableName,
+                COMMIT_SUCCESS);
+
+        if (dataBeforeCDC) {
+            try (Connection conn = newConnection()) {
+                createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+                        indexSaltBuckets, null);
+            }
+            // Testing with flushed data adds more coverage.
+            getUtility().getAdmin().flush(TableName.valueOf(datatableName));
+            getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
+                    CDCUtil.getCDCIndexName(cdcName))));
+        }
+
+        String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
+        try (Connection conn = newConnection(tenantId)) {
+            // For debug: uncomment to see the exact results logged to console.
+            //try (Statement stmt = conn.createStatement()) {
+            //    try (ResultSet rs = stmt.executeQuery(
+            //            "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," +
+            //                    "\"CDC JSON\" FROM " + cdcFullName)) {
+            //        while (rs.next()) {
+            //            System.out.println("----- " + rs.getString(1) + " " +
+            //                    rs.getInt(2) + " " + rs.getString(3));
+            //        }
+            //    }
+            //}
+            verifyChanges(tenantId, conn.createStatement().executeQuery(
+                            "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName),
+                            changes, PRE_POST_IMG, false);
+            verifyChanges(tenantId, conn.createStatement().executeQuery(
+                            "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName), changes,
+                    CHANGE_IMG, false);
+            verifyChanges(tenantId, conn.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(CHANGE) */ " +
+                    "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcFullName),
+                    changes, CHANGE_IMG, false);
+        }
+    }
+
+    @Test
+    public void testSelectCDCImmutableOneCellPerColumn() throws Exception {
+        _testSelectCDCImmutable(PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
+    }
+
+    @Test
+    public void testSelectCDCImmutableSingleCell() throws Exception {
+        _testSelectCDCImmutable(PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS);
+    }
+
+    @Test
+    public void testSelectTimeRangeQueries() throws Exception {
+        String cdcName, cdc_sql;
+        String schemaName = withSchemaName ? generateUniqueName() : null;
+        String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+        try (Connection conn = newConnection()) {
+            createTable(conn, "CREATE TABLE  " + tableName + " (" +
+                    (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") +
+                    "k INTEGER NOT NULL, v1 INTEGER, CONSTRAINT PK PRIMARY KEY " +
+                    (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", encodingScheme, multitenant,
+                    tableSaltBuckets, false, null);
+            if (forView) {
+                String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+                createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName,
+                        encodingScheme);
+                tableName = viewName;
+            }
+            cdcName = generateUniqueName();
+            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+            if (!dataBeforeCDC) {
+                createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+                        indexSaltBuckets, null);
+            }
+        }
+
+        EnvironmentEdgeManager.injectEdge(injectEdge);
+
+        String tenantId = multitenant ? "1000" : null;
+        String[] tenantids = {tenantId};
+        if (multitenant) {
+            tenantids = new String[] {tenantId, "2000"};
+        }
+
+        Timestamp ts1 = new Timestamp(System.currentTimeMillis());
+        cal.setTimeInMillis(ts1.getTime());
+        injectEdge.setValue(ts1.getTime());
+
+        for (String tid: tenantids) {
+            try (Connection conn = newConnection(tid)) {
+                conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
+                conn.commit();
+            }
+        }
+
+        injectEdge.incrementValue(100);
+
+        for (String tid: tenantids) {
+            try (Connection conn = newConnection(tid)) {
+                conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
+                conn.commit();
+            }
+        }
+
+        injectEdge.incrementValue(100);
+        cal.add(Calendar.MILLISECOND, 200);
+        Timestamp ts2 = new Timestamp(cal.getTime().getTime());
+        injectEdge.incrementValue(100);
+
+        for (String tid: tenantids) {
+            try (Connection conn = newConnection(tid)) {
+                conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
+                conn.commit();
+                injectEdge.incrementValue(100);
+                conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (3, 300)");
+                conn.commit();
+            }
+        }
+
+        injectEdge.incrementValue(100);
+        cal.add(Calendar.MILLISECOND, 200 + 100 * tenantids.length);
+        Timestamp ts3 = new Timestamp(cal.getTime().getTime());
+        injectEdge.incrementValue(100);
+
+        for (String tid: tenantids) {
+            try (Connection conn = newConnection(tid)) {
+                conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
+                conn.commit();
+                injectEdge.incrementValue(100);
+                conn.createStatement().execute("DELETE FROM " + tableName + " WHERE k = 2");
+                conn.commit();
+            }
+        }
+
+        injectEdge.incrementValue(100);
+        cal.add(Calendar.MILLISECOND, 200 + 100 * tenantids.length);
+        Timestamp ts4 = new Timestamp(cal.getTime().getTime());
+        EnvironmentEdgeManager.reset();
+
+        if (dataBeforeCDC) {
+            try (Connection conn = newConnection()) {
+                createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+                        indexSaltBuckets, null);
+            }
+        }
+
+        //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
+
+        String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
+        try (Connection conn = newConnection(tenantId)) {
+            String sel_sql =
+                    "SELECT to_char(phoenix_row_timestamp()), k, \"CDC JSON\" FROM " + cdcFullName +
+                            " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() <= ?";
+            Object[] testDataSets = new Object[] {
+                    new Object[] {ts1, ts2, new int[] {1, 2}},
+                    new Object[] {ts2, ts3, new int[] {1, 3}},
+                    new Object[] {ts3, ts4, new int[] {1, 2}},
+                    new Object[] {ts1, ts4, new int[] {1, 2, 1, 3, 1, 2}},
+            };
+            PreparedStatement stmt = conn.prepareStatement(sel_sql);
+            // For debug: uncomment to see the exact results logged to console.
+            //System.out.println("----- ts1: " + ts1 + " ts2: " + ts2 + " ts3: " + ts3 + " ts4: " +
+            //        ts4);
+            //for (int i = 0; i < testDataSets.length; ++i) {
+            //    Object[] testData = (Object[]) testDataSets[i];
+            //    stmt.setTimestamp(1, (Timestamp) testData[0]);
+            //    stmt.setTimestamp(2, (Timestamp) testData[1]);
+            //    try (ResultSet rs = stmt.executeQuery()) {
+            //        System.out.println("----- Test data set: " + i);
+            //        while (rs.next()) {
+            //            System.out.println("----- " + rs.getString(1) + " " +
+            //                    rs.getInt(2) + " "  + rs.getString(3));
+            //        }
+            //    }
+            //}
+            for (int i = 0; i < testDataSets.length; ++i) {
+                Object[] testData = (Object[]) testDataSets[i];
+                stmt.setTimestamp(1, (Timestamp) testData[0]);
+                stmt.setTimestamp(2, (Timestamp) testData[1]);
+                try (ResultSet rs = stmt.executeQuery()) {
+                    for (int j = 0; j < ((int[]) testData[2]).length; ++j) {
+                        int k = ((int[]) testData[2])[j];
+                        assertEquals(" Index: " + j + " Test data set: " + i,
+                                true, rs.next());
+                        assertEquals(" Index: " + j + " Test data set: " + i,
+                                k, rs.getInt(2));
+                    }
+                    assertEquals("Test data set: " + i, false, rs.next());
+                }
+            }
+
+            PreparedStatement pstmt = conn.prepareStatement(
+                    "SELECT * FROM " + cdcFullName + " WHERE PHOENIX_ROW_TIMESTAMP() > ?");
+            pstmt.setTimestamp(1, ts4);
+            try (ResultSet rs = pstmt.executeQuery()) {
+                assertEquals(false, rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testSelectCDCWithDDL() throws Exception {
+        String schemaName = withSchemaName ? generateUniqueName() : null;
+        String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+        String datatableName = tableName;
+        String cdcName, cdc_sql;
+        try (Connection conn = newConnection()) {
+            createTable(conn, "CREATE TABLE  " + tableName + " (" +
+                    (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") +
+                    "k INTEGER NOT NULL, v0 INTEGER, v1 INTEGER, v1v2 INTEGER, v2 INTEGER, B.vb INTEGER, " +
+                    "v3 INTEGER, CONSTRAINT PK PRIMARY KEY " +
+                    (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", encodingScheme, multitenant,
+                    tableSaltBuckets, false, null);
+            if (forView) {
+                String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+                createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName,
+                        encodingScheme);
+                tableName = viewName;
+            }
+
+            cdcName = generateUniqueName();
+            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+            if (!dataBeforeCDC) {
+                createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+                        indexSaltBuckets, null);
+            }
+            conn.createStatement().execute("ALTER TABLE " + datatableName + " DROP COLUMN v0");
+        }
+
+        String tenantId = multitenant ? "1000" : null;
+        String[] tenantids = {tenantId};
+        if (multitenant) {
+            tenantids = new String[] {tenantId, "2000"};
+        }
+
+        long startTS = System.currentTimeMillis();
+        List<ChangeRow> changes = generateChanges(startTS, tenantids, tableName, datatableName,
+                COMMIT_SUCCESS);
+
+        if (dataBeforeCDC) {
+            try (Connection conn = newConnection()) {
+                createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+                        indexSaltBuckets, null);
+            }
+            // Testing with flushed data adds more coverage.
+            getUtility().getAdmin().flush(TableName.valueOf(datatableName));
+            getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
+                    CDCUtil.getCDCIndexName(cdcName))));
+        }
+
+        try (Connection conn = newConnection(tenantId)) {
+            verifyChanges(tenantId, conn.createStatement().executeQuery(
+                    "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + SchemaUtil.getTableName(
+                            schemaName, cdcName)),
+                    changes, CHANGE_IMG, true);
+        }
+    }
+
+    private void assertCDCBinaryAndDateColumn(ResultSet rs,
+                                              List<byte []> byteColumnValues,
+                                              List<Date> dateColumnValues,
+                                              Timestamp timestamp) throws Exception {
+        assertEquals(true, rs.next());
+        assertEquals(1, rs.getInt(2));
+
+        Map<String, Object> row1 = new HashMap<String, Object>(){{
+            put(CDC_EVENT_TYPE, CDC_UPSERT_EVENT_TYPE);
+        }};
+        Map<String, Object> postImage = new HashMap<>();
+        postImage.put("A_BINARY",
+                Base64.getEncoder().encodeToString(byteColumnValues.get(0)));
+        postImage.put("D", dateColumnValues.get(0).toString());
+        postImage.put("T", timestamp.toString());
+        row1.put(CDC_POST_IMAGE, postImage);
+        Map<String, Object> changeImage = new HashMap<>();
+        changeImage.put("A_BINARY",
+                Base64.getEncoder().encodeToString(byteColumnValues.get(0)));
+        changeImage.put("D", dateColumnValues.get(0).toString());
+        changeImage.put("T", timestamp.toString());
+        row1.put(CDC_CHANGE_IMAGE, changeImage);
+        row1.put(CDC_PRE_IMAGE, new HashMap<String, String>() {{
+        }});
+        assertEquals(row1, gson.fromJson(rs.getString(3),
+                HashMap.class));
+
+        assertEquals(true, rs.next());
+        assertEquals(2, rs.getInt(2));
+        HashMap<String, Object> row2Json = gson.fromJson(rs.getString(3),
+                HashMap.class);
+        String row2BinaryColStr = (String) ((Map)((Map)row2Json.get(CDC_CHANGE_IMAGE))).get("A_BINARY");
+        byte[] row2BinaryCol = Base64.getDecoder().decode(row2BinaryColStr);
+
+        assertEquals(0, DescVarLengthFastByteComparisons.compareTo(byteColumnValues.get(1),
+                0, byteColumnValues.get(1).length, row2BinaryCol, 0, row2BinaryCol.length));
+    }
+
+    @Test
+    public void testCDCBinaryAndDateColumn() throws Exception {
+        List<byte []> byteColumnValues = new ArrayList<>();
+        byteColumnValues.add( new byte[] {0,0,0,0,0,0,0,0,0,1});
+        byteColumnValues.add(new byte[] {0,0,0,0,0,0,0,0,0,2});
+        List<Date> dateColumnValues = new ArrayList<>();
+        dateColumnValues.add(Date.valueOf("2024-02-01"));
+        dateColumnValues.add(Date.valueOf("2024-01-31"));
+        Timestamp timestampColumnValue = Timestamp.valueOf("2024-01-31 12:12:14");
+        String cdcName, cdc_sql;
+        String schemaName = withSchemaName ? generateUniqueName() : null;
+        String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+        try (Connection conn = newConnection()) {
+            createTable(conn, "CREATE TABLE  " + tableName + " (" +
+                    (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") +
+                    "k INTEGER NOT NULL, a_binary binary(10), d Date, t TIMESTAMP, " +
+                    "CONSTRAINT PK PRIMARY KEY " +
+                    (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", encodingScheme, multitenant,
+                    tableSaltBuckets, false, null);
+            if (forView) {
+                String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+                createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName,
+                        encodingScheme);
+                tableName = viewName;
+            }
+            cdcName = generateUniqueName();
+            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+            if (!dataBeforeCDC) {
+                createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+                        indexSaltBuckets, null);
+            }
+        }
+
+        String tenantId = multitenant ? "1000" : null;
+        try (Connection conn = newConnection(tenantId)) {
+            String upsertQuery = "UPSERT INTO " + tableName + " (k, a_binary, d, t) VALUES (?, ?, ?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsertQuery);
+            stmt.setInt(1, 1);
+            stmt.setBytes(2, byteColumnValues.get(0));
+            stmt.setDate(3, dateColumnValues.get(0));
+            stmt.setTimestamp(4, timestampColumnValue);
+            stmt.execute();
+            conn.commit();
+            stmt.setInt(1, 2);
+            stmt.setBytes(2, byteColumnValues.get(1));
+            stmt.setDate(3, dateColumnValues.get(1));
+            stmt.setTimestamp(4, timestampColumnValue);
+            stmt.execute();
+            conn.commit();
+        }
+
+        if (dataBeforeCDC) {
+            try (Connection conn = newConnection()) {
+                createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+                        indexSaltBuckets, null);
+            }
+        }
+
+        try (Connection conn = newConnection(tenantId)) {
+            assertCDCBinaryAndDateColumn(conn.createStatement().executeQuery
+                    ("SELECT /*+ CDC_INCLUDE(PRE, POST, CHANGE) */ * FROM " +
+                    SchemaUtil.getTableName(schemaName, cdcName)),
+                    byteColumnValues, dateColumnValues, timestampColumnValue);
+        }
+    }
+
+    @Test
+    public void testSelectCDCFailDataTableUpdate() throws Exception {
+        if (dataBeforeCDC == true) {
+            // In this case, index will not exist at the time of upsert, so we can't simulate the
+            // index failure.
+            return;
+        }
+        String schemaName = withSchemaName ? generateUniqueName() : null;
+        String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+        String cdcName, cdc_sql;
+        try (Connection conn = newConnection()) {
+            createTable(conn, "CREATE TABLE  " + tableName + " (" +
+                            (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") +
+                            "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, B.vb INTEGER, " +
+                            "CONSTRAINT PK PRIMARY KEY " +
+                            (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", encodingScheme, multitenant,
+                    tableSaltBuckets, false, null);
+            if (forView) {
+                String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+                createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName,
+                        encodingScheme);
+                tableName = viewName;
+            }
+            cdcName = generateUniqueName();
+            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+            createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme, indexSaltBuckets, null);
+        }
+
+        String tenantId = multitenant ? "1000" : null;
+        String[] tenantids = {tenantId};
+        if (multitenant) {
+            tenantids = new String[] {tenantId, "2000"};
+        }
+
+        long startTS = System.currentTimeMillis();
+        generateChanges(startTS, tenantids, tableName, null,
+                COMMIT_FAILURE_EXPECTED);
+
+        try (Connection conn = newConnection(tenantId)) {
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " +
+                    SchemaUtil.getTableName(schemaName, cdcName));
+            assertEquals(false, rs.next());
+        }
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
index d21d00ad2f..2dbbabe76b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
@@ -48,11 +48,14 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Date;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Properties;
+import java.util.TimeZone;
 
 import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
 import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
@@ -500,12 +503,18 @@ public class SingleCellIndexIT extends ParallelStatsDisabledIT {
                     hTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName.getBytes());
             Scan scan = new Scan();
             scan.setRaw(true);
+            scan.readAllVersions();
             LOGGER.info("***** Table Name : " + tableName);
             ResultScanner scanner = hTable.getScanner(scan);
+            // This is the default format of to_char(timestamp)
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+            sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
             for (Result result = scanner.next(); result != null; result = scanner.next()) {
                 for (Cell cell : result.rawCells()) {
                     String cellString = cell.toString();
-                    LOGGER.info(cellString + " ****** value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
+                    LOGGER.info(cellString + " ****** timestamp : " +
+                            sdf.format(new Date(cell.getTimestamp())) + " ****** value : " +
+                            Bytes.toStringBinary(CellUtil.cloneValue(cell)));
                 }
             }
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
index 520c6d9ad1..2d0429281f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -260,7 +260,7 @@ public class CreateIndexCompiler {
         return new BaseMutationPlan(context, operation) {
             @Override
             public MutationState execute() throws SQLException {
-                return client.createIndex(create, splits, null);
+                return client.createIndex(create, splits);
             }
 
             @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
index 1985d0f822..90fb9cb192 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
@@ -84,8 +84,12 @@ public class IndexStatementRewriter extends ParseNodeRewriter {
      * @return new select statement or the same one if nothing was rewritten.
      * @throws SQLException 
      */
-    public static SelectStatement translate(SelectStatement statement, ColumnResolver dataResolver, Map<TableRef, TableRef> multiTableRewriteMap) throws SQLException {
-        return rewrite(statement, new IndexStatementRewriter(dataResolver, multiTableRewriteMap, false));
+    public static SelectStatement translate(SelectStatement statement,
+                                            ColumnResolver dataResolver,
+                                            Map<TableRef, TableRef> multiTableRewriteMap)
+                                            throws SQLException {
+        return rewrite(statement, new IndexStatementRewriter(dataResolver, multiTableRewriteMap,
+                false));
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 9981e31219..abd8626f89 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -28,9 +28,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.phoenix.expression.function.PhoenixRowTimestampFunction;
 import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.TerminalParseNode;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
@@ -240,8 +243,41 @@ public class QueryCompiler {
         return plan;
     }
 
+    private QueryPlan getExistingDataPlanForCDC() {
+        if (dataPlans != null) {
+            for (QueryPlan plan : dataPlans.values()) {
+                if (plan.getTableRef().getTable().getType() == PTableType.CDC) {
+                    return plan;
+                }
+            }
+        }
+        return null;
+    }
+
     public QueryPlan compileSelect(SelectStatement select) throws SQLException{
         StatementContext context = new StatementContext(statement, resolver, bindManager, scan, sequenceManager);
+        QueryPlan dataPlanForCDC = getExistingDataPlanForCDC();
+        if (dataPlanForCDC != null) {
+            TableRef cdcTableRef = dataPlanForCDC.getTableRef();
+            PTable cdcTable = cdcTableRef.getTable();
+            NamedTableNode cdcDataTableName = NODE_FACTORY.namedTable(null,
+                    NODE_FACTORY.table(cdcTable.getSchemaName().getString(),
+                            cdcTable.getParentTableName().getString()),
+                    select.getTableSamplingRate());
+            ColumnResolver dataTableResolver = FromCompiler.getResolver(cdcDataTableName,
+                    statement.getConnection());
+            TableRef cdcDataTableRef = dataTableResolver.getTables().get(0);
+            Set<PTable.CDCChangeScope> cdcIncludeScopes =
+                    cdcTable.getCDCIncludeScopes();
+            String cdcHint = select.getHint().getHint(Hint.CDC_INCLUDE);
+            if (cdcHint != null && cdcHint.startsWith(HintNode.PREFIX)) {
+                cdcIncludeScopes = CDCUtil.makeChangeScopeEnumsFromString(cdcHint.substring(1,
+                        cdcHint.length() - 1));
+            }
+            context.setCDCDataTableRef(cdcDataTableRef);
+            context.setCDCTableRef(cdcTableRef);
+            context.setCDCIncludeScopes(cdcIncludeScopes);
+        }
         if (select.isJoin()) {
             JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver());
             return compileJoinQuery(context, joinTable, false, false, null);
@@ -702,20 +738,44 @@ public class QueryCompiler {
             if (projectedTable != null) {
                 context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), select.getUdfParseNodes()));
             }
-
-            if (context.getCurrentTable().getTable().getType() == PTableType.CDC) {
-                // This will get the data column added to the context so that projection can get
-                // serialized..
-                context.getDataColumnPosition(
-                        context.getCurrentTable().getTable().getColumnForColumnName(
-                                QueryConstants.CDC_JSON_COL_NAME));
-            }
         }
         
         ColumnResolver resolver = context.getResolver();
         TableRef tableRef = context.getCurrentTable();
         PTable table = tableRef.getTable();
 
+        if (table.getType() == PTableType.CDC) {
+            List<AliasedNode> selectNodes = select.getSelect();
+            // For CDC queries, if a single wildcard projection is used, automatically insert
+            // PHOENIX_ROW_TIMESTAMP() as a project at the beginning.
+            ParseNode selectNode = selectNodes.size() == 1 ? selectNodes.get(0).getNode() : null;
+            if (selectNode instanceof TerminalParseNode
+                    && ((TerminalParseNode) selectNode).isWildcardNode()) {
+                List<AliasedNode> tmpSelectNodes = Lists.newArrayListWithExpectedSize(
+                        selectNodes.size() + 1);
+                tmpSelectNodes.add(NODE_FACTORY.aliasedNode(null,
+                        NODE_FACTORY.function(PhoenixRowTimestampFunction.NAME,
+                                Collections.emptyList())));
+                tmpSelectNodes.add(NODE_FACTORY.aliasedNode(null,
+                        ((TerminalParseNode) selectNode).getRewritten()));
+                selectNodes = tmpSelectNodes;
+            }
+            List<OrderByNode> orderByNodes = select.getOrderBy();
+            // For CDC queries, if no ORDER BY is specified, add default ordering.
+            if (orderByNodes.size() == 0) {
+                orderByNodes = Lists.newArrayListWithExpectedSize(1);
+                orderByNodes.add(NODE_FACTORY.orderBy(
+                        NODE_FACTORY.function(PhoenixRowTimestampFunction.NAME,
+                                Collections.emptyList()),
+                        false, SortOrder.getDefault() == SortOrder.ASC));
+            }
+            select = NODE_FACTORY.select(select.getFrom(),
+                    select.getHint(), select.isDistinct(), selectNodes, select.getWhere(),
+                    select.getGroupBy(), select.getHaving(), orderByNodes, select.getLimit(),
+                    select.getOffset(), select.getBindCount(), select.isAggregate(),
+                    select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
+        }
+
         ParseNode viewWhere = null;
         if (table.getViewStatement() != null) {
             viewWhere = new SQLParser(table.getViewStatement()).parseQuery().getWhere();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index c12cd62d56..5d2ec718e0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -85,7 +85,9 @@ public class StatementContext {
     private boolean isClientSideUpsertSelect;
     private boolean isUncoveredIndex;
     private String cdcIncludeScopes;
-    
+    private TableRef cdcTableRef;
+    private TableRef cdcDataTableRef;
+
     public StatementContext(PhoenixStatement statement) {
         this(statement, new Scan());
     }
@@ -386,4 +388,20 @@ public class StatementContext {
     public void setCDCIncludeScopes(Set<PTable.CDCChangeScope> cdcIncludeScopes) {
         this.cdcIncludeScopes = CDCUtil.makeChangeScopeStringFromEnums(cdcIncludeScopes);
     }
+
+    public TableRef getCDCDataTableRef() {
+        return cdcDataTableRef;
+    }
+
+    public void setCDCDataTableRef(TableRef cdcDataTableRef) {
+        this.cdcDataTableRef = cdcDataTableRef;
+    }
+
+    public TableRef getCDCTableRef() {
+        return cdcTableRef;
+    }
+
+    public void setCDCTableRef(TableRef cdcTableRef) {
+        this.cdcTableRef = cdcTableRef;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index faa940a5e7..c99672e624 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -16,8 +16,6 @@
  * limitations under the License.
  */
 package org.apache.phoenix.compile;
-import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
-import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
 
@@ -57,7 +55,6 @@ import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
@@ -92,8 +89,10 @@ public class TupleProjectionCompiler {
             if (node instanceof WildcardParseNode) {
                 if (((WildcardParseNode) node).isRewrite()) {
                     TableRef parentTableRef = FromCompiler.getResolver(
-                            NODE_FACTORY.namedTable(null, TableName.create(table.getSchemaName().getString(),
-                                    table.getParentTableName().getString())), context.getConnection()).resolveTable(
+                            NODE_FACTORY.namedTable(null,
+                                    TableName.create(table.getSchemaName().getString(),
+                                    table.getParentTableName().getString())),
+                                    context.getConnection()).resolveTable(
                             table.getSchemaName().getString(),
                             table.getParentTableName().getString());
                     for (PColumn column : parentTableRef.getTable().getColumns()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index d76046d3b4..18cfebdff3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -150,11 +150,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
     public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName";
     public static final String INDEX_ROW_KEY = "_IndexRowKey";
     public static final String READ_REPAIR_TRANSFORMING_TABLE = "_ReadRepairTransformingTable";
-    public static final String CDC_DATA_TABLE_NAME = "_CdcDataTableName";
-    public static final String CDC_JSON_COL_QUALIFIER = "_CdcJsonColumn_Qualifier";
-    public static final String CDC_INCLUDE_SCOPES = "_CdcIncludeScopes";
-    public static final String DATA_COL_QUALIFIER_TO_NAME_MAP = "_DataColQualToNameMap";
-    public static final String DATA_COL_QUALIFIER_TO_TYPE_MAP = "_DataColQualToTypeMap";
+    public static final String CDC_DATA_TABLE_DEF = "_CdcDataTableDef";
 
     public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
     public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);
@@ -237,7 +233,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
             Exception cause = new StaleRegionBoundaryCacheException(region.getRegionInfo().getTable().getNameAsString());
             throw new DoNotRetryIOException(cause.getMessage(), cause);
         }
-        if(isLocalIndex) {
+        if (isLocalIndex) {
             ScanUtil.setupLocalIndexScan(scan);
         }
     }
@@ -414,7 +410,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
             // If the exception is NotServingRegionException then throw it as
             // StaleRegionBoundaryCacheException to handle it by phoenix client other wise hbase
             // client may recreate scans with wrong region boundaries.
-            if(t instanceof NotServingRegionException) {
+            if (t instanceof NotServingRegionException) {
                 Exception cause = new StaleRegionBoundaryCacheException(c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
                 throw new DoNotRetryIOException(cause.getMessage(), cause);
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 178bb1d705..f9f5b4a4a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -18,63 +18,50 @@
 package org.apache.phoenix.coprocessor;
 
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellBuilder;
 import org.apache.hadoop.hbase.CellBuilderFactory;
 import org.apache.hadoop.hbase.CellBuilderType;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.generated.CDCInfoProtos;
 import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.CDCTableInfo;
 import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.CDCChangeBuilder;
 import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.sql.Types;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
 
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_NAME_MAP;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_TYPE_MAP;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_DEF;
+import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
 public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScanner {
     private static final Logger LOGGER =
             LoggerFactory.getLogger(CDCGlobalIndexRegionScanner.class);
-
-    private Map<ImmutableBytesPtr, String> dataColQualNameMap;
-    private Map<ImmutableBytesPtr, PDataType> dataColQualTypeMap;
-    // Map<dataRowKey: Map<TS: Map<qualifier: Cell>>>
-    private Map<ImmutableBytesPtr, Map<Long, Map<ImmutableBytesPtr, Cell>>> dataRowChanges =
-            new HashMap<>();
+    private CDCTableInfo cdcDataTableInfo;
+    private CDCChangeBuilder changeBuilder;
 
     public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner,
                                        final Region region,
@@ -90,130 +77,152 @@ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScann
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        changeBuilder = new CDCChangeBuilder(cdcDataTableInfo);
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            // firstCell: Picking the earliest cell in the index row so that
+            // timestamp of the cell and the row will be same.
+            Cell firstIndexCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = ImmutableBytesPtr.cloneCellRowIfNecessary(firstIndexCell);
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long changeTS = firstIndexCell.getTimestamp();
+            TupleProjector dataTableProjector = cdcDataTableInfo.getDataTableProjector();
+            Expression[] expressions = dataTableProjector != null ?
+                    dataTableProjector.getExpressions() : null;
+            boolean isSingleCell = dataTableProjector != null;
+            byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(
+                    cdcDataTableInfo.getQualifierEncodingScheme()).getFirst();
+            changeBuilder.initChange(changeTS);
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
+                if (dataRow != null) {
+                    int curColumnNum = 0;
+                    List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+                            this.cdcDataTableInfo.getColumnInfoList();
+                    cellLoop:
+                    for (Cell cell : dataRow.rawCells()) {
+                        if (! changeBuilder.isChangeRelevant(cell)) {
+                            continue;
+                        }
+                        byte[] cellFam = ImmutableBytesPtr.cloneCellFamilyIfNecessary(cell);
+                        byte[] cellQual = ImmutableBytesPtr.cloneCellQualifierIfNecessary(cell);
+                        if (cell.getType() == Cell.Type.DeleteFamily) {
+                            if (changeTS == cell.getTimestamp()) {
+                                changeBuilder.markAsDeletionEvent();
+                            } else if (changeTS > cell.getTimestamp()
+                                    && changeBuilder.getLastDeletedTimestamp() == 0L) {
+                                // Cells with timestamp less than the lowerBoundTsForPreImage
+                                // can not be part of the PreImage as there is a Delete Family
+                                // marker after that.
+                                changeBuilder.setLastDeletedTimestamp(cell.getTimestamp());
                             }
-                            if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
+                        } else if ((cell.getType() == Cell.Type.DeleteColumn
+                                || cell.getType() == Cell.Type.Put)
+                                && !Arrays.equals(cellQual, emptyCQ)) {
+                            if (! changeBuilder.isChangeRelevant(cell)) {
+                                // We don't need to build the change image, just skip it.
                                 continue;
                             }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
+                            // In this case, cell is the row, meaning we loop over rows..
+                            if (isSingleCell) {
+                                while (curColumnNum < cdcColumnInfoList.size()) {
+                                    boolean hasValue = dataTableProjector.getSchema().extractValue(
+                                            cell, (SingleCellColumnExpression)
+                                                    expressions[curColumnNum], ptr);
+                                    if (hasValue) {
+                                        Object cellValue = getColumnValue(ptr.get(),
+                                                ptr.getOffset(), ptr.getLength(),
+                                                cdcColumnInfoList.get(curColumnNum).getColumnType());
+                                        changeBuilder.registerChange(cell, curColumnNum, cellValue);
+                                    }
+                                    ++curColumnNum;
+                                }
+                                break cellLoop;
                             }
-                        }
-                        if (currentColumn != null) {
-                            columns.add(currentColumn);
-                        }
-                        List<Long> sortedTimestamps = uniqueTimeStamps.stream().sorted().collect(
-                                Collectors.toList());
-                        // FIXME: Does this need to be Concurrent?
-                        Map<ImmutableBytesPtr, Cell> rollingRow = new HashMap<>();
-                        int[] columnPointers = new int[columns.size()];
-                        changeTimeline = new TreeMap<>();
-                        dataRowChanges.put(dataRowKey, changeTimeline);
-                        for (Long ts : sortedTimestamps) {
-                            for (int i = 0; i < columns.size(); ++i) {
-                                Cell cell = columns.get(i).get(columnPointers[i]);
-                                if (cell.getTimestamp() == ts) {
-                                    rollingRow.put(new ImmutableBytesPtr(
-                                                    cell.getQualifierArray(),
-                                                    cell.getQualifierOffset(),
-                                                    cell.getQualifierLength()),
-                                            cell);
-                                    ++columnPointers[i];
+                            while (true) {
+                                CDCTableInfo.CDCColumnInfo currentColumnInfo =
+                                        cdcColumnInfoList.get(curColumnNum);
+                                int columnComparisonResult = CDCUtil.compareCellFamilyAndQualifier(
+                                                cellFam, cellQual,
+                                                currentColumnInfo.getColumnFamily(),
+                                                currentColumnInfo.getColumnQualifier());
+                                if (columnComparisonResult > 0) {
+                                    if (++curColumnNum >= cdcColumnInfoList.size()) {
+                                        // Have no more column definitions, so the rest of the cells
+                                        // must be for dropped columns and so can be ignored.
+                                        break cellLoop;
+                                    }
+                                    // Continue looking for the right column definition
+                                    // for this cell.
+                                    continue;
+                                } else if (columnComparisonResult < 0) {
+                                    // We didn't find a column definition for this cell, ignore the
+                                    // current cell but continue working on the rest of the cells.
+                                    continue cellLoop;
                                 }
+
+                                // else, found the column definition.
+                                Object cellValue = cell.getType() == Cell.Type.DeleteColumn ? null
+                                        : getColumnValue(cell, cdcColumnInfoList.get(curColumnNum)
+                                                .getColumnType());
+                                changeBuilder.registerChange(cell, curColumnNum, cellValue);
+                                // Done processing the current cell, check the next cell.
+                                break;
                             }
-                            Map<ImmutableBytesPtr, Cell> rowOfCells = new HashMap();
-                            rowOfCells.putAll(rollingRow);
-                            changeTimeline.put(ts, rowOfCells);
                         }
                     }
-
-                    Map<ImmutableBytesPtr, Cell> mapOfCells = changeTimeline.get(indexRowTs);
-                    if (mapOfCells != null) {
-                        Map <String, Object> rowValueMap = new HashMap<>(mapOfCells.size());
-                        for (Map.Entry<ImmutableBytesPtr, Cell> entry: mapOfCells.entrySet()) {
-                            String colName = dataColQualNameMap.get(entry.getKey());
-                            Object colVal = dataColQualTypeMap.get(entry.getKey()).toObject(
-                                    entry.getValue().getValueArray());
-                            rowValueMap.put(colName, colVal);
+                    if (changeBuilder.isNonEmptyEvent()) {
+                        Result cdcRow = getCDCImage(indexRowKey, firstIndexCell);
+                        if (cdcRow != null && tupleProjector != null) {
+                            if (firstIndexCell.getType() == Cell.Type.DeleteFamily) {
+                                // result is of type EncodedColumnQualiferCellsList for queries with
+                                // Order by clause. It fails when Delete Family cell is added to it
+                                // as it expects column qualifier bytes which is not available.
+                                // Adding empty PUT cell as a placeholder.
+                                result.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+                                        .setRow(indexRowKey)
+                                        .setFamily(ImmutableBytesPtr.cloneCellFamilyIfNecessary(
+                                                firstIndexCell))
+                                        .setQualifier(indexMaintainer.getEmptyKeyValueQualifier())
+                                        .setTimestamp(firstIndexCell.getTimestamp())
+                                        .setType(Cell.Type.Put)
+                                        .setValue(EMPTY_BYTE_ARRAY).build());
+                            } else {
+                                result.add(firstIndexCell);
+                            }
+                            IndexUtil.addTupleAsOneCell(result, new ResultTuple(cdcRow),
+                                    tupleProjector, ptr);
+                        } else {
+                            result.clear();
                         }
-                        byte[] value =
-                                new Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
-                        CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
-                        ImmutableBytesPtr family = new ImmutableBytesPtr(firstCell.getFamilyArray(),
-                                firstCell.getFamilyOffset(), firstCell.getFamilyLength());
-                        dataRow = Result.create(Arrays.asList(builder.
-                                setRow(dataRowKey.copyBytesIfNecessary()).
-                                setFamily(family.copyBytesIfNecessary()).
-                                setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))).
-                                setTimestamp(firstCell.getTimestamp()).
-                                setValue(value).
-                                setType(Cell.Type.Put).
-                                build()));
+                    } else {
+                        result.clear();
                     }
-                }
-                if (dataRow != null && tupleProjector != null) {
-                    IndexUtil.addTupleAsOneCell(result, new ResultTuple(dataRow),
-                            tupleProjector, ptr);
-                }
-                else {
+                } else {
                     result.clear();
                 }
+
                 return true;
             } catch (Throwable e) {
                 LOGGER.error("Exception in UncoveredIndexRegionScanner for region "
@@ -223,4 +232,44 @@ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScann
         }
         return false;
     }
+
+    private Result getCDCImage(byte[] indexRowKey, Cell firstCell) {
+        Gson gson = new GsonBuilder().serializeNulls().create();
+        byte[] value = gson.toJson(changeBuilder.buildCDCEvent()).getBytes(StandardCharsets.UTF_8);
+        CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+        Result cdcRow = Result.create(Arrays.asList(builder
+                .setRow(indexRowKey)
+                .setFamily(ImmutableBytesPtr.cloneCellFamilyIfNecessary(firstCell))
+                .setQualifier(cdcDataTableInfo.getCdcJsonColQualBytes())
+                .setTimestamp(changeBuilder.getChangeTimestamp())
+                .setValue(value)
+                .setType(Cell.Type.Put)
+                .build()));
+        return cdcRow;
+    }
+
+    private Object getColumnValue(Cell cell, PDataType dataType) {
+        return getColumnValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                dataType);
+    }
+
+    private Object getColumnValue(byte[] cellValue, int offset, int length, PDataType dataType) {
+        if (dataType.getSqlType() == Types.BINARY) {
+            // Unfortunately, Base64.Encoder has no option to specify offset and length so can't
+            // avoid copying bytes.
+            return Base64.getEncoder().encodeToString(
+                    ImmutableBytesPtr.copyBytesIfNecessary(cellValue, offset, length));
+        } else {
+            Object value = dataType.toObject(cellValue, offset, length);
+            if (dataType.getSqlType() == Types.DATE
+                    || dataType.getSqlType() == Types.TIMESTAMP
+                    || dataType.getSqlType() == Types.TIME
+                    || dataType.getSqlType() == Types.TIME_WITH_TIMEZONE
+                    || dataType.getSqlType() == Types.TIMESTAMP_WITH_TIMEZONE) {
+                value = value.toString();
+            }
+            return value;
+        }
+    }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 874b6669c1..cbe79cef1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -1433,7 +1433,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
         return indexMutations.size();
     }
 
-    static boolean adjustScanFilter(Scan scan) {
+    public static boolean adjustScanFilter(Scan scan) {
         // For rebuilds we use count (*) as query for regular tables which ends up setting the FirstKeyOnlyFilter on scan
         // This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col
         // For rebuilds we need all columns and all versions
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index d010c33dff..a302f84e19 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -59,8 +59,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
 import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index ca0259203a..7987a616a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -22,7 +22,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.sql.ParameterMetaData;
 import java.sql.SQLException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -48,10 +47,8 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -77,10 +74,8 @@ import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
 import org.apache.phoenix.trace.TracingIterator;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
@@ -316,7 +311,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         ScanUtil.setCustomAnnotations(scan,
                 customAnnotations == null ? null : customAnnotations.getBytes());
         // Set index related scan attributes.
-        if (table.getType() == PTableType.INDEX || table.getType() == PTableType.CDC) {
+        if (table.getType() == PTableType.INDEX) {
             if (table.getIndexType() == IndexType.LOCAL) {
                 ScanUtil.setLocalIndex(scan);
             } else if (context.isUncoveredIndex()) {
@@ -334,8 +329,8 @@ public abstract class BaseQueryPlan implements QueryPlan {
                 // Set data columns to be join back from data table.
                 PTable parentTable = context.getCurrentTable().getTable();
                 String parentSchemaName = parentTable.getParentSchemaName().getString();
-                if (parentTable.getType() == PTableType.CDC) {
-                    dataTable = parentTable;
+                if (context.getCDCTableRef() != null) {
+                    dataTable = context.getCDCTableRef().getTable();
                 }
                 else {
                     String parentTableName = parentTable.getParentTableName().getString();
@@ -386,7 +381,9 @@ public abstract class BaseQueryPlan implements QueryPlan {
         PName name = context.getCurrentTable().getTable().getName();
         List<PTable> indexes = Lists.newArrayListWithExpectedSize(1);
         for (PTable index : dataTable.getIndexes()) {
-            if (index.getName().equals(name) && index.getIndexType() == IndexType.LOCAL) {
+            if (index.getName().equals(name) && (
+                    index.getIndexType() == IndexType.LOCAL
+                            || dataTable.getType() == PTableType.CDC)) {
                 indexes.add(index);
                 break;
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 1cf23d7e0f..91503cf863 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -51,6 +51,7 @@ import java.util.Set;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
@@ -68,7 +69,10 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.hbase.index.AbstractValueGetter;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.exception.IndexWriteException;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -112,6 +116,7 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
@@ -595,6 +600,61 @@ public class MutationState implements SQLCloseable {
         return ptr;
     }
 
+    private List<Mutation> getCDCDeleteMutations(PTable table, PTable index,
+                                                 Long mutationTimestamp,
+                                                 List<Mutation> mutationList) throws
+            SQLException {
+        final ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+        IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
+        List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(mutationList.size());
+        for (final Mutation mutation : mutationList) {
+            // Only generate extra row mutations for DELETE
+            if (mutation instanceof Delete) {
+                ptr.set(mutation.getRow());
+                ValueGetter getter = new AbstractValueGetter() {
+                    @Override
+                    public byte[] getRowKey() {
+                        return mutation.getRow();
+                    }
+                    @Override
+                    public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) {
+                        // Always return null for our empty key value, as this will cause the index
+                        // maintainer to always treat this Put as a new row.
+                        if (IndexUtil.isEmptyKeyValue(table, ref)) {
+                            return null;
+                        }
+                        byte[] family = ref.getFamily();
+                        byte[] qualifier = ref.getQualifier();
+                        Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
+                        List<Cell> kvs = familyMap.get(family);
+                        if (kvs == null) {
+                            return null;
+                        }
+                        for (Cell kv : kvs) {
+                            if (Bytes.compareTo(kv.getFamilyArray(), kv.getFamilyOffset(),
+                                    kv.getFamilyLength(), family, 0, family.length) == 0
+                                    && Bytes.compareTo(kv.getQualifierArray(),
+                                    kv.getQualifierOffset(), kv.getQualifierLength(),
+                                    qualifier, 0, qualifier.length) == 0) {
+                                ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+                                connection.getKeyValueBuilder().getValueAsPtr(kv, ptr);
+                                return ptr;
+                            }
+                        }
+                        return null;
+                    }
+                };
+                ImmutableBytesPtr key = new ImmutableBytesPtr(maintainer.buildRowKey(
+                        getter, ptr, null, null, mutationTimestamp));
+                PRow row = table.newRow(
+                        connection.getKeyValueBuilder(), mutationTimestamp, key, false);
+                row.delete();
+                indexMutations.addAll(row.toRowMutations());
+            }
+        }
+        return indexMutations;
+    }
+
     private Iterator<Pair<PTable, List<Mutation>>> addRowMutations(final TableRef tableRef,
             final MultiRowMutationState values, final long mutationTimestamp, final long serverTimestamp,
             boolean includeAllIndexes, final boolean sendAll) {
@@ -628,6 +688,7 @@ public class MutationState implements SQLCloseable {
 
                 List<Mutation> indexMutations = null;
                 try {
+
                     if (!mutationsPertainingToIndex.isEmpty()) {
                         if (table.isTransactional()) {
                             if (indexMutationsMap == null) {
@@ -671,6 +732,19 @@ public class MutationState implements SQLCloseable {
                             }
                         }
                     }
+
+                    if (CDCUtil.isCDCIndex(index)) {
+                        List<Mutation> cdcMutations = getCDCDeleteMutations(
+                                table, index, mutationTimestamp, mutationList);
+                        if (cdcMutations.size() > 0) {
+                            if (indexMutations == null) {
+                                indexMutations = cdcMutations;
+                            } else {
+                                indexMutations.addAll(cdcMutations);
+                            }
+                        }
+                    }
+
                 } catch (SQLException | IOException e) {
                     throw new IllegalDataException(e);
                 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
index 302d5e2ffb..de15a8b1be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
@@ -139,7 +139,7 @@ public class TupleProjector {
      * @param projector projector to serialize
      * @return byte array
      */
-    private static byte[] serializeProjectorIntoBytes(TupleProjector projector) {
+    public static byte[] serializeProjectorIntoBytes(TupleProjector projector) {
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         try {
             DataOutputStream output = new DataOutputStream(stream);
@@ -172,7 +172,7 @@ public class TupleProjector {
      * @param proj byte array to deserialize
      * @return projector
      */
-    private static TupleProjector deserializeProjectorFromBytes(byte[] proj) {
+    public static TupleProjector deserializeProjectorFromBytes(byte[] proj) {
         if (proj == null) {
             return null;
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/SingleCellColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/SingleCellColumnExpression.java
index 2c152978e8..115e4e6d6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/SingleCellColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/SingleCellColumnExpression.java
@@ -88,23 +88,20 @@ public class SingleCellColumnExpression extends KeyValueColumnExpression {
     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
     	if (!super.evaluate(tuple, ptr)) {
             return false;
-        } else if (ptr.getLength() == 0) { 
-        	return true; 
         }
-        // the first position is reserved and we offset maxEncodedColumnQualifier by
-        // ENCODED_CQ_COUNTER_INITIAL_VALUE (which is the minimum encoded column qualifier)
-        int index = decodedColumnQualifier - QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE + 1;
-        // Given a ptr to the entire array, set ptr to point to a particular element
-        // within that array
-    	ColumnValueDecoder encoderDecoder = immutableStorageScheme.getDecoder();
-    	return encoderDecoder.decode(ptr, index);
+        return evaluate(ptr);
     }
 
     @Override
     public boolean evaluateUnsafe(Tuple tuple, ImmutableBytesWritable ptr) {
         if (!super.evaluateUnsafe(tuple, ptr)) {
             return false;
-        } else if (ptr.getLength() == 0) {
+        }
+        return evaluate(ptr);
+    }
+
+    public boolean evaluate(ImmutableBytesWritable ptr) {
+        if (ptr.getLength() == 0) {
             return true;
         }
         // the first position is reserved and we offset maxEncodedColumnQualifier by
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java
index 9825c77bab..946fa2e861 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java
@@ -19,7 +19,9 @@ package org.apache.phoenix.hbase.index.util;
 
 import java.io.DataInput;
 import java.io.IOException;
+import java.util.Arrays;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -102,10 +104,35 @@ public class ImmutableBytesPtr extends ImmutableBytesWritable {
     return copyBytesIfNecessary(this);
     }
 
-  public static byte[] copyBytesIfNecessary(ImmutableBytesWritable ptr) {
-    if (ptr.getOffset() == 0 && ptr.getLength() == ptr.get().length) {
-      return ptr.get();
+    public static byte[] copyBytesIfNecessary(ImmutableBytesWritable ptr) {
+        return copyBytesIfNecessary(ptr.get(), ptr.getOffset(), ptr.getLength());
+    }
+
+    public static byte[] copyBytesIfNecessary(byte[] bytes, int offset, int length) {
+        if (offset == 0 && length == bytes.length) {
+            return bytes;
+        }
+        return Arrays.copyOfRange(bytes, offset, offset + length);
+    }
+
+    public static byte[] cloneCellRowIfNecessary(Cell cell) {
+        return copyBytesIfNecessary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+    }
+
+    public static byte[] cloneCellFamilyIfNecessary(Cell cell) {
+        return copyBytesIfNecessary(cell.getFamilyArray(), cell.getFamilyOffset(),
+                cell.getFamilyLength());
+    }
+
+    public static byte[] cloneCellQualifierIfNecessary(Cell cell) {
+        return ImmutableBytesPtr.copyBytesIfNecessary(
+                cell.getQualifierArray(), cell.getQualifierOffset(),
+                cell.getQualifierLength());
+    }
+
+    public static byte[] cloneCellValueIfNecessary(Cell cell) {
+        return ImmutableBytesPtr.copyBytesIfNecessary(
+                cell.getValueArray(), cell.getValueOffset(),
+                cell.getValueLength());
     }
-    return ptr.copyBytes();
-  }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/CDCTableInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/index/CDCTableInfo.java
new file mode 100644
index 0000000000..4d80f3e9d2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/CDCTableInfo.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.index;
+
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.TupleProjectionCompiler;
+import org.apache.phoenix.coprocessor.generated.CDCInfoProtos;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
+import static org.apache.phoenix.query.QueryConstants.NAME_SEPARATOR;
+
+
+/**
+ * CDC Table Def Class
+ */
+public class CDCTableInfo {
+    private List<CDCColumnInfo> columnInfoList;
+    private byte[] defaultColumnFamily;
+    private final Set<PTable.CDCChangeScope> includeScopes;
+    private PTable.QualifierEncodingScheme qualifierEncodingScheme;
+    private final byte[] cdcJsonColQualBytes;
+    private final TupleProjector dataTableProjector;
+
+    private CDCTableInfo(List<CDCColumnInfo> columnInfoList,
+                         Set<PTable.CDCChangeScope> includeScopes, byte[] cdcJsonColQualBytes,
+                         TupleProjector dataTableProjector) {
+        Collections.sort(columnInfoList);
+        this.columnInfoList = columnInfoList;
+        this.includeScopes = includeScopes;
+        this.cdcJsonColQualBytes = cdcJsonColQualBytes;
+        this.dataTableProjector = dataTableProjector;
+    }
+
+    public CDCTableInfo(byte[] defaultColumnFamily, List<CDCColumnInfo> columnInfoList,
+                        Set<PTable.CDCChangeScope> includeScopes,
+                        PTable.QualifierEncodingScheme qualifierEncodingScheme,
+                        byte[] cdcJsonColQualBytes, TupleProjector dataTableProjector) {
+        this(columnInfoList, includeScopes, cdcJsonColQualBytes, dataTableProjector);
+        this.defaultColumnFamily = defaultColumnFamily;
+        this.qualifierEncodingScheme = qualifierEncodingScheme;
+    }
+
+    public List<CDCColumnInfo> getColumnInfoList() {
+        return columnInfoList;
+    }
+
+    public byte[] getDefaultColumnFamily() {
+        return defaultColumnFamily;
+    }
+
+    public PTable.QualifierEncodingScheme getQualifierEncodingScheme() {
+        return qualifierEncodingScheme;
+    }
+
+    public Set<PTable.CDCChangeScope> getIncludeScopes() {
+        return includeScopes;
+    }
+
+    public byte[] getCdcJsonColQualBytes() {
+        return cdcJsonColQualBytes;
+    }
+
+    public TupleProjector getDataTableProjector() {
+        return dataTableProjector;
+    }
+
+    public static CDCTableInfo createFromProto(CDCInfoProtos.CDCTableDef table) {
+        byte[] defaultColumnFamily = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+        if (table.hasDefaultFamilyName()) {
+            defaultColumnFamily = table.getDefaultFamilyName().toByteArray();
+        }
+        // For backward compatibility. Clients older than 4.10 will always have
+        // non-encoded qualifiers.
+        PTable.QualifierEncodingScheme qualifierEncodingScheme
+                = PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+        if (table.hasQualifierEncodingScheme()) {
+            qualifierEncodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue(
+                    table.getQualifierEncodingScheme().toByteArray()[0]);
+        }
+        List<CDCColumnInfo> columns = Lists.newArrayListWithExpectedSize(table.getColumnsCount());
+        for (CDCInfoProtos.CDCColumnDef curColumnProto : table.getColumnsList()) {
+            columns.add(CDCColumnInfo.createFromProto(curColumnProto));
+        }
+        String includeScopesStr = table.getCdcIncludeScopes();
+        Set<PTable.CDCChangeScope> changeScopeSet;
+        try {
+            changeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(includeScopesStr);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+        TupleProjector dataTableProjector = null;
+        if (table.hasDataTableProjectorBytes()) {
+            dataTableProjector = TupleProjector.deserializeProjectorFromBytes(
+                    table.getDataTableProjectorBytes().toByteArray());
+        }
+        return new CDCTableInfo(defaultColumnFamily, columns, changeScopeSet,
+                qualifierEncodingScheme, table.getCdcJsonColQualBytes().toByteArray(),
+                dataTableProjector);
+    }
+
+    public static CDCInfoProtos.CDCTableDef toProto(StatementContext context)
+            throws SQLException {
+        PTable cdcTable = context.getCDCTableRef().getTable();
+        PTable dataTable = context.getCDCDataTableRef().getTable();
+        CDCInfoProtos.CDCTableDef.Builder builder = CDCInfoProtos.CDCTableDef.newBuilder();
+        if (dataTable.getDefaultFamilyName() != null) {
+            builder.setDefaultFamilyName(
+                    ByteStringer.wrap(dataTable.getDefaultFamilyName().getBytes()));
+        }
+        String cdcIncludeScopes = context.getEncodedCdcIncludeScopes();
+        if (cdcIncludeScopes != null) {
+            builder.setCdcIncludeScopes(cdcIncludeScopes);
+        }
+        if (dataTable.getEncodingScheme() != null) {
+            builder.setQualifierEncodingScheme(ByteStringer.wrap(
+                    new byte[] { dataTable.getEncodingScheme().getSerializedMetadataValue() }));
+        }
+        for (PColumn column : dataTable.getColumns()) {
+            if (column.getFamilyName() == null) {
+                continue;
+            }
+            builder.addColumns(CDCColumnInfo.toProto(column));
+        }
+        PColumn cdcJsonCol = cdcTable.getColumnForColumnName(CDC_JSON_COL_NAME);
+        builder.setCdcJsonColQualBytes(ByteStringer.wrap(cdcJsonCol.getColumnQualifierBytes()));
+
+        TableRef cdcDataTableRef = context.getCDCDataTableRef();
+        if (cdcDataTableRef.getTable().isImmutableRows() &&
+                cdcDataTableRef.getTable().getImmutableStorageScheme() ==
+                        PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+
+            List<ColumnRef> dataColumns = new ArrayList<ColumnRef>();
+            PTable table = cdcDataTableRef.getTable();
+            for (PColumn column : table.getColumns()) {
+                if (!SchemaUtil.isPKColumn(column)) {
+                    dataColumns.add(new ColumnRef(cdcDataTableRef, column.getPosition()));
+                }
+            }
+
+            PTable projectedDataTable = TupleProjectionCompiler.createProjectedTable(
+                    cdcDataTableRef, dataColumns, false);;
+            TupleProjector dataTableProjector = new TupleProjector(projectedDataTable);
+            builder.setDataTableProjectorBytes(ByteStringer.wrap(
+                    TupleProjector.serializeProjectorIntoBytes(dataTableProjector)));
+        }
+
+        return builder.build();
+    }
+
+    /**
+     * CDC Column Def Class
+     */
+    public static class CDCColumnInfo implements Comparable<CDCColumnInfo> {
+
+        private final byte[] columnFamily;
+        private final byte[] columnQualifier;
+        private final String columnName;
+        private final PDataType columnType;
+        private final String columnFamilyName;
+        private String columnDisplayName;
+
+        public CDCColumnInfo(byte[] columnFamily, byte[] columnQualifier,
+                             String columnName, PDataType columnType,
+                             String columnFamilyName) {
+            this.columnFamily = columnFamily;
+            this.columnQualifier = columnQualifier;
+            this.columnName = columnName;
+            this.columnType = columnType;
+            this.columnFamilyName = columnFamilyName;
+        }
+
+        public byte[] getColumnFamily() {
+            return columnFamily;
+        }
+
+        public byte[] getColumnQualifier() {
+            return columnQualifier;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public PDataType getColumnType() {
+            return columnType;
+        }
+
+        public String getColumnFamilyName() {
+            return columnFamilyName;
+        }
+
+        @Override
+        public int compareTo(CDCColumnInfo columnInfo) {
+            return CDCUtil.compareCellFamilyAndQualifier(this.getColumnFamily(),
+                    this.getColumnQualifier(),
+                    columnInfo.getColumnFamily(),
+                    columnInfo.getColumnQualifier());
+        }
+
+        public static CDCColumnInfo createFromProto(CDCInfoProtos.CDCColumnDef column) {
+            String columnName = column.getColumnName();
+            byte[] familyNameBytes = column.getFamilyNameBytes().toByteArray();
+            PDataType dataType = PDataType.fromSqlTypeName(column.getDataType());
+            byte[] columnQualifierBytes = column.getColumnQualifierBytes().toByteArray();
+            String columnFamilyName = StandardCharsets.UTF_8
+                    .decode(ByteBuffer.wrap(familyNameBytes)).toString();
+            return new CDCColumnInfo(familyNameBytes,
+                    columnQualifierBytes, columnName, dataType, columnFamilyName);
+        }
+
+        public static CDCInfoProtos.CDCColumnDef toProto(PColumn column) {
+            CDCInfoProtos.CDCColumnDef.Builder builder = CDCInfoProtos.CDCColumnDef.newBuilder();
+            builder.setColumnName(column.getName().toString());
+            if (column.getFamilyName() != null) {
+                builder.setFamilyNameBytes(ByteStringer.wrap(column.getFamilyName().getBytes()));
+            }
+            if (column.getDataType() != null) {
+                builder.setDataType(column.getDataType().getSqlTypeName());
+            }
+            if (column.getColumnQualifierBytes() != null) {
+                builder.setColumnQualifierBytes(
+                        ByteStringer.wrap(column.getColumnQualifierBytes()));
+            }
+            return builder.build();
+        }
+
+        public String getColumnDisplayName(CDCTableInfo tableInfo) {
+            if (columnDisplayName == null) {
+                // Don't include Column Family if it is a default column Family
+                if (Arrays.equals(getColumnFamily(), tableInfo.getDefaultColumnFamily())) {
+                    columnDisplayName = getColumnName();
+                } else {
+                    columnDisplayName = getColumnFamilyName()
+                            + NAME_SEPARATOR + getColumnName();
+                }
+            }
+            return columnDisplayName;
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 86f49208f4..0ff50c82fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -144,13 +144,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
 
     private static final int EXPRESSION_NOT_PRESENT = -1;
     private static final int ESTIMATED_EXPRESSION_SIZE = 8;
-    
+
     public static IndexMaintainer create(PTable dataTable, PTable index,
+                                         PhoenixConnection connection) throws SQLException {
+        return create(dataTable, null, index, connection);
+    }
+
+    public static IndexMaintainer create(PTable dataTable, PTable cdcTable, PTable index,
             PhoenixConnection connection) throws SQLException {
         if (dataTable.getType() == PTableType.INDEX || index.getType() != PTableType.INDEX || !dataTable.getIndexes().contains(index)) {
             throw new IllegalArgumentException();
         }
-        IndexMaintainer maintainer = new IndexMaintainer(dataTable, index, connection);
+        IndexMaintainer maintainer = new IndexMaintainer(dataTable, cdcTable, index, connection);
         return maintainer;
     }
     
@@ -444,8 +449,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         this.dataRowKeySchema = dataRowKeySchema;
         this.isDataTableSalted = isDataTableSalted;
     }
-    
+
     private IndexMaintainer(final PTable dataTable, final PTable index,
+                            PhoenixConnection connection) throws SQLException {
+        this(dataTable, null, index, connection);
+    }
+
+    private IndexMaintainer(final PTable dataTable, final PTable cdcTable, final PTable index,
             PhoenixConnection connection) throws SQLException {
         this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null);
         this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable();
@@ -455,7 +465,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
         this.isUncovered = index.getIndexType() == IndexType.UNCOVERED_GLOBAL;
         this.encodingScheme = index.getEncodingScheme();
-      
+        this.isCDCIndex = CDCUtil.isCDCIndex(index);
+
         // null check for b/w compatibility
         this.encodingScheme = index.getEncodingScheme() == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : index.getEncodingScheme();
         this.immutableStorageScheme = index.getImmutableStorageScheme() == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : index.getImmutableStorageScheme();
@@ -499,7 +510,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         this.indexDataColumnCount = dataPKColumns.size();
         PTable parentTable = dataTable;
         // We need to get the PK column for the table on which the index is created
-        if (!dataTable.getName().equals(index.getParentName())) {
+        if (!dataTable.getName().equals(cdcTable != null
+                ? cdcTable.getParentName() : index.getParentName())) {
             try {
                 String tenantId = (index.getTenantId() != null) ? 
                         index.getTenantId().getString() : null;
@@ -676,7 +688,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             this.indexWhere = index.getIndexWhereExpression(connection);
             this.indexWhereColumns = index.getIndexWhereColumns(connection);
         }
-        this.isCDCIndex = CDCUtil.isCDCIndex(index);
 
         initCachedState();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index a536c6d016..189f7196c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.phoenix.iterate;
 
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_DEF;
 import static org.apache.phoenix.coprocessor.ScanRegionObserver.WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
@@ -191,7 +191,7 @@ public abstract class RegionScannerFactory {
                               dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
                               pageSizeMs, offset, actualStartKey, extraLimit);
                   } else {
-                      if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
+                      if (scan.getAttribute(CDC_DATA_TABLE_DEF) != null) {
                           s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
                                   dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
                                   pageSizeMs, extraLimit);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 48a5734149..e1716fd20c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -19,7 +19,6 @@
 package org.apache.phoenix.optimize;
 
 import java.sql.SQLException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -69,6 +68,7 @@ import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
 import org.apache.phoenix.schema.TableRef;
@@ -82,6 +82,8 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 
+import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
+
 public class QueryOptimizer {
     private static final ParseNodeFactory FACTORY = new ParseNodeFactory();
 
@@ -219,16 +221,36 @@ public class QueryOptimizer {
             return Collections.<QueryPlan> singletonList(dataPlan);
         }
 
+        ColumnResolver indexResolver = null;
+        boolean forCDC = false;
         PTable table = dataPlan.getTableRef().getTable();
         if (table.getType() == PTableType.CDC) {
-            Set<PTable.CDCChangeScope> cdcIncludeScopes = table.getCDCIncludeScopes();
-            String cdcHint = select.getHint().getHint(Hint.CDC_INCLUDE);
-            if (cdcHint != null && cdcHint.startsWith(HintNode.PREFIX)) {
-                cdcIncludeScopes = CDCUtil.makeChangeScopeEnumsFromString(cdcHint.substring(1,
-                        cdcHint.length() - 1));
+            NamedTableNode indexTableNode = FACTORY.namedTable(null,
+                    FACTORY.table(table.getSchemaName().getString(),
+                            CDCUtil.getCDCIndexName(table.getTableName().getString())),
+                    select.getTableSamplingRate());
+            indexResolver = FromCompiler.getResolver(indexTableNode,
+                    statement.getConnection());
+            TableRef indexTableRef = indexResolver.getTables().get(0);
+            PTable cdcIndex = indexTableRef.getTable();
+            PTableImpl.Builder indexBuilder = PTableImpl.builderFromExisting(cdcIndex);
+            List<PColumn> idxColumns = cdcIndex.getColumns();
+            if (cdcIndex.getBucketNum() != null) {
+                // If salted, it will get added by the builder, so avoid duplication.
+                idxColumns = idxColumns.subList(1, idxColumns.size());
             }
-            dataPlan.getContext().setCDCIncludeScopes(cdcIncludeScopes);
-            return Arrays.asList(dataPlan);
+            indexBuilder.setColumns(idxColumns);
+            indexBuilder.setParentName(table.getName());
+            indexBuilder.setParentTableName(table.getTableName());
+            cdcIndex = indexBuilder.build();
+            indexTableRef.setTable(cdcIndex);
+
+            PTableImpl.Builder cdcBuilder = PTableImpl.builderFromExisting(table);
+            cdcBuilder.setColumns(table.getColumns());
+            cdcBuilder.setIndexes(Collections.singletonList(cdcIndex));
+            table = cdcBuilder.build();
+            dataPlan.getTableRef().setTable(table);
+            forCDC = true;
         }
 
         List<PTable>indexes = Lists.newArrayList(dataPlan.getTableRef().getTable().getIndexes());
@@ -251,21 +273,29 @@ public class QueryOptimizer {
             targetColumns = targetDatums;
         }
         
-        List<QueryPlan> plans = Lists.newArrayListWithExpectedSize(1 + indexes.size());
-        SelectStatement translatedIndexSelect = IndexStatementRewriter.translate(select, FromCompiler.getResolver(dataPlan.getTableRef()));
-        plans.add(dataPlan);
-        QueryPlan hintedPlan = getHintedQueryPlan(statement, translatedIndexSelect, indexes, targetColumns, parallelIteratorFactory, plans);
-        if (hintedPlan != null) {
-            PTable index = hintedPlan.getTableRef().getTable();
-            if (stopAtBestPlan && hintedPlan.isApplicable() && (index.getIndexWhere() == null
-                    || isPartialIndexUsable(select, dataPlan, index))) {
-                return Collections.singletonList(hintedPlan);
+        List<QueryPlan> plans = Lists.newArrayListWithExpectedSize((forCDC ? 0 : 1)
+                + indexes.size());
+        SelectStatement translatedIndexSelect = IndexStatementRewriter.translate(
+                select, FromCompiler.getResolver(dataPlan.getTableRef()));
+        QueryPlan hintedPlan = null;
+        // We can't have hints work with CDC queries so skip looking for hinted plans.
+        if (! forCDC) {
+            plans.add(dataPlan);
+            hintedPlan = getHintedQueryPlan(statement, translatedIndexSelect, indexes,
+                    targetColumns, parallelIteratorFactory, plans);
+            if (hintedPlan != null) {
+                PTable index = hintedPlan.getTableRef().getTable();
+                if (stopAtBestPlan && hintedPlan.isApplicable() && (index.getIndexWhere() == null
+                        || isPartialIndexUsable(select, dataPlan, index))) {
+                    return Collections.singletonList(hintedPlan);
+                }
+                plans.add(0, hintedPlan);
             }
-            plans.add(0, hintedPlan);
         }
         
         for (PTable index : indexes) {
-            QueryPlan plan = addPlan(statement, translatedIndexSelect, index, targetColumns, parallelIteratorFactory, dataPlan, false);
+            QueryPlan plan = addPlan(statement, translatedIndexSelect, index, targetColumns,
+                    parallelIteratorFactory, dataPlan, false, indexResolver);
             if (plan != null &&
                     (index.getIndexWhere() == null
                             || isPartialIndexUsable(select, dataPlan, index))) {
@@ -330,7 +360,8 @@ public class QueryOptimizer {
                     // Hinted index is applicable, so return it's index
                     PTable index = indexes.get(indexPos);
                     indexes.remove(indexPos);
-                    QueryPlan plan = addPlan(statement, select, index, targetColumns, parallelIteratorFactory, dataPlan, true);
+                    QueryPlan plan = addPlan(statement, select, index, targetColumns,
+                            parallelIteratorFactory, dataPlan, true, null);
                     if (plan != null) {
                         return plan;
                     }
@@ -350,17 +381,33 @@ public class QueryOptimizer {
         return -1;
     }
     
-    private QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan, boolean isHinted) throws SQLException {
-        int nColumns = dataPlan.getProjector().getColumnCount();
+    private QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index,
+                              List<? extends PDatum> targetColumns,
+                              ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan,
+                              boolean isHinted, ColumnResolver indexResolver)
+                              throws SQLException {
         String tableAlias = dataPlan.getTableRef().getTableAlias();
-		String alias = tableAlias==null ? null : '"' + tableAlias + '"'; // double quote in case it's case sensitive
+        String alias = tableAlias == null ? null
+                : '"' + tableAlias + '"'; // double quote in case it's case sensitive
         String schemaName = index.getParentSchemaName().getString();
-        schemaName = schemaName.length() == 0 ? null :  '"' + schemaName + '"';
+        schemaName = schemaName.length() == 0 ? null : '"' + schemaName + '"';
 
         String tableName = '"' + index.getTableName().getString() + '"';
-        TableNode table = FACTORY.namedTable(alias, FACTORY.table(schemaName, tableName),select.getTableSamplingRate());
+        TableNode table = FACTORY.namedTable(alias, FACTORY.table(schemaName, tableName),
+                select.getTableSamplingRate());
         SelectStatement indexSelect = FACTORY.select(select, table);
-        ColumnResolver resolver = FromCompiler.getResolverForQuery(indexSelect, statement.getConnection());
+        ColumnResolver resolver = indexResolver != null ? indexResolver
+                : FromCompiler.getResolverForQuery(indexSelect, statement.getConnection());
+        return addPlan(statement, select, index, targetColumns, parallelIteratorFactory, dataPlan,
+                isHinted, indexSelect, resolver);
+    }
+
+    private QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index,
+                              List<? extends PDatum> targetColumns,
+                              ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan,
+                              boolean isHinted, SelectStatement indexSelect,
+                              ColumnResolver resolver) throws SQLException {
+        int nColumns = dataPlan.getProjector().getColumnCount();
         // We will or will not do tuple projection according to the data plan.
         boolean isProjected = dataPlan.getContext().getResolver().getTables().get(0).getTable().getType() == PTableType.PROJECTED;
         // Check index state of now potentially updated index table to make sure it's active
@@ -384,7 +431,8 @@ public class QueryOptimizer {
                             : index.getTableName().getString();
                     throw new ColumnNotFoundException(schemaNameStr, tableNameStr, null, "*");
                 }
-            	// translate nodes that match expressions that are indexed to the associated column parse node
+                // translate nodes that match expressions that are indexed to the
+                // associated column parse node
                 SelectStatement rewrittenIndexSelect = ParseNodeRewriter.rewrite(indexSelect, new  IndexExpressionParseNodeRewriter(index, null, statement.getConnection(), indexSelect.getUdfParseNodes()));
                 QueryCompiler compiler = new QueryCompiler(statement, rewrittenIndexSelect, resolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, true, dataPlans);
 
@@ -396,30 +444,40 @@ public class QueryOptimizer {
                     plan.getContext().setUncoveredIndex(true);
                     PhoenixConnection connection = statement.getConnection();
                     IndexMaintainer maintainer;
-                    PTable dataTable;
+                    PTable newIndexTable;
+                    String dataTableName;
                     if (indexTable.getViewIndexId() != null
                             && indexTable.getName().getString().contains(
-                                    QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
+                            QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
                         // MetaDataClient modifies the index table name for view indexes if the
                         // parent view of an index has a child view. We need to recreate a PTable
                         // object with the correct table name to get the index maintainer
                         int lastIndexOf = indexTable.getName().getString().lastIndexOf(
                                 QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
                         String indexName = indexTable.getName().getString().substring(lastIndexOf + 1);
-                        PTable newIndexTable = PhoenixRuntime.getTable(connection, indexName);
-                        dataTable = PhoenixRuntime.getTable(connection, SchemaUtil.getTableName(
+                        newIndexTable = PhoenixRuntime.getTable(connection, indexName);
+                        dataTableName = SchemaUtil.getTableName(
                                 newIndexTable.getParentSchemaName().getString(),
-                                indexTable.getParentTableName().getString()));
-                        maintainer = newIndexTable.getIndexMaintainer(dataTable,
-                                statement.getConnection());
+                                indexTable.getParentTableName().getString());
                     } else {
-                        dataTable = PhoenixRuntime.getTable(connection,
-                                SchemaUtil.getTableName(indexTable.getParentSchemaName().getString(),
-                                        indexTable.getParentTableName().getString()));
-                        maintainer = indexTable.getIndexMaintainer(dataTable, connection);
+                        newIndexTable = indexTable;
+                        dataTableName = SchemaUtil.getTableName(
+                                indexTable.getParentSchemaName().getString(),
+                                indexTable.getParentTableName().getString());
                     }
+                    PTable dataTableFromDataPlan = dataPlan.getTableRef().getTable();
+                    PTable cdcTable = null;
+                    if (dataTableFromDataPlan.getType() == PTableType.CDC) {
+                        cdcTable = dataTableFromDataPlan;
+                        dataTableName = SchemaUtil.getTableName(
+                                indexTable.getParentSchemaName().getString(),
+                                dataTableFromDataPlan.getParentTableName().getString());
+                    }
+                    PTable dataTable = PhoenixRuntime.getTable(connection, dataTableName);
+                    maintainer = newIndexTable.getIndexMaintainer(dataTable, cdcTable, connection);
                     Set<org.apache.hadoop.hbase.util.Pair<String, String>> indexedColumns =
                             maintainer.getIndexedColumnInfo();
+                    // TODO: Why is PHOENIX_ROW_TIMESTAMP() not showing up?
                     for (org.apache.hadoop.hbase.util.Pair<String, String> pair : indexedColumns) {
                         // The first member of the pair is the column family. For the data table PK columns, the column
                         // family is set to null. The data PK columns should not be added to the set of data columns
@@ -430,6 +488,11 @@ public class QueryOptimizer {
                             plan.getContext().getDataColumnPosition(pColumn);
                         }
                     }
+                    if (dataTableFromDataPlan.getType() == PTableType.CDC) {
+                        PColumn cdcJsonCol = dataTableFromDataPlan.getColumnForColumnName(
+                                CDC_JSON_COL_NAME);
+                        plan.getContext().getDataColumnPosition(cdcJsonCol);
+                    }
                 }
                 indexTableRef = plan.getTableRef();
                 indexTable = indexTableRef.getTable();
@@ -727,7 +790,8 @@ public class QueryOptimizer {
             return select;
         }
 
-        SelectStatement indexSelect = IndexStatementRewriter.translate(FACTORY.select(select, newFrom), resolver, replacement);
+        SelectStatement indexSelect = IndexStatementRewriter.translate(FACTORY.select(select,
+                newFrom), resolver, replacement);
         for (TableRef indexTableRef : replacement.values()) {
             // replace expressions with corresponding matching columns for functional indexes
             indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(indexTableRef.getTable(), indexTableRef.getTableAlias(), connection, indexSelect.getUdfParseNodes()));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/FamilyWildcardParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/FamilyWildcardParseNode.java
index 80a08bfc9a..59f0954288 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/FamilyWildcardParseNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/FamilyWildcardParseNode.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.parse;
 
 import java.sql.SQLException;
 
+import org.apache.hadoop.hbase.backup.example.HFileArchiveTableMonitor;
 import org.apache.phoenix.compile.ColumnResolver;
 
 /**
@@ -79,5 +80,14 @@ public class FamilyWildcardParseNode extends NamedParseNode {
         toSQL(buf);
         buf.append(".*");
     }
-}
 
+	@Override
+	public boolean isWildcardNode() {
+		return true;
+	}
+
+	@Override
+	public FamilyWildcardParseNode getRewritten() {
+		return new FamilyWildcardParseNode(getName(), true);
+	}
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
index bfb5782d8f..a8a7fd3820 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
@@ -18,22 +18,23 @@
 package org.apache.phoenix.parse;
 
 import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.expression.function.PhoenixRowTimestampFunction;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
-import org.apache.phoenix.util.SchemaUtil;
 
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * 
@@ -74,12 +75,14 @@ public class ParseNodeRewriter extends TraverseAllParseNodeVisitor<ParseNode> {
                  new ParseNodeRewriter(columnResolver, selectStament.getSelect().size());
          return ParseNodeRewriter.rewrite(selectStament, parseNodeRewriter);
     }
+
     /**
      * Rewrite the select statement by switching any constants to the right hand side
      * of the expression.
+     *
      * @param statement the select statement
      * @return new select statement
-     * @throws SQLException 
+     * @throws SQLException
      */
     public static SelectStatement rewrite(SelectStatement statement, ParseNodeRewriter rewriter) throws SQLException {
         Map<String,ParseNode> aliasMap = rewriter.getAliasMap();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/TableWildcardParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/TableWildcardParseNode.java
index 7c7f4160fd..3ff5972fb0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/TableWildcardParseNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/TableWildcardParseNode.java
@@ -82,5 +82,15 @@ public class TableWildcardParseNode extends NamedParseNode {
         toSQL(buf);
         buf.append(".*");
     }
+
+	@Override
+	public boolean isWildcardNode() {
+		return true;
+	}
+
+	@Override
+	public TableWildcardParseNode getRewritten() {
+		return new TableWildcardParseNode(tableName, true);
+	}
 }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java
index 6c2679b18b..78224e68d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java
@@ -32,4 +32,12 @@ public abstract class TerminalParseNode extends ParseNode {
     public final List<ParseNode> getChildren() {
         return Collections.emptyList();
     }
+
+    public boolean isWildcardNode() {
+        return false;
+    }
+
+    public TerminalParseNode getRewritten() {
+        return null;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java
index 9922c3f669..70cdbd2892 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java
@@ -77,6 +77,15 @@ public class WildcardParseNode extends TerminalParseNode {
         buf.append(' ');
         buf.append(NAME);
         buf.append(' ');
-    }    
-    
+    }
+
+    @Override
+    public boolean isWildcardNode() {
+        return true;
+    }
+
+    @Override
+    public WildcardParseNode getRewritten() {
+        return REWRITE_INSTANCE;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index bf9352caad..06557803ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -776,7 +776,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW));
                 return locations;
             } catch (org.apache.hadoop.hbase.TableNotFoundException e) {
-                throw new TableNotFoundException(table.getNameAsString());
+                TableNotFoundException ex = new TableNotFoundException(table.getNameAsString());
+                e.initCause(ex);
+                throw ex;
             } catch (IOException e) {
                 LOGGER.error("Exception encountered in getAllTableRegions for "
                         + "table: {}, retryCount: {}", table.getNameAsString(), retryCount, e);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index ba9f3904a2..975631aa17 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -307,6 +307,12 @@ public interface QueryConstants {
     byte VIEW_MODIFIED_PROPERTY_TAG_TYPE = (byte) 70;
 
     String CDC_JSON_COL_NAME = "CDC JSON";
+    String CDC_EVENT_TYPE = "event_type";
+    String CDC_PRE_IMAGE = "pre_image";
+    String CDC_POST_IMAGE = "post_image";
+    String CDC_CHANGE_IMAGE = "change_image";
+    String CDC_UPSERT_EVENT_TYPE = "upsert";
+    String CDC_DELETE_EVENT_TYPE = "delete";
 
     /**
      * We mark counter values 0 to 10 as reserved. Value 0 is used by
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index d1ad19fce2..8f0ff72879 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -211,6 +211,12 @@ public class DelegateTable implements PTable {
         return delegate.getIndexMaintainer(dataTable, connection);
     }
 
+    @Override
+    public IndexMaintainer getIndexMaintainer(PTable dataTable, PTable cdcTable,
+                                              PhoenixConnection connection) throws SQLException {
+        return delegate.getIndexMaintainer(dataTable, cdcTable, connection);
+    }
+
     @Override
     public TransformMaintainer getTransformMaintainer(PTable oldTable, PhoenixConnection connection) {
         return delegate.getTransformMaintainer(oldTable, connection);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/KeyValueSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/KeyValueSchema.java
index 10906916f5..a8d6c7dce1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/KeyValueSchema.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/KeyValueSchema.java
@@ -21,11 +21,13 @@ import java.util.List;
 
 import net.jcip.annotations.Immutable;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.ByteUtil;
@@ -143,6 +145,30 @@ public class KeyValueSchema extends ValueSchema {
         }
     }
 
+    /**
+     * Extract value out of a cell encoded with {@link
+     * org.apache.phoenix.schema.PTable.ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS}
+     *
+     * @param cell The cell, exepected to have an encoded value.
+     * @param expression The expression
+     * @param ptr The pointer in which the extracted value can be found, if successful.
+     * @return {@code true} on success.
+     */
+    public boolean extractValue(Cell cell, SingleCellColumnExpression expression,
+                                ImmutableBytesWritable ptr) {
+        ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+        List<Field> fields = getFields();
+        for (int i = 0; i < fields.size(); i++) {
+            Field field = fields.get(i);
+            for (int j = 0; j < field.getCount(); j++) {
+                if (expression.evaluate(ptr) && ptr.getLength() > 0) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     private int getVarLengthBytes(int length) {
         return length + WritableUtils.getVIntSize(length);
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 9717430a11..5b3172a682 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NA
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
 import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
 import static org.apache.phoenix.query.QueryServices.INDEX_CREATE_DEFAULT_STATE;
+import static org.apache.phoenix.schema.PTableType.CDC;
 import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSet;
 import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSetWithExpectedSize;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB;
@@ -1435,11 +1436,10 @@ public class MetaDataClient {
      *    listed as an index column.
      * @param statement
      * @param splits
-     * @param indexPKExpresionsType If non-{@code null}, all PK expressions should be of this specific type.
      * @return MutationState from population of index table from data table
      * @throws SQLException
      */
-    public MutationState createIndex(CreateIndexStatement statement, byte[][] splits, PDataType indexPKExpresionsType) throws SQLException {
+    public MutationState createIndex(CreateIndexStatement statement, byte[][] splits) throws SQLException {
         IndexKeyConstraint ik = statement.getIndexConstraint();
         TableName indexTableName = statement.getIndexTableName();
 
@@ -1554,9 +1554,6 @@ public class MetaDataClient {
                 if (expression.isStateless()) {
                     throw new SQLExceptionInfo.Builder(SQLExceptionCode.STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException();
                 }
-                if (indexPKExpresionsType != null && expression.getDataType() != indexPKExpresionsType) {
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION).build().buildException();
-                }
                 unusedPkColumns.remove(expression);
 
                 // Go through parse node to get string as otherwise we
@@ -1741,29 +1738,27 @@ public class MetaDataClient {
                 statement.getProps().size() + 1);
         populatePropertyMaps(statement.getProps(), tableProps, commonFamilyProps, PTableType.CDC);
 
-        NamedNode indexName = FACTORY.indexName(CDCUtil.getCDCIndexName(
-                statement.getCdcObjName().getName()));
-        IndexKeyConstraint indexKeyConstraint =
-                FACTORY.indexKey(Arrays.asList(new Pair[]{Pair.newPair(
-                        FACTORY.function(PhoenixRowTimestampFunction.NAME, Collections.emptyList()),
-                        SortOrder.getDefault())}));
         IndexType indexType = (IndexType) TableProperty.INDEX_TYPE.getValue(tableProps);
-        ListMultimap<String, Pair<String, Object>> indexProps = ArrayListMultimap.create();
-        if (TableProperty.SALT_BUCKETS.getValue(tableProps) != null) {
-            indexProps.put(QueryConstants.ALL_FAMILY_PROPERTIES_KEY, new Pair<>(
-                    TableProperty.SALT_BUCKETS.getPropertyName(),
-                    TableProperty.SALT_BUCKETS.getValue(tableProps)));
-        }
-        CreateIndexStatement indexStatement = FACTORY.createIndex(indexName, FACTORY.namedTable(null,
-                        statement.getDataTable(), (Double) null), indexKeyConstraint, null, null,
-                        indexProps, statement.isIfNotExists(), indexType, true, 0,
-                        new HashMap<>(), null);
-        MutationState indexMutationState;
+        PhoenixStatement pstmt = new PhoenixStatement(connection);
+        String dataTableFullName = SchemaUtil.getTableName(statement.getDataTable().getSchemaName(),
+                statement.getDataTable().getTableName());
+        String createIndexSql = "CREATE " +
+                (indexType == IndexType.LOCAL ? "LOCAL " : "UNCOVERED ") +
+                "INDEX " + (statement.isIfNotExists() ? "IF NOT EXISTS " : "") +
+                "\"" + CDCUtil.getCDCIndexName(statement.getCdcObjName().getName()) + "\"" +
+                " ON " + dataTableFullName + " (" + PhoenixRowTimestampFunction.NAME + "()) ASYNC";
+        List<String> indexProps = new ArrayList<>();
+        Object saltBucketNum = TableProperty.SALT_BUCKETS.getValue(tableProps);
+        if (saltBucketNum != null) {
+            indexProps.add("SALT_BUCKETS=" + saltBucketNum);
+        }
+        Object columnEncodedBytes = TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
+        if (columnEncodedBytes != null) {
+            indexProps.add("COLUMN_ENCODED_BYTES=" + columnEncodedBytes);
+        }
+        createIndexSql = createIndexSql + " " + String.join(", ", indexProps);
         try {
-            // TODO: Should we also allow PTimestamp here, in fact PTimestamp is the right type,
-            // but we are forced to support PDate because of incorrect type for
-            // PHOENIX_ROW_TIMESTAMP (see PHOENIX-6807)?
-            indexMutationState = createIndex(indexStatement, null, PDate.INSTANCE);
+            pstmt.execute(createIndexSql);
         } catch (SQLException e) {
             if (e.getErrorCode() == TABLE_ALREADY_EXIST.getErrorCode()) {
                 throw new SQLExceptionInfo.Builder(TABLE_ALREADY_EXIST).setTableName(
@@ -1776,12 +1771,9 @@ public class MetaDataClient {
         List<PColumn> pkColumns = dataTable.getPKColumns();
         List<ColumnDef> columnDefs = new ArrayList<>();
         List<ColumnDefInPkConstraint> pkColumnDefs = new ArrayList<>();
-        ColumnName timeIdxCol = FACTORY.columnName(PhoenixRowTimestampFunction.NAME + "()");
-        columnDefs.add(FACTORY.columnDef(timeIdxCol, PDate.INSTANCE.getSqlTypeName(), false, null, false,
-                PDate.INSTANCE.getMaxLength(null), PDate.INSTANCE.getScale(null), false,
-                SortOrder.getDefault(), "", null, false));
-        pkColumnDefs.add(FACTORY.columnDefInPkConstraint(timeIdxCol, SortOrder.getDefault(), false));
-        for (PColumn pcol : pkColumns) {
+        int pkOffset = dataTable.getBucketNum() != null ? 1 : 0;
+        for (int i = pkOffset; i < pkColumns.size(); ++i) {
+            PColumn pcol = pkColumns.get(i);
             columnDefs.add(FACTORY.columnDef(FACTORY.columnName(pcol.getName().getString()),
                     pcol.getDataType().getSqlTypeName(), false, null, false, pcol.getMaxLength(),
                     pcol.getScale(), false, pcol.getSortOrder(), "", null, false));
@@ -1791,15 +1783,24 @@ public class MetaDataClient {
         columnDefs.add(FACTORY.columnDef(FACTORY.columnName(QueryConstants.CDC_JSON_COL_NAME),
                 PVarchar.INSTANCE.getSqlTypeName(), false, null, true, null,
                 null, false, SortOrder.getDefault(), "", null, false));
+        tableProps = new HashMap<>();
+        if (dataTable.getImmutableStorageScheme() == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+            // CDC table doesn't need SINGLE_CELL_ARRAY_WITH_OFFSETS encoding, so override it.
+            tableProps.put(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName(),
+                    ONE_CELL_PER_COLUMN.name());
+        }
+        if (dataTable.isMultiTenant()) {
+            tableProps.put(TableProperty.MULTI_TENANT.getPropertyName(), Boolean.TRUE);
+        }
         CreateTableStatement tableStatement = FACTORY.createTable(
                 FACTORY.table(dataTable.getSchemaName().getString(), statement.getCdcObjName().getName()),
-                statement.getProps(), columnDefs, FACTORY.primaryKey(null, pkColumnDefs),
+                null, columnDefs, FACTORY.primaryKey(null, pkColumnDefs),
                 Collections.emptyList(), PTableType.CDC, statement.isIfNotExists(), null, null,
                 statement.getBindCount(), null);
         createTableInternal(tableStatement, null, dataTable, null, null, null,
                 null, null, false, null,
                 null, statement.getIncludeScopes(), tableProps, commonFamilyProps);
-        return indexMutationState;
+        return new MutationState(0, 0, connection);
     }
 
     /**
@@ -2024,7 +2025,7 @@ public class MetaDataClient {
         }
         return false;
     }
-    
+
     /**
      * While adding or dropping columns we write a cell to the SYSTEM.MUTEX table with the rowkey of the
      * physical table to prevent conflicting concurrent modifications. For eg two client adding a column
@@ -2306,7 +2307,7 @@ public class MetaDataClient {
             }
 
             // Can't set any of these on views or shared indexes on views
-            if (tableType != PTableType.VIEW && !allocateIndexId) {
+            if (tableType != PTableType.VIEW && tableType != PTableType.CDC && !allocateIndexId) {
                 saltBucketNum = (Integer) TableProperty.SALT_BUCKETS.getValue(tableProps);
                 if (saltBucketNum != null) {
                     if (saltBucketNum < 0 || saltBucketNum > SaltingUtil.MAX_BUCKET_NUM) {
@@ -2412,8 +2413,8 @@ public class MetaDataClient {
                 .setSchemaName(schemaName).setTableName(tableName)
                 .build().buildException();
             }
-            if (TableProperty.TTL.getValue(commonFamilyProps) != null 
-                    && transactionProvider != null 
+            if (TableProperty.TTL.getValue(commonFamilyProps) != null
+                    && transactionProvider != null
                     && transactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.SET_TTL)) {
                 throw new SQLExceptionInfo.Builder(PhoenixTransactionProvider.Feature.SET_TTL.getCode())
                 .setMessage(transactionProvider.name())
@@ -2543,7 +2544,7 @@ public class MetaDataClient {
                     }
                     pkColumns = newLinkedHashSet(parent.getPKColumns());
 
-                    // Add row linking view to its parent 
+                    // Add row linking view to its parent
                     try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_VIEW_LINK)) {
                         linkStatement.setString(1, tenantIdStr);
                         linkStatement.setString(2, schemaName);
@@ -2569,7 +2570,20 @@ public class MetaDataClient {
                 columns = new LinkedHashMap<PColumn,PColumn>(colDefs.size());
                 pkColumns = newLinkedHashSetWithExpectedSize(colDefs.size() + 1); // in case salted
             }
-            
+
+            if (tableType == PTableType.CDC) {
+                if (parent.getType() == VIEW) {
+                    physicalNames = Collections.singletonList(
+                            PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(
+                                    parent.getBaseTableLogicalName(), isNamespaceMapped)));
+                }
+                else {
+                    physicalNames = Collections.singletonList(
+                            PNameFactory.newName(SchemaUtil.getTableName(schemaName,
+                                    CDCUtil.getCDCIndexName(tableName))));
+                }
+            }
+
             // Don't add link for mapped view, as it just points back to itself and causes the drop to
             // fail because it looks like there's always a view associated with it.
             if (!physicalNames.isEmpty()) {
@@ -2630,7 +2644,7 @@ public class MetaDataClient {
                 /*
                  * We can't control what column qualifiers are used in HTable mapped to Phoenix views. So we are not
                  * able to encode column names.
-                 */  
+                 */
                 if (viewType != MAPPED) {
                     /*
                      * For regular phoenix views, use the storage scheme of the physical table since they all share the
@@ -2648,14 +2662,14 @@ public class MetaDataClient {
             // System tables have hard-coded column qualifiers. So we can't use column encoding for them.
             else if (!SchemaUtil.isSystemTable(Bytes.toBytes(SchemaUtil.getTableName(schemaName, tableName)))|| SchemaUtil.isLogTable(schemaName, tableName)) {
                 /*
-                 * Indexes inherit the storage scheme of the parent data tables. Otherwise, we always attempt to 
-                 * create tables with encoded column names. 
-                 * 
-                 * Also of note is the case with shared indexes i.e. local indexes and view indexes. In these cases, 
-                 * column qualifiers for covered columns don't have to be unique because rows of the logical indexes are 
+                 * Indexes inherit the storage scheme of the parent data tables. Otherwise, we always attempt to
+                 * create tables with encoded column names.
+                 *
+                 * Also of note is the case with shared indexes i.e. local indexes and view indexes. In these cases,
+                 * column qualifiers for covered columns don't have to be unique because rows of the logical indexes are
                  * partitioned by the virtue of indexId present in the row key. As such, different shared indexes can use
                  * potentially overlapping column qualifiers.
-                 * 
+                 *
                  */
                 if (parent != null) {
                     Byte encodingSchemeSerializedByte = (Byte) TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
@@ -2689,7 +2703,9 @@ public class MetaDataClient {
                         }
                     }
 
-                    if (parent.getImmutableStorageScheme() == SINGLE_CELL_ARRAY_WITH_OFFSETS && immutableStorageScheme == ONE_CELL_PER_COLUMN) {
+                    if (tableType != CDC &&
+                            parent.getImmutableStorageScheme() == SINGLE_CELL_ARRAY_WITH_OFFSETS &&
+                            immutableStorageScheme == ONE_CELL_PER_COLUMN) {
                         throw new SQLExceptionInfo.Builder(
                                 SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE)
                                 .setSchemaName(schemaName).setTableName(tableName).build()
@@ -2903,7 +2919,7 @@ public class MetaDataClient {
                         column.getFamilyName());
                 }
             }
-            
+
             // We need a PK definition for a TABLE or mapped VIEW
             if (!wasPKDefined && pkColumnsNames.isEmpty() && tableType != PTableType.VIEW && viewType != ViewType.MAPPED) {
                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_MISSING)
@@ -2995,7 +3011,7 @@ public class MetaDataClient {
                         .build();
                 connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             }
-            
+
             // Update column qualifier counters
             if (EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme)) {
                 // Store the encoded column counter for phoenix entities that have their own hbase
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index c64de2ef16..ec80113b2c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -870,11 +870,12 @@ public interface PTable extends PMetaDataEntity {
     PName getPhysicalName(boolean returnColValueFromSyscat);
 
     boolean isImmutableRows();
-
     boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection)
             throws SQLException;
     IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection)
             throws SQLException;
+    IndexMaintainer getIndexMaintainer(PTable dataTable, PTable cdcTable,
+                                       PhoenixConnection connection) throws SQLException;
     TransformMaintainer getTransformMaintainer(PTable oldTable, PhoenixConnection connection);
     PName getDefaultFamilyName();
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 0b3beb885b..6e6fbc6d00 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -1763,9 +1763,16 @@ public class PTableImpl implements PTable {
 
     @Override
     public synchronized IndexMaintainer getIndexMaintainer(PTable dataTable,
+                                                           PhoenixConnection connection)
+            throws SQLException {
+        return getIndexMaintainer(dataTable, null, connection);
+    }
+
+    @Override
+    public synchronized IndexMaintainer getIndexMaintainer(PTable dataTable, PTable cdcTable,
             PhoenixConnection connection) throws SQLException {
         if (indexMaintainer == null) {
-            indexMaintainer = IndexMaintainer.create(dataTable, this, connection);
+            indexMaintainer = IndexMaintainer.create(dataTable, cdcTable, this, connection);
         }
         return indexMaintainer;
     }
@@ -1795,9 +1802,7 @@ public class PTableImpl implements PTable {
                 return SchemaUtil.getPhysicalHBaseTableName(schemaName,
                         physicalTableNameColumnInSyscat, isNamespaceMapped);
             }
-            return SchemaUtil.getPhysicalHBaseTableName(schemaName, getType() == PTableType.CDC ?
-                    PNameFactory.newName(CDCUtil.getCDCIndexName(tableName.getString())) :
-                    tableName, isNamespaceMapped);
+            return SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, isNamespaceMapped);
         } else {
             return PNameFactory.newName(physicalNames.get(0).getBytes());
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
index 60633877d4..cfa8fe2bfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
@@ -41,9 +41,6 @@ public class SaltingUtil {
     public static final PColumnImpl SALTING_COLUMN = new PColumnImpl(
             PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false, null,
         HConstants.LATEST_TIMESTAMP);
-    public static final RowKeySchema VAR_BINARY_SALTED_SCHEMA = new RowKeySchemaBuilder(2)
-        .addField(SALTING_COLUMN, false, SortOrder.getDefault())
-        .addField(SchemaUtil.VAR_BINARY_DATUM, false, SortOrder.getDefault()).build();
 
     public static List<KeyRange> generateAllSaltingRanges(int bucketNum) {
         List<KeyRange> allRanges = Lists.newArrayListWithExpectedSize(bucketNum);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
new file mode 100644
index 0000000000..e947ed7003
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.phoenix.index.CDCTableInfo;
+import org.apache.phoenix.schema.PTable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.phoenix.query.QueryConstants.CDC_CHANGE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
+
+public class CDCChangeBuilder {
+    private final boolean isChangeImageInScope;
+    private final boolean isPreImageInScope;
+    private final boolean isPostImageInScope;
+    private final CDCTableInfo cdcDataTableInfo;
+    private String changeType;
+    private long lastDeletedTimestamp;
+    private long changeTimestamp;
+    private Map<String, Object> preImage = null;
+    private Map<String, Object> changeImage = null;
+
+    public CDCChangeBuilder(CDCTableInfo cdcDataTableInfo) {
+        this.cdcDataTableInfo = cdcDataTableInfo;
+        Set<PTable.CDCChangeScope> changeScopes = cdcDataTableInfo.getIncludeScopes();
+        isChangeImageInScope = changeScopes.contains(PTable.CDCChangeScope.CHANGE);
+        isPreImageInScope = changeScopes.contains(PTable.CDCChangeScope.PRE);
+        isPostImageInScope = changeScopes.contains(PTable.CDCChangeScope.POST);
+    }
+
+    public void initChange(long ts) {
+        changeTimestamp = ts;
+        changeType = null;
+        lastDeletedTimestamp = 0L;
+        if (isPreImageInScope || isPostImageInScope) {
+            preImage = new HashMap<>();
+        }
+        if (isChangeImageInScope || isPostImageInScope) {
+            changeImage = new HashMap<>();
+        }
+    }
+
+    public long getChangeTimestamp() {
+        return changeTimestamp;
+    }
+
+    public boolean isDeletionEvent() {
+        return changeType == CDC_DELETE_EVENT_TYPE;
+    }
+
+    public boolean isNonEmptyEvent() {
+        return changeType != null;
+    }
+
+    public void markAsDeletionEvent() {
+        changeType = CDC_DELETE_EVENT_TYPE;
+    }
+
+    public long getLastDeletedTimestamp() {
+        return lastDeletedTimestamp;
+    }
+
+    public void setLastDeletedTimestamp(long lastDeletedTimestamp) {
+        this.lastDeletedTimestamp = lastDeletedTimestamp;
+    }
+
+    public boolean isChangeRelevant(Cell cell) {
+        if (cell.getTimestamp() > changeTimestamp) {
+            return false;
+        }
+        if (cell.getType() != Cell.Type.DeleteFamily && !isOlderThanChange(cell) &&
+                isDeletionEvent()) {
+            // We don't need to build the change image in this case.
+            return false;
+        }
+        return true;
+    }
+
+    public void registerChange(Cell cell, int columnNum, Object value) {
+        if (!isChangeRelevant(cell)) {
+            return;
+        }
+        CDCTableInfo.CDCColumnInfo columnInfo =
+                cdcDataTableInfo.getColumnInfoList().get(columnNum);
+        String cdcColumnName = columnInfo.getColumnDisplayName(cdcDataTableInfo);
+        if (isOlderThanChange(cell)) {
+            if ((isPreImageInScope || isPostImageInScope) &&
+                    !preImage.containsKey(cdcColumnName)) {
+                preImage.put(cdcColumnName, value);
+            }
+        } else if (cell.getTimestamp() == changeTimestamp) {
+            assert !isDeletionEvent() : "Not expected to find a change for delete event";
+            changeType = CDC_UPSERT_EVENT_TYPE;
+            if (isChangeImageInScope || isPostImageInScope) {
+                changeImage.put(cdcColumnName, value);
+            }
+        }
+    }
+
+    public Map buildCDCEvent() {
+        assert (changeType != null) : "Not expected when no event was detected";
+        Map<String, Object> cdcChange = new HashMap<>();
+        if (isPreImageInScope) {
+            cdcChange.put(CDC_PRE_IMAGE, preImage);
+        }
+        if (changeType == CDC_UPSERT_EVENT_TYPE) {
+            if (isChangeImageInScope) {
+                cdcChange.put(CDC_CHANGE_IMAGE, changeImage);
+            }
+            if (isPostImageInScope) {
+                Map<String, Object> postImage = new HashMap<>();
+                if (!isDeletionEvent()) {
+                    postImage.putAll(preImage);
+                    postImage.putAll(changeImage);
+                }
+                cdcChange.put(CDC_POST_IMAGE, postImage);
+            }
+        }
+        cdcChange.put(CDC_EVENT_TYPE, changeType);
+        return cdcChange;
+    }
+
+    public boolean isOlderThanChange(Cell cell) {
+        return (cell.getTimestamp() < changeTimestamp &&
+                cell.getTimestamp() > lastDeletedTimestamp) ? true : false;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 002da0a9c5..438638c1dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -25,13 +25,16 @@ import java.util.NavigableSet;
 import java.util.Set;
 import java.util.StringTokenizer;
 
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.util.StringUtils;
-
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
 import org.apache.phoenix.schema.PTable;
 
+import static org.apache.phoenix.coprocessor.GlobalIndexRegionScanner.adjustScanFilter;
+
 public class CDCUtil {
     public static final String CDC_INDEX_PREFIX = "__CDC__";
     public static final String CDC_INDEX_TYPE_LOCAL = "L";
@@ -80,12 +83,7 @@ public class CDCUtil {
     }
 
     public static String getCDCIndexName(String cdcName) {
-        return CDC_INDEX_PREFIX + cdcName;
-    }
-
-    public static String getCDCNameFromIndexName(String indexName) {
-        assert(indexName.startsWith(CDC_INDEX_PREFIX));
-        return indexName.substring(CDC_INDEX_PREFIX.length());
+        return CDC_INDEX_PREFIX + SchemaUtil.getTableNameFromFullName(cdcName.toUpperCase());
     }
 
     public static boolean isCDCIndex(String indexName) {
@@ -102,12 +100,21 @@ public class CDCUtil {
         scan.setCacheBlocks(false);
         Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
         if (! familyMap.isEmpty()) {
-            familyMap.keySet().stream().forEach(fQual -> {
-                if (familyMap.get(fQual) != null) {
-                    familyMap.get(fQual).clear();
-                }
-            });
+            familyMap.clear();
         }
         return scan;
     }
+
+    public static int compareCellFamilyAndQualifier(byte[] columnFamily1,
+                                                     byte[] columnQual1,
+                                                     byte[] columnFamily2,
+                                                     byte[] columnQual2) {
+        int familyNameComparison = DescVarLengthFastByteComparisons.compareTo(columnFamily1,
+                0, columnFamily1.length, columnFamily2, 0, columnFamily2.length);
+        if (familyNameComparison != 0) {
+            return familyNameComparison;
+        }
+        return DescVarLengthFastByteComparisons.compareTo(columnQual1,
+                0, columnQual1.length, columnQual2, 0, columnQual2.length);
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 5a06b6287d..a2e7b72215 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -299,7 +299,7 @@ public class IndexUtil {
     }
     
 
-    private static boolean isEmptyKeyValue(PTable table, ColumnReference ref) {
+    public static boolean isEmptyKeyValue(PTable table, ColumnReference ref) {
         byte[] emptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(table);
         byte[] emptyKeyValueQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
         return (Bytes.compareTo(emptyKeyValueCF, 0, emptyKeyValueCF.length, ref.getFamilyWritable()
@@ -586,7 +586,7 @@ public class IndexUtil {
                         tupleProjector.getValueBitSet(), ptr);
         Cell firstCell = result.get(0);
         Cell keyValue =
-                PhoenixKeyValueUtil.newKeyValue(firstCell.getRowArray(),
+                PhoenixKeyValueUtil.newKeyValue(firstCell.getRowArray(), // FIXME: This does DEEP_COPY of cell, do we need that?
                         firstCell.getRowOffset(),firstCell.getRowLength(), VALUE_COLUMN_FAMILY,
                         VALUE_COLUMN_QUALIFIER, firstCell.getTimestamp(), value, 0, value.length);
         result.add(keyValue);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index f2b6b911fb..a52213aea2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -19,32 +19,21 @@ package org.apache.phoenix.util;
 
 import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
 import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_INCLUDE_SCOPES;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_DEF;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_NAME_MAP;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_TYPE_MAP;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
-import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
 import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -68,11 +57,11 @@ import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -89,6 +78,7 @@ import org.apache.phoenix.filter.PagingFilter;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.index.CDCTableInfo;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -323,14 +313,16 @@ public class ScanUtil {
         Filter filter = scan.getFilter();
         if (filter == null) {
             scan.setFilter(andWithFilter); 
-        } else if (filter instanceof FilterList && ((FilterList)filter).getOperator() == FilterList.Operator.MUST_PASS_ALL) {
+        } else if (filter instanceof FilterList && ((FilterList)filter).getOperator()
+                == FilterList.Operator.MUST_PASS_ALL) {
             FilterList filterList = (FilterList)filter;
             List<Filter> allFilters = new ArrayList<Filter>(filterList.getFilters().size() + 1);
             allFilters.addAll(filterList.getFilters());
             allFilters.add(andWithFilter);
             scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,allFilters));
         } else {
-            scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,Arrays.asList(filter, andWithFilter)));
+            scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,
+                    Arrays.asList(filter, andWithFilter)));
         }
     }
     
@@ -1136,7 +1128,7 @@ public class ScanUtil {
     }
 
     public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table,
-            PhoenixConnection phoenixConnection) throws SQLException {
+                                                           PhoenixConnection phoenixConnection, StatementContext context) throws SQLException {
         boolean isTransforming = (table.getTransformingNewTable() != null);
         PTable indexTable = table;
         // Transforming index table can be repaired in regular path via globalindexchecker coproc on it.
@@ -1180,25 +1172,22 @@ public class ScanUtil {
             scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
             scan.setAttribute(BaseScannerRegionObserver.READ_REPAIR_TRANSFORMING_TABLE, TRUE_BYTES);
         } else {
-            if (table.getType() != PTableType.CDC && (table.getType() != PTableType.INDEX ||
-                    !IndexUtil.isGlobalIndex(indexTable))) {
+            if (table.getType() != PTableType.INDEX || !IndexUtil.isGlobalIndex(indexTable)) {
                 return;
             }
             if (table.isTransactional() && table.getIndexType() == IndexType.UNCOVERED_GLOBAL) {
                 return;
             }
-            PTable dataTable = ScanUtil.getDataTable(indexTable, phoenixConnection);
+            PTable dataTable = context.getCDCDataTableRef() != null ?
+                    context.getCDCDataTableRef().getTable() :
+                    ScanUtil.getDataTable(indexTable, phoenixConnection);
             if (dataTable == null) {
                 // This index table must be being deleted. No need to set the scan attributes
                 return;
             }
             // MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child
             // view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work
-            if (table.getType() == PTableType.CDC) {
-                indexTable = PhoenixRuntime.getTable(phoenixConnection,
-                        CDCUtil.getCDCIndexName(table.getName().getString()));
-            }
-            else if (indexTable.getViewIndexId() != null &&
+            if (indexTable.getViewIndexId() != null &&
                     indexTable.getName().getString().contains(
                             QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
                 int lastIndexOf = indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
@@ -1302,7 +1291,7 @@ public class ScanUtil {
     public static void setScanAttributesForClient(Scan scan, PTable table,
                                                   StatementContext context) throws SQLException {
         PhoenixConnection phoenixConnection = context.getConnection();
-        setScanAttributesForIndexReadRepair(scan, table, phoenixConnection);
+        setScanAttributesForIndexReadRepair(scan, table, phoenixConnection, context);
         setScanAttributesForPhoenixTTL(scan, table, phoenixConnection);
         byte[] emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME);
         byte[] emptyCQ = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME);
@@ -1320,112 +1309,10 @@ public class ScanUtil {
 
         setScanAttributeForPaging(scan, phoenixConnection);
 
-        if (table.getType() == PTableType.CDC) {
-            PTable dataTable = PhoenixRuntime.getTable(phoenixConnection,
-                    SchemaUtil.getTableName(table.getSchemaName().getString(),
-                            table.getParentTableName().getString()));
-            scan.setAttribute(CDC_DATA_TABLE_NAME,
-                    table.getParentName().getBytes());
-
-            PColumn cdcJsonCol = table.getColumnForColumnName(CDC_JSON_COL_NAME);
-            scan.setAttribute(CDC_JSON_COL_QUALIFIER, cdcJsonCol.getColumnQualifierBytes());
-            scan.setAttribute(CDC_INCLUDE_SCOPES,
-                    context.getEncodedCdcIncludeScopes().getBytes(StandardCharsets.UTF_8));
+        if (context.getCDCTableRef() != null) {
+            scan.setAttribute(CDC_DATA_TABLE_DEF, CDCTableInfo.toProto(context).toByteArray());
             CDCUtil.initForRawScan(scan);
-            List<PColumn> columns = dataTable.getColumns();
-            Map<byte[], String> dataColQualNameMap = new HashMap<>(columns.size());
-            Map<byte[], PDataType> dataColTypeMap = new HashMap<>();
-            for (PColumn col: columns) {
-                if (col.getColumnQualifierBytes() != null) {
-                    dataColQualNameMap.put(col.getColumnQualifierBytes(), col.getName().getString());
-                    dataColTypeMap.put(col.getColumnQualifierBytes(), col.getDataType());
-                }
-            }
-            scan.setAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP,
-                    serializeColumnQualifierToNameMap(dataColQualNameMap));
-            scan.setAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP,
-                    serializeColumnQualifierToTypeMap(dataColTypeMap));
-        }
-    }
-
-    public static byte[] serializeColumnQualifierToNameMap(Map<byte[], String> colQualNameMap) {
-        ByteArrayOutputStream stream = new ByteArrayOutputStream();
-        DataOutputStream output = new DataOutputStream(stream);
-        try {
-            output.writeInt(colQualNameMap.size());
-            for (Map.Entry<byte[], String> entry: colQualNameMap.entrySet()) {
-                output.writeInt(entry.getKey().length);
-                output.write(entry.getKey());
-                WritableUtils.writeString(output, entry.getValue());
-            }
-            return stream.toByteArray();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static Map<ImmutableBytesPtr, String> deserializeColumnQualifierToNameMap(
-            byte[] mapBytes) {
-        ByteArrayInputStream stream = new ByteArrayInputStream(mapBytes);
-        DataInputStream input = new DataInputStream(stream);
-        try {
-            Map<ImmutableBytesPtr, String> colQualNameMap = new HashMap<>();
-            int size = input.readInt();
-            for (int i = 0; i < size; ++i) {
-                int qualLength = input.readInt();
-                byte[] qualBytes = new byte[qualLength];
-                int bytesRead = input.read(qualBytes);
-                if (bytesRead != qualLength) {
-                    throw new IOException("Expected number of bytes: " + qualLength + " but got " +
-                            "only: " + bytesRead);
-                }
-                String colName = WritableUtils.readString(input);
-                colQualNameMap.put(new ImmutableBytesPtr(qualBytes), colName);
-            }
-            return colQualNameMap;
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static byte[] serializeColumnQualifierToTypeMap(
-            Map<byte[], PDataType> pkColNamesAndTypes) {
-        ByteArrayOutputStream stream = new ByteArrayOutputStream();
-        DataOutputStream output = new DataOutputStream(stream);
-        try {
-            output.writeInt(pkColNamesAndTypes.size());
-            for (Map.Entry<byte[], PDataType> entry: pkColNamesAndTypes.entrySet()) {
-                output.writeInt(entry.getKey().length);
-                output.write(entry.getKey());
-                WritableUtils.writeString(output, entry.getValue().getSqlTypeName());
-            }
-            return stream.toByteArray();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static Map<ImmutableBytesPtr, PDataType> deserializeColumnQualifierToTypeMap(
-            byte[] pkColInfoBytes) {
-        ByteArrayInputStream stream = new ByteArrayInputStream(pkColInfoBytes);
-        DataInputStream input = new DataInputStream(stream);
-        try {
-            Map<ImmutableBytesPtr, PDataType> colQualTypeMap = new HashMap<>();
-            int colCnt = input.readInt();
-            for (int i = 0; i < colCnt; ++i) {
-                int qualLength = input.readInt();
-                byte[] qualBytes = new byte[qualLength];
-                int bytesRead = input.read(qualBytes);
-                if (bytesRead != qualLength) {
-                    throw new IOException("Expected number of bytes: " + qualLength + " but got " +
-                            "only: " + bytesRead);
-                }
-                colQualTypeMap.put(new ImmutableBytesPtr(qualBytes),
-                        PDataType.fromSqlTypeName(WritableUtils.readString(input)));
-            }
-            return colQualTypeMap;
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+            GlobalIndexRegionScanner.adjustScanFilter(scan);
         }
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java b/phoenix-core/src/main/protobuf/CDCInfo.proto
similarity index 54%
copy from phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java
copy to phoenix-core/src/main/protobuf/CDCInfo.proto
index 6c2679b18b..92f2c29333 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java
+++ b/phoenix-core/src/main/protobuf/CDCInfo.proto
@@ -15,21 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.phoenix.parse;
 
-import java.util.Collections;
-import java.util.List;
+option java_package = "org.apache.phoenix.coprocessor.generated";
+option java_outer_classname = "CDCInfoProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
 
-/**
- * 
- * Abstract node for expressions that have no children
- *
- * 
- * @since 0.1
- */
-public abstract class TerminalParseNode extends ParseNode {
-    @Override
-    public final List<ParseNode> getChildren() {
-        return Collections.emptyList();
-    }
+message CDCColumnDef {
+  required string columnName = 1;
+  required bytes familyNameBytes = 2;
+  required bytes columnQualifierBytes = 3;
+  required string dataType = 4;
+}
+
+message CDCTableDef {
+  optional bytes defaultFamilyName = 1;
+  optional string cdcIncludeScopes = 2;
+  required bytes qualifierEncodingScheme = 3;
+  repeated CDCColumnDef columns = 4;
+  required bytes cdcJsonColQualBytes = 5;
+  optional bytes dataTableProjectorBytes = 6;
 }