You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2024/01/24 16:29:08 UTC
(phoenix) branch PHOENIX-7001-feature updated: PHOENIX-7008: Addressing Jira spec and review feedback changes (#1802)
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch PHOENIX-7001-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this push:
new f07898f1fd PHOENIX-7008: Addressing Jira spec and review feedback changes (#1802)
f07898f1fd is described below
commit f07898f1fd91f5777329b65175fd6ba5b643bbce
Author: Hari Krishna Dara <ha...@gmail.com>
AuthorDate: Wed Jan 24 21:59:00 2024 +0530
PHOENIX-7008: Addressing Jira spec and review feedback changes (#1802)
---
.../java/org/apache/phoenix/end2end/CDCMiscIT.java | 231 +++++++++++++--------
phoenix-core/src/main/antlr3/PhoenixSQL.g | 10 +-
.../apache/phoenix/exception/SQLExceptionCode.java | 4 +-
.../apache/phoenix/exception/SQLExceptionInfo.java | 15 ++
.../org/apache/phoenix/index/IndexMaintainer.java | 2 +-
.../phoenix/iterate/RegionScannerFactory.java | 60 +++---
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 11 +-
.../apache/phoenix/optimize/QueryOptimizer.java | 3 +-
.../apache/phoenix/parse/CreateCDCStatement.java | 15 +-
.../java/org/apache/phoenix/parse/HintNode.java | 2 +-
.../org/apache/phoenix/parse/ParseNodeFactory.java | 3 +-
.../org/apache/phoenix/schema/MetaDataClient.java | 26 +--
.../java/org/apache/phoenix/schema/PTable.java | 6 -
.../org/apache/phoenix/schema/TableProperty.java | 3 +-
.../main/java/org/apache/phoenix/util/CDCUtil.java | 24 +--
.../org/apache/phoenix/parse/QueryParserTest.java | 43 ++--
.../java/org/apache/phoenix/util/CDCUtilTest.java | 21 +-
17 files changed, 245 insertions(+), 234 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
index d1f04c02cc..be35bcbf46 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
@@ -19,23 +19,27 @@ package org.apache.phoenix.end2end;
import com.google.gson.Gson;
import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableProperty;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
-import javax.xml.transform.Result;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Timestamp;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -48,8 +52,22 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+@RunWith(Parameterized.class)
@Category(ParallelStatsDisabledTest.class)
public class CDCMiscIT extends ParallelStatsDisabledIT {
+ private final boolean forView;
+
+ public CDCMiscIT(boolean forView) {
+ this.forView = forView;
+ }
+
+ @Parameterized.Parameters(name = "forVieiw={0}")
+ public static synchronized Collection<Boolean[]> data() {
+ return Arrays.asList(new Boolean[][] {
+ { false}, { true }
+ });
+ }
+
private void assertCDCState(Connection conn, String cdcName, String expInclude,
int idxType) throws SQLException {
try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
@@ -89,19 +107,33 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
assertEquals(nbuckets, indexTable.getBucketNum());
}
+ private void createAndWait(Connection conn, String tableName, String cdcName, String cdc_sql)
+ throws Exception {
+ conn.createStatement().execute(cdc_sql);
+ IndexToolIT.runIndexTool(false, null, tableName,
+ "\""+CDCUtil.getCDCIndexName(cdcName)+"\"");
+ TestUtil.waitForIndexState(conn, CDCUtil.getCDCIndexName(cdcName), PIndexState.ACTIVE);
+ }
+
@Test
- public void testCreate() throws SQLException {
+ public void testCreate() throws Exception {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
conn.createStatement().execute(
"CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+ " v2 DATE)");
+ if (forView) {
+ String viewName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName);
+ tableName = viewName;
+ }
String cdcName = generateUniqueName();
try {
conn.createStatement().execute("CREATE CDC " + cdcName
- + " ON NON_EXISTENT_TABLE (PHOENIX_ROW_TIMESTAMP())");
+ + " ON NON_EXISTENT_TABLE");
fail("Expected to fail due to non-existent table");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
@@ -109,42 +141,16 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
try {
conn.createStatement().execute("CREATE CDC " + cdcName
- + " ON " + tableName +"(UNKNOWN_FUNCTION())");
- fail("Expected to fail due to invalid function");
- } catch (SQLException e) {
- assertEquals(SQLExceptionCode.FUNCTION_UNDEFINED.getErrorCode(), e.getErrorCode());
- }
-
- try {
- conn.createStatement().execute("CREATE CDC " + cdcName
- + " ON " + tableName +"(NOW())");
- fail("Expected to fail due to non-deterministic function");
- } catch (SQLException e) {
- assertEquals(SQLExceptionCode.NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX.
- getErrorCode(), e.getErrorCode());
- }
-
- try {
- conn.createStatement().execute("CREATE CDC " + cdcName
- + " ON " + tableName +"(ROUND(v1))");
- fail("Expected to fail due to non-date expression in the index PK");
- } catch (SQLException e) {
- assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
- e.getErrorCode());
- }
-
- try {
- conn.createStatement().execute("CREATE CDC " + cdcName
- + " ON " + tableName +"(v1)");
- fail("Expected to fail due to non-date column in the index PK");
+ + " ON " + tableName + " INCLUDE (abc)");
+ fail("Expected to fail due to invalid INCLUDE");
} catch (SQLException e) {
- assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
+ assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(),
e.getErrorCode());
+ assertTrue(e.getMessage().endsWith("abc"));
}
- String cdc_sql = "CREATE CDC " + cdcName
- + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
- conn.createStatement().execute(cdc_sql);
+ String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ createAndWait(conn, tableName, cdcName, cdc_sql);
assertCDCState(conn, cdcName, null, 3);
try {
@@ -154,43 +160,32 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(), e.getErrorCode());
assertTrue(e.getMessage().endsWith(cdcName));
}
+
conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + cdcName + " ON " + tableName +
- "(v2) INCLUDE (pre, post) INDEX_TYPE=g");
+ " INCLUDE (pre, post) INDEX_TYPE=g");
cdcName = generateUniqueName();
- conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
- "(v2) INCLUDE (pre, post) INDEX_TYPE=g");
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName +
+ " INCLUDE (pre, post) INDEX_TYPE=g";
+ createAndWait(conn, tableName, cdcName, cdc_sql);
assertCDCState(conn, cdcName, "PRE,POST", 3);
assertPTable(cdcName, new HashSet<>(
Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), tableName);
cdcName = generateUniqueName();
- conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
- "(v2) INDEX_TYPE=l");
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INDEX_TYPE=l";
+ createAndWait(conn, tableName, cdcName, cdc_sql);
assertCDCState(conn, cdcName, null, 2);
assertPTable(cdcName, null, tableName);
- String viewName = generateUniqueName();
- conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " +
- tableName);
- cdcName = generateUniqueName();
- try {
- conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + viewName +
- "(PHOENIX_ROW_TIMESTAMP())");
- fail("Expected to fail on VIEW");
- }
- catch(SQLException e) {
- assertEquals(SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC.getErrorCode(),
- e.getErrorCode());
- assertTrue(e.getMessage().endsWith(
- SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC.getMessage() + " tableType=VIEW"));
+ // Indexes on views don't support salt buckets and is currently silently ignored.
+ if (! forView) {
+ cdcName = generateUniqueName();
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " SALT_BUCKETS = 4";
+ createAndWait(conn, tableName, cdcName, cdc_sql);
+ assertSaltBuckets(cdcName, 4);
}
- cdcName = generateUniqueName();
- conn.createStatement().execute("CREATE CDC " + cdcName
- + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP()) SALT_BUCKETS = 4");
- assertSaltBuckets(cdcName, 4);
-
conn.close();
}
@@ -203,8 +198,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
" (tenantId INTEGER NOT NULL, k INTEGER NOT NULL," + " v1 INTEGER, v2 DATE, " +
"CONSTRAINT pk PRIMARY KEY (tenantId, k)) MULTI_TENANT=true");
String cdcName = generateUniqueName();
- conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
- "(PHOENIX_ROW_TIMESTAMP())");
+ conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName);
PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName));
List<PColumn> idxPkColumns = indexTable.getPKColumns();
@@ -214,7 +208,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName);
List<PColumn> cdcPkColumns = cdcTable.getPKColumns();
- assertEquals(" PHOENIX_ROW_TIMESTAMP()", cdcPkColumns.get(0).getName().getString());
+ assertEquals("PHOENIX_ROW_TIMESTAMP()", cdcPkColumns.get(0).getName().getString());
assertEquals("TENANTID", cdcPkColumns.get(1).getName().getString());
assertEquals("K", cdcPkColumns.get(2).getName().getString());
}
@@ -260,8 +254,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
"CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+ " v2 DATE)");
String cdcName = generateUniqueName();
- String cdc_sql = "CREATE CDC " + cdcName
- + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
+ String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
conn.createStatement().execute(cdc_sql);
assertCDCState(conn, cdcName, null, 3);
String drop_cdc_index_sql = "DROP INDEX \"" + CDCUtil.getCDCIndexName(cdcName) + "\" ON " + tableName;
@@ -291,28 +284,33 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
rs.close();
}
+ private Connection newConnection() throws SQLException {
+ Properties props = new Properties();
+ // Use these only for debugging.
+ //props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
+ //props.put("hbase.client.scanner.timeout.period", "6000000");
+ //props.put("phoenix.query.timeoutMs", "6000000");
+ //props.put("zookeeper.session.timeout", "6000000");
+ //props.put("hbase.rpc.timeout", "6000000");
+ return DriverManager.getConnection(getUrl(), props);
+ }
+
@Test
public void testSelectCDC() throws Exception {
- Properties props = new Properties();
- props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
- props.put("hbase.client.scanner.timeout.period", "6000000");
- props.put("phoenix.query.timeoutMs", "6000000");
- props.put("zookeeper.session.timeout", "6000000");
- props.put("hbase.rpc.timeout", "6000000");
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = newConnection();
String tableName = generateUniqueName();
conn.createStatement().execute(
"CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
conn.commit();
- Thread.sleep(1000);
+ Thread.sleep(10);
conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
conn.commit();
String cdcName = generateUniqueName();
String cdc_sql = "CREATE CDC " + cdcName
- + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
- conn.createStatement().execute(cdc_sql);
+ + " ON " + tableName;
+ createAndWait(conn, tableName, cdcName, cdc_sql);
assertCDCState(conn, cdcName, null, 3);
// NOTE: To debug the query execution, add the below condition where you need a breakpoint.
// if (<table>.getTableName().getString().equals("N000002") ||
@@ -322,8 +320,8 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName));
assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName +
" WHERE PHOENIX_ROW_TIMESTAMP() < NOW()"));
- assertResultSet(conn.createStatement().executeQuery("SELECT /*+ INCLUDE(PRE, POST) */ * " +
- "FROM " + cdcName));
+ assertResultSet(conn.createStatement().executeQuery("SELECT " +
+ "/*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName));
assertResultSet(conn.createStatement().executeQuery("SELECT " +
"PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcName));
@@ -359,16 +357,79 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testSelectCDCBadIncludeSpec() throws Exception {
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
+ String cdcName = generateUniqueName();
+ String cdc_sql = "CREATE CDC " + cdcName
+ + " ON " + tableName;
+ conn.createStatement().execute(cdc_sql);
+ try {
+ conn.createStatement().executeQuery("SELECT " +
+ "/*+ CDC_INCLUDE(DUMMY) */ * FROM " + cdcName);
+ fail("Expected to fail due to invalid CDC INCLUDE hint");
+ }
+ catch (SQLException e) {
+ assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getMessage().endsWith("DUMMY"));
+ }
+ }
+
+ @Test
+ public void testSelectTimeRangeQueries() throws Exception {
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
+ String cdcName = generateUniqueName();
+ String cdc_sql = "CREATE CDC " + cdcName
+ + " ON " + tableName;
+ conn.createStatement().execute(cdc_sql);
+ Timestamp ts1 = new Timestamp(System.currentTimeMillis());
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
+ conn.commit();
+ Thread.sleep(10);
+ Timestamp ts2 = new Timestamp(System.currentTimeMillis());
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (3, 300)");
+ conn.commit();
+ Thread.sleep(10);
+ Timestamp ts3 = new Timestamp(System.currentTimeMillis());
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
+ conn.createStatement().execute("DELETE FROM " + tableName + " WHERE k = 2");
+ Timestamp ts4 = new Timestamp(System.currentTimeMillis());
+
+ String sel_sql = "SELECT * FROM " + cdcName + " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND " +
+ "PHOENIX_ROW_TIMESTAMP() <= ?";
+ Object[] testDataSets = new Object[] {
+ new Object[] {ts1, ts2, new int[] {1, 2}}/*,
+ new Object[] {ts2, ts3, new int[] {1, 3}},
+ new Object[] {ts3, ts4, new int[] {1}}*/
+ };
+ PreparedStatement stmt = conn.prepareStatement(sel_sql);
+ for (int i = 0; i < testDataSets.length; ++i) {
+ Object[] testData = (Object[]) testDataSets[i];
+ stmt.setTimestamp(1, (Timestamp) testData[0]);
+ stmt.setTimestamp(2, (Timestamp) testData[1]);
+ try (ResultSet rs = stmt.executeQuery()) {
+ for (int k: (int[]) testData[2]) {
+ assertEquals(true, rs.next());
+ assertEquals(k, rs.getInt(2));
+ }
+ assertEquals(false, rs.next());
+ }
+ }
+ }
+
// Temporary test case used as a reference for debugging and comparing against the CDC query.
@Test
public void testSelectUncoveredIndex() throws Exception {
- Properties props = new Properties();
- props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
- props.put("hbase.client.scanner.timeout.period", "6000000");
- props.put("phoenix.query.timeoutMs", "6000000");
- props.put("zookeeper.session.timeout", "6000000");
- props.put("hbase.rpc.timeout", "6000000");
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = newConnection();
String tableName = generateUniqueName();
conn.createStatement().execute(
"CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, v1 INTEGER)");
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index cc0a2f10c8..737ef4aabf 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -566,11 +566,10 @@ create_index_node returns [CreateIndexStatement ret]
create_cdc_node returns [CreateCDCStatement ret]
: CREATE CDC (IF NOT ex=EXISTS)? o=cdc_name ON t=from_table_name
- LPAREN (tcol=column_name | tfunc=cdc_time_func) RPAREN
(INCLUDE LPAREN v=cdc_change_scopes RPAREN)?
(p=fam_properties)?
{
- ret = factory.createCDC(o, t, tcol, tfunc, v, p, ex != null, getBindCount());
+ ret = factory.createCDC(o, t, v, p, ex != null, getBindCount());
}
;
@@ -578,13 +577,6 @@ cdc_name returns [NamedNode ret]
: name=identifier {$ret = factory.cdcName(name); }
;
-cdc_time_func returns [FunctionParseNode ret]
- : field=identifier LPAREN l=zero_or_more_expressions RPAREN
- {
- ret = factory.function(field, l);
- }
- ;
-
cdc_change_scopes returns [Set<CDCChangeScope> ret]
@init { ret = new HashSet<>(); }
: v=cdc_change_scope { $ret.add(v); } ( COMMA v=cdc_change_scope { $ret.add(v); } )*
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 7842e8f4c0..6785a5da28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -360,7 +360,7 @@ public enum SQLExceptionCode {
VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES(10956, "44A38", "View can extend parent primary key"
+ " only if none of the parents have indexes in the parent hierarchy"),
UNKNOWN_INDEX_TYPE(1098, "44A39", "Unknown INDEX type: "),
- UNKNOWN_INCLUDE_CHANGE_SCOPE(1099, "44A40", "Unknown change scope for INCLUDE: "),
+ UNKNOWN_INCLUDE_CHANGE_SCOPE(1099, "44A40", "Unknown change scope for CDC INCLUDE"),
/** Sequence related */
SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
@@ -469,8 +469,6 @@ public enum SQLExceptionCode {
"Missing ENCODED_QUALIFIER."),
EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET(1151, "XCL51", "A batch operation can't include a "
+ "statement that produces result sets.", Factory.BATCH_UPDATE_ERROR),
- INVALID_TABLE_TYPE_FOR_CDC(1152, "XCL52",
- "Invalid table type for creating CDC."),
/**
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
index b7babd5dc6..3e27dc75c1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java
@@ -65,6 +65,7 @@ public class SQLExceptionInfo {
private final int phoenixColumnSizeBytes;
private final int maxPhoenixColumnSizeBytes;
private final String haGroupInfo;
+ private final String cdcChangeScope;
public static class Builder {
private Throwable rootCause;
@@ -83,6 +84,7 @@ public class SQLExceptionInfo {
private int maxPhoenixColumnSizeBytes;
private String haGroupInfo;
private PTableType tableType;
+ private String cdcChangeScope;
public Builder(SQLExceptionCode code) {
this.code = code;
@@ -163,6 +165,11 @@ public class SQLExceptionInfo {
return this;
}
+ public Builder setCdcChangeScope(String cdcChangeScope) {
+ this.cdcChangeScope = cdcChangeScope;
+ return this;
+ }
+
public SQLExceptionInfo build() {
return new SQLExceptionInfo(this);
}
@@ -190,6 +197,7 @@ public class SQLExceptionInfo {
maxPhoenixColumnSizeBytes = builder.maxPhoenixColumnSizeBytes;
phoenixColumnSizeBytes = builder.phoenixColumnSizeBytes;
haGroupInfo = builder.haGroupInfo;
+ cdcChangeScope = builder.cdcChangeScope;
}
@Override
@@ -235,6 +243,9 @@ public class SQLExceptionInfo {
if (haGroupInfo != null) {
builder.append(" ").append(HA_GROUP_INFO).append("=").append(haGroupInfo);
}
+ if (cdcChangeScope != null) {
+ builder.append(": ").append(cdcChangeScope);
+ }
return builder.toString();
}
@@ -306,4 +317,8 @@ public class SQLExceptionInfo {
public String getHaGroupInfo() {
return haGroupInfo;
}
+
+ public String getCdcChangeScope() {
+ return cdcChangeScope;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 002857b213..86f49208f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -676,7 +676,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
this.indexWhere = index.getIndexWhereExpression(connection);
this.indexWhereColumns = index.getIndexWhereColumns(connection);
}
- this.isCDCIndex = CDCUtil.isACDCIndex(index);
+ this.isCDCIndex = CDCUtil.isCDCIndex(index);
initCachedState();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 171108f87f..a536c6d016 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -135,7 +135,8 @@ public abstract class RegionScannerFactory {
long extraLimit = -1;
{
- if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) {
+ // for indexes construct the row filter for uncovered columns if it exists
+ if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) {
byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_FILTER);
if (expBytes == null) {
// For older clients
@@ -160,48 +161,45 @@ public abstract class RegionScannerFactory {
if (limitBytes != null) {
extraLimit = Bytes.toLong(limitBytes);
}
- if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)
- && (tupleProjector != null
- || (indexMaintainer != null && indexMaintainer.isUncovered()))) {
+ if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)
+ && (tupleProjector != null
+ || (indexMaintainer != null && indexMaintainer.isUncovered()))) {
- PTable.ImmutableStorageScheme storageScheme =
- indexMaintainer.getIndexStorageScheme();
+ PTable.ImmutableStorageScheme storageScheme =
+ indexMaintainer.getIndexStorageScheme();
Scan dataTableScan = new Scan();
- if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
- if (dataColumns != null) {
+ if (dataColumns != null) {
for (int i = 0; i < dataColumns.length; i++) {
- if (storageScheme ==
- PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
- dataTableScan.addFamily(dataColumns[i].getFamily());
- } else {
- dataTableScan.addColumn(dataColumns[i].getFamily(),
- dataColumns[i].getQualifier());
- }
+ if (storageScheme ==
+ PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ dataTableScan.addFamily(dataColumns[i].getFamily());
+ } else {
+ dataTableScan.addColumn(dataColumns[i].getFamily(),
+ dataColumns[i].getQualifier());
+ }
}
- } else if (indexMaintainer.isUncovered()) {
+ } else if (indexMaintainer.isUncovered()) {
// Indexed columns and the columns in index where clause should also be added
// to the data columns to scan for uncovered global indexes. This is required
// to verify the index row against the data table row.
for (ColumnReference column : indexMaintainer.getAllColumnsForDataTable()) {
- dataTableScan.addColumn(column.getFamily(), column.getQualifier());
+ dataTableScan.addColumn(column.getFamily(), column.getQualifier());
}
- }
}
if (ScanUtil.isLocalIndex(scan)) {
- s = new UncoveredLocalIndexRegionScanner(regionScanner, dataRegion, scan, env,
- dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
- pageSizeMs, offset, actualStartKey, extraLimit);
- } else {
- if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
- s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
- dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
- pageSizeMs, extraLimit);
- }
- else {
- s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
+ s = new UncoveredLocalIndexRegionScanner(regionScanner, dataRegion, scan, env,
dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
- pageSizeMs, extraLimit);
- }
+ pageSizeMs, offset, actualStartKey, extraLimit);
+ } else {
+ if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
+ s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
+ dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
+ pageSizeMs, extraLimit);
+ } else {
+ s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
+ dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
+ pageSizeMs, extraLimit);
+ }
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 026095a4b7..244e4d14bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -123,7 +123,6 @@ import org.apache.phoenix.log.QueryLoggerUtil;
import org.apache.phoenix.log.QueryStatus;
import org.apache.phoenix.monitoring.TableMetricsManager;
import org.apache.phoenix.optimize.Cost;
-import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.AddColumnStatement;
import org.apache.phoenix.parse.AddJarsStatement;
import org.apache.phoenix.parse.AliasedNode;
@@ -146,7 +145,6 @@ import org.apache.phoenix.parse.DeclareCursorStatement;
import org.apache.phoenix.parse.DeleteJarStatement;
import org.apache.phoenix.parse.DeleteStatement;
import org.apache.phoenix.parse.ExplainType;
-import org.apache.phoenix.parse.FunctionParseNode;
import org.apache.phoenix.parse.ShowCreateTableStatement;
import org.apache.phoenix.parse.ShowCreateTable;
import org.apache.phoenix.parse.DropColumnStatement;
@@ -1082,12 +1080,10 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
private static class ExecutableCreateCDCStatement extends CreateCDCStatement
implements CompilableStatement {
public ExecutableCreateCDCStatement(NamedNode cdcObjName, TableName dataTable,
- ColumnName timeIdxColumn, FunctionParseNode tfunc,
Set<PTable.CDCChangeScope> includeScopes,
ListMultimap<String, Pair<String, Object>> props,
boolean ifNotExists, int bindCount) {
- super(cdcObjName, dataTable, timeIdxColumn, tfunc, includeScopes, props, ifNotExists,
- bindCount);
+ super(cdcObjName, dataTable, includeScopes, props, ifNotExists, bindCount);
}
@Override
@@ -1594,7 +1590,7 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
@Override
public MutationState execute() throws SQLException {
String indexName = ExecutableDropIndexStatement.this.getIndexName().getName();
- if (CDCUtil.isACDCIndex(indexName)) {
+ if (CDCUtil.isCDCIndex(indexName)) {
throw new SQLExceptionInfo.Builder(CANNOT_DROP_CDC_INDEX)
.setTableName(indexName)
.build().buildException();
@@ -1940,11 +1936,10 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
@Override
public CreateCDCStatement createCDC(NamedNode cdcObj, TableName dataTable,
- ColumnName timeIdxColumn, FunctionParseNode timeIdxFunc,
Set<PTable.CDCChangeScope> includeScopes,
ListMultimap<String, Pair<String, Object>> props,
boolean ifNotExists, int bindCount) {
- return new ExecutableCreateCDCStatement(cdcObj, dataTable, timeIdxColumn, timeIdxFunc,
+ return new ExecutableCreateCDCStatement(cdcObj, dataTable,
includeScopes, props, ifNotExists, bindCount);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 5d81f0b585..48a5734149 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -220,10 +220,9 @@ public class QueryOptimizer {
}
PTable table = dataPlan.getTableRef().getTable();
- // TODO: Need to handle CDC hints.
if (table.getType() == PTableType.CDC) {
Set<PTable.CDCChangeScope> cdcIncludeScopes = table.getCDCIncludeScopes();
- String cdcHint = select.getHint().getHint(Hint.INCLUDE);
+ String cdcHint = select.getHint().getHint(Hint.CDC_INCLUDE);
if (cdcHint != null && cdcHint.startsWith(HintNode.PREFIX)) {
cdcIncludeScopes = CDCUtil.makeChangeScopeEnumsFromString(cdcHint.substring(1,
cdcHint.length() - 1));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateCDCStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateCDCStatement.java
index 9b1468a3ef..5722ab2a20 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateCDCStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateCDCStatement.java
@@ -28,21 +28,16 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
public class CreateCDCStatement extends MutableStatement {
private final NamedNode cdcObjName;
private final TableName dataTable;
- private final ColumnName timeIdxColumn;
- private final FunctionParseNode timeIdxFunc;
private final Set<PTable.CDCChangeScope> includeScopes;
private final ListMultimap<String,Pair<String,Object>> props;
private final boolean ifNotExists;
private final int bindCount;
- public CreateCDCStatement(NamedNode cdcObjName, TableName dataTable, ColumnName timeIdxColumn,
- FunctionParseNode timeIdxFunc,
+ public CreateCDCStatement(NamedNode cdcObjName, TableName dataTable,
Set<PTable.CDCChangeScope> includeScopes, ListMultimap<String,
Pair<String, Object>> props, boolean ifNotExists, int bindCount) {
this.cdcObjName = cdcObjName;
this.dataTable = dataTable;
- this.timeIdxColumn = timeIdxColumn;
- this.timeIdxFunc = timeIdxFunc;
this.includeScopes = includeScopes;
this.props = props == null ? ArrayListMultimap.<String,Pair<String,Object>>create() : props;
this.ifNotExists = ifNotExists;
@@ -57,14 +52,6 @@ public class CreateCDCStatement extends MutableStatement {
return dataTable;
}
- public ColumnName getTimeIdxColumn() {
- return timeIdxColumn;
- }
-
- public FunctionParseNode getTimeIdxFunc() {
- return timeIdxFunc;
- }
-
public Set<PTable.CDCChangeScope> getIncludeScopes() {
return includeScopes;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
index a80b092956..6c1f97b7fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
@@ -133,7 +133,7 @@ public class HintNode {
/**
* Override the default CDC include scopes.
*/
- INCLUDE,
+ CDC_INCLUDE,
;
};
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 390ae10ade..17ffa6a3f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -377,11 +377,10 @@ public class ParseNodeFactory {
}
public CreateCDCStatement createCDC(NamedNode cdcObj, TableName dataTable,
- ColumnName timeIdxColumn, FunctionParseNode timeIdxFunc,
Set<PTable.CDCChangeScope> includeScopes,
ListMultimap<String, Pair<String, Object>> props,
boolean ifNotExists, int bindCount) {
- return new CreateCDCStatement(cdcObj, dataTable, timeIdxColumn, timeIdxFunc, includeScopes,
+ return new CreateCDCStatement(cdcObj, dataTable, includeScopes,
props, ifNotExists, bindCount);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 76108a6220..9717430a11 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.schema;
import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE;
import static org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY;
-import static org.apache.phoenix.exception.SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC;
import static org.apache.phoenix.exception.SQLExceptionCode.TABLE_ALREADY_EXIST;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
@@ -161,6 +160,7 @@ import java.util.Set;
import java.util.HashSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.expression.function.PhoenixRowTimestampFunction;
import org.apache.phoenix.parse.CreateCDCStatement;
import org.apache.phoenix.parse.DropCDCStatement;
import org.apache.hadoop.hbase.client.Table;
@@ -1731,15 +1731,9 @@ public class MetaDataClient {
}
public MutationState createCDC(CreateCDCStatement statement) throws SQLException {
- // TODO: Do we need to borrow the schema name of the table?
ColumnResolver resolver = FromCompiler.getResolver(NamedTableNode.create(statement.getDataTable()), connection);
TableRef tableRef = resolver.getTables().get(0);
PTable dataTable = tableRef.getTable();
- // Check if data table is a view and give a not supported error.
- if (dataTable.getType() != TABLE) {
- throw new SQLExceptionInfo.Builder(INVALID_TABLE_TYPE_FOR_CDC).setTableType(
- dataTable.getType()).build().buildException();
- }
Map<String, Object> tableProps = Maps.newHashMapWithExpectedSize(
statement.getProps().size());
@@ -1749,12 +1743,9 @@ public class MetaDataClient {
NamedNode indexName = FACTORY.indexName(CDCUtil.getCDCIndexName(
statement.getCdcObjName().getName()));
- String timeIdxColName = statement.getTimeIdxColumn() != null ?
- statement.getTimeIdxColumn().getColumnName() : null;
IndexKeyConstraint indexKeyConstraint =
FACTORY.indexKey(Arrays.asList(new Pair[]{Pair.newPair(
- timeIdxColName != null ? FACTORY.column(statement.getDataTable(),
- timeIdxColName, timeIdxColName) : statement.getTimeIdxFunc(),
+ FACTORY.function(PhoenixRowTimestampFunction.NAME, Collections.emptyList()),
SortOrder.getDefault())}));
IndexType indexType = (IndexType) TableProperty.INDEX_TYPE.getValue(tableProps);
ListMultimap<String, Pair<String, Object>> indexProps = ArrayListMultimap.create();
@@ -1763,13 +1754,10 @@ public class MetaDataClient {
TableProperty.SALT_BUCKETS.getPropertyName(),
TableProperty.SALT_BUCKETS.getValue(tableProps)));
}
- // TODO: Transfer TTL and MaxLookback from statement.getProps() to indexProps.
CreateIndexStatement indexStatement = FACTORY.createIndex(indexName, FACTORY.namedTable(null,
statement.getDataTable(), (Double) null), indexKeyConstraint, null, null,
- indexProps, statement.isIfNotExists(), indexType, false, 0,
+ indexProps, statement.isIfNotExists(), indexType, true, 0,
new HashMap<>(), null);
- // TODO: Currently index can be dropped, leaving the CDC dangling, DROP INDEX needs to
- // protect based on CDCUtil.isACDCIndex().
MutationState indexMutationState;
try {
// TODO: Should we also allow PTimestamp here, in fact PTimestamp is the right type,
@@ -1782,24 +1770,18 @@ public class MetaDataClient {
statement.getCdcObjName().getName()).setRootCause(
e).build().buildException();
}
- // TODO: What about translating other index creation failures? E.g., bad TS column.
throw e;
}
List<PColumn> pkColumns = dataTable.getPKColumns();
List<ColumnDef> columnDefs = new ArrayList<>();
List<ColumnDefInPkConstraint> pkColumnDefs = new ArrayList<>();
- // TODO: toString() on function will have an extra space at the beginning, but this may
- // be OK as I see exactly the same with an index.
- ColumnName timeIdxCol = statement.getTimeIdxColumn() != null ?
- statement.getTimeIdxColumn() :
- FACTORY.columnName(statement.getTimeIdxFunc().toString());
+ ColumnName timeIdxCol = FACTORY.columnName(PhoenixRowTimestampFunction.NAME + "()");
columnDefs.add(FACTORY.columnDef(timeIdxCol, PDate.INSTANCE.getSqlTypeName(), false, null, false,
PDate.INSTANCE.getMaxLength(null), PDate.INSTANCE.getScale(null), false,
SortOrder.getDefault(), "", null, false));
pkColumnDefs.add(FACTORY.columnDefInPkConstraint(timeIdxCol, SortOrder.getDefault(), false));
for (PColumn pcol : pkColumns) {
- // TODO: Cross check with the ColumnName creation logic in createIndex (line ~1578).
columnDefs.add(FACTORY.columnDef(FACTORY.columnName(pcol.getName().getString()),
pcol.getDataType().getSqlTypeName(), false, null, false, pcol.getMaxLength(),
pcol.getScale(), false, pcol.getSortOrder(), "", null, false));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 64ab453492..c64de2ef16 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -1104,11 +1104,5 @@ public interface PTable extends PMetaDataEntity {
* Include only the post image (state past the change) of the row.
*/
POST,
-
- /**
- * Include only the latest image of the row.
- */
- LATEST,
- ;
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 369d349395..681b181283 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -353,7 +353,8 @@ public enum TableProperty {
}
},
- INCLUDE(PhoenixDatabaseMetaData.CDC_INCLUDE_NAME, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, true, false, false) {
+ INCLUDE(PhoenixDatabaseMetaData.CDC_INCLUDE_NAME, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY,
+ true, false, false) {
@Override
public Object getPTableValue(PTable table) {
return table.getCDCIncludeScopes();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
index e80fd4ee09..002da0a9c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -18,9 +18,7 @@
package org.apache.phoenix.util;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.sql.SQLException;
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableSet;
@@ -28,15 +26,12 @@ import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.schema.PTable;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_INCLUDE_SCOPES;
-
public class CDCUtil {
public static final String CDC_INDEX_PREFIX = "__CDC__";
public static final String CDC_INDEX_TYPE_LOCAL = "L";
@@ -48,7 +43,8 @@ public class CDCUtil {
* @param includeScopes Comma-separated scope names.
* @return the set of enums, which can be empty if the string is empty or has no valid names.
*/
- public static Set<PTable.CDCChangeScope> makeChangeScopeEnumsFromString(String includeScopes) {
+ public static Set<PTable.CDCChangeScope> makeChangeScopeEnumsFromString(String includeScopes)
+ throws SQLException {
Set<PTable.CDCChangeScope> cdcChangeScopes = new HashSet<>();
if (includeScopes != null) {
StringTokenizer st = new StringTokenizer(includeScopes, ",");
@@ -58,7 +54,9 @@ public class CDCUtil {
cdcChangeScopes.add(PTable.CDCChangeScope.valueOf(tok.trim().toUpperCase()));
}
catch (IllegalArgumentException e) {
- // Just ignore unrecognized scopes.
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE).setCdcChangeScope(
+ tok).build().buildException();
}
}
}
@@ -90,12 +88,12 @@ public class CDCUtil {
return indexName.substring(CDC_INDEX_PREFIX.length());
}
- public static boolean isACDCIndex(String indexName) {
+ public static boolean isCDCIndex(String indexName) {
return indexName.startsWith(CDC_INDEX_PREFIX);
}
- public static boolean isACDCIndex(PTable indexTable) {
- return isACDCIndex(indexTable.getTableName().getString());
+ public static boolean isCDCIndex(PTable indexTable) {
+ return isCDCIndex(indexTable.getTableName().getString());
}
public static Scan initForRawScan(Scan scan) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
index 829cb73ed0..71020bb659 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
@@ -497,47 +497,38 @@ public class QueryParserTest {
}
}
- private CreateCDCStatement parseCreateCDCSimple(String sql, boolean ifNotExists, String tsCol)
+ private CreateCDCStatement parseCreateCDCSimple(String sql, boolean ifNotExists)
throws Exception {
CreateCDCStatement stmt = parseQuery(sql, CreateCDCStatement.class);
assertEquals("FOO", stmt.getCdcObjName().getName());
assertEquals("BAR", stmt.getDataTable().getTableName());
- if (tsCol != null) {
- assertEquals(tsCol, stmt.getTimeIdxColumn().getColumnName());
- }
- else {
- assertNull(stmt.getTimeIdxColumn());
- }
assertEquals(ifNotExists, stmt.isIfNotExists());
return stmt;
}
@Test
public void testCreateCDCSimple() throws Exception {
+ parseCreateCDCSimple("create cdc foo on bar", false);
+ parseCreateCDCSimple("create cdc foo on s.bar", false);
+ parseCreateCDCSimple("create cdc if not exists foo on bar", true);
+ parseCreateCDCSimple("create cdc foo on bar index_type=g", false);
+ parseCreateCDCSimple("create cdc foo on bar index_type=l", false);
CreateCDCStatement stmt = null;
- parseCreateCDCSimple("create cdc foo on bar(ts)", false, "TS");
- parseCreateCDCSimple("create cdc foo on s.bar(ts)", false, "TS");
- parseCreateCDCSimple("create cdc if not exists foo on bar(ts)", true, "TS");
- parseCreateCDCSimple("create cdc foo on bar(t) index_type=g", false, "T");
- parseCreateCDCSimple("create cdc foo on bar(t) index_type=l", false, "T");
- stmt = parseCreateCDCSimple("create cdc foo on bar(TS_FUNC()) TTL=100, INDEX_TYPE=g",
- false, null);
- assertEquals("TS_FUNC", stmt.getTimeIdxFunc().getName());
- assertEquals(" TS_FUNC()", stmt.getTimeIdxFunc().toString());
+ stmt = parseCreateCDCSimple("create cdc foo on bar TTL=100, INDEX_TYPE=g", false);
assertEquals(Arrays.asList(new Pair("TTL", 100), new Pair("INDEX_TYPE", "g")),
stmt.getProps().get(""));
- stmt = parseCreateCDCSimple("create cdc foo on bar(ts) include (pre)", false, "TS");
+ stmt = parseCreateCDCSimple("create cdc foo on bar include (pre)", false);
assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE)),
stmt.getIncludeScopes());
- stmt = parseCreateCDCSimple("create cdc foo on bar(ts) include (pre, pre, post)",
- false, "TS");
+ stmt = parseCreateCDCSimple("create cdc foo on bar include (pre, pre, post)",
+ false);
assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE,
PTable.CDCChangeScope.POST)), stmt.getIncludeScopes());
- stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) abc=def",
- true, "TS");
+ stmt = parseCreateCDCSimple("create cdc if not exists foo on bar abc=def",
+ true);
assertEquals(Arrays.asList(new Pair("ABC", "def")), stmt.getProps().get(""));
- stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) abc=def, prop=val",
- true, "TS");
+ stmt = parseCreateCDCSimple("create cdc if not exists foo on bar abc=def, prop=val",
+ true);
assertEquals(Arrays.asList(new Pair("ABC", "def"), new Pair("PROP", "val")),
stmt.getProps().get(""));
}
@@ -545,10 +536,7 @@ public class QueryParserTest {
@Test
public void testCreateCDCWithErrors() throws Exception {
parseQueryThatShouldFail("create cdc foo");
- parseQueryThatShouldFail("create cdc foo on bar");
- parseQueryThatShouldFail("create cdc foo on bar(ts integer)");
- parseQueryThatShouldFail("create cdc foo on bar(ts1, ts2)");
- parseQueryThatShouldFail("create cdc foo on bar(ts) include (abc)");
+ parseQueryThatShouldFail("create cdc foo on bar include (abc)");
}
private void parseInvalidCreateCDC(String sql, int expRrrorCode) throws IOException {
@@ -563,7 +551,6 @@ public class QueryParserTest {
@Test
public void testInvalidCreateCDC() throws Exception {
- parseInvalidCreateCDC("create cdc foo on bar", SQLExceptionCode.MISMATCHED_TOKEN.getErrorCode());
parseInvalidCreateCDC("create cdc foo bar", SQLExceptionCode.MISSING_TOKEN.getErrorCode());
parseInvalidCreateCDC("create cdc foo bar ts", SQLExceptionCode.MISSING_TOKEN.getErrorCode());
parseInvalidCreateCDC("create cdc foo bar(ts)", SQLExceptionCode.MISSING_TOKEN.getErrorCode());
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
index e14fed4810..7feb261dc8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
@@ -18,9 +18,11 @@
package org.apache.phoenix.util;
+import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.schema.PTable;
import org.junit.Test;
+import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
@@ -38,10 +40,15 @@ public class CDCUtilTest {
CDCUtil.makeChangeScopeEnumsFromString("PRE,"));
assertEquals(new HashSet<>(Arrays.asList(PRE)),
CDCUtil.makeChangeScopeEnumsFromString("PRE, PRE"));
- assertEquals(new HashSet<>(Arrays.asList(PRE)),
- CDCUtil.makeChangeScopeEnumsFromString("PRE,DUMMY"));
- assertEquals(new HashSet<>(Arrays.asList(CHANGE, PRE, POST, LATEST)),
- CDCUtil.makeChangeScopeEnumsFromString("POST,PRE,CHANGE,LATEST"));
+ assertEquals(new HashSet<>(Arrays.asList(CHANGE, PRE, POST)),
+ CDCUtil.makeChangeScopeEnumsFromString("POST,PRE,CHANGE"));
+ try {
+ CDCUtil.makeChangeScopeEnumsFromString("DUMMY");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getMessage().endsWith("DUMMY"));
+ }
}
@Test
@@ -49,9 +56,7 @@ public class CDCUtilTest {
assertEquals(null, CDCUtil.makeChangeScopeStringFromEnums(null));
assertEquals("", CDCUtil.makeChangeScopeStringFromEnums(
new HashSet<PTable.CDCChangeScope>()));
- assertEquals("CHANGE,PRE,POST,LATEST", CDCUtil.makeChangeScopeStringFromEnums(
- new HashSet<>(Arrays.asList(CHANGE, PRE, POST, LATEST))));
- assertEquals("CHANGE,PRE,POST,LATEST", CDCUtil.makeChangeScopeStringFromEnums(
- new HashSet<>(Arrays.asList(PRE, LATEST, POST, CHANGE))));
+ assertEquals("CHANGE,PRE,POST", CDCUtil.makeChangeScopeStringFromEnums(
+ new HashSet<>(Arrays.asList(CHANGE, PRE, POST))));
}
}