You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2023/12/23 16:29:24 UTC
(phoenix) branch PHOENIX-7001-feature updated: PHOENIX-7014: Query compiler/optimizer changes along with some PHOENIX-7015 changes (#1766)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch PHOENIX-7001-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this push:
new 7420443d84 PHOENIX-7014: Query compiler/optimizer changes along with some PHOENIX-7015 changes (#1766)
7420443d84 is described below
commit 7420443d84ce6786e2594fd18f5f02c7b742197f
Author: Hari Krishna Dara <ha...@gmail.com>
AuthorDate: Sat Dec 23 21:59:19 2023 +0530
PHOENIX-7014: Query compiler/optimizer changes along with some PHOENIX-7015 changes (#1766)
---
phoenix-core/pom.xml | 5 +
.../java/org/apache/phoenix/end2end/CDCMiscIT.java | 148 +++++++++++++-
.../index/UncoveredGlobalIndexRegionScannerIT.java | 8 +-
.../apache/phoenix/compile/ProjectionCompiler.java | 2 +-
.../org/apache/phoenix/compile/QueryCompiler.java | 12 ++
.../apache/phoenix/compile/StatementContext.java | 10 +-
.../phoenix/compile/TupleProjectionCompiler.java | 18 +-
.../coprocessor/BaseScannerRegionObserver.java | 7 +-
.../coprocessor/CDCGlobalIndexRegionScanner.java | 226 +++++++++++++++++++++
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 5 +-
.../coprocessor/UncoveredIndexRegionScanner.java | 29 ++-
.../org/apache/phoenix/execute/BaseQueryPlan.java | 30 ++-
.../phoenix/iterate/RegionScannerFactory.java | 50 +++--
.../phoenix/iterate/TableResultIterator.java | 2 +-
.../apache/phoenix/optimize/QueryOptimizer.java | 18 +-
.../java/org/apache/phoenix/parse/HintNode.java | 177 ++++++++--------
.../java/org/apache/phoenix/schema/ColumnRef.java | 3 +-
.../org/apache/phoenix/schema/MetaDataClient.java | 11 +-
.../java/org/apache/phoenix/schema/PTableImpl.java | 4 +-
.../main/java/org/apache/phoenix/util/CDCUtil.java | 30 +++
.../apache/phoenix/util/EncodedColumnsUtil.java | 8 +-
.../java/org/apache/phoenix/util/ScanUtil.java | 137 ++++++++++++-
phoenix-pherf/pom.xml | 2 -
pom.xml | 6 +
24 files changed, 783 insertions(+), 165 deletions(-)
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 7827a065f0..b1534225ca 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -249,6 +249,11 @@
</build>
<dependencies>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+
<!-- shaded thirdparty dependencies -->
<dependency>
<groupId>org.apache.phoenix.thirdparty</groupId>
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
index 7e081419e2..d1f04c02cc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
@@ -17,7 +17,10 @@
*/
package org.apache.phoenix.end2end;
+import com.google.gson.Gson;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableProperty;
@@ -27,13 +30,16 @@ import org.apache.phoenix.util.SchemaUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import javax.xml.transform.Result;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -60,7 +66,8 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
}
}
- private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes)
+ private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes,
+ String datatableName)
throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -68,6 +75,9 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
assertEquals(expIncludeScopes, table.getCDCIncludeScopes());
assertEquals(expIncludeScopes, TableProperty.INCLUDE.getPTableValue(table));
assertNull(table.getIndexState()); // Index state should be null for CDC.
+ assertNull(table.getIndexType()); // This is not an index.
+ assertEquals(datatableName, table.getParentName().getString());
+ assertEquals(CDCUtil.getCDCIndexName(cdcName), table.getPhysicalName().getString());
}
private void assertSaltBuckets(String cdcName, Integer nbuckets) throws SQLException {
@@ -117,7 +127,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(ROUND(v1))");
- fail("Expected to fail due to non-timestamp expression in the index PK");
+ fail("Expected to fail due to non-date expression in the index PK");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
e.getErrorCode());
@@ -126,7 +136,7 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(v1)");
- fail("Expected to fail due to non-timestamp column in the index PK");
+ fail("Expected to fail due to non-date column in the index PK");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
e.getErrorCode());
@@ -152,13 +162,13 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
"(v2) INCLUDE (pre, post) INDEX_TYPE=g");
assertCDCState(conn, cdcName, "PRE,POST", 3);
assertPTable(cdcName, new HashSet<>(
- Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)));
+ Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), tableName);
cdcName = generateUniqueName();
conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
"(v2) INDEX_TYPE=l");
assertCDCState(conn, cdcName, null, 2);
- assertPTable(cdcName, null);
+ assertPTable(cdcName, null, tableName);
String viewName = generateUniqueName();
conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " +
@@ -209,7 +219,6 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
assertEquals("K", cdcPkColumns.get(2).getName().getString());
}
- @Test
public void testDropCDC () throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -219,11 +228,6 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
+ " v2 DATE)");
String cdcName = generateUniqueName();
- String cdc_sql = "CREATE CDC " + cdcName
- + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
- conn.createStatement().execute(cdc_sql);
- assertCDCState(conn, cdcName, null, 3);
-
String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
conn.createStatement().execute(drop_cdc_sql);
@@ -269,4 +273,126 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
}
}
+ private void assertResultSet(ResultSet rs) throws Exception{
+ Gson gson = new Gson();
+ assertEquals(true, rs.next());
+ assertEquals(1, rs.getInt(2));
+ assertEquals(new HashMap(){{put("V1", 100d);}}, gson.fromJson(rs.getString(3),
+ HashMap.class));
+ assertEquals(true, rs.next());
+ assertEquals(2, rs.getInt(2));
+ assertEquals(new HashMap(){{put("V1", 200d);}}, gson.fromJson(rs.getString(3),
+ HashMap.class));
+ assertEquals(true, rs.next());
+ assertEquals(1, rs.getInt(2));
+ assertEquals(new HashMap(){{put("V1", 101d);}}, gson.fromJson(rs.getString(3),
+ HashMap.class));
+ assertEquals(false, rs.next());
+ rs.close();
+ }
+
+ @Test
+ public void testSelectCDC() throws Exception {
+ Properties props = new Properties();
+ props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
+ props.put("hbase.client.scanner.timeout.period", "6000000");
+ props.put("phoenix.query.timeoutMs", "6000000");
+ props.put("zookeeper.session.timeout", "6000000");
+ props.put("hbase.rpc.timeout", "6000000");
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
+ conn.commit();
+ Thread.sleep(1000);
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
+ conn.commit();
+ String cdcName = generateUniqueName();
+ String cdc_sql = "CREATE CDC " + cdcName
+ + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
+ conn.createStatement().execute(cdc_sql);
+ assertCDCState(conn, cdcName, null, 3);
+ // NOTE: To debug the query execution, add the below condition where you need a breakpoint.
+ // if (<table>.getTableName().getString().equals("N000002") ||
+ // <table>.getTableName().getString().equals("__CDC__N000002")) {
+ // "".isEmpty();
+ // }
+ assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName));
+ assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName +
+ " WHERE PHOENIX_ROW_TIMESTAMP() < NOW()"));
+ assertResultSet(conn.createStatement().executeQuery("SELECT /*+ INCLUDE(PRE, POST) */ * " +
+ "FROM " + cdcName));
+ assertResultSet(conn.createStatement().executeQuery("SELECT " +
+ "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcName));
+
+ HashMap<String, int[]> testQueries = new HashMap<String, int[]>() {{
+ put("SELECT 'dummy', k FROM " + cdcName, new int [] {2, 1});
+ put("SELECT * FROM " + cdcName +
+ " ORDER BY k ASC", new int [] {1, 1, 2});
+ put("SELECT * FROM " + cdcName +
+ " ORDER BY k DESC", new int [] {2, 1, 1});
+ put("SELECT * FROM " + cdcName +
+ " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC", new int [] {1, 2, 1});
+ }};
+ for (Map.Entry<String, int[]> testQuery: testQueries.entrySet()) {
+ try (ResultSet rs = conn.createStatement().executeQuery(testQuery.getKey())) {
+ for (int k: testQuery.getValue()) {
+ assertEquals(true, rs.next());
+ assertEquals(k, rs.getInt(2));
+ }
+ assertEquals(false, rs.next());
+ }
+ }
+
+ try (ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT * FROM " + cdcName + " WHERE PHOENIX_ROW_TIMESTAMP() > NOW()")) {
+ assertEquals(false, rs.next());
+ }
+ try (ResultSet rs = conn.createStatement().executeQuery("SELECT 'abc' FROM " + cdcName)) {
+ assertEquals(true, rs.next());
+ assertEquals("abc", rs.getString(1));
+ assertEquals(true, rs.next());
+ assertEquals("abc", rs.getString(1));
+ assertEquals(false, rs.next());
+ }
+ }
+
+ // Temporary test case used as a reference for debugging and comparing against the CDC query.
+ @Test
+ public void testSelectUncoveredIndex() throws Exception {
+ Properties props = new Properties();
+ props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
+ props.put("hbase.client.scanner.timeout.period", "6000000");
+ props.put("phoenix.query.timeoutMs", "6000000");
+ props.put("zookeeper.session.timeout", "6000000");
+ props.put("hbase.rpc.timeout", "6000000");
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, v1 INTEGER)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" +
+ " (1, 100)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" +
+ " (2, 200)");
+ conn.commit();
+ String indexName = generateUniqueName();
+ String index_sql = "CREATE UNCOVERED INDEX " + indexName
+ + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
+ conn.createStatement().execute(index_sql);
+ //ResultSet rs =
+ // conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName +
+ // " " + indexName + ") */ * FROM " + tableName);
+ ResultSet rs =
+ conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName +
+ " " + indexName + ") */ K, V1, PHOENIX_ROW_TIMESTAMP() FROM " + tableName);
+ assertEquals(true, rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals(100, rs.getInt(2));
+ assertEquals(true, rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals(200, rs.getInt(2));
+ assertEquals(false, rs.next());
+ }
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java
index e29284a951..aae8e48108 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java
@@ -175,7 +175,7 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest {
String timeZoneID = Calendar.getInstance().getTimeZone().getID();
// Write a query to get the val2 = 'bc' with a time range query
String query = "SELECT"+ (uncovered ? " " : "/*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ ")
- + "val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName
+ + "val1, val2, PHOENIX_ROW_TIMESTAMP(), val3 from " + dataTableName
+ " WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('"
+ before.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '"
+ timeZoneID + "') AND " + "PHOENIX_ROW_TIMESTAMP() < TO_DATE('" + after
@@ -186,8 +186,10 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest {
assertTrue(rs.next());
assertEquals("bc", rs.getString(1));
assertEquals("bcd", rs.getString(2));
+ assertEquals("bcd", rs.getString(2));
assertTrue(rs.getTimestamp(3).after(before));
assertTrue(rs.getTimestamp(3).before(after));
+ assertEquals("bcde", rs.getString(4));
assertFalse(rs.next());
// Count the number of index rows
rs = conn.createStatement().executeQuery("SELECT COUNT(*) from " + indexTableName);
@@ -206,10 +208,11 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest {
assertEquals("bcd", rs.getString(2));
assertTrue(rs.getTimestamp(3).after(before));
assertTrue(rs.getTimestamp(3).before(after));
+ assertEquals("bcde", rs.getString(4));
assertFalse(rs.next());
// Write a time range query to get the last row with val2 ='bc'
query = "SELECT"+ (uncovered ? " " : "/*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ ")
- +"val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName +
+ +"val1, val2, PHOENIX_ROW_TIMESTAMP(), val3 from " + dataTableName +
" WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after
+ "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
// Verify that we will read from the index table
@@ -219,6 +222,7 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest {
assertEquals("bc", rs.getString(1));
assertEquals("ccc", rs.getString(2));
assertTrue(rs.getTimestamp(3).after(after));
+ assertEquals("cccc", rs.getString(4));
assertFalse(rs.next());
// Verify that we can execute the same query without using the index
String noIndexQuery = "SELECT /*+ NO_INDEX */ val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE val1 = 'bc' AND " +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index 9e6b90cded..8dc6678f24 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -237,7 +237,7 @@ public class ProjectionCompiler {
ColumnRef ref = null;
try {
indexColumn = index.getColumnForColumnName(indexColName);
- //TODO could should we do this more efficiently than catching the expcetion ?
+ // TODO: Should we do this more efficiently than catching the exception ?
} catch (ColumnNotFoundException e) {
if (IndexUtil.shouldIndexBeUsedForUncoveredQuery(tableRef)) {
//Projected columns have the same name as in the data table
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 41459dc133..9981e31219 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -28,6 +28,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
@@ -82,6 +85,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ParseNodeUtil;
import org.apache.phoenix.util.ParseNodeUtil.RewriteResult;
@@ -698,6 +702,14 @@ public class QueryCompiler {
if (projectedTable != null) {
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), select.getUdfParseNodes()));
}
+
+ if (context.getCurrentTable().getTable().getType() == PTableType.CDC) {
+ // This will get the data column added to the context so that projection can get
+ // serialized..
+ context.getDataColumnPosition(
+ context.getCurrentTable().getTable().getColumnForColumnName(
+ QueryConstants.CDC_JSON_COL_NAME));
+ }
}
ColumnResolver resolver = context.getResolver();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index f795254a36..c12cd62d56 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TimeZone;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -46,6 +45,7 @@ import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PTime;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.NumberUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -84,6 +84,7 @@ public class StatementContext {
private QueryLogger queryLogger;
private boolean isClientSideUpsertSelect;
private boolean isUncoveredIndex;
+ private String cdcIncludeScopes;
public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
@@ -378,4 +379,11 @@ public class StatementContext {
return retrying;
}
}
+ public String getEncodedCdcIncludeScopes() {
+ return cdcIncludeScopes;
+ }
+
+ public void setCDCIncludeScopes(Set<PTable.CDCChangeScope> cdcIncludeScopes) {
+ this.cdcIncludeScopes = CDCUtil.makeChangeScopeStringFromEnums(cdcIncludeScopes);
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index 99392f6d29..faa940a5e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -16,12 +16,15 @@
* limitations under the License.
*/
package org.apache.phoenix.compile;
+import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
+import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
@@ -61,18 +64,21 @@ import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
public class TupleProjectionCompiler {
public static final PName PROJECTED_TABLE_SCHEMA = PNameFactory.newName(".");
+ public static final EnumSet<PTableType> PROJECTED_TABLE_TYPES = EnumSet.of(PTableType.TABLE,
+ PTableType.INDEX, PTableType.VIEW, PTableType.CDC);
private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();
public static PTable createProjectedTable(SelectStatement select, StatementContext context) throws SQLException {
Preconditions.checkArgument(!select.isJoin());
// Non-group-by or group-by aggregations will create its own projected result.
- if (select.getInnerSelectStatement() != null
+ if (select.getInnerSelectStatement() != null
|| select.getFrom() == null
|| select.isAggregate()
|| select.isDistinct()
- || (context.getResolver().getTables().get(0).getTable().getType() != PTableType.TABLE
- && context.getResolver().getTables().get(0).getTable().getType() != PTableType.INDEX && context.getResolver().getTables().get(0).getTable().getType() != PTableType.VIEW))
+ || ! PROJECTED_TABLE_TYPES.contains(
+ context.getResolver().getTables().get(0).getTable().getType())) {
return null;
+ }
List<PColumn> projectedColumns = new ArrayList<PColumn>();
boolean isWildcard = false;
@@ -86,7 +92,7 @@ public class TupleProjectionCompiler {
if (node instanceof WildcardParseNode) {
if (((WildcardParseNode) node).isRewrite()) {
TableRef parentTableRef = FromCompiler.getResolver(
- NODE_FACTORY.namedTable(null, TableName.create(table.getSchemaName().getString(),
+ NODE_FACTORY.namedTable(null, TableName.create(table.getSchemaName().getString(),
table.getParentTableName().getString())), context.getConnection()).resolveTable(
table.getSchemaName().getString(),
table.getParentTableName().getString());
@@ -162,8 +168,8 @@ public class TupleProjectionCompiler {
// add IndexUncoveredDataColumnRef
position = projectedColumns.size() + (hasSaltingColumn ? 1 : 0);
for (IndexUncoveredDataColumnRef sourceColumnRef : visitor.indexColumnRefSet) {
- PColumn column = new ProjectedColumn(sourceColumnRef.getColumn().getName(),
- sourceColumnRef.getColumn().getFamilyName(), position++,
+ PColumn column = new ProjectedColumn(sourceColumnRef.getColumn().getName(),
+ sourceColumnRef.getColumn().getFamilyName(), position++,
sourceColumnRef.getColumn().isNullable(), sourceColumnRef, sourceColumnRef.getColumn().getColumnQualifierBytes());
projectedColumns.add(column);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 7493acceac..d76046d3b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -150,7 +150,12 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName";
public static final String INDEX_ROW_KEY = "_IndexRowKey";
public static final String READ_REPAIR_TRANSFORMING_TABLE = "_ReadRepairTransformingTable";
-
+ public static final String CDC_DATA_TABLE_NAME = "_CdcDataTableName";
+ public static final String CDC_JSON_COL_QUALIFIER = "_CdcJsonColumn_Qualifier";
+ public static final String CDC_INCLUDE_SCOPES = "_CdcIncludeScopes";
+ public static final String DATA_COL_QUALIFIER_TO_NAME_MAP = "_DataColQualToNameMap";
+ public static final String DATA_COL_QUALIFIER_TO_TYPE_MAP = "_DataColQualToTypeMap";
+
public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);
// In case of Index Write failure, we need to determine that Index mutation
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
new file mode 100644
index 0000000000..178bb1d705
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_NAME_MAP;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_TYPE_MAP;
+
+public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScanner {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(CDCGlobalIndexRegionScanner.class);
+
+ private Map<ImmutableBytesPtr, String> dataColQualNameMap;
+ private Map<ImmutableBytesPtr, PDataType> dataColQualTypeMap;
+ // Map<dataRowKey: Map<TS: Map<qualifier: Cell>>>
+ private Map<ImmutableBytesPtr, Map<Long, Map<ImmutableBytesPtr, Cell>>> dataRowChanges =
+ new HashMap<>();
+
+ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner,
+ final Region region,
+ final Scan scan,
+ final RegionCoprocessorEnvironment env,
+ final Scan dataTableScan,
+ final TupleProjector tupleProjector,
+ final IndexMaintainer indexMaintainer,
+ final byte[][] viewConstants,
+ final ImmutableBytesWritable ptr,
+ final long pageSizeMs,
+ final long queryLimit) throws IOException {
+ super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer,
+ viewConstants, ptr, pageSizeMs, queryLimit);
+ CDCUtil.initForRawScan(dataTableScan);
+ dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
+ scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
+ dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
+ scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+ }
+
+ @Override
+ protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException {
+ return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
+ }
+
+ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException {
+ if (indexRowIterator.hasNext()) {
+ List<Cell> indexRow = indexRowIterator.next();
+ for (Cell c: indexRow) {
+ if (c.getType() == Cell.Type.Put) {
+ result.add(c);
+ }
+ }
+ try {
+ Result dataRow = null;
+ if (! result.isEmpty()) {
+ Cell firstCell = result.get(0);
+ byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+ firstCell.getRowOffset(), firstCell.getRowLength())
+ .copyBytesIfNecessary();
+ ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+ indexToDataRowKeyMap.get(indexRowKey));
+ dataRow = dataRows.get(dataRowKey);
+ Long indexRowTs = result.get(0).getTimestamp();
+ Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get(
+ dataRowKey);
+ if (changeTimeline == null) {
+ List<Cell> resultCells = Arrays.asList(dataRow.rawCells());
+ Collections.sort(resultCells, CellComparator.getInstance().reversed());
+ List<Cell> deleteMarkers = new ArrayList<>();
+ List<List<Cell>> columns = new LinkedList<>();
+ Cell currentColumnCell = null;
+ Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo(
+ EncodedColumnsUtil.getQualifierEncodingScheme(scan));
+ List<Cell> currentColumn = null;
+ Set<Long> uniqueTimeStamps = new HashSet<>();
+ // TODO: From CompactionScanner.formColumns(), see if this can be refactored.
+ for (Cell cell : resultCells) {
+ uniqueTimeStamps.add(cell.getTimestamp());
+ if (cell.getType() != Cell.Type.Put) {
+ deleteMarkers.add(cell);
+ }
+ if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ emptyKV.getFirst())) {
+ continue;
+ }
+ if (currentColumnCell == null) {
+ currentColumn = new LinkedList<>();
+ currentColumnCell = cell;
+ currentColumn.add(cell);
+ } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) {
+ columns.add(currentColumn);
+ currentColumn = new LinkedList<>();
+ currentColumnCell = cell;
+ currentColumn.add(cell);
+ } else {
+ currentColumn.add(cell);
+ }
+ }
+ if (currentColumn != null) {
+ columns.add(currentColumn);
+ }
+ List<Long> sortedTimestamps = uniqueTimeStamps.stream().sorted().collect(
+ Collectors.toList());
+ // FIXME: Does this need to be Concurrent?
+ Map<ImmutableBytesPtr, Cell> rollingRow = new HashMap<>();
+ int[] columnPointers = new int[columns.size()];
+ changeTimeline = new TreeMap<>();
+ dataRowChanges.put(dataRowKey, changeTimeline);
+ for (Long ts : sortedTimestamps) {
+ for (int i = 0; i < columns.size(); ++i) {
+ Cell cell = columns.get(i).get(columnPointers[i]);
+ if (cell.getTimestamp() == ts) {
+ rollingRow.put(new ImmutableBytesPtr(
+ cell.getQualifierArray(),
+ cell.getQualifierOffset(),
+ cell.getQualifierLength()),
+ cell);
+ ++columnPointers[i];
+ }
+ }
+ Map<ImmutableBytesPtr, Cell> rowOfCells = new HashMap();
+ rowOfCells.putAll(rollingRow);
+ changeTimeline.put(ts, rowOfCells);
+ }
+ }
+
+ Map<ImmutableBytesPtr, Cell> mapOfCells = changeTimeline.get(indexRowTs);
+ if (mapOfCells != null) {
+ Map <String, Object> rowValueMap = new HashMap<>(mapOfCells.size());
+ for (Map.Entry<ImmutableBytesPtr, Cell> entry: mapOfCells.entrySet()) {
+ String colName = dataColQualNameMap.get(entry.getKey());
+ Object colVal = dataColQualTypeMap.get(entry.getKey()).toObject(
+ entry.getValue().getValueArray());
+ rowValueMap.put(colName, colVal);
+ }
+ byte[] value =
+ new Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
+ CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+ ImmutableBytesPtr family = new ImmutableBytesPtr(firstCell.getFamilyArray(),
+ firstCell.getFamilyOffset(), firstCell.getFamilyLength());
+ dataRow = Result.create(Arrays.asList(builder.
+ setRow(dataRowKey.copyBytesIfNecessary()).
+ setFamily(family.copyBytesIfNecessary()).
+ setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))).
+ setTimestamp(firstCell.getTimestamp()).
+ setValue(value).
+ setType(Cell.Type.Put).
+ build()));
+ }
+ }
+ if (dataRow != null && tupleProjector != null) {
+ IndexUtil.addTupleAsOneCell(result, new ResultTuple(dataRow),
+ tupleProjector, ptr);
+ }
+ else {
+ result.clear();
+ }
+ return true;
+ } catch (Throwable e) {
+ LOGGER.error("Exception in UncoveredIndexRegionScanner for region "
+ + region.getRegionInfo().getRegionNameAsString(), e);
+ throw e;
+ }
+ }
+ return false;
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index f4a0dcb7d0..efdf19a364 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -86,6 +86,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_BYTES;
import static org.apache.phoenix.query.QueryConstants.VIEW_MODIFIED_PROPERTY_TAG_TYPE;
import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
+import static org.apache.phoenix.schema.PTableType.CDC;
import static org.apache.phoenix.schema.PTableType.INDEX;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
@@ -1465,8 +1466,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
List<PTable> indexes = Lists.newArrayList();
List<PName> physicalTables = Lists.newArrayList();
- PName parentTableName = tableType == INDEX ? dataTableName : null;
- PName parentSchemaName = tableType == INDEX ? schemaName : null;
+ PName parentTableName = tableType == INDEX || tableType == CDC ? dataTableName : null;
+ PName parentSchemaName = tableType == INDEX || tableType == CDC ? schemaName : null;
PName parentLogicalName = null;
EncodedCQCounter cqCounter = null;
if (oldTable != null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index 438c014ca3..d010c33dff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -51,14 +51,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER;
import static org.apache.phoenix.query.QueryServices.INDEX_PAGE_SIZE_IN_ROWS;
import static org.apache.phoenix.util.ScanUtil.getDummyResult;
import static org.apache.phoenix.util.ScanUtil.isDummy;
@@ -66,6 +72,7 @@ import static org.apache.phoenix.util.ScanUtil.isDummy;
public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
private static final Logger LOGGER =
LoggerFactory.getLogger(UncoveredIndexRegionScanner.class);
+
/**
* The states of the processing a page of index rows
*/
@@ -170,6 +177,11 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
protected abstract void scanDataTableRows(long startTime) throws IOException;
protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException {
+ return prepareDataTableScan(dataRowKeys, false);
+ }
+
+ protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys,
+ boolean includeMultipleVersions) throws IOException {
List<KeyRange> keys = new ArrayList<>(dataRowKeys.size());
for (byte[] dataRowKey : dataRowKeys) {
// If the data table scan was interrupted because of paging we retry the scan
@@ -185,7 +197,7 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
scanRanges.initializeScan(dataScan);
SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
- dataScan.setFilter(new SkipScanFilter(skipScanFilter, false));
+ dataScan.setFilter(new SkipScanFilter(skipScanFilter, includeMultipleVersions));
dataScan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS,
Bytes.toBytes(Long.valueOf(pageSizeMs)));
return dataScan;
@@ -244,9 +256,15 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
}
Cell firstCell = row.get(0);
byte[] indexRowKey = firstCell.getRowArray();
- ptr.set(indexRowKey, firstCell.getRowOffset() + offset,
- firstCell.getRowLength() - offset);
- lastIndexRowKey = ptr.copyBytes();
+ // Avoid unnecessary byte copy and garbage when the row key is what we need.
+ if (firstCell.getRowOffset() + offset == 0 && firstCell.getRowLength() - offset == indexRowKey.length) {
+ lastIndexRowKey = indexRowKey;
+ }
+ else {
+ ptr.set(indexRowKey, firstCell.getRowOffset() + offset,
+ firstCell.getRowLength() - offset);
+ lastIndexRowKey = ptr.copyBytes();
+ }
indexToDataRowKeyMap.put(offset == 0 ? lastIndexRowKey :
CellUtil.cloneRow(firstCell), indexMaintainer.buildDataRowKey(
new ImmutableBytesWritable(lastIndexRowKey),
@@ -300,7 +318,7 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
return false;
}
- private boolean getNextCoveredIndexRow(List<Cell> result) throws IOException {
+ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException {
if (indexRowIterator.hasNext()) {
List<Cell> indexRow = indexRowIterator.next();
result.addAll(indexRow);
@@ -308,6 +326,7 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
byte[] indexRowKey = CellUtil.cloneRow(indexRow.get(0));
Result dataRow = dataRows.get(new ImmutableBytesPtr(
indexToDataRowKeyMap.get(indexRowKey)));
+
if (dataRow != null) {
long ts = indexRow.get(0).getTimestamp();
if (!indexMaintainer.isUncovered()
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index ded8f17c67..ca0259203a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -22,6 +22,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.sql.ParameterMetaData;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -79,6 +80,7 @@ import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
import org.apache.phoenix.trace.TracingIterator;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
@@ -314,30 +316,38 @@ public abstract class BaseQueryPlan implements QueryPlan {
ScanUtil.setCustomAnnotations(scan,
customAnnotations == null ? null : customAnnotations.getBytes());
// Set index related scan attributes.
- if (table.getType() == PTableType.INDEX) {
+ if (table.getType() == PTableType.INDEX || table.getType() == PTableType.CDC) {
if (table.getIndexType() == IndexType.LOCAL) {
ScanUtil.setLocalIndex(scan);
} else if (context.isUncoveredIndex()) {
ScanUtil.setUncoveredGlobalIndex(scan);
}
+ PTable dataTable = null;
Set<PColumn> dataColumns = context.getDataColumns();
// If any data columns to join back from data table are present then we set following attributes
// 1. data columns to be projected and their key value schema.
- // 2. index maintainer and view constants if exists to build data row key from index row key.
+ // 2. index maintainer and view constants if exists to build data row key from index row key.
// TODO: can have an hint to skip joining back to data table, in that case if any column to
// project is not present in the index then we need to skip this plan.
if (!dataColumns.isEmpty()) {
// Set data columns to be join back from data table.
PTable parentTable = context.getCurrentTable().getTable();
String parentSchemaName = parentTable.getParentSchemaName().getString();
- String parentTableName = parentTable.getParentTableName().getString();
- final ParseNodeFactory FACTORY = new ParseNodeFactory();
- TableRef dataTableRef =
- FromCompiler.getResolver(
- FACTORY.namedTable(null, TableName.create(parentSchemaName, parentTableName)),
- context.getConnection()).resolveTable(parentSchemaName, parentTableName);
- PTable dataTable = dataTableRef.getTable();
+ if (parentTable.getType() == PTableType.CDC) {
+ dataTable = parentTable;
+ }
+ else {
+ String parentTableName = parentTable.getParentTableName().getString();
+ final ParseNodeFactory FACTORY = new ParseNodeFactory();
+ TableRef dataTableRef =
+ FromCompiler.getResolver(
+ FACTORY.namedTable(null, TableName.create(parentSchemaName, parentTableName)),
+ context.getConnection()).resolveTable(parentSchemaName, parentTableName);
+ dataTable = dataTableRef.getTable();
+ }
+ }
+ if (! dataColumns.isEmpty()) {
// Set data columns to be join back from data table.
serializeDataTableColumnsToJoin(scan, dataColumns, dataTable);
KeyValueSchema schema = ProjectedColumnExpression.buildSchema(dataColumns);
@@ -579,4 +589,4 @@ public abstract class BaseQueryPlan implements QueryPlan {
ResultIterator iterator = iterator();
iterator.close();
}
-}
\ No newline at end of file
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 7877e71ca3..171108f87f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -18,10 +18,12 @@
package org.apache.phoenix.iterate;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME;
import static org.apache.phoenix.coprocessor.ScanRegionObserver.WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.phoenix.coprocessor.CDCGlobalIndexRegionScanner;
import org.apache.phoenix.coprocessor.UncoveredGlobalIndexRegionScanner;
import org.apache.phoenix.coprocessor.UncoveredLocalIndexRegionScanner;
import org.apache.phoenix.schema.KeyValueSchema;
@@ -133,8 +135,7 @@ public abstract class RegionScannerFactory {
long extraLimit = -1;
{
- // for indexes construct the row filter for uncovered columns if it exists
- if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) {
+ if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) {
byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_FILTER);
if (expBytes == null) {
// For older clients
@@ -166,22 +167,24 @@ public abstract class RegionScannerFactory {
PTable.ImmutableStorageScheme storageScheme =
indexMaintainer.getIndexStorageScheme();
Scan dataTableScan = new Scan();
- if (dataColumns != null) {
- for (int i = 0; i < dataColumns.length; i++) {
- if (storageScheme ==
- PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
- dataTableScan.addFamily(dataColumns[i].getFamily());
- } else {
- dataTableScan.addColumn(dataColumns[i].getFamily(),
- dataColumns[i].getQualifier());
+ if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
+ if (dataColumns != null) {
+ for (int i = 0; i < dataColumns.length; i++) {
+ if (storageScheme ==
+ PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ dataTableScan.addFamily(dataColumns[i].getFamily());
+ } else {
+ dataTableScan.addColumn(dataColumns[i].getFamily(),
+ dataColumns[i].getQualifier());
+ }
+ }
+ } else if (indexMaintainer.isUncovered()) {
+ // Indexed columns and the columns in index where clause should also be added
+ // to the data columns to scan for uncovered global indexes. This is required
+ // to verify the index row against the data table row.
+ for (ColumnReference column : indexMaintainer.getAllColumnsForDataTable()) {
+ dataTableScan.addColumn(column.getFamily(), column.getQualifier());
}
- }
- } else if (indexMaintainer.isUncovered()) {
- // Indexed columns and the columns in index where clause should also be added
- // to the data columns to scan for uncovered global indexes. This is required
- // to verify the index row against the data table row.
- for (ColumnReference column : indexMaintainer.getAllColumnsForDataTable()) {
- dataTableScan.addColumn(column.getFamily(), column.getQualifier());
}
}
if (ScanUtil.isLocalIndex(scan)) {
@@ -189,9 +192,16 @@ public abstract class RegionScannerFactory {
dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
pageSizeMs, offset, actualStartKey, extraLimit);
} else {
- s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
- dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
- pageSizeMs, extraLimit);
+ if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
+ s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
+ dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
+ pageSizeMs, extraLimit);
+ }
+ else {
+ s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
+ dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
+ pageSizeMs, extraLimit);
+ }
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 9087411c4f..a38273d97e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -152,7 +152,7 @@ public class TableResultIterator implements ResultIterator {
.getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
this.isMapReduceContext = isMapReduceContext;
this.maxQueryEndTime = maxQueryEndTime;
- ScanUtil.setScanAttributesForClient(scan, table, plan.getContext().getConnection());
+ ScanUtil.setScanAttributesForClient(scan, table, plan.getContext());
}
// Constructors without maxQueryEndTime to maintain API compatibility for phoenix-connectors
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 504ae47e91..5d81f0b585 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -19,6 +19,7 @@
package org.apache.phoenix.optimize;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -72,6 +73,7 @@ import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ParseNodeUtil;
import org.apache.phoenix.util.ParseNodeUtil.RewriteResult;
@@ -216,6 +218,20 @@ public class QueryOptimizer {
&& stopAtBestPlan && dataPlan.isApplicable()) {
return Collections.<QueryPlan> singletonList(dataPlan);
}
+
+ PTable table = dataPlan.getTableRef().getTable();
+ // TODO: Need to handle CDC hints.
+ if (table.getType() == PTableType.CDC) {
+ Set<PTable.CDCChangeScope> cdcIncludeScopes = table.getCDCIncludeScopes();
+ String cdcHint = select.getHint().getHint(Hint.INCLUDE);
+ if (cdcHint != null && cdcHint.startsWith(HintNode.PREFIX)) {
+ cdcIncludeScopes = CDCUtil.makeChangeScopeEnumsFromString(cdcHint.substring(1,
+ cdcHint.length() - 1));
+ }
+ dataPlan.getContext().setCDCIncludeScopes(cdcIncludeScopes);
+ return Arrays.asList(dataPlan);
+ }
+
List<PTable>indexes = Lists.newArrayList(dataPlan.getTableRef().getTable().getIndexes());
if (dataPlan.isApplicable() && (indexes.isEmpty()
|| dataPlan.isDegenerate()
@@ -236,8 +252,8 @@ public class QueryOptimizer {
targetColumns = targetDatums;
}
- SelectStatement translatedIndexSelect = IndexStatementRewriter.translate(select, FromCompiler.getResolver(dataPlan.getTableRef()));
List<QueryPlan> plans = Lists.newArrayListWithExpectedSize(1 + indexes.size());
+ SelectStatement translatedIndexSelect = IndexStatementRewriter.translate(select, FromCompiler.getResolver(dataPlan.getTableRef()));
plans.add(dataPlan);
QueryPlan hintedPlan = getHintedQueryPlan(statement, translatedIndexSelect, indexes, targetColumns, parallelIteratorFactory, plans);
if (hintedPlan != null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
index baba60bc91..a80b092956 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
@@ -17,9 +17,14 @@
*/
package org.apache.phoenix.parse;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
@@ -36,9 +41,11 @@ public class HintNode {
public static final char SEPARATOR = ' ';
public static final String PREFIX = "(";
public static final String SUFFIX = ")";
- // Split on whitespace and parenthesis, keeping the parenthesis in the token array
- private static final String SPLIT_REGEXP = "\\s+|((?<=\\" + PREFIX + ")|(?=\\" + PREFIX + "))|((?<=\\" + SUFFIX + ")|(?=\\" + SUFFIX + "))";
-
+ // Each hint is of the generic syntax hintWord(hintArgs) where hintArgs in parent are optional.
+ private static final Pattern HINT_PATTERN = Pattern.compile(
+ "(?<hintWord>\\w+)\\s*(?:\\s*\\(\\s*(?<hintArgs>[^)]+)\\s*\\))?");
+ private static final Pattern HINT_ARG_PATTERN = Pattern.compile("(?<hintArg>\"[^\"]+\"|\\S+)");
+
public enum Hint {
/**
* Forces a range scan to be used to process the query.
@@ -53,62 +60,62 @@ public class HintNode {
*/
NO_CHILD_PARENT_JOIN_OPTIMIZATION,
/**
- * Prevents the usage of indexes, forcing usage
- * of the data table for a query.
- */
- NO_INDEX,
- /**
- * Hint of the form {@code INDEX(<table_name> <index_name>...) }
- * to suggest usage of the index if possible. The first
- * usable index in the list of indexes will be choosen.
- * Table and index names may be surrounded by double quotes
- * if they are case sensitive.
- */
- INDEX,
- /**
- * All things being equal, use the data table instead of
- * the index table when optimizing.
- */
- USE_DATA_OVER_INDEX_TABLE,
- /**
- * All things being equal, use the index table instead of
- * the data table when optimizing.
- */
- USE_INDEX_OVER_DATA_TABLE,
- /**
- * Avoid caching any HBase blocks loaded by this query.
- */
- NO_CACHE,
- /**
- * Use sort-merge join algorithm instead of broadcast join (hash join) algorithm.
- */
- USE_SORT_MERGE_JOIN,
- /**
- * Persist the RHS results of a hash join.
- */
- USE_PERSISTENT_CACHE,
- /**
- * Avoid using star-join optimization. Used for broadcast join (hash join) only.
- */
- NO_STAR_JOIN,
- /**
- * Avoid using the no seek optimization. When there are many columns which are not selected coming in between 2
- * selected columns and/or versions of columns, this should be used.
- */
- SEEK_TO_COLUMN,
- /**
- * Avoid seeks to select specified columns. When there are very less number of columns which are not selected in
- * between 2 selected columns this will be give better performance.
- */
- NO_SEEK_TO_COLUMN,
- /**
- * Saves an RPC call on the scan. See Scan.setSmall(true) in HBase documentation.
- */
- SMALL,
- /**
- * Enforces a serial scan.
- */
- SERIAL,
+ * Prevents the usage of indexes, forcing usage
+ * of the data table for a query.
+ */
+ NO_INDEX,
+ /**
+ * Hint of the form {@code INDEX(<table_name> <index_name>...) }
+ * to suggest usage of the index if possible. The first
+ * usable index in the list of indexes will be choosen.
+ * Table and index names may be surrounded by double quotes
+ * if they are case sensitive.
+ */
+ INDEX,
+ /**
+ * All things being equal, use the data table instead of
+ * the index table when optimizing.
+ */
+ USE_DATA_OVER_INDEX_TABLE,
+ /**
+ * All things being equal, use the index table instead of
+ * the data table when optimizing.
+ */
+ USE_INDEX_OVER_DATA_TABLE,
+ /**
+ * Avoid caching any HBase blocks loaded by this query.
+ */
+ NO_CACHE,
+ /**
+ * Use sort-merge join algorithm instead of broadcast join (hash join) algorithm.
+ */
+ USE_SORT_MERGE_JOIN,
+ /**
+ * Persist the RHS results of a hash join.
+ */
+ USE_PERSISTENT_CACHE,
+ /**
+ * Avoid using star-join optimization. Used for broadcast join (hash join) only.
+ */
+ NO_STAR_JOIN,
+ /**
+ * Avoid using the no seek optimization. When there are many columns which are not selected coming in between 2
+ * selected columns and/or versions of columns, this should be used.
+ */
+ SEEK_TO_COLUMN,
+ /**
+ * Avoid seeks to select specified columns. When there are very less number of columns which are not selected in
+ * between 2 selected columns this will be give better performance.
+ */
+ NO_SEEK_TO_COLUMN,
+ /**
+ * Saves an RPC call on the scan. See Scan.setSmall(true) in HBase documentation.
+ */
+ SMALL,
+ /**
+ * Enforces a serial scan.
+ */
+ SERIAL,
/**
* Enforces a forward scan.
*/
@@ -121,7 +128,13 @@ public class HintNode {
/**
* Do not use server merge for hinted uncovered indexes
*/
- NO_INDEX_SERVER_MERGE
+ NO_INDEX_SERVER_MERGE,
+
+ /**
+ * Override the default CDC include scopes.
+ */
+ INCLUDE,
+ ;
};
private final Map<Hint,String> hints;
@@ -160,41 +173,33 @@ public class HintNode {
public HintNode(String hint) {
Map<Hint,String> hints = new HashMap<Hint,String>();
- // Split on whitespace or parenthesis. We do not need to handle escaped or
- // embedded whitespace/parenthesis, since we are parsing what will be HBase
- // table names which are not allowed to contain whitespace or parenthesis.
- String[] hintWords = hint.split(SPLIT_REGEXP);
- for (int i = 0; i < hintWords.length; i++) {
- String hintWord = hintWords[i];
- if (hintWord.isEmpty()) {
- continue;
- }
+ Matcher hintMatcher = HINT_PATTERN.matcher(hint);
+ while (hintMatcher.find()) {
try {
- Hint key = Hint.valueOf(hintWord.toUpperCase());
- String hintValue = "";
- if (i+1 < hintWords.length && PREFIX.equals(hintWords[i+1])) {
- StringBuffer hintValueBuf = new StringBuffer(hint.length());
- hintValueBuf.append(PREFIX);
- i+=2;
- while (i < hintWords.length && !SUFFIX.equals(hintWords[i])) {
- hintValueBuf.append(SchemaUtil.normalizeIdentifier(hintWords[i++]));
- hintValueBuf.append(SEPARATOR);
+ Hint hintWord = Hint.valueOf(hintMatcher.group("hintWord").toUpperCase());
+ String hintArgsStr = hintMatcher.group("hintArgs");
+ List<String> hintArgs = new ArrayList<>();
+ if (hintArgsStr != null) {
+ Matcher hintArgMatcher = HINT_ARG_PATTERN.matcher(hintArgsStr);
+ while (hintArgMatcher.find()) {
+ hintArgs.add(SchemaUtil.normalizeIdentifier(hintArgMatcher.group()));
}
- // Replace trailing separator with suffix
- hintValueBuf.replace(hintValueBuf.length()-1, hintValueBuf.length(), SUFFIX);
- hintValue = hintValueBuf.toString();
}
- String oldValue = hints.put(key, hintValue);
- // Concatenate together any old value with the new value
- if (oldValue != null) {
- hints.put(key, oldValue + hintValue);
+ hintArgsStr = String.join(" ", hintArgs);
+ hintArgsStr = hintArgsStr.equals("") ? "" : "(" + hintArgsStr + ")";
+ if (hints.containsKey(hintWord)) {
+ // Concatenate together any old value with the new value
+ hints.put(hintWord, hints.get(hintWord) + hintArgsStr);
+ }
+ else {
+ hints.put(hintWord, hintArgsStr);
}
} catch (IllegalArgumentException e) { // Ignore unknown/invalid hints
}
}
this.hints = ImmutableMap.copyOf(hints);
}
-
+
public boolean isEmpty() {
return hints.isEmpty();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
index c7fdbf6a20..2618e0fa6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
@@ -122,7 +122,8 @@ public class ColumnRef {
displayName);
}
- if (table.getType() == PTableType.PROJECTED || table.getType() == PTableType.SUBQUERY) {
+ if (table.getType() == PTableType.PROJECTED || table.getType() == PTableType.SUBQUERY ||
+ table.getType() == PTableType.CDC) {
return new ProjectedColumnExpression(column, table, displayName);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 8755b0ed96..76108a6220 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1770,14 +1770,17 @@ public class MetaDataClient {
new HashMap<>(), null);
// TODO: Currently index can be dropped, leaving the CDC dangling, DROP INDEX needs to
// protect based on CDCUtil.isACDCIndex().
- // TODO: Should we also allow PTimestamp here?
MutationState indexMutationState;
try {
+ // TODO: Should we also allow PTimestamp here, in fact PTimestamp is the right type,
+ // but we are forced to support PDate because of incorrect type for
+ // PHOENIX_ROW_TIMESTAMP (see PHOENIX-6807)?
indexMutationState = createIndex(indexStatement, null, PDate.INSTANCE);
} catch (SQLException e) {
if (e.getErrorCode() == TABLE_ALREADY_EXIST.getErrorCode()) {
throw new SQLExceptionInfo.Builder(TABLE_ALREADY_EXIST).setTableName(
- statement.getCdcObjName().getName()).build().buildException();
+ statement.getCdcObjName().getName()).setRootCause(
+ e).build().buildException();
}
// TODO: What about translating other index creation failures? E.g., bad TS column.
throw e;
@@ -1791,8 +1794,8 @@ public class MetaDataClient {
ColumnName timeIdxCol = statement.getTimeIdxColumn() != null ?
statement.getTimeIdxColumn() :
FACTORY.columnName(statement.getTimeIdxFunc().toString());
- columnDefs.add(FACTORY.columnDef(timeIdxCol, PTimestamp.INSTANCE.getSqlTypeName(), false, null, false,
- PTimestamp.INSTANCE.getMaxLength(null), PTimestamp.INSTANCE.getScale(null), false,
+ columnDefs.add(FACTORY.columnDef(timeIdxCol, PDate.INSTANCE.getSqlTypeName(), false, null, false,
+ PDate.INSTANCE.getMaxLength(null), PDate.INSTANCE.getScale(null), false,
SortOrder.getDefault(), "", null, false));
pkColumnDefs.add(FACTORY.columnDefInPkConstraint(timeIdxCol, SortOrder.getDefault(), false));
for (PColumn pcol : pkColumns) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 533f6dc2d9..0b3beb885b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -1795,7 +1795,9 @@ public class PTableImpl implements PTable {
return SchemaUtil.getPhysicalHBaseTableName(schemaName,
physicalTableNameColumnInSyscat, isNamespaceMapped);
}
- return SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, isNamespaceMapped);
+ return SchemaUtil.getPhysicalHBaseTableName(schemaName, getType() == PTableType.CDC ?
+ PNameFactory.newName(CDCUtil.getCDCIndexName(tableName.getString())) :
+ tableName, isNamespaceMapped);
} else {
return PNameFactory.newName(physicalNames.get(0).getBytes());
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
index d015aaf422..e80fd4ee09 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -18,14 +18,25 @@
package org.apache.phoenix.util;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.StringTokenizer;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.phoenix.schema.PTable;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_INCLUDE_SCOPES;
+
public class CDCUtil {
public static final String CDC_INDEX_PREFIX = "__CDC__";
public static final String CDC_INDEX_TYPE_LOCAL = "L";
@@ -82,4 +93,23 @@ public class CDCUtil {
public static boolean isACDCIndex(String indexName) {
return indexName.startsWith(CDC_INDEX_PREFIX);
}
+
+ public static boolean isACDCIndex(PTable indexTable) {
+ return isACDCIndex(indexTable.getTableName().getString());
+ }
+
+ public static Scan initForRawScan(Scan scan) {
+ scan.setRaw(true);
+ scan.readAllVersions();
+ scan.setCacheBlocks(false);
+ Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+ if (! familyMap.isEmpty()) {
+ familyMap.keySet().stream().forEach(fQual -> {
+ if (familyMap.get(fQual) != null) {
+ familyMap.get(fQual).clear();
+ }
+ });
+ }
+ return scan;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
index f1c0b1d6e0..e02abf6720 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
@@ -84,9 +84,7 @@ public class EncodedColumnsUtil {
* part is the value to use for it.
*/
public static Pair<byte[], byte[]> getEmptyKeyValueInfo(PTable table) {
- return usesEncodedColumnNames(table) ? new Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES,
- QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
- QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ return getEmptyKeyValueInfo(usesEncodedColumnNames(table));
}
/**
@@ -104,9 +102,7 @@ public class EncodedColumnsUtil {
* part is the value to use for it.
*/
public static Pair<byte[], byte[]> getEmptyKeyValueInfo(QualifierEncodingScheme encodingScheme) {
- return usesEncodedColumnNames(encodingScheme) ? new Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES,
- QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
- QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ return getEmptyKeyValueInfo(usesEncodedColumnNames(encodingScheme));
}
public static Pair<Integer, Integer> getMinMaxQualifiersFromScan(Scan scan) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index fdafd1f33b..f2b6b911fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -19,20 +19,32 @@ package org.apache.phoenix.util;
import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_INCLUDE_SCOPES;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_NAME_MAP;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_TYPE_MAP;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
+import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -56,6 +68,7 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
@@ -788,7 +801,7 @@ public class ScanUtil {
* that the slot at index 2 has a slot index of 2 but a row key index of 3.
* To calculate the "adjusted position" index, we simply add up the number of extra slots spanned and offset
* the slotPosition by that much.
- * @param slotSpan the extra span per skip scan slot. corresponds to {@link ScanRanges#slotSpan}
+ * @param slotSpan the extra span per skip scan slot. corresponds to {@link ScanRanges#getSlotSpans()}
* @param slotPosition the index of a slot in the SkipScan slots list.
* @return the equivalent row key position in the RowKeySchema
*/
@@ -1167,7 +1180,8 @@ public class ScanUtil {
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
scan.setAttribute(BaseScannerRegionObserver.READ_REPAIR_TRANSFORMING_TABLE, TRUE_BYTES);
} else {
- if (table.getType() != PTableType.INDEX || !IndexUtil.isGlobalIndex(indexTable)) {
+ if (table.getType() != PTableType.CDC && (table.getType() != PTableType.INDEX ||
+ !IndexUtil.isGlobalIndex(indexTable))) {
return;
}
if (table.isTransactional() && table.getIndexType() == IndexType.UNCOVERED_GLOBAL) {
@@ -1180,7 +1194,13 @@ public class ScanUtil {
}
// MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child
// view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work
- if (indexTable.getViewIndexId() != null && indexTable.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
+ if (table.getType() == PTableType.CDC) {
+ indexTable = PhoenixRuntime.getTable(phoenixConnection,
+ CDCUtil.getCDCIndexName(table.getName().getString()));
+ }
+ else if (indexTable.getViewIndexId() != null &&
+ indexTable.getName().getString().contains(
+ QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
int lastIndexOf = indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
String indexName = indexTable.getName().getString().substring(lastIndexOf + 1);
indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName);
@@ -1280,7 +1300,8 @@ public class ScanUtil {
}
public static void setScanAttributesForClient(Scan scan, PTable table,
- PhoenixConnection phoenixConnection) throws SQLException {
+ StatementContext context) throws SQLException {
+ PhoenixConnection phoenixConnection = context.getConnection();
setScanAttributesForIndexReadRepair(scan, table, phoenixConnection);
setScanAttributesForPhoenixTTL(scan, table, phoenixConnection);
byte[] emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME);
@@ -1298,6 +1319,114 @@ public class ScanUtil {
}
setScanAttributeForPaging(scan, phoenixConnection);
+
+ if (table.getType() == PTableType.CDC) {
+ PTable dataTable = PhoenixRuntime.getTable(phoenixConnection,
+ SchemaUtil.getTableName(table.getSchemaName().getString(),
+ table.getParentTableName().getString()));
+ scan.setAttribute(CDC_DATA_TABLE_NAME,
+ table.getParentName().getBytes());
+
+ PColumn cdcJsonCol = table.getColumnForColumnName(CDC_JSON_COL_NAME);
+ scan.setAttribute(CDC_JSON_COL_QUALIFIER, cdcJsonCol.getColumnQualifierBytes());
+ scan.setAttribute(CDC_INCLUDE_SCOPES,
+ context.getEncodedCdcIncludeScopes().getBytes(StandardCharsets.UTF_8));
+ CDCUtil.initForRawScan(scan);
+ List<PColumn> columns = dataTable.getColumns();
+ Map<byte[], String> dataColQualNameMap = new HashMap<>(columns.size());
+ Map<byte[], PDataType> dataColTypeMap = new HashMap<>();
+ for (PColumn col: columns) {
+ if (col.getColumnQualifierBytes() != null) {
+ dataColQualNameMap.put(col.getColumnQualifierBytes(), col.getName().getString());
+ dataColTypeMap.put(col.getColumnQualifierBytes(), col.getDataType());
+ }
+ }
+ scan.setAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP,
+ serializeColumnQualifierToNameMap(dataColQualNameMap));
+ scan.setAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP,
+ serializeColumnQualifierToTypeMap(dataColTypeMap));
+ }
+ }
+
+ public static byte[] serializeColumnQualifierToNameMap(Map<byte[], String> colQualNameMap) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutputStream output = new DataOutputStream(stream);
+ try {
+ output.writeInt(colQualNameMap.size());
+ for (Map.Entry<byte[], String> entry: colQualNameMap.entrySet()) {
+ output.writeInt(entry.getKey().length);
+ output.write(entry.getKey());
+ WritableUtils.writeString(output, entry.getValue());
+ }
+ return stream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static Map<ImmutableBytesPtr, String> deserializeColumnQualifierToNameMap(
+ byte[] mapBytes) {
+ ByteArrayInputStream stream = new ByteArrayInputStream(mapBytes);
+ DataInputStream input = new DataInputStream(stream);
+ try {
+ Map<ImmutableBytesPtr, String> colQualNameMap = new HashMap<>();
+ int size = input.readInt();
+ for (int i = 0; i < size; ++i) {
+ int qualLength = input.readInt();
+ byte[] qualBytes = new byte[qualLength];
+ int bytesRead = input.read(qualBytes);
+ if (bytesRead != qualLength) {
+ throw new IOException("Expected number of bytes: " + qualLength + " but got " +
+ "only: " + bytesRead);
+ }
+ String colName = WritableUtils.readString(input);
+ colQualNameMap.put(new ImmutableBytesPtr(qualBytes), colName);
+ }
+ return colQualNameMap;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static byte[] serializeColumnQualifierToTypeMap(
+ Map<byte[], PDataType> pkColNamesAndTypes) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ DataOutputStream output = new DataOutputStream(stream);
+ try {
+ output.writeInt(pkColNamesAndTypes.size());
+ for (Map.Entry<byte[], PDataType> entry: pkColNamesAndTypes.entrySet()) {
+ output.writeInt(entry.getKey().length);
+ output.write(entry.getKey());
+ WritableUtils.writeString(output, entry.getValue().getSqlTypeName());
+ }
+ return stream.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static Map<ImmutableBytesPtr, PDataType> deserializeColumnQualifierToTypeMap(
+ byte[] pkColInfoBytes) {
+ ByteArrayInputStream stream = new ByteArrayInputStream(pkColInfoBytes);
+ DataInputStream input = new DataInputStream(stream);
+ try {
+ Map<ImmutableBytesPtr, PDataType> colQualTypeMap = new HashMap<>();
+ int colCnt = input.readInt();
+ for (int i = 0; i < colCnt; ++i) {
+ int qualLength = input.readInt();
+ byte[] qualBytes = new byte[qualLength];
+ int bytesRead = input.read(qualBytes);
+ if (bytesRead != qualLength) {
+ throw new IOException("Expected number of bytes: " + qualLength + " but got " +
+ "only: " + bytesRead);
+ }
+ colQualTypeMap.put(new ImmutableBytesPtr(qualBytes),
+ PDataType.fromSqlTypeName(WritableUtils.readString(input)));
+ }
+ return colQualTypeMap;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
public static void setScanAttributeForPaging(Scan scan, PhoenixConnection phoenixConnection) {
diff --git a/phoenix-pherf/pom.xml b/phoenix-pherf/pom.xml
index 981f04add0..dc3c5aec41 100644
--- a/phoenix-pherf/pom.xml
+++ b/phoenix-pherf/pom.xml
@@ -34,7 +34,6 @@
<!-- Versions for pherf-only dependencies -->
<diffutils.version>1.2.1</diffutils.version>
- <gson.version>2.9.1</gson.version>
<commons-math3.version>3.3</commons-math3.version>
<activation.version>1.1</activation.version>
<jcabi-jdbc.version>0.15</jcabi-jdbc.version>
@@ -151,7 +150,6 @@
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
- <version>${gson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
diff --git a/pom.xml b/pom.xml
index 248273a4ba..0c629b1ac8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,6 +213,7 @@
<protoc.arch>${os.detected.classifier}</protoc.arch>
<!-- Keeping the version in sync with hbase javax.el version -->
<glassfish.el.version>3.0.1-b08</glassfish.el.version>
+ <gson.version>2.9.1</gson.version>
</properties>
<build>
@@ -1548,6 +1549,11 @@
<artifactId>javax.el</artifactId>
<version>${glassfish.el.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${gson.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>