You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2024/01/24 16:29:08 UTC

(phoenix) branch PHOENIX-7001-feature updated: PHOENIX-7008: Addressing Jira spec and review feedback changes (#1802)

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch PHOENIX-7001-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this push:
     new f07898f1fd PHOENIX-7008: Addressing Jira spec and review feedback changes (#1802)
f07898f1fd is described below

commit f07898f1fd91f5777329b65175fd6ba5b643bbce
Author: Hari Krishna Dara <ha...@gmail.com>
AuthorDate: Wed Jan 24 21:59:00 2024 +0530

    PHOENIX-7008: Addressing Jira spec and review feedback changes (#1802)
---
 .../java/org/apache/phoenix/end2end/CDCMiscIT.java | 231 +++++++++++++--------
 phoenix-core/src/main/antlr3/PhoenixSQL.g          |  10 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |   4 +-
 .../apache/phoenix/exception/SQLExceptionInfo.java |  15 ++
 .../org/apache/phoenix/index/IndexMaintainer.java  |   2 +-
 .../phoenix/iterate/RegionScannerFactory.java      |  60 +++---
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |  11 +-
 .../apache/phoenix/optimize/QueryOptimizer.java    |   3 +-
 .../apache/phoenix/parse/CreateCDCStatement.java   |  15 +-
 .../java/org/apache/phoenix/parse/HintNode.java    |   2 +-
 .../org/apache/phoenix/parse/ParseNodeFactory.java |   3 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  |  26 +--
 .../java/org/apache/phoenix/schema/PTable.java     |   6 -
 .../org/apache/phoenix/schema/TableProperty.java   |   3 +-
 .../main/java/org/apache/phoenix/util/CDCUtil.java |  24 +--
 .../org/apache/phoenix/parse/QueryParserTest.java  |  43 ++--
 .../java/org/apache/phoenix/util/CDCUtilTest.java  |  21 +-
 17 files changed, 245 insertions(+), 234 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
index d1f04c02cc..be35bcbf46 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
@@ -19,23 +19,27 @@ package org.apache.phoenix.end2end;
 
 import com.google.gson.Gson;
 import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableProperty;
 import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-import javax.xml.transform.Result;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Timestamp;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -48,8 +52,22 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@RunWith(Parameterized.class)
 @Category(ParallelStatsDisabledTest.class)
 public class CDCMiscIT extends ParallelStatsDisabledIT {
+    private final boolean forView;
+
+    public CDCMiscIT(boolean forView) {
+        this.forView = forView;
+    }
+
+    @Parameterized.Parameters(name = "forVieiw={0}")
+    public static synchronized Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {
+                { false}, { true }
+        });
+    }
+
     private void assertCDCState(Connection conn, String cdcName, String expInclude,
                                 int idxType) throws SQLException {
         try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
@@ -89,19 +107,33 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
         assertEquals(nbuckets, indexTable.getBucketNum());
     }
 
+    private void createAndWait(Connection conn, String tableName, String cdcName, String cdc_sql)
+            throws Exception {
+        conn.createStatement().execute(cdc_sql);
+        IndexToolIT.runIndexTool(false, null, tableName,
+                "\""+CDCUtil.getCDCIndexName(cdcName)+"\"");
+        TestUtil.waitForIndexState(conn, CDCUtil.getCDCIndexName(cdcName), PIndexState.ACTIVE);
+    }
+
     @Test
-    public void testCreate() throws SQLException {
+    public void testCreate() throws Exception {
         Properties props = new Properties();
         Connection conn = DriverManager.getConnection(getUrl(), props);
         String tableName = generateUniqueName();
         conn.createStatement().execute(
                 "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
                         + " v2 DATE)");
+        if (forView) {
+            String viewName = generateUniqueName();
+            conn.createStatement().execute(
+                    "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName);
+            tableName = viewName;
+        }
         String cdcName = generateUniqueName();
 
         try {
             conn.createStatement().execute("CREATE CDC " + cdcName
-                    + " ON NON_EXISTENT_TABLE (PHOENIX_ROW_TIMESTAMP())");
+                    + " ON NON_EXISTENT_TABLE");
             fail("Expected to fail due to non-existent table");
         } catch (SQLException e) {
             assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
@@ -109,42 +141,16 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
 
         try {
             conn.createStatement().execute("CREATE CDC " + cdcName
-                    + " ON " + tableName +"(UNKNOWN_FUNCTION())");
-            fail("Expected to fail due to invalid function");
-        } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.FUNCTION_UNDEFINED.getErrorCode(), e.getErrorCode());
-        }
-
-        try {
-            conn.createStatement().execute("CREATE CDC " + cdcName
-                    + " ON " + tableName +"(NOW())");
-            fail("Expected to fail due to non-deterministic function");
-        } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX.
-                    getErrorCode(), e.getErrorCode());
-        }
-
-        try {
-            conn.createStatement().execute("CREATE CDC " + cdcName
-                    + " ON " + tableName +"(ROUND(v1))");
-            fail("Expected to fail due to non-date expression in the index PK");
-        } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
-                    e.getErrorCode());
-        }
-
-        try {
-            conn.createStatement().execute("CREATE CDC " + cdcName
-                    + " ON " + tableName +"(v1)");
-            fail("Expected to fail due to non-date column in the index PK");
+                    + " ON " + tableName + " INCLUDE (abc)");
+            fail("Expected to fail due to invalid INCLUDE");
         } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
+            assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(),
                     e.getErrorCode());
+            assertTrue(e.getMessage().endsWith("abc"));
         }
 
-        String cdc_sql = "CREATE CDC " + cdcName
-                + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
-        conn.createStatement().execute(cdc_sql);
+        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+        createAndWait(conn, tableName, cdcName, cdc_sql);
         assertCDCState(conn, cdcName, null, 3);
 
         try {
@@ -154,43 +160,32 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
             assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(), e.getErrorCode());
             assertTrue(e.getMessage().endsWith(cdcName));
         }
+
         conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + cdcName + " ON " + tableName +
-                "(v2) INCLUDE (pre, post) INDEX_TYPE=g");
+                " INCLUDE (pre, post) INDEX_TYPE=g");
 
         cdcName = generateUniqueName();
-        conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
-                "(v2) INCLUDE (pre, post) INDEX_TYPE=g");
+        cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName +
+                " INCLUDE (pre, post) INDEX_TYPE=g";
+        createAndWait(conn, tableName, cdcName, cdc_sql);
         assertCDCState(conn, cdcName, "PRE,POST", 3);
         assertPTable(cdcName, new HashSet<>(
                 Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), tableName);
 
         cdcName = generateUniqueName();
-        conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
-                "(v2) INDEX_TYPE=l");
+        cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INDEX_TYPE=l";
+        createAndWait(conn, tableName, cdcName, cdc_sql);
         assertCDCState(conn, cdcName, null, 2);
         assertPTable(cdcName, null, tableName);
 
-        String viewName = generateUniqueName();
-        conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " +
-                tableName);
-        cdcName = generateUniqueName();
-        try {
-            conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + viewName +
-                    "(PHOENIX_ROW_TIMESTAMP())");
-            fail("Expected to fail on VIEW");
-        }
-        catch(SQLException e) {
-            assertEquals(SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC.getErrorCode(),
-                    e.getErrorCode());
-            assertTrue(e.getMessage().endsWith(
-                    SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC.getMessage() + " tableType=VIEW"));
+        // Indexes on views don't support salt buckets and is currently silently ignored.
+        if (! forView) {
+            cdcName = generateUniqueName();
+            cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " SALT_BUCKETS = 4";
+            createAndWait(conn, tableName, cdcName, cdc_sql);
+            assertSaltBuckets(cdcName, 4);
         }
 
-        cdcName = generateUniqueName();
-        conn.createStatement().execute("CREATE CDC " + cdcName
-                + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP()) SALT_BUCKETS = 4");
-        assertSaltBuckets(cdcName, 4);
-
         conn.close();
     }
 
@@ -203,8 +198,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
                 " (tenantId INTEGER NOT NULL, k INTEGER NOT NULL," + " v1 INTEGER, v2 DATE, " +
                 "CONSTRAINT pk PRIMARY KEY (tenantId, k)) MULTI_TENANT=true");
         String cdcName = generateUniqueName();
-        conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
-                "(PHOENIX_ROW_TIMESTAMP())");
+        conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName);
 
         PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName));
         List<PColumn> idxPkColumns = indexTable.getPKColumns();
@@ -214,7 +208,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
 
         PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName);
         List<PColumn> cdcPkColumns = cdcTable.getPKColumns();
-        assertEquals(" PHOENIX_ROW_TIMESTAMP()", cdcPkColumns.get(0).getName().getString());
+        assertEquals("PHOENIX_ROW_TIMESTAMP()", cdcPkColumns.get(0).getName().getString());
         assertEquals("TENANTID", cdcPkColumns.get(1).getName().getString());
         assertEquals("K", cdcPkColumns.get(2).getName().getString());
     }
@@ -260,8 +254,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
                 "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
                         + " v2 DATE)");
         String cdcName = generateUniqueName();
-        String cdc_sql = "CREATE CDC " + cdcName
-                + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
+        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
         conn.createStatement().execute(cdc_sql);
         assertCDCState(conn, cdcName, null, 3);
         String drop_cdc_index_sql = "DROP INDEX \"" + CDCUtil.getCDCIndexName(cdcName) + "\" ON " + tableName;
@@ -291,28 +284,33 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
         rs.close();
     }
 
+    private Connection newConnection() throws SQLException {
+        Properties props = new Properties();
+        // Use these only for debugging.
+        //props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
+        //props.put("hbase.client.scanner.timeout.period", "6000000");
+        //props.put("phoenix.query.timeoutMs", "6000000");
+        //props.put("zookeeper.session.timeout", "6000000");
+        //props.put("hbase.rpc.timeout", "6000000");
+        return DriverManager.getConnection(getUrl(), props);
+    }
+
     @Test
     public void testSelectCDC() throws Exception {
-        Properties props = new Properties();
-        props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
-        props.put("hbase.client.scanner.timeout.period", "6000000");
-        props.put("phoenix.query.timeoutMs", "6000000");
-        props.put("zookeeper.session.timeout", "6000000");
-        props.put("hbase.rpc.timeout", "6000000");
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = newConnection();
         String tableName = generateUniqueName();
         conn.createStatement().execute(
                 "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
         conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
         conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
         conn.commit();
-        Thread.sleep(1000);
+        Thread.sleep(10);
         conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
         conn.commit();
         String cdcName = generateUniqueName();
         String cdc_sql = "CREATE CDC " + cdcName
-                + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
-        conn.createStatement().execute(cdc_sql);
+                + " ON " + tableName;
+        createAndWait(conn, tableName, cdcName, cdc_sql);
         assertCDCState(conn, cdcName, null, 3);
         // NOTE: To debug the query execution, add the below condition where you need a breakpoint.
         //      if (<table>.getTableName().getString().equals("N000002") ||
@@ -322,8 +320,8 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
         assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName));
         assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName +
                 " WHERE PHOENIX_ROW_TIMESTAMP() < NOW()"));
-        assertResultSet(conn.createStatement().executeQuery("SELECT /*+ INCLUDE(PRE, POST) */ * " +
-                "FROM " + cdcName));
+        assertResultSet(conn.createStatement().executeQuery("SELECT " +
+                "/*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName));
         assertResultSet(conn.createStatement().executeQuery("SELECT " +
                 "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcName));
 
@@ -359,16 +357,79 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
         }
     }
 
+    @Test
+    public void testSelectCDCBadIncludeSpec() throws Exception {
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
+        String cdcName = generateUniqueName();
+        String cdc_sql = "CREATE CDC  " + cdcName
+                + " ON " + tableName;
+        conn.createStatement().execute(cdc_sql);
+        try {
+            conn.createStatement().executeQuery("SELECT " +
+                    "/*+ CDC_INCLUDE(DUMMY) */ * FROM " + cdcName);
+            fail("Expected to fail due to invalid CDC INCLUDE hint");
+        }
+        catch (SQLException e) {
+            assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(),
+                    e.getErrorCode());
+            assertTrue(e.getMessage().endsWith("DUMMY"));
+        }
+    }
+
+    @Test
+    public void testSelectTimeRangeQueries() throws Exception {
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
+        String cdcName = generateUniqueName();
+        String cdc_sql = "CREATE CDC " + cdcName
+                + " ON " + tableName;
+        conn.createStatement().execute(cdc_sql);
+        Timestamp ts1 = new Timestamp(System.currentTimeMillis());
+        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
+        conn.commit();
+        Thread.sleep(10);
+        Timestamp ts2 = new Timestamp(System.currentTimeMillis());
+        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (3, 300)");
+        conn.commit();
+        Thread.sleep(10);
+        Timestamp ts3 = new Timestamp(System.currentTimeMillis());
+        conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
+        conn.createStatement().execute("DELETE FROM " + tableName + " WHERE k = 2");
+        Timestamp ts4 = new Timestamp(System.currentTimeMillis());
+
+        String sel_sql = "SELECT * FROM " + cdcName + " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND " +
+                "PHOENIX_ROW_TIMESTAMP() <= ?";
+        Object[] testDataSets = new Object[] {
+                new Object[] {ts1, ts2, new int[] {1, 2}}/*,
+                new Object[] {ts2, ts3, new int[] {1, 3}},
+                new Object[] {ts3, ts4, new int[] {1}}*/
+        };
+        PreparedStatement stmt = conn.prepareStatement(sel_sql);
+        for (int i = 0; i < testDataSets.length; ++i) {
+            Object[] testData = (Object[]) testDataSets[i];
+            stmt.setTimestamp(1, (Timestamp) testData[0]);
+            stmt.setTimestamp(2, (Timestamp) testData[1]);
+            try (ResultSet rs = stmt.executeQuery()) {
+                for (int k:  (int[]) testData[2]) {
+                    assertEquals(true, rs.next());
+                    assertEquals(k, rs.getInt(2));
+                }
+                assertEquals(false, rs.next());
+            }
+        }
+    }
+
     // Temporary test case used as a reference for debugging and comparing against the CDC query.
     @Test
     public void testSelectUncoveredIndex() throws Exception {
-        Properties props = new Properties();
-        props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
-        props.put("hbase.client.scanner.timeout.period", "6000000");
-        props.put("phoenix.query.timeoutMs", "6000000");
-        props.put("zookeeper.session.timeout", "6000000");
-        props.put("hbase.rpc.timeout", "6000000");
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Connection conn = newConnection();
         String tableName = generateUniqueName();
         conn.createStatement().execute(
                 "CREATE TABLE  " + tableName + " (k INTEGER PRIMARY KEY, v1 INTEGER)");
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index cc0a2f10c8..737ef4aabf 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -566,11 +566,10 @@ create_index_node returns [CreateIndexStatement ret]
 
 create_cdc_node returns [CreateCDCStatement ret]
     :   CREATE CDC (IF NOT ex=EXISTS)? o=cdc_name ON t=from_table_name
-        LPAREN (tcol=column_name | tfunc=cdc_time_func) RPAREN
         (INCLUDE LPAREN v=cdc_change_scopes RPAREN)?
         (p=fam_properties)?
         {
-            ret = factory.createCDC(o, t, tcol, tfunc, v, p, ex != null, getBindCount());
+            ret = factory.createCDC(o, t, v, p, ex != null, getBindCount());
         }
     ;
 
@@ -578,13 +577,6 @@ cdc_name returns [NamedNode ret]
     :   name=identifier {$ret = factory.cdcName(name); }
     ;
 
-cdc_time_func returns [FunctionParseNode ret]
-    :   field=identifier LPAREN l=zero_or_more_expressions RPAREN
-        {
-            ret = factory.function(field, l);
-        }
-    ;
-
 cdc_change_scopes returns [Set<CDCChangeScope> ret]
 @init { ret = new HashSet<>(); }
     :   v=cdc_change_scope { $ret.add(v); } ( COMMA v=cdc_change_scope { $ret.add(v); } )*
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 7842e8f4c0..6785a5da28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -360,7 +360,7 @@ public enum SQLExceptionCode {
     VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES(10956, "44A38", "View can extend parent primary key"
             + " only if none of the parents have indexes in the parent hierarchy"),
     UNKNOWN_INDEX_TYPE(1098, "44A39", "Unknown INDEX type: "),
-    UNKNOWN_INCLUDE_CHANGE_SCOPE(1099, "44A40", "Unknown change scope for INCLUDE: "),
+    UNKNOWN_INCLUDE_CHANGE_SCOPE(1099, "44A40", "Unknown change scope for CDC INCLUDE"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
@@ -469,8 +469,6 @@ public enum SQLExceptionCode {
             "Missing ENCODED_QUALIFIER."),
     EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET(1151, "XCL51", "A batch operation can't include a "
             + "statement that produces result sets.", Factory.BATCH_UPDATE_ERROR),
-    INVALID_TABLE_TYPE_FOR_CDC(1152, "XCL52",
-            "Invalid table type for creating CDC."),
 
 
     /**
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
index b7babd5dc6..3e27dc75c1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
@@ -65,6 +65,7 @@ public class SQLExceptionInfo {
     private final int phoenixColumnSizeBytes;
     private final int maxPhoenixColumnSizeBytes;
     private final String haGroupInfo;
+    private final String cdcChangeScope;
 
     public static class Builder {
         private Throwable rootCause;
@@ -83,6 +84,7 @@ public class SQLExceptionInfo {
         private int maxPhoenixColumnSizeBytes;
         private String haGroupInfo;
         private PTableType tableType;
+        private String cdcChangeScope;
 
         public Builder(SQLExceptionCode code) {
             this.code = code;
@@ -163,6 +165,11 @@ public class SQLExceptionInfo {
             return this;
         }
 
+        public Builder setCdcChangeScope(String cdcChangeScope) {
+            this.cdcChangeScope = cdcChangeScope;
+            return this;
+        }
+
         public SQLExceptionInfo build() {
             return new SQLExceptionInfo(this);
         }
@@ -190,6 +197,7 @@ public class SQLExceptionInfo {
         maxPhoenixColumnSizeBytes = builder.maxPhoenixColumnSizeBytes;
         phoenixColumnSizeBytes = builder.phoenixColumnSizeBytes;
         haGroupInfo = builder.haGroupInfo;
+        cdcChangeScope = builder.cdcChangeScope;
     }
 
     @Override
@@ -235,6 +243,9 @@ public class SQLExceptionInfo {
         if (haGroupInfo != null) {
             builder.append(" ").append(HA_GROUP_INFO).append("=").append(haGroupInfo);
         }
+        if (cdcChangeScope != null) {
+            builder.append(": ").append(cdcChangeScope);
+        }
 
         return builder.toString();
     }
@@ -306,4 +317,8 @@ public class SQLExceptionInfo {
     public String getHaGroupInfo() {
         return haGroupInfo;
     }
+
+    public String getCdcChangeScope() {
+        return cdcChangeScope;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 002857b213..86f49208f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -676,7 +676,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             this.indexWhere = index.getIndexWhereExpression(connection);
             this.indexWhereColumns = index.getIndexWhereColumns(connection);
         }
-        this.isCDCIndex = CDCUtil.isACDCIndex(index);
+        this.isCDCIndex = CDCUtil.isCDCIndex(index);
 
         initCachedState();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 171108f87f..a536c6d016 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -135,7 +135,8 @@ public abstract class RegionScannerFactory {
       long extraLimit = -1;
 
       {
-        if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) {
+          // for indexes construct the row filter for uncovered columns if it exists
+          if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) {
               byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_FILTER);
               if (expBytes == null) {
                   // For older clients
@@ -160,48 +161,45 @@ public abstract class RegionScannerFactory {
               if (limitBytes != null) {
                   extraLimit = Bytes.toLong(limitBytes);
               }
-            if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)
-                    && (tupleProjector != null
-                    || (indexMaintainer != null && indexMaintainer.isUncovered()))) {
+              if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)
+                      && (tupleProjector != null
+                          || (indexMaintainer != null && indexMaintainer.isUncovered()))) {
 
-              PTable.ImmutableStorageScheme storageScheme =
-                          indexMaintainer.getIndexStorageScheme();
+                  PTable.ImmutableStorageScheme storageScheme =
+                      indexMaintainer.getIndexStorageScheme();
                   Scan dataTableScan = new Scan();
-                  if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
-                    if (dataColumns != null) {
+                  if (dataColumns != null) {
                       for (int i = 0; i < dataColumns.length; i++) {
-                        if (storageScheme ==
-                                PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
-                          dataTableScan.addFamily(dataColumns[i].getFamily());
-                        } else {
-                          dataTableScan.addColumn(dataColumns[i].getFamily(),
-                                  dataColumns[i].getQualifier());
-                        }
+                          if (storageScheme ==
+                                  PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                              dataTableScan.addFamily(dataColumns[i].getFamily());
+                          } else {
+                              dataTableScan.addColumn(dataColumns[i].getFamily(),
+                                      dataColumns[i].getQualifier());
+                          }
                       }
-                    } else if (indexMaintainer.isUncovered()) {
+                  } else if (indexMaintainer.isUncovered()) {
                       // Indexed columns and the columns in index where clause should also be added
                       // to the data columns to scan for uncovered global indexes. This is required
                       // to verify the index row against the data table row.
                       for (ColumnReference column : indexMaintainer.getAllColumnsForDataTable()) {
-                        dataTableScan.addColumn(column.getFamily(), column.getQualifier());
+                          dataTableScan.addColumn(column.getFamily(), column.getQualifier());
                       }
-                    }
                   }
                   if (ScanUtil.isLocalIndex(scan)) {
-                    s = new UncoveredLocalIndexRegionScanner(regionScanner, dataRegion, scan, env,
-                            dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
-                            pageSizeMs, offset, actualStartKey, extraLimit);
-                  } else {
-                    if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
-                      s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
-                              dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
-                              pageSizeMs, extraLimit);
-                    }
-                    else {
-                      s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
+                      s = new UncoveredLocalIndexRegionScanner(regionScanner, dataRegion, scan, env,
                               dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
-                              pageSizeMs, extraLimit);
-                    }
+                              pageSizeMs, offset, actualStartKey, extraLimit);
+                  } else {
+                      if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
+                          s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
+                                  dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
+                                  pageSizeMs, extraLimit);
+                      } else {
+                          s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
+                                  dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
+                                  pageSizeMs, extraLimit);
+                      }
                   }
               }
           }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 026095a4b7..244e4d14bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -123,7 +123,6 @@ import org.apache.phoenix.log.QueryLoggerUtil;
 import org.apache.phoenix.log.QueryStatus;
 import org.apache.phoenix.monitoring.TableMetricsManager;
 import org.apache.phoenix.optimize.Cost;
-import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.AddColumnStatement;
 import org.apache.phoenix.parse.AddJarsStatement;
 import org.apache.phoenix.parse.AliasedNode;
@@ -146,7 +145,6 @@ import org.apache.phoenix.parse.DeclareCursorStatement;
 import org.apache.phoenix.parse.DeleteJarStatement;
 import org.apache.phoenix.parse.DeleteStatement;
 import org.apache.phoenix.parse.ExplainType;
-import org.apache.phoenix.parse.FunctionParseNode;
 import org.apache.phoenix.parse.ShowCreateTableStatement;
 import org.apache.phoenix.parse.ShowCreateTable;
 import org.apache.phoenix.parse.DropColumnStatement;
@@ -1082,12 +1080,10 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
     private static class ExecutableCreateCDCStatement extends CreateCDCStatement
             implements CompilableStatement {
         public ExecutableCreateCDCStatement(NamedNode cdcObjName, TableName dataTable,
-                                            ColumnName timeIdxColumn, FunctionParseNode tfunc,
                                             Set<PTable.CDCChangeScope> includeScopes,
                                             ListMultimap<String, Pair<String, Object>> props,
                                             boolean ifNotExists, int bindCount) {
-            super(cdcObjName, dataTable, timeIdxColumn, tfunc, includeScopes, props, ifNotExists,
-                    bindCount);
+            super(cdcObjName, dataTable, includeScopes, props, ifNotExists, bindCount);
         }
 
         @Override
@@ -1594,7 +1590,7 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
                 @Override
                 public MutationState execute() throws SQLException {
                     String indexName = ExecutableDropIndexStatement.this.getIndexName().getName();
-                    if (CDCUtil.isACDCIndex(indexName)) {
+                    if (CDCUtil.isCDCIndex(indexName)) {
                         throw new SQLExceptionInfo.Builder(CANNOT_DROP_CDC_INDEX)
                                 .setTableName(indexName)
                                 .build().buildException();
@@ -1940,11 +1936,10 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
 
         @Override
         public CreateCDCStatement createCDC(NamedNode cdcObj, TableName dataTable,
-                                            ColumnName timeIdxColumn, FunctionParseNode timeIdxFunc,
                                             Set<PTable.CDCChangeScope> includeScopes,
                                             ListMultimap<String, Pair<String, Object>> props,
                                             boolean ifNotExists, int bindCount) {
-            return new ExecutableCreateCDCStatement(cdcObj, dataTable, timeIdxColumn, timeIdxFunc,
+            return new ExecutableCreateCDCStatement(cdcObj, dataTable,
                     includeScopes, props, ifNotExists, bindCount);
         }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 5d81f0b585..48a5734149 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -220,10 +220,9 @@ public class QueryOptimizer {
         }
 
         PTable table = dataPlan.getTableRef().getTable();
-        // TODO: Need to handle CDC hints.
         if (table.getType() == PTableType.CDC) {
             Set<PTable.CDCChangeScope> cdcIncludeScopes = table.getCDCIncludeScopes();
-            String cdcHint = select.getHint().getHint(Hint.INCLUDE);
+            String cdcHint = select.getHint().getHint(Hint.CDC_INCLUDE);
             if (cdcHint != null && cdcHint.startsWith(HintNode.PREFIX)) {
                 cdcIncludeScopes = CDCUtil.makeChangeScopeEnumsFromString(cdcHint.substring(1,
                         cdcHint.length() - 1));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateCDCStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateCDCStatement.java
index 9b1468a3ef..5722ab2a20 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateCDCStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateCDCStatement.java
@@ -28,21 +28,16 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
 public class CreateCDCStatement extends MutableStatement {
     private final NamedNode cdcObjName;
     private final TableName dataTable;
-    private final ColumnName timeIdxColumn;
-    private final FunctionParseNode timeIdxFunc;
     private final Set<PTable.CDCChangeScope> includeScopes;
     private final ListMultimap<String,Pair<String,Object>> props;
     private final boolean ifNotExists;
     private final int bindCount;
 
-    public CreateCDCStatement(NamedNode cdcObjName, TableName dataTable, ColumnName timeIdxColumn,
-                              FunctionParseNode timeIdxFunc,
+    public CreateCDCStatement(NamedNode cdcObjName, TableName dataTable,
                               Set<PTable.CDCChangeScope> includeScopes, ListMultimap<String,
                               Pair<String, Object>> props, boolean ifNotExists, int bindCount) {
         this.cdcObjName = cdcObjName;
         this.dataTable = dataTable;
-        this.timeIdxColumn = timeIdxColumn;
-        this.timeIdxFunc = timeIdxFunc;
         this.includeScopes = includeScopes;
         this.props = props == null ? ArrayListMultimap.<String,Pair<String,Object>>create() : props;
         this.ifNotExists = ifNotExists;
@@ -57,14 +52,6 @@ public class CreateCDCStatement extends MutableStatement {
         return dataTable;
     }
 
-    public ColumnName getTimeIdxColumn() {
-        return timeIdxColumn;
-    }
-
-    public FunctionParseNode getTimeIdxFunc() {
-        return timeIdxFunc;
-    }
-
     public Set<PTable.CDCChangeScope> getIncludeScopes() {
         return includeScopes;
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
index a80b092956..6c1f97b7fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
@@ -133,7 +133,7 @@ public class HintNode {
         /**
          * Override the default CDC include scopes.
          */
-        INCLUDE,
+        CDC_INCLUDE,
         ;
     };
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 390ae10ade..17ffa6a3f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -377,11 +377,10 @@ public class ParseNodeFactory {
     }
 
     public CreateCDCStatement createCDC(NamedNode cdcObj, TableName dataTable,
-                                        ColumnName timeIdxColumn, FunctionParseNode timeIdxFunc,
                                         Set<PTable.CDCChangeScope> includeScopes,
                                         ListMultimap<String, Pair<String, Object>> props,
                                         boolean ifNotExists, int bindCount) {
-        return new CreateCDCStatement(cdcObj, dataTable, timeIdxColumn, timeIdxFunc, includeScopes,
+        return new CreateCDCStatement(cdcObj, dataTable, includeScopes,
                 props, ifNotExists, bindCount);
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 76108a6220..9717430a11 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.schema;
 
 import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE;
 import static org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY;
-import static org.apache.phoenix.exception.SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC;
 import static org.apache.phoenix.exception.SQLExceptionCode.TABLE_ALREADY_EXIST;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
@@ -161,6 +160,7 @@ import java.util.Set;
 import java.util.HashSet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.expression.function.PhoenixRowTimestampFunction;
 import org.apache.phoenix.parse.CreateCDCStatement;
 import org.apache.phoenix.parse.DropCDCStatement;
 import org.apache.hadoop.hbase.client.Table;
@@ -1731,15 +1731,9 @@ public class MetaDataClient {
     }
 
     public MutationState createCDC(CreateCDCStatement statement) throws SQLException {
-        // TODO: Do we need to borrow the schema name of the table?
         ColumnResolver resolver = FromCompiler.getResolver(NamedTableNode.create(statement.getDataTable()), connection);
         TableRef tableRef = resolver.getTables().get(0);
         PTable dataTable = tableRef.getTable();
-        // Check if data table is a view and give a not supported error.
-        if (dataTable.getType() != TABLE) {
-            throw new SQLExceptionInfo.Builder(INVALID_TABLE_TYPE_FOR_CDC).setTableType(
-                    dataTable.getType()).build().buildException();
-        }
 
         Map<String, Object> tableProps = Maps.newHashMapWithExpectedSize(
                 statement.getProps().size());
@@ -1749,12 +1743,9 @@ public class MetaDataClient {
 
         NamedNode indexName = FACTORY.indexName(CDCUtil.getCDCIndexName(
                 statement.getCdcObjName().getName()));
-        String timeIdxColName = statement.getTimeIdxColumn() != null ?
-                statement.getTimeIdxColumn().getColumnName() : null;
         IndexKeyConstraint indexKeyConstraint =
                 FACTORY.indexKey(Arrays.asList(new Pair[]{Pair.newPair(
-                        timeIdxColName != null ? FACTORY.column(statement.getDataTable(),
-                                timeIdxColName, timeIdxColName) : statement.getTimeIdxFunc(),
+                        FACTORY.function(PhoenixRowTimestampFunction.NAME, Collections.emptyList()),
                         SortOrder.getDefault())}));
         IndexType indexType = (IndexType) TableProperty.INDEX_TYPE.getValue(tableProps);
         ListMultimap<String, Pair<String, Object>> indexProps = ArrayListMultimap.create();
@@ -1763,13 +1754,10 @@ public class MetaDataClient {
                     TableProperty.SALT_BUCKETS.getPropertyName(),
                     TableProperty.SALT_BUCKETS.getValue(tableProps)));
         }
-        // TODO: Transfer TTL and MaxLookback from statement.getProps() to indexProps.
         CreateIndexStatement indexStatement = FACTORY.createIndex(indexName, FACTORY.namedTable(null,
                         statement.getDataTable(), (Double) null), indexKeyConstraint, null, null,
-                        indexProps, statement.isIfNotExists(), indexType, false, 0,
+                        indexProps, statement.isIfNotExists(), indexType, true, 0,
                         new HashMap<>(), null);
-        // TODO: Currently index can be dropped, leaving the CDC dangling, DROP INDEX needs to
-        //  protect based on CDCUtil.isACDCIndex().
         MutationState indexMutationState;
         try {
             // TODO: Should we also allow PTimestamp here, in fact PTimestamp is the right type,
@@ -1782,24 +1770,18 @@ public class MetaDataClient {
                         statement.getCdcObjName().getName()).setRootCause(
                                 e).build().buildException();
             }
-            // TODO: What about translating other index creation failures? E.g., bad TS column.
             throw e;
         }
 
         List<PColumn> pkColumns = dataTable.getPKColumns();
         List<ColumnDef> columnDefs = new ArrayList<>();
         List<ColumnDefInPkConstraint> pkColumnDefs = new ArrayList<>();
-        // TODO: toString() on function will have an extra space at the beginning, but this may
-        //  be OK as I see exactly the same with an index.
-        ColumnName timeIdxCol = statement.getTimeIdxColumn() != null ?
-                statement.getTimeIdxColumn() :
-                FACTORY.columnName(statement.getTimeIdxFunc().toString());
+        ColumnName timeIdxCol = FACTORY.columnName(PhoenixRowTimestampFunction.NAME + "()");
         columnDefs.add(FACTORY.columnDef(timeIdxCol, PDate.INSTANCE.getSqlTypeName(), false, null, false,
                 PDate.INSTANCE.getMaxLength(null), PDate.INSTANCE.getScale(null), false,
                 SortOrder.getDefault(), "", null, false));
         pkColumnDefs.add(FACTORY.columnDefInPkConstraint(timeIdxCol, SortOrder.getDefault(), false));
         for (PColumn pcol : pkColumns) {
-            // TODO: Cross check with the ColumnName creation logic in createIndex (line ~1578).
             columnDefs.add(FACTORY.columnDef(FACTORY.columnName(pcol.getName().getString()),
                     pcol.getDataType().getSqlTypeName(), false, null, false, pcol.getMaxLength(),
                     pcol.getScale(), false, pcol.getSortOrder(), "", null, false));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 64ab453492..c64de2ef16 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -1104,11 +1104,5 @@ public interface PTable extends PMetaDataEntity {
          * Include only the post image (state past the change) of the row.
          */
         POST,
-
-        /**
-         * Include only the latest image of the row.
-         */
-        LATEST,
-        ;
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 369d349395..681b181283 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -353,7 +353,8 @@ public enum TableProperty {
         }
     },
 
-    INCLUDE(PhoenixDatabaseMetaData.CDC_INCLUDE_NAME, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, true, false, false) {
+    INCLUDE(PhoenixDatabaseMetaData.CDC_INCLUDE_NAME, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY,
+            true, false, false) {
         @Override
         public Object getPTableValue(PTable table) {
             return table.getCDCIncludeScopes();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
index e80fd4ee09..002da0a9c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -18,9 +18,7 @@
 
 package org.apache.phoenix.util;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.sql.SQLException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -28,15 +26,12 @@ import java.util.Set;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.schema.PTable;
 
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_INCLUDE_SCOPES;
-
 public class CDCUtil {
     public static final String CDC_INDEX_PREFIX = "__CDC__";
     public static final String CDC_INDEX_TYPE_LOCAL = "L";
@@ -48,7 +43,8 @@ public class CDCUtil {
      * @param includeScopes Comma-separated scope names.
      * @return the set of enums, which can be empty if the string is empty or has no valid names.
      */
-    public static Set<PTable.CDCChangeScope> makeChangeScopeEnumsFromString(String includeScopes) {
+    public static Set<PTable.CDCChangeScope> makeChangeScopeEnumsFromString(String includeScopes)
+            throws SQLException {
         Set<PTable.CDCChangeScope> cdcChangeScopes = new HashSet<>();
         if (includeScopes != null) {
             StringTokenizer st  = new StringTokenizer(includeScopes, ",");
@@ -58,7 +54,9 @@ public class CDCUtil {
                     cdcChangeScopes.add(PTable.CDCChangeScope.valueOf(tok.trim().toUpperCase()));
                 }
                 catch (IllegalArgumentException e) {
-                    // Just ignore unrecognized scopes.
+                    throw new SQLExceptionInfo.Builder(
+                            SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE).setCdcChangeScope(
+                                    tok).build().buildException();
                 }
             }
         }
@@ -90,12 +88,12 @@ public class CDCUtil {
         return indexName.substring(CDC_INDEX_PREFIX.length());
     }
 
-    public static boolean isACDCIndex(String indexName) {
+    public static boolean isCDCIndex(String indexName) {
         return indexName.startsWith(CDC_INDEX_PREFIX);
     }
 
-    public static boolean isACDCIndex(PTable indexTable) {
-        return isACDCIndex(indexTable.getTableName().getString());
+    public static boolean isCDCIndex(PTable indexTable) {
+        return isCDCIndex(indexTable.getTableName().getString());
     }
 
     public static Scan initForRawScan(Scan scan) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
index 829cb73ed0..71020bb659 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
@@ -497,47 +497,38 @@ public class QueryParserTest {
         }
     }
 
-    private CreateCDCStatement parseCreateCDCSimple(String sql, boolean ifNotExists, String tsCol)
+    private CreateCDCStatement parseCreateCDCSimple(String sql, boolean ifNotExists)
             throws Exception {
         CreateCDCStatement stmt = parseQuery(sql, CreateCDCStatement.class);
         assertEquals("FOO", stmt.getCdcObjName().getName());
         assertEquals("BAR", stmt.getDataTable().getTableName());
-        if (tsCol != null) {
-            assertEquals(tsCol, stmt.getTimeIdxColumn().getColumnName());
-        }
-        else {
-            assertNull(stmt.getTimeIdxColumn());
-        }
         assertEquals(ifNotExists, stmt.isIfNotExists());
         return stmt;
     }
 
     @Test
     public void testCreateCDCSimple() throws Exception {
+        parseCreateCDCSimple("create cdc foo on bar", false);
+        parseCreateCDCSimple("create cdc foo on s.bar", false);
+        parseCreateCDCSimple("create cdc if not exists foo on bar", true);
+        parseCreateCDCSimple("create cdc foo on bar index_type=g", false);
+        parseCreateCDCSimple("create cdc foo on bar index_type=l", false);
         CreateCDCStatement stmt = null;
-        parseCreateCDCSimple("create cdc foo on bar(ts)", false, "TS");
-        parseCreateCDCSimple("create cdc foo on s.bar(ts)", false, "TS");
-        parseCreateCDCSimple("create cdc if not exists foo on bar(ts)", true, "TS");
-        parseCreateCDCSimple("create cdc foo on bar(t) index_type=g", false, "T");
-        parseCreateCDCSimple("create cdc foo on bar(t) index_type=l", false, "T");
-        stmt = parseCreateCDCSimple("create cdc foo on bar(TS_FUNC()) TTL=100, INDEX_TYPE=g",
-                false, null);
-        assertEquals("TS_FUNC", stmt.getTimeIdxFunc().getName());
-        assertEquals(" TS_FUNC()", stmt.getTimeIdxFunc().toString());
+        stmt = parseCreateCDCSimple("create cdc foo on bar TTL=100, INDEX_TYPE=g", false);
         assertEquals(Arrays.asList(new Pair("TTL", 100), new Pair("INDEX_TYPE", "g")),
                 stmt.getProps().get(""));
-        stmt = parseCreateCDCSimple("create cdc foo on bar(ts) include (pre)", false, "TS");
+        stmt = parseCreateCDCSimple("create cdc foo on bar include (pre)", false);
         assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE)),
                 stmt.getIncludeScopes());
-        stmt = parseCreateCDCSimple("create cdc foo on bar(ts) include (pre, pre, post)",
-                false, "TS");
+        stmt = parseCreateCDCSimple("create cdc foo on bar include (pre, pre, post)",
+                false);
         assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE,
                 PTable.CDCChangeScope.POST)), stmt.getIncludeScopes());
-        stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) abc=def",
-                true, "TS");
+        stmt = parseCreateCDCSimple("create cdc if not exists foo on bar abc=def",
+                true);
         assertEquals(Arrays.asList(new Pair("ABC", "def")), stmt.getProps().get(""));
-        stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) abc=def, prop=val",
-                true, "TS");
+        stmt = parseCreateCDCSimple("create cdc if not exists foo on bar abc=def, prop=val",
+                true);
         assertEquals(Arrays.asList(new Pair("ABC", "def"), new Pair("PROP", "val")),
                 stmt.getProps().get(""));
     }
@@ -545,10 +536,7 @@ public class QueryParserTest {
     @Test
     public void testCreateCDCWithErrors() throws Exception {
         parseQueryThatShouldFail("create cdc foo");
-        parseQueryThatShouldFail("create cdc foo on bar");
-        parseQueryThatShouldFail("create cdc foo on bar(ts integer)");
-        parseQueryThatShouldFail("create cdc foo on bar(ts1, ts2)");
-        parseQueryThatShouldFail("create cdc foo on bar(ts) include (abc)");
+        parseQueryThatShouldFail("create cdc foo on bar include (abc)");
     }
 
     private void parseInvalidCreateCDC(String sql, int expRrrorCode) throws IOException {
@@ -563,7 +551,6 @@ public class QueryParserTest {
 
     @Test
     public void testInvalidCreateCDC() throws Exception {
-        parseInvalidCreateCDC("create cdc foo on bar", SQLExceptionCode.MISMATCHED_TOKEN.getErrorCode());
         parseInvalidCreateCDC("create cdc foo bar", SQLExceptionCode.MISSING_TOKEN.getErrorCode());
         parseInvalidCreateCDC("create cdc foo bar ts", SQLExceptionCode.MISSING_TOKEN.getErrorCode());
         parseInvalidCreateCDC("create cdc foo bar(ts)", SQLExceptionCode.MISSING_TOKEN.getErrorCode());
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
index e14fed4810..7feb261dc8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
@@ -18,9 +18,11 @@
 
 package org.apache.phoenix.util;
 
+import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.schema.PTable;
 import org.junit.Test;
 
+import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.HashSet;
 
@@ -38,10 +40,15 @@ public class CDCUtilTest {
                 CDCUtil.makeChangeScopeEnumsFromString("PRE,"));
         assertEquals(new HashSet<>(Arrays.asList(PRE)),
                 CDCUtil.makeChangeScopeEnumsFromString("PRE, PRE"));
-        assertEquals(new HashSet<>(Arrays.asList(PRE)),
-                CDCUtil.makeChangeScopeEnumsFromString("PRE,DUMMY"));
-        assertEquals(new HashSet<>(Arrays.asList(CHANGE, PRE, POST, LATEST)),
-                CDCUtil.makeChangeScopeEnumsFromString("POST,PRE,CHANGE,LATEST"));
+        assertEquals(new HashSet<>(Arrays.asList(CHANGE, PRE, POST)),
+                CDCUtil.makeChangeScopeEnumsFromString("POST,PRE,CHANGE"));
+        try {
+            CDCUtil.makeChangeScopeEnumsFromString("DUMMY");
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(),
+                    e.getErrorCode());
+            assertTrue(e.getMessage().endsWith("DUMMY"));
+        }
     }
 
     @Test
@@ -49,9 +56,7 @@ public class CDCUtilTest {
         assertEquals(null, CDCUtil.makeChangeScopeStringFromEnums(null));
         assertEquals("", CDCUtil.makeChangeScopeStringFromEnums(
                 new HashSet<PTable.CDCChangeScope>()));
-        assertEquals("CHANGE,PRE,POST,LATEST", CDCUtil.makeChangeScopeStringFromEnums(
-                new HashSet<>(Arrays.asList(CHANGE, PRE, POST, LATEST))));
-        assertEquals("CHANGE,PRE,POST,LATEST", CDCUtil.makeChangeScopeStringFromEnums(
-                new HashSet<>(Arrays.asList(PRE, LATEST, POST, CHANGE))));
+        assertEquals("CHANGE,PRE,POST", CDCUtil.makeChangeScopeStringFromEnums(
+                new HashSet<>(Arrays.asList(CHANGE, PRE, POST))));
     }
 }