You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2023/12/23 16:29:24 UTC

(phoenix) branch PHOENIX-7001-feature updated: PHOENIX-7014: Query compiler/optimizer changes along with some PHOENIX-7015 changes (#1766)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch PHOENIX-7001-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this push:
     new 7420443d84 PHOENIX-7014: Query compiler/optimizer changes along with some PHOENIX-7015 changes (#1766)
7420443d84 is described below

commit 7420443d84ce6786e2594fd18f5f02c7b742197f
Author: Hari Krishna Dara <ha...@gmail.com>
AuthorDate: Sat Dec 23 21:59:19 2023 +0530

    PHOENIX-7014: Query compiler/optimizer changes along with some PHOENIX-7015 changes (#1766)
---
 phoenix-core/pom.xml                               |   5 +
 .../java/org/apache/phoenix/end2end/CDCMiscIT.java | 148 +++++++++++++-
 .../index/UncoveredGlobalIndexRegionScannerIT.java |   8 +-
 .../apache/phoenix/compile/ProjectionCompiler.java |   2 +-
 .../org/apache/phoenix/compile/QueryCompiler.java  |  12 ++
 .../apache/phoenix/compile/StatementContext.java   |  10 +-
 .../phoenix/compile/TupleProjectionCompiler.java   |  18 +-
 .../coprocessor/BaseScannerRegionObserver.java     |   7 +-
 .../coprocessor/CDCGlobalIndexRegionScanner.java   | 226 +++++++++++++++++++++
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |   5 +-
 .../coprocessor/UncoveredIndexRegionScanner.java   |  29 ++-
 .../org/apache/phoenix/execute/BaseQueryPlan.java  |  30 ++-
 .../phoenix/iterate/RegionScannerFactory.java      |  50 +++--
 .../phoenix/iterate/TableResultIterator.java       |   2 +-
 .../apache/phoenix/optimize/QueryOptimizer.java    |  18 +-
 .../java/org/apache/phoenix/parse/HintNode.java    | 177 ++++++++--------
 .../java/org/apache/phoenix/schema/ColumnRef.java  |   3 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  |  11 +-
 .../java/org/apache/phoenix/schema/PTableImpl.java |   4 +-
 .../main/java/org/apache/phoenix/util/CDCUtil.java |  30 +++
 .../apache/phoenix/util/EncodedColumnsUtil.java    |   8 +-
 .../java/org/apache/phoenix/util/ScanUtil.java     | 137 ++++++++++++-
 phoenix-pherf/pom.xml                              |   2 -
 pom.xml                                            |   6 +
 24 files changed, 783 insertions(+), 165 deletions(-)

diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 7827a065f0..b1534225ca 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -249,6 +249,11 @@
   </build>
 
   <dependencies>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+
     <!-- shaded thirdparty dependencies -->
     <dependency>
       <groupId>org.apache.phoenix.thirdparty</groupId>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
index 7e081419e2..d1f04c02cc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
@@ -17,7 +17,10 @@
  */
 package org.apache.phoenix.end2end;
 
+import com.google.gson.Gson;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableProperty;
@@ -27,13 +30,16 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import javax.xml.transform.Result;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
@@ -60,7 +66,8 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
         }
     }
 
-    private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes)
+    private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes,
+                              String datatableName)
             throws SQLException {
         Properties props = new Properties();
         Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -68,6 +75,9 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
         assertEquals(expIncludeScopes, table.getCDCIncludeScopes());
         assertEquals(expIncludeScopes, TableProperty.INCLUDE.getPTableValue(table));
         assertNull(table.getIndexState()); // Index state should be null for CDC.
+        assertNull(table.getIndexType()); // This is not an index.
+        assertEquals(datatableName, table.getParentName().getString());
+        assertEquals(CDCUtil.getCDCIndexName(cdcName), table.getPhysicalName().getString());
     }
 
     private void assertSaltBuckets(String cdcName, Integer nbuckets) throws SQLException {
@@ -117,7 +127,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
         try {
             conn.createStatement().execute("CREATE CDC " + cdcName
                     + " ON " + tableName +"(ROUND(v1))");
-            fail("Expected to fail due to non-timestamp expression in the index PK");
+            fail("Expected to fail due to non-date expression in the index PK");
         } catch (SQLException e) {
             assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
                     e.getErrorCode());
@@ -126,7 +136,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
         try {
             conn.createStatement().execute("CREATE CDC " + cdcName
                     + " ON " + tableName +"(v1)");
-            fail("Expected to fail due to non-timestamp column in the index PK");
+            fail("Expected to fail due to non-date column in the index PK");
         } catch (SQLException e) {
             assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
                     e.getErrorCode());
@@ -152,13 +162,13 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
                 "(v2) INCLUDE (pre, post) INDEX_TYPE=g");
         assertCDCState(conn, cdcName, "PRE,POST", 3);
         assertPTable(cdcName, new HashSet<>(
-                Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)));
+                Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), tableName);
 
         cdcName = generateUniqueName();
         conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
                 "(v2) INDEX_TYPE=l");
         assertCDCState(conn, cdcName, null, 2);
-        assertPTable(cdcName, null);
+        assertPTable(cdcName, null, tableName);
 
         String viewName = generateUniqueName();
         conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " +
@@ -209,7 +219,6 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
         assertEquals("K", cdcPkColumns.get(2).getName().getString());
     }
 
-    @Test
     public void testDropCDC () throws SQLException {
         Properties props = new Properties();
         Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -219,11 +228,6 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
                         + " v2 DATE)");
         String cdcName = generateUniqueName();
 
-        String cdc_sql = "CREATE CDC " + cdcName
-                + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
-        conn.createStatement().execute(cdc_sql);
-        assertCDCState(conn, cdcName, null, 3);
-
         String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
         conn.createStatement().execute(drop_cdc_sql);
 
@@ -269,4 +273,126 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
         }
     }
 
+    private void assertResultSet(ResultSet rs) throws Exception{
+        Gson gson = new Gson();
+        assertEquals(true, rs.next());
+        assertEquals(1, rs.getInt(2));
+        assertEquals(new HashMap(){{put("V1", 100d);}}, gson.fromJson(rs.getString(3),
+                HashMap.class));
+        assertEquals(true, rs.next());
+        assertEquals(2, rs.getInt(2));
+        assertEquals(new HashMap(){{put("V1", 200d);}}, gson.fromJson(rs.getString(3),
+                HashMap.class));
+        assertEquals(true, rs.next());
+        assertEquals(1, rs.getInt(2));
+        assertEquals(new HashMap(){{put("V1", 101d);}}, gson.fromJson(rs.getString(3),
+                HashMap.class));
+        assertEquals(false, rs.next());
+        rs.close();
+    }
+
+    @Test
+    public void testSelectCDC() throws Exception {
+        Properties props = new Properties();
+        props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
+        props.put("hbase.client.scanner.timeout.period", "6000000");
+        props.put("phoenix.query.timeoutMs", "6000000");
+        props.put("zookeeper.session.timeout", "6000000");
+        props.put("hbase.rpc.timeout", "6000000");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
+        conn.commit();
+        Thread.sleep(1000);
+        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
+        conn.commit();
+        String cdcName = generateUniqueName();
+        String cdc_sql = "CREATE CDC " + cdcName
+                + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
+        conn.createStatement().execute(cdc_sql);
+        assertCDCState(conn, cdcName, null, 3);
+        // NOTE: To debug the query execution, add the below condition where you need a breakpoint.
+        //      if (<table>.getTableName().getString().equals("N000002") ||
+        //                 <table>.getTableName().getString().equals("__CDC__N000002")) {
+        //          "".isEmpty();
+        //      }
+        assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName));
+        assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName +
+                " WHERE PHOENIX_ROW_TIMESTAMP() < NOW()"));
+        assertResultSet(conn.createStatement().executeQuery("SELECT /*+ INCLUDE(PRE, POST) */ * " +
+                "FROM " + cdcName));
+        assertResultSet(conn.createStatement().executeQuery("SELECT " +
+                "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcName));
+
+        HashMap<String, int[]> testQueries = new HashMap<String, int[]>() {{
+            put("SELECT 'dummy', k FROM " + cdcName, new int [] {2, 1});
+            put("SELECT * FROM " + cdcName +
+                    " ORDER BY k ASC", new int [] {1, 1, 2});
+            put("SELECT * FROM " + cdcName +
+                    " ORDER BY k DESC", new int [] {2, 1, 1});
+            put("SELECT * FROM " + cdcName +
+                    " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC", new int [] {1, 2, 1});
+        }};
+        for (Map.Entry<String, int[]> testQuery: testQueries.entrySet()) {
+            try (ResultSet rs = conn.createStatement().executeQuery(testQuery.getKey())) {
+                for (int k:  testQuery.getValue()) {
+                    assertEquals(true, rs.next());
+                    assertEquals(k, rs.getInt(2));
+                }
+                assertEquals(false, rs.next());
+            }
+        }
+
+        try (ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT * FROM " + cdcName + " WHERE PHOENIX_ROW_TIMESTAMP() > NOW()")) {
+            assertEquals(false, rs.next());
+        }
+        try (ResultSet rs = conn.createStatement().executeQuery("SELECT 'abc' FROM " + cdcName)) {
+            assertEquals(true, rs.next());
+            assertEquals("abc", rs.getString(1));
+            assertEquals(true, rs.next());
+            assertEquals("abc", rs.getString(1));
+            assertEquals(false, rs.next());
+        }
+    }
+
+    // Temporary test case used as a reference for debugging and comparing against the CDC query.
+    @Test
+    public void testSelectUncoveredIndex() throws Exception {
+        Properties props = new Properties();
+        props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
+        props.put("hbase.client.scanner.timeout.period", "6000000");
+        props.put("phoenix.query.timeoutMs", "6000000");
+        props.put("zookeeper.session.timeout", "6000000");
+        props.put("hbase.rpc.timeout", "6000000");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " (k INTEGER PRIMARY KEY, v1 INTEGER)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" +
+                " (1, 100)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" +
+                " (2, 200)");
+        conn.commit();
+        String indexName = generateUniqueName();
+        String index_sql = "CREATE UNCOVERED INDEX " + indexName
+                + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
+        conn.createStatement().execute(index_sql);
+        //ResultSet rs =
+        //        conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName +
+        //                " " + indexName + ") */ * FROM " + tableName);
+        ResultSet rs =
+                conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName +
+                        " " + indexName + ") */ K, V1, PHOENIX_ROW_TIMESTAMP() FROM " + tableName);
+        assertEquals(true, rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals(100, rs.getInt(2));
+        assertEquals(true, rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals(200, rs.getInt(2));
+        assertEquals(false, rs.next());
+    }
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java
index e29284a951..aae8e48108 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java
@@ -175,7 +175,7 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest {
             String timeZoneID = Calendar.getInstance().getTimeZone().getID();
             // Write a query to get the val2 = 'bc' with a time range query
             String query = "SELECT"+ (uncovered ? " " : "/*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ ")
-                    + "val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName
+                    + "val1, val2, PHOENIX_ROW_TIMESTAMP(), val3 from " + dataTableName
                     + " WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('"
                     + before.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '"
                     + timeZoneID + "') AND " + "PHOENIX_ROW_TIMESTAMP() < TO_DATE('" + after
@@ -186,8 +186,10 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest {
             assertTrue(rs.next());
             assertEquals("bc", rs.getString(1));
             assertEquals("bcd", rs.getString(2));
+            assertEquals("bcd", rs.getString(2));
             assertTrue(rs.getTimestamp(3).after(before));
             assertTrue(rs.getTimestamp(3).before(after));
+            assertEquals("bcde", rs.getString(4));
             assertFalse(rs.next());
             // Count the number of index rows
             rs = conn.createStatement().executeQuery("SELECT COUNT(*) from " + indexTableName);
@@ -206,10 +208,11 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest {
             assertEquals("bcd", rs.getString(2));
             assertTrue(rs.getTimestamp(3).after(before));
             assertTrue(rs.getTimestamp(3).before(after));
+            assertEquals("bcde", rs.getString(4));
             assertFalse(rs.next());
             // Write a time range query to get the last row with val2 ='bc'
             query = "SELECT"+ (uncovered ? " " : "/*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ ")
-                    +"val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName +
+                    +"val1, val2, PHOENIX_ROW_TIMESTAMP(), val3 from " + dataTableName +
                     " WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after
                     + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
             // Verify that we will read from the index table
@@ -219,6 +222,7 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest {
             assertEquals("bc", rs.getString(1));
             assertEquals("ccc", rs.getString(2));
             assertTrue(rs.getTimestamp(3).after(after));
+            assertEquals("cccc", rs.getString(4));
             assertFalse(rs.next());
             // Verify that we can execute the same query without using the index
             String noIndexQuery = "SELECT /*+ NO_INDEX */ val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE val1 = 'bc' AND " +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index 9e6b90cded..8dc6678f24 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -237,7 +237,7 @@ public class ProjectionCompiler {
             ColumnRef ref = null;
             try {
                 indexColumn = index.getColumnForColumnName(indexColName);
-                //TODO could should we do this more efficiently than catching the expcetion ?
+                // TODO: Should we do this more efficiently than catching the exception ?
             } catch (ColumnNotFoundException e) {
                 if (IndexUtil.shouldIndexBeUsedForUncoveredQuery(tableRef)) {
                     //Projected columns have the same name as in the data table
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 41459dc133..9981e31219 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -28,6 +28,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
@@ -82,6 +85,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ParseNodeUtil;
 import org.apache.phoenix.util.ParseNodeUtil.RewriteResult;
@@ -698,6 +702,14 @@ public class QueryCompiler {
             if (projectedTable != null) {
                 context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), select.getUdfParseNodes()));
             }
+
+            if (context.getCurrentTable().getTable().getType() == PTableType.CDC) {
+                // This will get the data column added to the context so that projection can get
+                // serialized..
+                context.getDataColumnPosition(
+                        context.getCurrentTable().getTable().getColumnForColumnName(
+                                QueryConstants.CDC_JSON_COL_NAME));
+            }
         }
         
         ColumnResolver resolver = context.getResolver();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index f795254a36..c12cd62d56 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TimeZone;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -46,6 +45,7 @@ import org.apache.phoenix.schema.types.PDate;
 import org.apache.phoenix.schema.types.PTime;
 import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.NumberUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -84,6 +84,7 @@ public class StatementContext {
     private QueryLogger queryLogger;
     private boolean isClientSideUpsertSelect;
     private boolean isUncoveredIndex;
+    private String cdcIncludeScopes;
     
     public StatementContext(PhoenixStatement statement) {
         this(statement, new Scan());
@@ -378,4 +379,11 @@ public class StatementContext {
             return retrying;
         }
     }
+    public String getEncodedCdcIncludeScopes() {
+        return cdcIncludeScopes;
+    }
+
+    public void setCDCIncludeScopes(Set<PTable.CDCChangeScope> cdcIncludeScopes) {
+        this.cdcIncludeScopes = CDCUtil.makeChangeScopeStringFromEnums(cdcIncludeScopes);
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index 99392f6d29..faa940a5e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -16,12 +16,15 @@
  * limitations under the License.
  */
 package org.apache.phoenix.compile;
+import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
+import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -61,18 +64,21 @@ import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 
 public class TupleProjectionCompiler {
     public static final PName PROJECTED_TABLE_SCHEMA = PNameFactory.newName(".");
+    public static final EnumSet<PTableType> PROJECTED_TABLE_TYPES = EnumSet.of(PTableType.TABLE,
+            PTableType.INDEX, PTableType.VIEW, PTableType.CDC);
     private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();
     
     public static PTable createProjectedTable(SelectStatement select, StatementContext context) throws SQLException {
         Preconditions.checkArgument(!select.isJoin());
         // Non-group-by or group-by aggregations will create its own projected result.
-        if (select.getInnerSelectStatement() != null 
+        if (select.getInnerSelectStatement() != null
                 || select.getFrom() == null
                 || select.isAggregate() 
                 || select.isDistinct()
-                || (context.getResolver().getTables().get(0).getTable().getType() != PTableType.TABLE
-                && context.getResolver().getTables().get(0).getTable().getType() != PTableType.INDEX && context.getResolver().getTables().get(0).getTable().getType() != PTableType.VIEW))
+                || ! PROJECTED_TABLE_TYPES.contains(
+                        context.getResolver().getTables().get(0).getTable().getType())) {
             return null;
+        }
         
         List<PColumn> projectedColumns = new ArrayList<PColumn>();
         boolean isWildcard = false;
@@ -86,7 +92,7 @@ public class TupleProjectionCompiler {
             if (node instanceof WildcardParseNode) {
                 if (((WildcardParseNode) node).isRewrite()) {
                     TableRef parentTableRef = FromCompiler.getResolver(
-                            NODE_FACTORY.namedTable(null, TableName.create(table.getSchemaName().getString(), 
+                            NODE_FACTORY.namedTable(null, TableName.create(table.getSchemaName().getString(),
                                     table.getParentTableName().getString())), context.getConnection()).resolveTable(
                             table.getSchemaName().getString(),
                             table.getParentTableName().getString());
@@ -162,8 +168,8 @@ public class TupleProjectionCompiler {
         // add IndexUncoveredDataColumnRef
         position = projectedColumns.size() + (hasSaltingColumn ? 1 : 0);
         for (IndexUncoveredDataColumnRef sourceColumnRef : visitor.indexColumnRefSet) {
-            PColumn column = new ProjectedColumn(sourceColumnRef.getColumn().getName(), 
-                    sourceColumnRef.getColumn().getFamilyName(), position++, 
+            PColumn column = new ProjectedColumn(sourceColumnRef.getColumn().getName(),
+                    sourceColumnRef.getColumn().getFamilyName(), position++,
                     sourceColumnRef.getColumn().isNullable(), sourceColumnRef, sourceColumnRef.getColumn().getColumnQualifierBytes());
             projectedColumns.add(column);
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 7493acceac..d76046d3b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -150,7 +150,12 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
     public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName";
     public static final String INDEX_ROW_KEY = "_IndexRowKey";
     public static final String READ_REPAIR_TRANSFORMING_TABLE = "_ReadRepairTransformingTable";
-    
+    public static final String CDC_DATA_TABLE_NAME = "_CdcDataTableName";
+    public static final String CDC_JSON_COL_QUALIFIER = "_CdcJsonColumn_Qualifier";
+    public static final String CDC_INCLUDE_SCOPES = "_CdcIncludeScopes";
+    public static final String DATA_COL_QUALIFIER_TO_NAME_MAP = "_DataColQualToNameMap";
+    public static final String DATA_COL_QUALIFIER_TO_TYPE_MAP = "_DataColQualToTypeMap";
+
     public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
     public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);
     // In case of Index Write failure, we need to determine that Index mutation
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
new file mode 100644
index 0000000000..178bb1d705
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_NAME_MAP;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_TYPE_MAP;
+
+public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScanner {
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(CDCGlobalIndexRegionScanner.class);
+
+    private Map<ImmutableBytesPtr, String> dataColQualNameMap;
+    private Map<ImmutableBytesPtr, PDataType> dataColQualTypeMap;
+    // Map<dataRowKey: Map<TS: Map<qualifier: Cell>>>
+    private Map<ImmutableBytesPtr, Map<Long, Map<ImmutableBytesPtr, Cell>>> dataRowChanges =
+            new HashMap<>();
+
+    public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner,
+                                       final Region region,
+                                       final Scan scan,
+                                       final RegionCoprocessorEnvironment env,
+                                       final Scan dataTableScan,
+                                       final TupleProjector tupleProjector,
+                                       final IndexMaintainer indexMaintainer,
+                                       final byte[][] viewConstants,
+                                       final ImmutableBytesWritable ptr,
+                                       final long pageSizeMs,
+                                       final long queryLimit) throws IOException {
+        super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer,
+                viewConstants, ptr, pageSizeMs, queryLimit);
+        CDCUtil.initForRawScan(dataTableScan);
+        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
+                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
+        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
+                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+    }
+
+    @Override
+    protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException {
+        return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
+    }
+
+    protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException {
+        if (indexRowIterator.hasNext()) {
+            List<Cell> indexRow = indexRowIterator.next();
+            for (Cell c: indexRow) {
+                if (c.getType() == Cell.Type.Put) {
+                    result.add(c);
+                }
+            }
+            try {
+                Result dataRow = null;
+                if (! result.isEmpty()) {
+                    Cell firstCell = result.get(0);
+                    byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                            firstCell.getRowOffset(), firstCell.getRowLength())
+                            .copyBytesIfNecessary();
+                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                            indexToDataRowKeyMap.get(indexRowKey));
+                    dataRow = dataRows.get(dataRowKey);
+                    Long indexRowTs = result.get(0).getTimestamp();
+                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get(
+                            dataRowKey);
+                    if (changeTimeline == null) {
+                        List<Cell> resultCells = Arrays.asList(dataRow.rawCells());
+                        Collections.sort(resultCells, CellComparator.getInstance().reversed());
+                        List<Cell> deleteMarkers = new ArrayList<>();
+                        List<List<Cell>> columns = new LinkedList<>();
+                        Cell currentColumnCell = null;
+                        Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo(
+                                EncodedColumnsUtil.getQualifierEncodingScheme(scan));
+                        List<Cell> currentColumn = null;
+                        Set<Long> uniqueTimeStamps = new HashSet<>();
+                        // TODO: From CompactionScanner.formColumns(), see if this can be refactored.
+                        for (Cell cell : resultCells) {
+                            uniqueTimeStamps.add(cell.getTimestamp());
+                            if (cell.getType() != Cell.Type.Put) {
+                                deleteMarkers.add(cell);
+                            }
+                            if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                                    emptyKV.getFirst())) {
+                                continue;
+                            }
+                            if (currentColumnCell == null) {
+                                currentColumn = new LinkedList<>();
+                                currentColumnCell = cell;
+                                currentColumn.add(cell);
+                            } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) {
+                                columns.add(currentColumn);
+                                currentColumn = new LinkedList<>();
+                                currentColumnCell = cell;
+                                currentColumn.add(cell);
+                            } else {
+                                currentColumn.add(cell);
+                            }
+                        }
+                        if (currentColumn != null) {
+                            columns.add(currentColumn);
+                        }
+                        List<Long> sortedTimestamps = uniqueTimeStamps.stream().sorted().collect(
+                                Collectors.toList());
+                        // FIXME: Does this need to be Concurrent?
+                        Map<ImmutableBytesPtr, Cell> rollingRow = new HashMap<>();
+                        int[] columnPointers = new int[columns.size()];
+                        changeTimeline = new TreeMap<>();
+                        dataRowChanges.put(dataRowKey, changeTimeline);
+                        for (Long ts : sortedTimestamps) {
+                            for (int i = 0; i < columns.size(); ++i) {
+                                Cell cell = columns.get(i).get(columnPointers[i]);
+                                if (cell.getTimestamp() == ts) {
+                                    rollingRow.put(new ImmutableBytesPtr(
+                                                    cell.getQualifierArray(),
+                                                    cell.getQualifierOffset(),
+                                                    cell.getQualifierLength()),
+                                            cell);
+                                    ++columnPointers[i];
+                                }
+                            }
+                            Map<ImmutableBytesPtr, Cell> rowOfCells = new HashMap();
+                            rowOfCells.putAll(rollingRow);
+                            changeTimeline.put(ts, rowOfCells);
+                        }
+                    }
+
+                    Map<ImmutableBytesPtr, Cell> mapOfCells = changeTimeline.get(indexRowTs);
+                    if (mapOfCells != null) {
+                        Map <String, Object> rowValueMap = new HashMap<>(mapOfCells.size());
+                        for (Map.Entry<ImmutableBytesPtr, Cell> entry: mapOfCells.entrySet()) {
+                            String colName = dataColQualNameMap.get(entry.getKey());
+                            Object colVal = dataColQualTypeMap.get(entry.getKey()).toObject(
+                                    entry.getValue().getValueArray());
+                            rowValueMap.put(colName, colVal);
+                        }
+                        byte[] value =
+                                new Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+                        CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+                        ImmutableBytesPtr family = new ImmutableBytesPtr(firstCell.getFamilyArray(),
+                                firstCell.getFamilyOffset(), firstCell.getFamilyLength());
+                        dataRow = Result.create(Arrays.asList(builder.
+                                setRow(dataRowKey.copyBytesIfNecessary()).
+                                setFamily(family.copyBytesIfNecessary()).
+                                setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))).
+                                setTimestamp(firstCell.getTimestamp()).
+                                setValue(value).
+                                setType(Cell.Type.Put).
+                                build()));
+                    }
+                }
+                if (dataRow != null && tupleProjector != null) {
+                    IndexUtil.addTupleAsOneCell(result, new ResultTuple(dataRow),
+                            tupleProjector, ptr);
+                }
+                else {
+                    result.clear();
+                }
+                return true;
+            } catch (Throwable e) {
+                LOGGER.error("Exception in UncoveredIndexRegionScanner for region "
+                        + region.getRegionInfo().getRegionNameAsString(), e);
+                throw e;
+            }
+        }
+        return false;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index f4a0dcb7d0..efdf19a364 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -86,6 +86,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_BYTES;
 import static org.apache.phoenix.query.QueryConstants.VIEW_MODIFIED_PROPERTY_TAG_TYPE;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
+import static org.apache.phoenix.schema.PTableType.CDC;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
@@ -1465,8 +1466,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
         List<PTable> indexes = Lists.newArrayList();
         List<PName> physicalTables = Lists.newArrayList();
-        PName parentTableName = tableType == INDEX ? dataTableName : null;
-        PName parentSchemaName = tableType == INDEX ? schemaName : null;
+        PName parentTableName = tableType == INDEX || tableType == CDC ? dataTableName : null;
+        PName parentSchemaName = tableType == INDEX || tableType == CDC ? schemaName : null;
         PName parentLogicalName = null;
         EncodedCQCounter cqCounter = null;
         if (oldTable != null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index 438c014ca3..d010c33dff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -51,14 +51,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER;
 import static org.apache.phoenix.query.QueryServices.INDEX_PAGE_SIZE_IN_ROWS;
 import static org.apache.phoenix.util.ScanUtil.getDummyResult;
 import static org.apache.phoenix.util.ScanUtil.isDummy;
@@ -66,6 +72,7 @@ import static org.apache.phoenix.util.ScanUtil.isDummy;
 public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
     private static final Logger LOGGER =
             LoggerFactory.getLogger(UncoveredIndexRegionScanner.class);
+
     /**
      * The states of the processing a page of index rows
      */
@@ -170,6 +177,11 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
     protected abstract void scanDataTableRows(long startTime) throws IOException;
 
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException {
+        return prepareDataTableScan(dataRowKeys, false);
+    }
+
+    protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys,
+                                            boolean includeMultipleVersions) throws IOException {
         List<KeyRange> keys = new ArrayList<>(dataRowKeys.size());
         for (byte[] dataRowKey : dataRowKeys) {
             // If the data table scan was interrupted because of paging we retry the scan
@@ -185,7 +197,7 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
             dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
             scanRanges.initializeScan(dataScan);
             SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
-            dataScan.setFilter(new SkipScanFilter(skipScanFilter, false));
+            dataScan.setFilter(new SkipScanFilter(skipScanFilter, includeMultipleVersions));
             dataScan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS,
                     Bytes.toBytes(Long.valueOf(pageSizeMs)));
             return dataScan;
@@ -244,9 +256,15 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
                 }
                 Cell firstCell = row.get(0);
                 byte[] indexRowKey = firstCell.getRowArray();
-                ptr.set(indexRowKey, firstCell.getRowOffset() + offset,
-                        firstCell.getRowLength() - offset);
-                lastIndexRowKey = ptr.copyBytes();
+                // Avoid unnecessary byte copy and garbage when the row key is what we need.
+                if (firstCell.getRowOffset() + offset == 0 && firstCell.getRowLength() - offset == indexRowKey.length) {
+                    lastIndexRowKey = indexRowKey;
+                }
+                else {
+                    ptr.set(indexRowKey, firstCell.getRowOffset() + offset,
+                            firstCell.getRowLength() - offset);
+                    lastIndexRowKey = ptr.copyBytes();
+                }
                 indexToDataRowKeyMap.put(offset == 0 ? lastIndexRowKey :
                                 CellUtil.cloneRow(firstCell), indexMaintainer.buildDataRowKey(
                                         new ImmutableBytesWritable(lastIndexRowKey),
@@ -300,7 +318,7 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
         return false;
     }
 
-    private boolean getNextCoveredIndexRow(List<Cell> result) throws IOException {
+    protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
             result.addAll(indexRow);
@@ -308,6 +326,7 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
                 byte[] indexRowKey = CellUtil.cloneRow(indexRow.get(0));
                 Result dataRow = dataRows.get(new ImmutableBytesPtr(
                         indexToDataRowKeyMap.get(indexRowKey)));
+
                 if (dataRow != null) {
                     long ts = indexRow.get(0).getTimestamp();
                     if (!indexMaintainer.isUncovered()
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index ded8f17c67..ca0259203a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -22,6 +22,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.sql.ParameterMetaData;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -79,6 +80,7 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
 import org.apache.phoenix.trace.TracingIterator;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
@@ -314,30 +316,38 @@ public abstract class BaseQueryPlan implements QueryPlan {
         ScanUtil.setCustomAnnotations(scan,
                 customAnnotations == null ? null : customAnnotations.getBytes());
         // Set index related scan attributes.
-        if (table.getType() == PTableType.INDEX) {
+        if (table.getType() == PTableType.INDEX || table.getType() == PTableType.CDC) {
             if (table.getIndexType() == IndexType.LOCAL) {
                 ScanUtil.setLocalIndex(scan);
             } else if (context.isUncoveredIndex()) {
                 ScanUtil.setUncoveredGlobalIndex(scan);
             }
 
+            PTable dataTable = null;
             Set<PColumn> dataColumns = context.getDataColumns();
             // If any data columns to join back from data table are present then we set following attributes
             // 1. data columns to be projected and their key value schema.
-            // 2. index maintainer and view constants if exists to build data row key from index row key. 
+            // 2. index maintainer and view constants if exists to build data row key from index row key.
             // TODO: can have an hint to skip joining back to data table, in that case if any column to
             // project is not present in the index then we need to skip this plan.
             if (!dataColumns.isEmpty()) {
                 // Set data columns to be join back from data table.
                 PTable parentTable = context.getCurrentTable().getTable();
                 String parentSchemaName = parentTable.getParentSchemaName().getString();
-                String parentTableName = parentTable.getParentTableName().getString();
-                final ParseNodeFactory FACTORY = new ParseNodeFactory();
-                TableRef dataTableRef =
-                        FromCompiler.getResolver(
-                            FACTORY.namedTable(null, TableName.create(parentSchemaName, parentTableName)),
-                            context.getConnection()).resolveTable(parentSchemaName, parentTableName);
-                PTable dataTable = dataTableRef.getTable();
+                if (parentTable.getType() == PTableType.CDC) {
+                    dataTable = parentTable;
+                }
+                else {
+                    String parentTableName = parentTable.getParentTableName().getString();
+                    final ParseNodeFactory FACTORY = new ParseNodeFactory();
+                    TableRef dataTableRef =
+                            FromCompiler.getResolver(
+                                    FACTORY.namedTable(null, TableName.create(parentSchemaName, parentTableName)),
+                                    context.getConnection()).resolveTable(parentSchemaName, parentTableName);
+                    dataTable = dataTableRef.getTable();
+                }
+            }
+            if (! dataColumns.isEmpty()) {
                 // Set data columns to be join back from data table.
                 serializeDataTableColumnsToJoin(scan, dataColumns, dataTable);
                 KeyValueSchema schema = ProjectedColumnExpression.buildSchema(dataColumns);
@@ -579,4 +589,4 @@ public abstract class BaseQueryPlan implements QueryPlan {
         ResultIterator iterator = iterator();
         iterator.close();
     }
-}
\ No newline at end of file
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 7877e71ca3..171108f87f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -18,10 +18,12 @@
 
 package org.apache.phoenix.iterate;
 
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME;
 import static org.apache.phoenix.coprocessor.ScanRegionObserver.WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.phoenix.coprocessor.CDCGlobalIndexRegionScanner;
 import org.apache.phoenix.coprocessor.UncoveredGlobalIndexRegionScanner;
 import org.apache.phoenix.coprocessor.UncoveredLocalIndexRegionScanner;
 import org.apache.phoenix.schema.KeyValueSchema;
@@ -133,8 +135,7 @@ public abstract class RegionScannerFactory {
       long extraLimit = -1;
 
       {
-          // for indexes construct the row filter for uncovered columns if it exists
-          if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) {
+        if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) {
               byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_FILTER);
               if (expBytes == null) {
                   // For older clients
@@ -166,22 +167,24 @@ public abstract class RegionScannerFactory {
               PTable.ImmutableStorageScheme storageScheme =
                           indexMaintainer.getIndexStorageScheme();
                   Scan dataTableScan = new Scan();
-                  if (dataColumns != null) {
-                    for (int i = 0; i < dataColumns.length; i++) {
-                      if (storageScheme ==
-                              PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
-                        dataTableScan.addFamily(dataColumns[i].getFamily());
-                      } else {
-                        dataTableScan.addColumn(dataColumns[i].getFamily(),
-                                dataColumns[i].getQualifier());
+                  if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
+                    if (dataColumns != null) {
+                      for (int i = 0; i < dataColumns.length; i++) {
+                        if (storageScheme ==
+                                PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                          dataTableScan.addFamily(dataColumns[i].getFamily());
+                        } else {
+                          dataTableScan.addColumn(dataColumns[i].getFamily(),
+                                  dataColumns[i].getQualifier());
+                        }
+                      }
+                    } else if (indexMaintainer.isUncovered()) {
+                      // Indexed columns and the columns in index where clause should also be added
+                      // to the data columns to scan for uncovered global indexes. This is required
+                      // to verify the index row against the data table row.
+                      for (ColumnReference column : indexMaintainer.getAllColumnsForDataTable()) {
+                        dataTableScan.addColumn(column.getFamily(), column.getQualifier());
                       }
-                    }
-                  } else if (indexMaintainer.isUncovered()) {
-                    // Indexed columns and the columns in index where clause should also be added
-                    // to the data columns to scan for uncovered global indexes. This is required
-                    // to verify the index row against the data table row.
-                    for (ColumnReference column : indexMaintainer.getAllColumnsForDataTable()) {
-                      dataTableScan.addColumn(column.getFamily(), column.getQualifier());
                     }
                   }
                   if (ScanUtil.isLocalIndex(scan)) {
@@ -189,9 +192,16 @@ public abstract class RegionScannerFactory {
                             dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
                             pageSizeMs, offset, actualStartKey, extraLimit);
                   } else {
-                    s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
-                            dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
-                            pageSizeMs, extraLimit);
+                    if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
+                      s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
+                              dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
+                              pageSizeMs, extraLimit);
+                    }
+                    else {
+                      s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
+                              dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
+                              pageSizeMs, extraLimit);
+                    }
                   }
               }
           }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 9087411c4f..a38273d97e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -152,7 +152,7 @@ public class TableResultIterator implements ResultIterator {
                 .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
         this.isMapReduceContext = isMapReduceContext;
         this.maxQueryEndTime = maxQueryEndTime;
-        ScanUtil.setScanAttributesForClient(scan, table, plan.getContext().getConnection());
+        ScanUtil.setScanAttributesForClient(scan, table, plan.getContext());
     }
 
     // Constructors without maxQueryEndTime to maintain API compatibility for phoenix-connectors
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 504ae47e91..5d81f0b585 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -19,6 +19,7 @@
 package org.apache.phoenix.optimize;
 
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -72,6 +73,7 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ParseNodeUtil;
 import org.apache.phoenix.util.ParseNodeUtil.RewriteResult;
@@ -216,6 +218,20 @@ public class QueryOptimizer {
                 && stopAtBestPlan && dataPlan.isApplicable()) {
             return Collections.<QueryPlan> singletonList(dataPlan);
         }
+
+        PTable table = dataPlan.getTableRef().getTable();
+        // TODO: Need to handle CDC hints.
+        if (table.getType() == PTableType.CDC) {
+            Set<PTable.CDCChangeScope> cdcIncludeScopes = table.getCDCIncludeScopes();
+            String cdcHint = select.getHint().getHint(Hint.INCLUDE);
+            if (cdcHint != null && cdcHint.startsWith(HintNode.PREFIX)) {
+                cdcIncludeScopes = CDCUtil.makeChangeScopeEnumsFromString(cdcHint.substring(1,
+                        cdcHint.length() - 1));
+            }
+            dataPlan.getContext().setCDCIncludeScopes(cdcIncludeScopes);
+            return Arrays.asList(dataPlan);
+        }
+
         List<PTable>indexes = Lists.newArrayList(dataPlan.getTableRef().getTable().getIndexes());
         if (dataPlan.isApplicable() && (indexes.isEmpty()
                 || dataPlan.isDegenerate()
@@ -236,8 +252,8 @@ public class QueryOptimizer {
             targetColumns = targetDatums;
         }
         
-        SelectStatement translatedIndexSelect = IndexStatementRewriter.translate(select, FromCompiler.getResolver(dataPlan.getTableRef()));
         List<QueryPlan> plans = Lists.newArrayListWithExpectedSize(1 + indexes.size());
+        SelectStatement translatedIndexSelect = IndexStatementRewriter.translate(select, FromCompiler.getResolver(dataPlan.getTableRef()));
         plans.add(dataPlan);
         QueryPlan hintedPlan = getHintedQueryPlan(statement, translatedIndexSelect, indexes, targetColumns, parallelIteratorFactory, plans);
         if (hintedPlan != null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
index baba60bc91..a80b092956 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
@@ -17,9 +17,14 @@
  */
 package org.apache.phoenix.parse;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
@@ -36,9 +41,11 @@ public class HintNode {
     public static final char SEPARATOR = ' ';
     public static final String PREFIX = "(";
     public static final String SUFFIX = ")";
-    // Split on whitespace and parenthesis, keeping the parenthesis in the token array
-    private static final String SPLIT_REGEXP = "\\s+|((?<=\\" + PREFIX + ")|(?=\\" + PREFIX + "))|((?<=\\" + SUFFIX + ")|(?=\\" + SUFFIX + "))";
-    
+    // Each hint is of the generic syntax hintWord(hintArgs) where hintArgs in parent are optional.
+    private static final Pattern HINT_PATTERN = Pattern.compile(
+            "(?<hintWord>\\w+)\\s*(?:\\s*\\(\\s*(?<hintArgs>[^)]+)\\s*\\))?");
+    private static final Pattern HINT_ARG_PATTERN = Pattern.compile("(?<hintArg>\"[^\"]+\"|\\S+)");
+
     public enum Hint {
         /**
          * Forces a range scan to be used to process the query.
@@ -53,62 +60,62 @@ public class HintNode {
          */
         NO_CHILD_PARENT_JOIN_OPTIMIZATION,
         /**
-        * Prevents the usage of indexes, forcing usage
-        * of the data table for a query.
-        */
-       NO_INDEX,
-       /**
-       * Hint of the form {@code INDEX(<table_name> <index_name>...) }
-       * to suggest usage of the index if possible. The first
-       * usable index in the list of indexes will be choosen.
-       * Table and index names may be surrounded by double quotes
-       * if they are case sensitive.
-       */
-       INDEX,
-       /**
-        * All things being equal, use the data table instead of
-        * the index table when optimizing.
-        */
-       USE_DATA_OVER_INDEX_TABLE,
-       /**
-        * All things being equal, use the index table instead of
-        * the data table when optimizing.
-        */
-       USE_INDEX_OVER_DATA_TABLE,
-       /**
-        * Avoid caching any HBase blocks loaded by this query.
-        */
-       NO_CACHE,
-       /**
-        * Use sort-merge join algorithm instead of broadcast join (hash join) algorithm.
-        */
-       USE_SORT_MERGE_JOIN,
-       /**
-        * Persist the RHS results of a hash join.
-        */
-       USE_PERSISTENT_CACHE,
-       /**
-        * Avoid using star-join optimization. Used for broadcast join (hash join) only.
-        */
-       NO_STAR_JOIN,
-       /**
-        * Avoid using the no seek optimization. When there are many columns which are not selected coming in between 2
-        * selected columns and/or versions of columns, this should be used.
-        */
-      SEEK_TO_COLUMN,
-       /**
-        * Avoid seeks to select specified columns. When there are very less number of columns which are not selected in
-        * between 2 selected columns this will be give better performance.
-        */
-      NO_SEEK_TO_COLUMN,
-      /**
-       * Saves an RPC call on the scan. See Scan.setSmall(true) in HBase documentation.
-       */
-     SMALL,
-     /**
-      * Enforces a serial scan.
-      */
-     SERIAL,
+         * Prevents the usage of indexes, forcing usage
+         * of the data table for a query.
+         */
+        NO_INDEX,
+        /**
+         * Hint of the form {@code INDEX(<table_name> <index_name>...) }
+         * to suggest usage of the index if possible. The first
+         * usable index in the list of indexes will be choosen.
+         * Table and index names may be surrounded by double quotes
+         * if they are case sensitive.
+         */
+        INDEX,
+        /**
+         * All things being equal, use the data table instead of
+         * the index table when optimizing.
+         */
+        USE_DATA_OVER_INDEX_TABLE,
+        /**
+         * All things being equal, use the index table instead of
+         * the data table when optimizing.
+         */
+        USE_INDEX_OVER_DATA_TABLE,
+        /**
+         * Avoid caching any HBase blocks loaded by this query.
+         */
+        NO_CACHE,
+        /**
+         * Use sort-merge join algorithm instead of broadcast join (hash join) algorithm.
+         */
+        USE_SORT_MERGE_JOIN,
+        /**
+         * Persist the RHS results of a hash join.
+         */
+        USE_PERSISTENT_CACHE,
+        /**
+         * Avoid using star-join optimization. Used for broadcast join (hash join) only.
+         */
+        NO_STAR_JOIN,
+        /**
+         * Avoid using the no seek optimization. When there are many columns which are not selected coming in between 2
+         * selected columns and/or versions of columns, this should be used.
+         */
+        SEEK_TO_COLUMN,
+        /**
+         * Avoid seeks to select specified columns. When there are very less number of columns which are not selected in
+         * between 2 selected columns this will be give better performance.
+         */
+        NO_SEEK_TO_COLUMN,
+        /**
+         * Saves an RPC call on the scan. See Scan.setSmall(true) in HBase documentation.
+         */
+        SMALL,
+        /**
+         * Enforces a serial scan.
+         */
+        SERIAL,
         /**
          * Enforces a forward scan.
          */
@@ -121,7 +128,13 @@ public class HintNode {
         /**
          * Do not use server merge for hinted uncovered indexes
          */
-        NO_INDEX_SERVER_MERGE
+        NO_INDEX_SERVER_MERGE,
+
+        /**
+         * Override the default CDC include scopes.
+         */
+        INCLUDE,
+        ;
     };
 
     private final Map<Hint,String> hints;
@@ -160,41 +173,33 @@ public class HintNode {
 
     public HintNode(String hint) {
         Map<Hint,String> hints = new HashMap<Hint,String>();
-        // Split on whitespace or parenthesis. We do not need to handle escaped or
-        // embedded whitespace/parenthesis, since we are parsing what will be HBase
-        // table names which are not allowed to contain whitespace or parenthesis.
-        String[] hintWords = hint.split(SPLIT_REGEXP);
-        for (int i = 0; i < hintWords.length; i++) {
-            String hintWord = hintWords[i];
-            if (hintWord.isEmpty()) {
-                continue;
-            }
+        Matcher hintMatcher = HINT_PATTERN.matcher(hint);
+        while (hintMatcher.find()) {
             try {
-                Hint key = Hint.valueOf(hintWord.toUpperCase());
-                String hintValue = "";
-                if (i+1 < hintWords.length && PREFIX.equals(hintWords[i+1])) {
-                    StringBuffer hintValueBuf = new StringBuffer(hint.length());
-                    hintValueBuf.append(PREFIX);
-                    i+=2;
-                    while (i < hintWords.length && !SUFFIX.equals(hintWords[i])) {
-                        hintValueBuf.append(SchemaUtil.normalizeIdentifier(hintWords[i++]));
-                        hintValueBuf.append(SEPARATOR);
+                Hint hintWord = Hint.valueOf(hintMatcher.group("hintWord").toUpperCase());
+                String hintArgsStr = hintMatcher.group("hintArgs");
+                List<String> hintArgs = new ArrayList<>();
+                if (hintArgsStr != null) {
+                    Matcher hintArgMatcher = HINT_ARG_PATTERN.matcher(hintArgsStr);
+                    while (hintArgMatcher.find()) {
+                        hintArgs.add(SchemaUtil.normalizeIdentifier(hintArgMatcher.group()));
                     }
-                    // Replace trailing separator with suffix
-                    hintValueBuf.replace(hintValueBuf.length()-1, hintValueBuf.length(), SUFFIX);
-                    hintValue = hintValueBuf.toString();
                 }
-                String oldValue = hints.put(key, hintValue);
-                // Concatenate together any old value with the new value
-                if (oldValue != null) {
-                    hints.put(key, oldValue + hintValue);
+                hintArgsStr = String.join(" ", hintArgs);
+                hintArgsStr = hintArgsStr.equals("") ? "" : "(" + hintArgsStr + ")";
+                if (hints.containsKey(hintWord)) {
+                    // Concatenate together any old value with the new value
+                    hints.put(hintWord, hints.get(hintWord) + hintArgsStr);
+                }
+                else {
+                    hints.put(hintWord, hintArgsStr);
                 }
             } catch (IllegalArgumentException e) { // Ignore unknown/invalid hints
             }
         }
         this.hints = ImmutableMap.copyOf(hints);
     }
-    
+
     public boolean isEmpty() {
         return hints.isEmpty();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
index c7fdbf6a20..2618e0fa6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
@@ -122,7 +122,8 @@ public class ColumnRef {
                     displayName);
         }
         
-        if (table.getType() == PTableType.PROJECTED || table.getType() == PTableType.SUBQUERY) {
+        if (table.getType() == PTableType.PROJECTED || table.getType() == PTableType.SUBQUERY ||
+                table.getType() == PTableType.CDC) {
         	return new ProjectedColumnExpression(column, table, displayName);
         }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 8755b0ed96..76108a6220 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1770,14 +1770,17 @@ public class MetaDataClient {
                         new HashMap<>(), null);
         // TODO: Currently index can be dropped, leaving the CDC dangling, DROP INDEX needs to
         //  protect based on CDCUtil.isACDCIndex().
-        // TODO: Should we also allow PTimestamp here?
         MutationState indexMutationState;
         try {
+            // TODO: Should we also allow PTimestamp here, in fact PTimestamp is the right type,
+            // but we are forced to support PDate because of incorrect type for
+            // PHOENIX_ROW_TIMESTAMP (see PHOENIX-6807)?
             indexMutationState = createIndex(indexStatement, null, PDate.INSTANCE);
         } catch (SQLException e) {
             if (e.getErrorCode() == TABLE_ALREADY_EXIST.getErrorCode()) {
                 throw new SQLExceptionInfo.Builder(TABLE_ALREADY_EXIST).setTableName(
-                        statement.getCdcObjName().getName()).build().buildException();
+                        statement.getCdcObjName().getName()).setRootCause(
+                                e).build().buildException();
             }
             // TODO: What about translating other index creation failures? E.g., bad TS column.
             throw e;
@@ -1791,8 +1794,8 @@ public class MetaDataClient {
         ColumnName timeIdxCol = statement.getTimeIdxColumn() != null ?
                 statement.getTimeIdxColumn() :
                 FACTORY.columnName(statement.getTimeIdxFunc().toString());
-        columnDefs.add(FACTORY.columnDef(timeIdxCol, PTimestamp.INSTANCE.getSqlTypeName(), false, null, false,
-                PTimestamp.INSTANCE.getMaxLength(null), PTimestamp.INSTANCE.getScale(null), false,
+        columnDefs.add(FACTORY.columnDef(timeIdxCol, PDate.INSTANCE.getSqlTypeName(), false, null, false,
+                PDate.INSTANCE.getMaxLength(null), PDate.INSTANCE.getScale(null), false,
                 SortOrder.getDefault(), "", null, false));
         pkColumnDefs.add(FACTORY.columnDefInPkConstraint(timeIdxCol, SortOrder.getDefault(), false));
         for (PColumn pcol : pkColumns) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 533f6dc2d9..0b3beb885b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -1795,7 +1795,9 @@ public class PTableImpl implements PTable {
                 return SchemaUtil.getPhysicalHBaseTableName(schemaName,
                         physicalTableNameColumnInSyscat, isNamespaceMapped);
             }
-            return SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, isNamespaceMapped);
+            return SchemaUtil.getPhysicalHBaseTableName(schemaName, getType() == PTableType.CDC ?
+                    PNameFactory.newName(CDCUtil.getCDCIndexName(tableName.getString())) :
+                    tableName, isNamespaceMapped);
         } else {
             return PNameFactory.newName(physicalNames.get(0).getBytes());
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
index d015aaf422..e80fd4ee09 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -18,14 +18,25 @@
 
 package org.apache.phoenix.util;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.StringTokenizer;
 
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.phoenix.schema.PTable;
 
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_INCLUDE_SCOPES;
+
 public class CDCUtil {
     public static final String CDC_INDEX_PREFIX = "__CDC__";
     public static final String CDC_INDEX_TYPE_LOCAL = "L";
@@ -82,4 +93,23 @@ public class CDCUtil {
     public static boolean isACDCIndex(String indexName) {
         return indexName.startsWith(CDC_INDEX_PREFIX);
     }
+
+    public static boolean isACDCIndex(PTable indexTable) {
+        return isACDCIndex(indexTable.getTableName().getString());
+    }
+
+    public static Scan initForRawScan(Scan scan) {
+        scan.setRaw(true);
+        scan.readAllVersions();
+        scan.setCacheBlocks(false);
+        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+        if (! familyMap.isEmpty()) {
+            familyMap.keySet().stream().forEach(fQual -> {
+                if (familyMap.get(fQual) != null) {
+                    familyMap.get(fQual).clear();
+                }
+            });
+        }
+        return scan;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
index f1c0b1d6e0..e02abf6720 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
@@ -84,9 +84,7 @@ public class EncodedColumnsUtil {
      *         part is the value to use for it.
      */
     public static Pair<byte[], byte[]> getEmptyKeyValueInfo(PTable table) {
-        return usesEncodedColumnNames(table) ? new Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES,
-                QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
-                QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+        return getEmptyKeyValueInfo(usesEncodedColumnNames(table));
     }
 
     /**
@@ -104,9 +102,7 @@ public class EncodedColumnsUtil {
      *         part is the value to use for it.
      */
     public static Pair<byte[], byte[]> getEmptyKeyValueInfo(QualifierEncodingScheme encodingScheme) {
-        return usesEncodedColumnNames(encodingScheme) ? new Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES,
-                QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
-                QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+        return getEmptyKeyValueInfo(usesEncodedColumnNames(encodingScheme));
     }
 
     public static Pair<Integer, Integer> getMinMaxQualifiersFromScan(Scan scan) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index fdafd1f33b..f2b6b911fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -19,20 +19,32 @@ package org.apache.phoenix.util;
 
 import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
 import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_INCLUDE_SCOPES;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_NAME_MAP;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_TYPE_MAP;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
+import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
 import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -56,6 +68,7 @@ import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
@@ -788,7 +801,7 @@ public class ScanUtil {
      * that the slot at index 2 has a slot index of 2 but a row key index of 3.
      * To calculate the "adjusted position" index, we simply add up the number of extra slots spanned and offset
      * the slotPosition by that much.
-     * @param slotSpan  the extra span per skip scan slot. corresponds to {@link ScanRanges#slotSpan}
+     * @param slotSpan  the extra span per skip scan slot. corresponds to {@link ScanRanges#getSlotSpans()}
      * @param slotPosition  the index of a slot in the SkipScan slots list.
      * @return  the equivalent row key position in the RowKeySchema
      */
@@ -1167,7 +1180,8 @@ public class ScanUtil {
             scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
             scan.setAttribute(BaseScannerRegionObserver.READ_REPAIR_TRANSFORMING_TABLE, TRUE_BYTES);
         } else {
-            if (table.getType() != PTableType.INDEX || !IndexUtil.isGlobalIndex(indexTable)) {
+            if (table.getType() != PTableType.CDC && (table.getType() != PTableType.INDEX ||
+                    !IndexUtil.isGlobalIndex(indexTable))) {
                 return;
             }
             if (table.isTransactional() && table.getIndexType() == IndexType.UNCOVERED_GLOBAL) {
@@ -1180,7 +1194,13 @@ public class ScanUtil {
             }
             // MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child
             // view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work
-            if (indexTable.getViewIndexId() != null && indexTable.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
+            if (table.getType() == PTableType.CDC) {
+                indexTable = PhoenixRuntime.getTable(phoenixConnection,
+                        CDCUtil.getCDCIndexName(table.getName().getString()));
+            }
+            else if (indexTable.getViewIndexId() != null &&
+                    indexTable.getName().getString().contains(
+                            QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
                 int lastIndexOf = indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
                 String indexName = indexTable.getName().getString().substring(lastIndexOf + 1);
                 indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName);
@@ -1280,7 +1300,8 @@ public class ScanUtil {
     }
 
     public static void setScanAttributesForClient(Scan scan, PTable table,
-                                                  PhoenixConnection phoenixConnection) throws SQLException {
+                                                  StatementContext context) throws SQLException {
+        PhoenixConnection phoenixConnection = context.getConnection();
         setScanAttributesForIndexReadRepair(scan, table, phoenixConnection);
         setScanAttributesForPhoenixTTL(scan, table, phoenixConnection);
         byte[] emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME);
@@ -1298,6 +1319,114 @@ public class ScanUtil {
         }
 
         setScanAttributeForPaging(scan, phoenixConnection);
+
+        if (table.getType() == PTableType.CDC) {
+            PTable dataTable = PhoenixRuntime.getTable(phoenixConnection,
+                    SchemaUtil.getTableName(table.getSchemaName().getString(),
+                            table.getParentTableName().getString()));
+            scan.setAttribute(CDC_DATA_TABLE_NAME,
+                    table.getParentName().getBytes());
+
+            PColumn cdcJsonCol = table.getColumnForColumnName(CDC_JSON_COL_NAME);
+            scan.setAttribute(CDC_JSON_COL_QUALIFIER, cdcJsonCol.getColumnQualifierBytes());
+            scan.setAttribute(CDC_INCLUDE_SCOPES,
+                    context.getEncodedCdcIncludeScopes().getBytes(StandardCharsets.UTF_8));
+            CDCUtil.initForRawScan(scan);
+            List<PColumn> columns = dataTable.getColumns();
+            Map<byte[], String> dataColQualNameMap = new HashMap<>(columns.size());
+            Map<byte[], PDataType> dataColTypeMap = new HashMap<>();
+            for (PColumn col: columns) {
+                if (col.getColumnQualifierBytes() != null) {
+                    dataColQualNameMap.put(col.getColumnQualifierBytes(), col.getName().getString());
+                    dataColTypeMap.put(col.getColumnQualifierBytes(), col.getDataType());
+                }
+            }
+            scan.setAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP,
+                    serializeColumnQualifierToNameMap(dataColQualNameMap));
+            scan.setAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP,
+                    serializeColumnQualifierToTypeMap(dataColTypeMap));
+        }
+    }
+
+    public static byte[] serializeColumnQualifierToNameMap(Map<byte[], String> colQualNameMap) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        DataOutputStream output = new DataOutputStream(stream);
+        try {
+            output.writeInt(colQualNameMap.size());
+            for (Map.Entry<byte[], String> entry: colQualNameMap.entrySet()) {
+                output.writeInt(entry.getKey().length);
+                output.write(entry.getKey());
+                WritableUtils.writeString(output, entry.getValue());
+            }
+            return stream.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Map<ImmutableBytesPtr, String> deserializeColumnQualifierToNameMap(
+            byte[] mapBytes) {
+        ByteArrayInputStream stream = new ByteArrayInputStream(mapBytes);
+        DataInputStream input = new DataInputStream(stream);
+        try {
+            Map<ImmutableBytesPtr, String> colQualNameMap = new HashMap<>();
+            int size = input.readInt();
+            for (int i = 0; i < size; ++i) {
+                int qualLength = input.readInt();
+                byte[] qualBytes = new byte[qualLength];
+                int bytesRead = input.read(qualBytes);
+                if (bytesRead != qualLength) {
+                    throw new IOException("Expected number of bytes: " + qualLength + " but got " +
+                            "only: " + bytesRead);
+                }
+                String colName = WritableUtils.readString(input);
+                colQualNameMap.put(new ImmutableBytesPtr(qualBytes), colName);
+            }
+            return colQualNameMap;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static byte[] serializeColumnQualifierToTypeMap(
+            Map<byte[], PDataType> pkColNamesAndTypes) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        DataOutputStream output = new DataOutputStream(stream);
+        try {
+            output.writeInt(pkColNamesAndTypes.size());
+            for (Map.Entry<byte[], PDataType> entry: pkColNamesAndTypes.entrySet()) {
+                output.writeInt(entry.getKey().length);
+                output.write(entry.getKey());
+                WritableUtils.writeString(output, entry.getValue().getSqlTypeName());
+            }
+            return stream.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Map<ImmutableBytesPtr, PDataType> deserializeColumnQualifierToTypeMap(
+            byte[] pkColInfoBytes) {
+        ByteArrayInputStream stream = new ByteArrayInputStream(pkColInfoBytes);
+        DataInputStream input = new DataInputStream(stream);
+        try {
+            Map<ImmutableBytesPtr, PDataType> colQualTypeMap = new HashMap<>();
+            int colCnt = input.readInt();
+            for (int i = 0; i < colCnt; ++i) {
+                int qualLength = input.readInt();
+                byte[] qualBytes = new byte[qualLength];
+                int bytesRead = input.read(qualBytes);
+                if (bytesRead != qualLength) {
+                    throw new IOException("Expected number of bytes: " + qualLength + " but got " +
+                            "only: " + bytesRead);
+                }
+                colQualTypeMap.put(new ImmutableBytesPtr(qualBytes),
+                        PDataType.fromSqlTypeName(WritableUtils.readString(input)));
+            }
+            return colQualTypeMap;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public static void setScanAttributeForPaging(Scan scan, PhoenixConnection phoenixConnection) {
diff --git a/phoenix-pherf/pom.xml b/phoenix-pherf/pom.xml
index 981f04add0..dc3c5aec41 100644
--- a/phoenix-pherf/pom.xml
+++ b/phoenix-pherf/pom.xml
@@ -34,7 +34,6 @@
 
     <!-- Versions for pherf-only dependencies -->
     <diffutils.version>1.2.1</diffutils.version>
-    <gson.version>2.9.1</gson.version>
     <commons-math3.version>3.3</commons-math3.version>
     <activation.version>1.1</activation.version>
     <jcabi-jdbc.version>0.15</jcabi-jdbc.version>
@@ -151,7 +150,6 @@
     <dependency>
       <groupId>com.google.code.gson</groupId>
       <artifactId>gson</artifactId>
-      <version>${gson.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
diff --git a/pom.xml b/pom.xml
index 248273a4ba..0c629b1ac8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,6 +213,7 @@
     <protoc.arch>${os.detected.classifier}</protoc.arch>
     <!-- Keeping the version in sync with hbase javax.el version -->
     <glassfish.el.version>3.0.1-b08</glassfish.el.version>
+    <gson.version>2.9.1</gson.version>
   </properties>
 
   <build>
@@ -1548,6 +1549,11 @@
         <artifactId>javax.el</artifactId>
         <version>${glassfish.el.version}</version>
       </dependency>
+      <dependency>
+        <groupId>com.google.code.gson</groupId>
+        <artifactId>gson</artifactId>
+        <version>${gson.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>