You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2024/03/19 15:28:34 UTC
(phoenix) branch PHOENIX-7001-feature updated: PHOENIX-7015 Implementing CDCGlobalIndexRegionScanner (#1813)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch PHOENIX-7001-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this push:
new e395780e9d PHOENIX-7015 Implementing CDCGlobalIndexRegionScanner (#1813)
e395780e9d is described below
commit e395780e9d6fe3c61c3ed3e3be0387f79bb0cd5b
Author: TheNamesRai <sa...@outlook.com>
AuthorDate: Tue Mar 19 20:58:27 2024 +0530
PHOENIX-7015 Implementing CDCGlobalIndexRegionScanner (#1813)
---
.../java/org/apache/phoenix/end2end/CDCBaseIT.java | 655 ++++++++++++++++++++
.../apache/phoenix/end2end/CDCDefinitionIT.java | 328 ++++++++++
.../java/org/apache/phoenix/end2end/CDCMiscIT.java | 449 --------------
.../org/apache/phoenix/end2end/CDCQueryIT.java | 667 +++++++++++++++++++++
.../phoenix/end2end/index/SingleCellIndexIT.java | 11 +-
.../phoenix/compile/CreateIndexCompiler.java | 2 +-
.../phoenix/compile/IndexStatementRewriter.java | 8 +-
.../org/apache/phoenix/compile/QueryCompiler.java | 78 ++-
.../apache/phoenix/compile/StatementContext.java | 20 +-
.../phoenix/compile/TupleProjectionCompiler.java | 9 +-
.../coprocessor/BaseScannerRegionObserver.java | 10 +-
.../coprocessor/CDCGlobalIndexRegionScanner.java | 301 ++++++----
.../coprocessor/GlobalIndexRegionScanner.java | 2 +-
.../coprocessor/UncoveredIndexRegionScanner.java | 2 -
.../org/apache/phoenix/execute/BaseQueryPlan.java | 15 +-
.../org/apache/phoenix/execute/MutationState.java | 74 +++
.../org/apache/phoenix/execute/TupleProjector.java | 4 +-
.../expression/SingleCellColumnExpression.java | 17 +-
.../hbase/index/util/ImmutableBytesPtr.java | 37 +-
.../org/apache/phoenix/index/CDCTableInfo.java | 276 +++++++++
.../org/apache/phoenix/index/IndexMaintainer.java | 23 +-
.../phoenix/iterate/RegionScannerFactory.java | 4 +-
.../apache/phoenix/optimize/QueryOptimizer.java | 142 +++--
.../phoenix/parse/FamilyWildcardParseNode.java | 12 +-
.../apache/phoenix/parse/ParseNodeRewriter.java | 11 +-
.../phoenix/parse/TableWildcardParseNode.java | 10 +
.../apache/phoenix/parse/TerminalParseNode.java | 8 +
.../apache/phoenix/parse/WildcardParseNode.java | 13 +-
.../phoenix/query/ConnectionQueryServicesImpl.java | 4 +-
.../org/apache/phoenix/query/QueryConstants.java | 6 +
.../org/apache/phoenix/schema/DelegateTable.java | 6 +
.../org/apache/phoenix/schema/KeyValueSchema.java | 26 +
.../org/apache/phoenix/schema/MetaDataClient.java | 116 ++--
.../java/org/apache/phoenix/schema/PTable.java | 3 +-
.../java/org/apache/phoenix/schema/PTableImpl.java | 13 +-
.../org/apache/phoenix/schema/SaltingUtil.java | 3 -
.../org/apache/phoenix/util/CDCChangeBuilder.java | 151 +++++
.../main/java/org/apache/phoenix/util/CDCUtil.java | 31 +-
.../java/org/apache/phoenix/util/IndexUtil.java | 4 +-
.../java/org/apache/phoenix/util/ScanUtil.java | 147 +----
.../CDCInfo.proto} | 34 +-
41 files changed, 2830 insertions(+), 902 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
new file mode 100644
index 0000000000..8f4bcf17a2
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.ToNumberPolicy;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableProperty;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.phoenix.query.QueryConstants.CDC_CHANGE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
+import static org.apache.phoenix.util.MetaDataUtil.getViewIndexPhysicalName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class CDCBaseIT extends ParallelStatsDisabledIT {
+ static final HashSet<PTable.CDCChangeScope> CHANGE_IMG =
+ new HashSet<>(Arrays.asList(PTable.CDCChangeScope.CHANGE));
+ static final HashSet<PTable.CDCChangeScope> PRE_POST_IMG = new HashSet<>(
+ Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST));
+
+ protected ManualEnvironmentEdge injectEdge;
+ protected Gson gson = new GsonBuilder()
+ .setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE)
+ .create();
+ protected Calendar cal = Calendar.getInstance();
+
+ protected void createTable(Connection conn, String table_sql)
+ throws Exception {
+ createTable(conn, table_sql, null, false, null, false, null);
+ }
+
+ protected void createTable(Connection conn, String table_sql,
+ PTable.QualifierEncodingScheme encodingScheme)
+ throws Exception {
+ createTable(conn, table_sql, encodingScheme, false, null, false, null);
+ }
+
+ protected void createTable(Connection conn, String table_sql,
+ PTable.QualifierEncodingScheme encodingScheme, boolean multitenant)
+ throws Exception {
+ createTable(conn, table_sql, encodingScheme, multitenant, null, false, null);
+ }
+
+ protected void createTable(Connection conn, String table_sql,
+ PTable.QualifierEncodingScheme encodingScheme, boolean multitenant,
+ Integer nSaltBuckets, boolean immutable, PTable.ImmutableStorageScheme immutableStorageScheme)
+ throws Exception {
+ createTable(conn, table_sql, encodingScheme, multitenant, nSaltBuckets, null, immutable, immutableStorageScheme);
+ }
+
+ protected void createTable(Connection conn, String table_sql,
+ PTable.QualifierEncodingScheme encodingScheme, boolean multitenant,
+ Integer nSaltBuckets, PTable.IndexType indexType, boolean immutable,
+ PTable.ImmutableStorageScheme immutableStorageScheme)
+ throws Exception {
+ createTable(conn, table_sql, new HashMap<String, Object>() {{
+ put(TableProperty.COLUMN_ENCODED_BYTES.getPropertyName(), encodingScheme != null ?
+ new Byte(encodingScheme.getSerializedMetadataValue()) : null);
+ put(TableProperty.MULTI_TENANT.getPropertyName(), multitenant);
+ put(TableProperty.SALT_BUCKETS.getPropertyName(), nSaltBuckets);
+ put(TableProperty.INDEX_TYPE.getPropertyName(), indexType);
+ put(TableProperty.IMMUTABLE_ROWS.getPropertyName(), immutable);
+ put(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName(), immutableStorageScheme != null ?
+ immutableStorageScheme.name() : null);
+ }});
+ }
+
+ protected void createTable(Connection conn, String table_sql,
+ Map<String,Object> tableProps) throws Exception {
+ List<String> props = new ArrayList<>();
+ Byte encodingScheme = (Byte) TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
+ if (encodingScheme != null && encodingScheme !=
+ QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES) {
+ props.add(TableProperty.COLUMN_ENCODED_BYTES.getPropertyName() + "=" + encodingScheme);
+ }
+ Boolean multitenant = (Boolean) TableProperty.MULTI_TENANT.getValue(tableProps);
+ if (multitenant != null && multitenant) {
+ props.add(TableProperty.MULTI_TENANT.getPropertyName() + "=" + multitenant);
+ }
+ Integer nSaltBuckets = (Integer) TableProperty.SALT_BUCKETS.getValue(tableProps);
+ if (nSaltBuckets != null) {
+ props.add(TableProperty.SALT_BUCKETS.getPropertyName() + "=" + nSaltBuckets);
+ }
+ PTable.IndexType indexType = (PTable.IndexType) TableProperty.INDEX_TYPE.getValue(
+ tableProps);
+ if (indexType != null && indexType == PTable.IndexType.LOCAL) {
+ props.add(TableProperty.INDEX_TYPE.getPropertyName() + "=" +
+ (indexType == PTable.IndexType.LOCAL ? "l" : "g"));
+ }
+ if (nSaltBuckets != null) {
+ props.add(TableProperty.INDEX_TYPE.getPropertyName() + "=" + indexType);
+ }
+ Boolean immutableTable = (Boolean) TableProperty.IMMUTABLE_ROWS.getValue(tableProps);
+ if (immutableTable) {
+ props.add(TableProperty.IMMUTABLE_ROWS.getPropertyName() + "=true");
+ }
+ PTable.ImmutableStorageScheme immutableStorageScheme =
+ (PTable.ImmutableStorageScheme) TableProperty
+ .IMMUTABLE_STORAGE_SCHEME.getValue(tableProps);
+ if (immutableStorageScheme != null) {
+ props.add(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName() + "="
+ + immutableStorageScheme.name());
+ }
+ table_sql += " " + String.join(", ", props);
+ conn.createStatement().execute(table_sql);
+ }
+
+ protected void createCDCAndWait(Connection conn, String tableName, String cdcName,
+ String cdc_sql) throws Exception {
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, null, null, null);
+ }
+
+ protected void createCDCAndWait(Connection conn, String tableName, String cdcName,
+ String cdc_sql, PTable.IndexType indexType) throws Exception{
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, null, null, indexType);
+ }
+
+ protected void createCDCAndWait(Connection conn, String tableName, String cdcName,
+ String cdc_sql, PTable.QualifierEncodingScheme encodingScheme,
+ Integer nSaltBuckets) throws Exception {
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme, nSaltBuckets, null);
+ }
+
+ protected void createCDCAndWait(Connection conn, String tableName, String cdcName,
+ String cdc_sql, PTable.QualifierEncodingScheme encodingScheme,
+ Integer nSaltBuckets, PTable.IndexType indexType) throws Exception {
+ // For CDC, multitenancy gets derived automatically via the parent table.
+ createTable(conn, cdc_sql, encodingScheme, false, nSaltBuckets, indexType, false, null);
+ String schemaName = SchemaUtil.getSchemaNameFromFullName(tableName);
+ tableName = SchemaUtil.getTableNameFromFullName(tableName);
+ IndexToolIT.runIndexTool(false, schemaName, tableName,
+ "\""+CDCUtil.getCDCIndexName(cdcName)+"\"");
+ String indexFullName = SchemaUtil.getTableName(schemaName,
+ CDCUtil.getCDCIndexName(cdcName));
+ TestUtil.waitForIndexState(conn, indexFullName, PIndexState.ACTIVE);
+ }
+
+ protected void assertCDCState(Connection conn, String cdcName, String expInclude,
+ int idxType) throws SQLException {
+ try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
+ "system.catalog WHERE table_name = '" + cdcName +
+ "' AND column_name IS NULL and column_family IS NULL")) {
+ assertEquals(true, rs.next());
+ assertEquals(expInclude, rs.getString(1));
+ }
+ try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " +
+ "system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) +
+ "' AND column_name IS NULL and column_family IS NULL")) {
+ assertEquals(true, rs.next());
+ assertEquals(idxType, rs.getInt(1));
+ }
+ }
+
+ protected void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes,
+ String tableName, String datatableName)
+ throws SQLException {
+ Properties props = new Properties();
+ String schemaName = SchemaUtil.getSchemaNameFromFullName(tableName);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
+ PTable cdcTable = PhoenixRuntime.getTable(conn, cdcFullName);
+ assertEquals(expIncludeScopes, cdcTable.getCDCIncludeScopes());
+ assertEquals(expIncludeScopes, TableProperty.INCLUDE.getPTableValue(cdcTable));
+ assertNull(cdcTable.getIndexState()); // Index state should be null for CDC.
+ assertNull(cdcTable.getIndexType()); // This is not an index.
+ assertEquals(tableName, cdcTable.getParentName().getString());
+ String indexFullName = SchemaUtil.getTableName(schemaName,
+ CDCUtil.getCDCIndexName(cdcName));
+ assertEquals(cdcTable.getPhysicalName().getString(), tableName == datatableName ?
+ indexFullName : getViewIndexPhysicalName(datatableName));
+ }
+
+ protected void assertSaltBuckets(Connection conn, String tableName, Integer nbuckets)
+ throws SQLException {
+ PTable table = PhoenixRuntime.getTable(conn, tableName);
+ assertSaltBuckets(table, nbuckets);
+ }
+
+ protected void assertSaltBuckets(PTable table, Integer nbuckets) {
+ if (nbuckets == null || nbuckets == 0) {
+ assertNull(table.getBucketNum());
+ } else {
+ assertEquals(nbuckets, table.getBucketNum());
+ }
+ }
+
+ protected void assertNoResults(Connection conn, String cdcName) throws SQLException {
+ try (Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("select * from " + cdcName);
+ assertFalse(rs.next());
+ }
+ }
+
+ protected Connection newConnection() throws SQLException {
+ return newConnection(null);
+ }
+
+ protected Connection newConnection(String tenantId) throws SQLException {
+ Properties props = new Properties();
+ // Uncomment these only while debugging.
+ //props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
+ //props.put("hbase.client.scanner.timeout.period", "6000000");
+ //props.put("phoenix.query.timeoutMs", "6000000");
+ //props.put("zookeeper.session.timeout", "6000000");
+ //props.put("hbase.rpc.timeout", "6000000");
+ if (tenantId != null) {
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+ return DriverManager.getConnection(getUrl(), props);
+ }
+
+ private Map<String, Object> addChange(Connection conn, Map preImage,
+ long changeTS, String changeType, String tableName,
+ Map<String, Object> pks, Map<String, Object> values)
+ throws SQLException {
+ if (conn != null) {
+ String sql;
+ if (changeType == CDC_DELETE_EVENT_TYPE) {
+ String predicates = pks.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).
+ collect(Collectors.joining(", "));
+ sql = "DELETE FROM " + tableName + " WHERE " + predicates;
+ }
+ else {
+ String columnList = Stream.concat(pks.keySet().stream(),
+ values.keySet().stream()).collect(Collectors.joining(", "));
+ String valueList =
+ Stream.concat(pks.values().stream(), values.values().stream())
+ .map(v -> String.valueOf(v)).collect(Collectors.joining(", "));
+ sql = "UPSERT INTO " + tableName + " (" + columnList + ") VALUES (" + valueList + ")";
+ }
+ cal.setTimeInMillis(changeTS);
+ injectEdge.setValue(changeTS);
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(sql);
+ }
+ }
+ Map<String, Object> cdcChange = new HashMap<>();
+ cdcChange.put(CDC_EVENT_TYPE, changeType);
+ cdcChange.put(CDC_PRE_IMAGE, preImage);
+ if (changeType == CDC_UPSERT_EVENT_TYPE) {
+ Map<String, Object> changeImage = new HashMap<>();
+ changeImage.putAll(values);
+ cdcChange.put(CDC_CHANGE_IMAGE, changeImage);
+ Map<String, Object> postImage = new HashMap<>();
+ postImage.putAll(preImage);
+ postImage.putAll(changeImage);
+ cdcChange.put(CDC_POST_IMAGE, postImage);
+ }
+ return cdcChange;
+ }
+
+ // FIXME: Add the following with consecutive upserts on the sake PK (no delete in between):
+ // - with different values
+ // - with a null
+ // - missing columns
+ protected List<ChangeRow> generateChanges(long startTS, String[] tenantids, String tableName,
+ String datatableNameForDDL, CommitAdapter committer)
+ throws Exception {
+ List<ChangeRow> changes = new ArrayList<>();
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.setValue(startTS);
+ boolean dropV3Done = false;
+ committer.init();
+ Map<String, Object> pk1 = new HashMap() {{ put("K", 1); }};
+ Map<String, Object> pk2 = new HashMap() {{ put("K", 2); }};
+ Map<String, Object> pk3 = new HashMap() {{ put("K", 3); }};
+ Map<String, Object> c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12;
+ for (String tid: tenantids) {
+ try (Connection conn = committer.getConnection(tid)) {
+ c1 = addChange(conn, new HashMap(), startTS,
+ CDC_UPSERT_EVENT_TYPE, tableName, pk1, new TreeMap<String, Object>() {{
+ put("V1", 100L);
+ put("V2", 1000L);
+ put("B.VB", 10000L);
+ }});
+ changes.add(new ChangeRow(tid, startTS, pk1, c1));
+ c2 = addChange(conn, new HashMap(), startTS,
+ CDC_UPSERT_EVENT_TYPE, tableName, pk2, new TreeMap<String, Object>() {{
+ put("V1", 200L);
+ put("V2", 2000L);
+ }});
+ changes.add(new ChangeRow(tid, startTS, pk2, c2));
+ committer.commit(conn);
+
+ c3 = addChange(conn, new HashMap(), startTS +=100,
+ CDC_UPSERT_EVENT_TYPE,
+ tableName, pk3, new TreeMap<String, Object>() {{
+ put("V1", 300L);
+ put("V2", null);
+ put("B.VB", null);
+ }});
+ changes.add(new ChangeRow(tid, startTS, pk3, c3));
+ committer.commit(conn);
+
+ c4 = addChange(conn, (Map) c1.get(CDC_POST_IMAGE),
+ startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+ new TreeMap<String, Object>() {{
+ put("V1", 101L);
+ }});
+ changes.add(new ChangeRow(tid, startTS, pk1, c4));
+ committer.commit(conn);
+ }
+ if (datatableNameForDDL != null && !dropV3Done) {
+ try (Connection conn = newConnection()) {
+ conn.createStatement().execute("ALTER TABLE " + datatableNameForDDL +
+ " DROP COLUMN v3");
+ }
+ dropV3Done = true;
+ }
+ try (Connection conn = newConnection(tid)) {
+ c5 = addChange(conn, (Map) c4.get(CDC_POST_IMAGE), startTS +=100,
+ CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+ changes.add(new ChangeRow(tid, startTS, pk1, c5));
+ committer.commit(conn);
+
+ c6 = addChange(conn, new HashMap(),
+ startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+ new TreeMap<String, Object>() {{
+ put("V1", 102L);
+ put("V2", 1002L);
+ }});
+ changes.add(new ChangeRow(tid, startTS, pk1, c6));
+ committer.commit(conn);
+
+ c7 = addChange(conn, (Map) c6.get(CDC_POST_IMAGE), startTS +=100,
+ CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+ changes.add(new ChangeRow(tid, startTS, pk1, c7));
+ committer.commit(conn);
+
+ c8 = addChange(conn, (Map) c2.get(CDC_POST_IMAGE),
+ startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk2,
+ new TreeMap<String, Object>() {{
+ put("V1", 201L);
+ put("V2", null);
+ put("B.VB", 20001L);
+ }});
+ changes.add(new ChangeRow(tid, startTS, pk2, c8));
+ committer.commit(conn);
+
+ c9 = addChange(conn, new HashMap(),
+ startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+ new TreeMap<String, Object>() {{
+ put("V1", 103L);
+ put("V2", 1003L);
+ }});
+ changes.add(new ChangeRow(tid, startTS, pk1, c9));
+ committer.commit(conn);
+
+ c10 = addChange(conn, (Map) c9.get(CDC_POST_IMAGE), startTS +=100,
+ CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+ changes.add(new ChangeRow(tid, startTS, pk1, c10));
+ committer.commit(conn);
+
+ c11 = addChange(conn, new HashMap(),
+ startTS +=100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+ new TreeMap<String, Object>() {{
+ put("V1", 104L);
+ put("V2", 1004L);
+ }});
+ changes.add(new ChangeRow(tid, startTS, pk1, c11));
+ committer.commit(conn);
+
+ c12 = addChange(conn, (Map) c11.get(CDC_POST_IMAGE), startTS +=100,
+ CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+ changes.add(new ChangeRow(tid, startTS, pk1, c12));
+ committer.commit(conn);
+ }
+ }
+ committer.reset();
+ return changes;
+ }
+
+ private void _copyScopeIfRelevant(Set<PTable.CDCChangeScope> changeScopes,
+ PTable.CDCChangeScope changeScope,
+ Map<String, Object> change, Map<String, Object> expChange,
+ String scopeKeyName) {
+ if (changeScopes.contains(changeScope) && change.containsKey(scopeKeyName)) {
+ expChange.put(scopeKeyName, change.get(scopeKeyName));
+ }
+ }
+
+ protected void verifyChanges(String tenantId, ResultSet rs, List<ChangeRow> changes,
+ Set<PTable.CDCChangeScope> changeScopes,
+ boolean mutableTable) throws Exception {
+ for (int i = 0, changenr = 0; i < changes.size(); ++i) {
+ ChangeRow changeRow = changes.get(i);
+ if (changeRow.getTenantID() != tenantId) {
+ continue;
+ }
+ Map<String, Object> expChange = new HashMap<>();
+ Map<String, Object> change = changeRow.change;
+ expChange.put(CDC_EVENT_TYPE, change.get(CDC_EVENT_TYPE));
+ _copyScopeIfRelevant(changeScopes, PTable.CDCChangeScope.PRE, change, expChange,
+ CDC_PRE_IMAGE);
+ _copyScopeIfRelevant(changeScopes, PTable.CDCChangeScope.POST, change, expChange,
+ CDC_POST_IMAGE);
+ _copyScopeIfRelevant(changeScopes, PTable.CDCChangeScope.CHANGE, change, expChange,
+ CDC_CHANGE_IMAGE);
+ String changeDesc = "Change " + (changenr+1) + ": " + changeRow;
+ assertTrue(changeDesc, rs.next());
+ Map cdcObj = gson.fromJson(rs.getString(3), HashMap.class);
+ // This is needed because for immutable tables, CDC can't distinguish a null value from
+ // that of a a missing cell.
+ if (!mutableTable && changeDesc != null) {
+ _purgeNulls(changeRow.getPreImage());
+ _purgeNulls(changeRow.getChangeImage());
+ _purgeNulls(changeRow.getPostImage());
+ }
+ assertEquals(changeDesc, changeRow.getChangeTimestamp(),
+ rs.getDate(1).getTime());
+ for (Map.Entry<String, Object> pk: changeRow.getPrimaryKeys().entrySet()) {
+ assertEquals(changeDesc, pk.getValue(), rs.getObject(pk.getKey()));
+ }
+ assertEquals(changeDesc, expChange, cdcObj);
+ ++changenr;
+ }
+ assertFalse(rs.next());
+ }
+
+ private Object _ifIntConvertToLong(Object val) {
+ return (val instanceof Integer) ? new Long(((Integer) val).intValue()) : val;
+ }
+
+ private void _purgeNulls(Map image) {
+ if (image == null) {
+ return;
+ }
+ for (Iterator<Map.Entry> it = image.entrySet().iterator(); it.hasNext(); ) {
+ if (it.next().getValue() == null) {
+ it.remove();
+ }
+ }
+ }
+
+ protected List<ChangeRow> generateChangesImmutableTable(long startTS, String[] tenantids,
+ String tableName, CommitAdapter committer)
+ throws Exception {
+ List<ChangeRow> changes = new ArrayList<>();
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.setValue(startTS);
+ committer.init();
+ Map<String, Object> pk1 = new HashMap() {{ put("K", 1); }};
+ Map<String, Object> pk2 = new HashMap() {{ put("K", 2); }};
+ Map<String, Object> pk3 = new HashMap() {{ put("K", 3); }};
+ Map<String, Object> c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12;
+ for (String tid: tenantids) {
+ try (Connection conn = newConnection(tid)) {
+ c1 = addChange(conn, new HashMap(), startTS,
+ CDC_UPSERT_EVENT_TYPE, tableName, pk1, new TreeMap<String, Object>() {{
+ put("V1", 100L);
+ put("V2", 1000L);
+ }});
+ committer.commit(conn);
+ changes.add(new ChangeRow(tid, startTS, pk1, c1));
+ c2 = addChange(conn, new HashMap(), startTS += 100,
+ CDC_UPSERT_EVENT_TYPE, tableName, pk2, new TreeMap<String, Object>() {{
+ put("V1", 200L);
+ }});
+ committer.commit(conn);
+ changes.add(new ChangeRow(tid, startTS, pk2, c2));
+ c3 = addChange(conn, new HashMap(), startTS += 100,
+ CDC_UPSERT_EVENT_TYPE,
+ tableName, pk3, new TreeMap<String, Object>() {{
+ put("V1", 300L);
+ put("V2", null);
+ }});
+ committer.commit(conn);
+ changes.add(new ChangeRow(tid, startTS, pk3, c3));
+ c4 = addChange(conn, (Map) c1.get(CDC_POST_IMAGE), startTS += 100,
+ CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+ committer.commit(conn);
+ changes.add(new ChangeRow(tid, startTS, pk1, c4));
+ c5 = addChange(conn, new HashMap(),
+ startTS += 100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+ new TreeMap<String, Object>() {{
+ put("V1", 102L);
+ put("V2", 1002L);
+ }});
+ committer.commit(conn);
+ changes.add(new ChangeRow(tid, startTS, pk1, c5));
+ c6 = addChange(conn, (Map) c5.get(CDC_POST_IMAGE), startTS += 100,
+ CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+ committer.commit(conn);
+ changes.add(new ChangeRow(tid, startTS, pk1, c6));
+ c7 = addChange(conn, new HashMap(),
+ startTS += 100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+ new TreeMap<String, Object>() {{
+ put("V1", 103L);
+ put("V2", 1003L);
+ }});
+ committer.commit(conn);
+ changes.add(new ChangeRow(tid, startTS, pk1, c7));
+ c8 = addChange(conn, (Map) c7.get(CDC_POST_IMAGE), startTS += 100,
+ CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+ committer.commit(conn);
+ changes.add(new ChangeRow(tid, startTS, pk1, c8));
+ c9 = addChange(conn, new HashMap(),
+ startTS += 100, CDC_UPSERT_EVENT_TYPE, tableName, pk1,
+ new TreeMap<String, Object>() {{
+ put("V1", 104L);
+ put("V2", 1004L);
+ }});
+ committer.commit(conn);
+ changes.add(new ChangeRow(tid, startTS, pk1, c9));
+ c10 = addChange(conn, (Map) c9.get(CDC_POST_IMAGE), startTS += 100,
+ CDC_DELETE_EVENT_TYPE, tableName, pk1, null);
+ committer.commit(conn);
+ changes.add(new ChangeRow(tid, startTS, pk1, c10));
+ }
+ }
+ committer.reset();
+ return changes;
+ }
+
+ protected class ChangeRow {
+ private final String tenantid;
+ private final long changeTS;
+ private final Map<String, Object> pks;
+
+ public String getTenantID() {
+ return tenantid;
+ }
+
+ public Map<String, Object> getPreImage() {
+ return (Map<String, Object>) change.get(CDC_PRE_IMAGE);
+ }
+
+ public Map<String, Object> getChangeImage() {
+ return (Map<String, Object>) change.get(CDC_CHANGE_IMAGE);
+ }
+
+ public Map<String, Object> getPostImage() {
+ return (Map<String, Object>) change.get(CDC_POST_IMAGE);
+ }
+
+ private final Map<String, Object> change;
+
+ ChangeRow(String tenantid, long changeTS, Map<String, Object> pks, Map<String, Object> change) {
+ this.tenantid = tenantid;
+ this.changeTS = changeTS;
+ this.pks = pks;
+ this.change = change;
+ }
+
+ public String toString() {
+ return gson.toJson(this);
+ }
+
+ public Map<String, Object> getPrimaryKeys() {
+ return pks;
+ }
+
+ public long getChangeTimestamp() {
+ return changeTS;
+ }
+ }
+
+ protected abstract class CommitAdapter {
+ abstract void commit(Connection conn) throws SQLException;
+
+ void init() {
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ }
+
+ public void reset() {
+ EnvironmentEdgeManager.reset();
+ }
+
+ public Connection getConnection(String tid) throws SQLException {
+ return newConnection(tid);
+ }
+ }
+
+ protected final CommitAdapter COMMIT_SUCCESS = new CommitAdapter() {
+ @Override
+ public void commit(Connection conn) throws SQLException {
+ conn.commit();
+ }
+ };
+
+ protected final CommitAdapter COMMIT_FAILURE_EXPECTED = new CommitAdapter() {
+ @Override
+ public void commit(Connection conn) throws SQLException {
+ try {
+ conn.commit();
+ // It is config issue commit didn't fail.
+ fail("Commit expected to fail");
+ } catch (SQLException e) {
+ // this is expected
+ }
+ }
+
+ @Override
+ void init() {
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ }
+
+ @Override
+ public void reset() {
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ }
+ };
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
new file mode 100644
index 0000000000..62001ebaac
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+@Category(ParallelStatsDisabledTest.class)
+public class CDCDefinitionIT extends CDCBaseIT {
+ private final boolean forView;
+
+ public CDCDefinitionIT(boolean forView) {
+ this.forView = forView;
+ }
+
+ @Parameterized.Parameters(name = "forView={0}")
+ public static synchronized Collection<Boolean[]> data() {
+ return Arrays.asList(new Boolean[][] {
+ { false}, { true }
+ });
+ }
+
+ @Test
+ public void testCreate() throws Exception {
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ String datatableName = tableName;
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+ + " v2 DATE)");
+ if (forView) {
+ String viewName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName);
+ tableName = viewName;
+ }
+ String cdcName = generateUniqueName();
+ String cdc_sql;
+
+ try {
+ conn.createStatement().execute("CREATE CDC " + cdcName
+ + " ON NON_EXISTENT_TABLE");
+ fail("Expected to fail due to non-existent table");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
+ }
+
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, null, null, null);
+ assertCDCState(conn, cdcName, null, 3);
+ assertNoResults(conn, cdcName);
+
+ try {
+ conn.createStatement().execute(cdc_sql);
+ fail("Expected to fail due to duplicate index");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(), e.getErrorCode());
+ assertTrue(e.getMessage().endsWith(cdcName));
+ }
+
+ conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + cdcName + " ON " + tableName +
+ " INCLUDE (pre, post) INDEX_TYPE=g");
+
+ cdcName = generateUniqueName();
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE (pre, post)";
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, PTable.IndexType.UNCOVERED_GLOBAL);
+ assertCDCState(conn, cdcName, "PRE,POST", 3);
+ assertPTable(cdcName, new HashSet<>(
+ Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), tableName,
+ datatableName);
+ assertNoResults(conn, cdcName);
+
+ cdcName = generateUniqueName();
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INDEX_TYPE=l";
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, PTable.IndexType.LOCAL);
+ assertCDCState(conn, cdcName, null, 2);
+ assertPTable(cdcName, null, tableName, datatableName);
+ assertNoResults(conn, cdcName);
+
+ conn.close();
+ }
+
+ @Test
+ public void testCreateWithSalt() throws Exception {
+ // Indexes on views don't support salt buckets and is currently silently ignored.
+ if (forView) {
+ return;
+ }
+
+ // {data table bucket count, CDC bucket count}
+ Integer[][] saltingConfigs = new Integer[][] {
+ new Integer[]{null, 2},
+ new Integer[]{0, 2},
+ new Integer[]{4, null},
+ new Integer[]{4, 1},
+ new Integer[]{4, 0},
+ new Integer[]{4, 2}
+ };
+
+ for (Integer[] saltingConfig: saltingConfigs) {
+ try (Connection conn = newConnection()) {
+ String tableName = generateUniqueName();
+ createTable(conn, "CREATE TABLE " + tableName +
+ " ( k INTEGER PRIMARY KEY, v1 INTEGER, v2 DATE)",
+ null, false, saltingConfig[0], false, null);
+ assertSaltBuckets(conn, tableName, saltingConfig[0]);
+
+ String cdcName = generateUniqueName();
+ String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, null,
+ saltingConfig[1], null);
+ try {
+ assertCDCState(conn, cdcName, null, 3);
+ // Index inherits table salt buckets.
+ assertSaltBuckets(conn, cdcName, null);
+ assertSaltBuckets(conn, CDCUtil.getCDCIndexName(cdcName),
+ saltingConfig[1] != null ? saltingConfig[1] : saltingConfig[0]);
+ assertNoResults(conn, cdcName);
+ } catch (Exception error) {
+ throw new AssertionError("{tableSaltBuckets=" + saltingConfig[0] + ", " +
+ "cdcSaltBuckets=" + saltingConfig[1] + "} " + error.getMessage(),
+ error);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testCreateWithSchemaName() throws Exception {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String schemaName = generateUniqueName();
+ String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ String datatableName = tableName;
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," +
+ " v1 INTEGER, v2 DATE)");
+ if (forView) {
+ String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ conn.createStatement().execute(
+ "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName);
+ tableName = viewName;
+ }
+ String cdcName = generateUniqueName();
+ String cdc_sql;
+
+ try {
+ conn.createStatement().execute("CREATE CDC " + cdcName
+ + " ON NON_EXISTENT_TABLE");
+ fail("Expected to fail due to non-existent table");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
+ }
+
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql);
+ assertCDCState(conn, cdcName, null, 3);
+ assertPTable(cdcName, null, tableName, datatableName);
+ }
+
+ @Test
+ public void testCreateCDCMultitenant() throws Exception {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName +
+ " (tenantId INTEGER NOT NULL, k INTEGER NOT NULL," + " v1 INTEGER, v2 DATE, " +
+ "CONSTRAINT pk PRIMARY KEY (tenantId, k)) MULTI_TENANT=true");
+ String cdcName = generateUniqueName();
+ conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName);
+
+ PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName));
+ assertEquals(true, indexTable.isMultiTenant());
+ List<PColumn> idxPkColumns = indexTable.getPKColumns();
+ assertEquals(":TENANTID", idxPkColumns.get(0).getName().getString());
+ assertEquals(": PHOENIX_ROW_TIMESTAMP()", idxPkColumns.get(1).getName().getString());
+ assertEquals(":K", idxPkColumns.get(2).getName().getString());
+
+ PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName);
+ assertEquals(true, cdcTable.isMultiTenant());
+ List<PColumn> cdcPkColumns = cdcTable.getPKColumns();
+ assertEquals("TENANTID", cdcPkColumns.get(0).getName().getString());
+ assertEquals("K", cdcPkColumns.get(1).getName().getString());
+ }
+
+ @Test
+ public void testCreateWithNonDefaultColumnEncoding() throws Exception {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+ + " v2 DATE)");
+ if (forView) {
+ String viewName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName);
+ tableName = viewName;
+ }
+ String cdcName = generateUniqueName();
+
+ conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
+ " COLUMN_ENCODED_BYTES=" +
+ String.valueOf(NON_ENCODED_QUALIFIERS.getSerializedMetadataValue()));
+ PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName));
+ assertEquals(indexTable.getEncodingScheme(), NON_ENCODED_QUALIFIERS);
+ }
+
+ public void testDropCDC () throws SQLException {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+ + " v2 DATE)");
+ String cdcName = generateUniqueName();
+
+ String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+ conn.createStatement().execute(drop_cdc_sql);
+
+ try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
+ "system.catalog WHERE table_name = '" + cdcName +
+ "' AND column_name IS NULL and column_family IS NULL")) {
+ assertEquals(false, rs.next());
+ }
+ try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " +
+ "system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) +
+ "' AND column_name IS NULL and column_family IS NULL")) {
+ assertEquals(false, rs.next());
+ }
+
+ try {
+ conn.createStatement().execute(drop_cdc_sql);
+ fail("Expected to fail as cdc table doesn't exist");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
+ assertTrue(e.getMessage().endsWith(cdcName));
+ }
+ }
+
+ @Test
+ public void testDropCDCIndex () throws SQLException {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+ + " v2 DATE)");
+ String cdcName = generateUniqueName();
+ String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ conn.createStatement().execute(cdc_sql);
+ assertCDCState(conn, cdcName, null, 3);
+ String drop_cdc_index_sql = "DROP INDEX \"" + CDCUtil.getCDCIndexName(cdcName) + "\" ON " + tableName;
+ try {
+ conn.createStatement().execute(drop_cdc_index_sql);
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.CANNOT_DROP_CDC_INDEX.getErrorCode(), e.getErrorCode());
+ assertTrue(e.getMessage().endsWith(CDCUtil.getCDCIndexName(cdcName)));
+ }
+ }
+
+ @Test
+ public void testSelectCDCBadIncludeSpec() throws Exception {
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ conn.createStatement().execute("CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," +
+ " v1 INTEGER)");
+ if (forView) {
+ String viewName = generateUniqueName();
+ conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " +
+ tableName);
+ tableName = viewName;
+ }
+ String cdcName = generateUniqueName();
+ String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql);
+ try {
+ conn.createStatement().executeQuery("SELECT " +
+ "/*+ CDC_INCLUDE(DUMMY) */ * FROM " + cdcName);
+ fail("Expected to fail due to invalid CDC INCLUDE hint");
+ }
+ catch (SQLException e) {
+ assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(),
+ e.getErrorCode());
+ assertTrue(e.getMessage().endsWith("DUMMY"));
+ }
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
deleted file mode 100644
index c8cff782d7..0000000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import com.google.gson.Gson;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableProperty;
-import org.apache.phoenix.util.CDCUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@RunWith(Parameterized.class)
-@Category(ParallelStatsDisabledTest.class)
-public class CDCMiscIT extends ParallelStatsDisabledIT {
- private final boolean forView;
-
- public CDCMiscIT(boolean forView) {
- this.forView = forView;
- }
-
- @Parameterized.Parameters(name = "forVieiw={0}")
- public static synchronized Collection<Boolean[]> data() {
- return Arrays.asList(new Boolean[][] {
- { false}, { true }
- });
- }
-
- private void assertCDCState(Connection conn, String cdcName, String expInclude,
- int idxType) throws SQLException {
- try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
- "system.catalog WHERE table_name = '" + cdcName +
- "' AND column_name IS NULL and column_family IS NULL")) {
- assertEquals(true, rs.next());
- assertEquals(expInclude, rs.getString(1));
- }
- try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " +
- "system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) +
- "' AND column_name IS NULL and column_family IS NULL")) {
- assertEquals(true, rs.next());
- assertEquals(idxType, rs.getInt(1));
- }
- }
-
- private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes,
- String datatableName)
- throws SQLException {
- Properties props = new Properties();
- Connection conn = DriverManager.getConnection(getUrl(), props);
- PTable table = PhoenixRuntime.getTable(conn, cdcName);
- assertEquals(expIncludeScopes, table.getCDCIncludeScopes());
- assertEquals(expIncludeScopes, TableProperty.INCLUDE.getPTableValue(table));
- assertNull(table.getIndexState()); // Index state should be null for CDC.
- assertNull(table.getIndexType()); // This is not an index.
- assertEquals(datatableName, table.getParentName().getString());
- assertEquals(CDCUtil.getCDCIndexName(cdcName), table.getPhysicalName().getString());
- }
-
- private void assertSaltBuckets(String cdcName, Integer nbuckets) throws SQLException {
- Properties props = new Properties();
- Connection conn = DriverManager.getConnection(getUrl(), props);
- PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName);
- assertEquals(nbuckets, cdcTable.getBucketNum());
- PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName));
- assertEquals(nbuckets, indexTable.getBucketNum());
- }
-
- private void createAndWait(Connection conn, String tableName, String cdcName, String cdc_sql)
- throws Exception {
- conn.createStatement().execute(cdc_sql);
- IndexToolIT.runIndexTool(false, null, tableName,
- "\""+CDCUtil.getCDCIndexName(cdcName)+"\"");
- TestUtil.waitForIndexState(conn, CDCUtil.getCDCIndexName(cdcName), PIndexState.ACTIVE);
- }
-
- @Test
- public void testCreate() throws Exception {
- Properties props = new Properties();
- Connection conn = DriverManager.getConnection(getUrl(), props);
- String tableName = generateUniqueName();
- conn.createStatement().execute(
- "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
- + " v2 DATE)");
- if (forView) {
- String viewName = generateUniqueName();
- conn.createStatement().execute(
- "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName);
- tableName = viewName;
- }
- String cdcName = generateUniqueName();
-
- try {
- conn.createStatement().execute("CREATE CDC " + cdcName
- + " ON NON_EXISTENT_TABLE");
- fail("Expected to fail due to non-existent table");
- } catch (SQLException e) {
- assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
- }
-
- String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- createAndWait(conn, tableName, cdcName, cdc_sql);
- assertCDCState(conn, cdcName, null, 3);
-
- try {
- conn.createStatement().execute(cdc_sql);
- fail("Expected to fail due to duplicate index");
- } catch (SQLException e) {
- assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(), e.getErrorCode());
- assertTrue(e.getMessage().endsWith(cdcName));
- }
-
- conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + cdcName + " ON " + tableName +
- " INCLUDE (pre, post) INDEX_TYPE=g");
-
- cdcName = generateUniqueName();
- cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName +
- " INCLUDE (pre, post) INDEX_TYPE=g";
- createAndWait(conn, tableName, cdcName, cdc_sql);
- assertCDCState(conn, cdcName, "PRE,POST", 3);
- assertPTable(cdcName, new HashSet<>(
- Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), tableName);
-
- cdcName = generateUniqueName();
- cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INDEX_TYPE=l";
- createAndWait(conn, tableName, cdcName, cdc_sql);
- assertCDCState(conn, cdcName, null, 2);
- assertPTable(cdcName, null, tableName);
-
- // Indexes on views don't support salt buckets and is currently silently ignored.
- if (! forView) {
- cdcName = generateUniqueName();
- cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " SALT_BUCKETS = 4";
- createAndWait(conn, tableName, cdcName, cdc_sql);
- assertSaltBuckets(cdcName, 4);
- }
-
- conn.close();
- }
-
- @Test
- public void testCreateCDCMultitenant() throws Exception {
- Properties props = new Properties();
- Connection conn = DriverManager.getConnection(getUrl(), props);
- String tableName = generateUniqueName();
- conn.createStatement().execute("CREATE TABLE " + tableName +
- " (tenantId INTEGER NOT NULL, k INTEGER NOT NULL," + " v1 INTEGER, v2 DATE, " +
- "CONSTRAINT pk PRIMARY KEY (tenantId, k)) MULTI_TENANT=true");
- String cdcName = generateUniqueName();
- conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName);
-
- PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName));
- List<PColumn> idxPkColumns = indexTable.getPKColumns();
- assertEquals(":TENANTID", idxPkColumns.get(0).getName().getString());
- assertEquals(": PHOENIX_ROW_TIMESTAMP()", idxPkColumns.get(1).getName().getString());
- assertEquals(":K", idxPkColumns.get(2).getName().getString());
-
- PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName);
- List<PColumn> cdcPkColumns = cdcTable.getPKColumns();
- assertEquals("PHOENIX_ROW_TIMESTAMP()", cdcPkColumns.get(0).getName().getString());
- assertEquals("TENANTID", cdcPkColumns.get(1).getName().getString());
- assertEquals("K", cdcPkColumns.get(2).getName().getString());
- }
-
- public void testDropCDC () throws SQLException {
- Properties props = new Properties();
- Connection conn = DriverManager.getConnection(getUrl(), props);
- String tableName = generateUniqueName();
- conn.createStatement().execute(
- "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
- + " v2 DATE)");
- String cdcName = generateUniqueName();
-
- String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
- conn.createStatement().execute(drop_cdc_sql);
-
- try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
- "system.catalog WHERE table_name = '" + cdcName +
- "' AND column_name IS NULL and column_family IS NULL")) {
- assertEquals(false, rs.next());
- }
- try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " +
- "system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) +
- "' AND column_name IS NULL and column_family IS NULL")) {
- assertEquals(false, rs.next());
- }
-
- try {
- conn.createStatement().execute(drop_cdc_sql);
- fail("Expected to fail as cdc table doesn't exist");
- } catch (SQLException e) {
- assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
- assertTrue(e.getMessage().endsWith(cdcName));
- }
- }
-
- @Test
- public void testDropCDCIndex () throws SQLException {
- Properties props = new Properties();
- Connection conn = DriverManager.getConnection(getUrl(), props);
- String tableName = generateUniqueName();
- conn.createStatement().execute(
- "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
- + " v2 DATE)");
- String cdcName = generateUniqueName();
- String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
- conn.createStatement().execute(cdc_sql);
- assertCDCState(conn, cdcName, null, 3);
- String drop_cdc_index_sql = "DROP INDEX \"" + CDCUtil.getCDCIndexName(cdcName) + "\" ON " + tableName;
- try {
- conn.createStatement().execute(drop_cdc_index_sql);
- } catch (SQLException e) {
- assertEquals(SQLExceptionCode.CANNOT_DROP_CDC_INDEX.getErrorCode(), e.getErrorCode());
- assertTrue(e.getMessage().endsWith(CDCUtil.getCDCIndexName(cdcName)));
- }
- }
-
- private void assertResultSet(ResultSet rs) throws Exception{
- Gson gson = new Gson();
- assertEquals(true, rs.next());
- assertEquals(1, rs.getInt(2));
- assertEquals(new HashMap(){{put("V1", 100d);}}, gson.fromJson(rs.getString(3),
- HashMap.class));
- assertEquals(true, rs.next());
- assertEquals(2, rs.getInt(2));
- assertEquals(new HashMap(){{put("V1", 200d);}}, gson.fromJson(rs.getString(3),
- HashMap.class));
- assertEquals(true, rs.next());
- assertEquals(1, rs.getInt(2));
- assertEquals(new HashMap(){{put("V1", 101d);}}, gson.fromJson(rs.getString(3),
- HashMap.class));
- assertEquals(false, rs.next());
- rs.close();
- }
-
- private Connection newConnection() throws SQLException {
- Properties props = new Properties();
- // Use these only for debugging.
- //props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
- //props.put("hbase.client.scanner.timeout.period", "6000000");
- //props.put("phoenix.query.timeoutMs", "6000000");
- //props.put("zookeeper.session.timeout", "6000000");
- //props.put("hbase.rpc.timeout", "6000000");
- return DriverManager.getConnection(getUrl(), props);
- }
-
- @Test
- public void testSelectCDC() throws Exception {
- Connection conn = newConnection();
- String tableName = generateUniqueName();
- conn.createStatement().execute(
- "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
- conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
- conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
- conn.commit();
- Thread.sleep(10);
- conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
- conn.commit();
- String cdcName = generateUniqueName();
- String cdc_sql = "CREATE CDC " + cdcName
- + " ON " + tableName;
- createAndWait(conn, tableName, cdcName, cdc_sql);
- assertCDCState(conn, cdcName, null, 3);
- // NOTE: To debug the query execution, add the below condition where you need a breakpoint.
- // if (<table>.getTableName().getString().equals("N000002") ||
- // <table>.getTableName().getString().equals("__CDC__N000002")) {
- // "".isEmpty();
- // }
- assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName));
- assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName +
- " WHERE PHOENIX_ROW_TIMESTAMP() < NOW()"));
- assertResultSet(conn.createStatement().executeQuery("SELECT " +
- "/*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName));
- assertResultSet(conn.createStatement().executeQuery("SELECT " +
- "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcName));
-
- HashMap<String, int[]> testQueries = new HashMap<String, int[]>() {{
- put("SELECT 'dummy', k FROM " + cdcName, new int [] {2, 1});
- put("SELECT * FROM " + cdcName +
- " ORDER BY k ASC", new int [] {1, 1, 2});
- put("SELECT * FROM " + cdcName +
- " ORDER BY k DESC", new int [] {2, 1, 1});
- put("SELECT * FROM " + cdcName +
- " ORDER BY PHOENIX_ROW_TIMESTAMP() ASC", new int [] {1, 2, 1});
- }};
- for (Map.Entry<String, int[]> testQuery: testQueries.entrySet()) {
- try (ResultSet rs = conn.createStatement().executeQuery(testQuery.getKey())) {
- for (int k: testQuery.getValue()) {
- assertEquals(true, rs.next());
- assertEquals(k, rs.getInt(2));
- }
- assertEquals(false, rs.next());
- }
- }
-
- try (ResultSet rs = conn.createStatement().executeQuery(
- "SELECT * FROM " + cdcName + " WHERE PHOENIX_ROW_TIMESTAMP() > NOW()")) {
- assertEquals(false, rs.next());
- }
- try (ResultSet rs = conn.createStatement().executeQuery("SELECT 'abc' FROM " + cdcName)) {
- assertEquals(true, rs.next());
- assertEquals("abc", rs.getString(1));
- assertEquals(true, rs.next());
- assertEquals("abc", rs.getString(1));
- assertEquals(false, rs.next());
- }
- }
-
- @Test
- public void testSelectCDCBadIncludeSpec() throws Exception {
- Connection conn = newConnection();
- String tableName = generateUniqueName();
- conn.createStatement().execute(
- "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
- String cdcName = generateUniqueName();
- String cdc_sql = "CREATE CDC " + cdcName
- + " ON " + tableName;
- conn.createStatement().execute(cdc_sql);
- try {
- conn.createStatement().executeQuery("SELECT " +
- "/*+ CDC_INCLUDE(DUMMY) */ * FROM " + cdcName);
- fail("Expected to fail due to invalid CDC INCLUDE hint");
- }
- catch (SQLException e) {
- assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(),
- e.getErrorCode());
- assertTrue(e.getMessage().endsWith("DUMMY"));
- }
- }
-
- @Test
- public void testSelectTimeRangeQueries() throws Exception {
- Connection conn = newConnection();
- String tableName = generateUniqueName();
- conn.createStatement().execute(
- "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
- String cdcName = generateUniqueName();
- String cdc_sql = "CREATE CDC " + cdcName
- + " ON " + tableName;
- conn.createStatement().execute(cdc_sql);
- Timestamp ts1 = new Timestamp(System.currentTimeMillis());
- conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
- conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
- conn.commit();
- Thread.sleep(10);
- Timestamp ts2 = new Timestamp(System.currentTimeMillis());
- conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
- conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (3, 300)");
- conn.commit();
- Thread.sleep(10);
- Timestamp ts3 = new Timestamp(System.currentTimeMillis());
- conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
- conn.createStatement().execute("DELETE FROM " + tableName + " WHERE k = 2");
- Timestamp ts4 = new Timestamp(System.currentTimeMillis());
-
- String sel_sql = "SELECT * FROM " + cdcName + " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND " +
- "PHOENIX_ROW_TIMESTAMP() <= ?";
- Object[] testDataSets = new Object[] {
- new Object[] {ts1, ts2, new int[] {1, 2}}/*,
- new Object[] {ts2, ts3, new int[] {1, 3}},
- new Object[] {ts3, ts4, new int[] {1}}*/
- };
- PreparedStatement stmt = conn.prepareStatement(sel_sql);
- for (int i = 0; i < testDataSets.length; ++i) {
- Object[] testData = (Object[]) testDataSets[i];
- stmt.setTimestamp(1, (Timestamp) testData[0]);
- stmt.setTimestamp(2, (Timestamp) testData[1]);
- try (ResultSet rs = stmt.executeQuery()) {
- for (int k: (int[]) testData[2]) {
- assertEquals(true, rs.next());
- assertEquals(k, rs.getInt(2));
- }
- assertEquals(false, rs.next());
- }
- }
- }
-
- // Temporary test case used as a reference for debugging and comparing against the CDC query.
- @Test
- public void testSelectUncoveredIndex() throws Exception {
- Connection conn = newConnection();
- String tableName = generateUniqueName();
- conn.createStatement().execute(
- "CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, v1 INTEGER)");
- conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" +
- " (1, 100)");
- conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" +
- " (2, 200)");
- conn.commit();
- String indexName = generateUniqueName();
- String index_sql = "CREATE UNCOVERED INDEX " + indexName
- + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
- conn.createStatement().execute(index_sql);
- //ResultSet rs =
- // conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName +
- // " " + indexName + ") */ * FROM " + tableName);
- ResultSet rs =
- conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName +
- " " + indexName + ") */ K, V1, PHOENIX_ROW_TIMESTAMP() FROM " + tableName);
- assertEquals(true, rs.next());
- assertEquals(1, rs.getInt(1));
- assertEquals(100, rs.getInt(2));
- assertEquals(true, rs.next());
- assertEquals(2, rs.getInt(1));
- assertEquals(200, rs.getInt(2));
- assertEquals(false, rs.next());
- }
-}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
new file mode 100644
index 0000000000..09f3af45a4
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.phoenix.query.QueryConstants.CDC_CHANGE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+// NOTE: To debug the query execution, add the below condition or the equivalent where you need a
+// breakpoint.
+// if (<table>.getTableName().getString().equals("N000002") ||
+// <table>.getTableName().getString().equals("__CDC__N000002")) {
+// "".isEmpty();
+// }
+@RunWith(Parameterized.class)
+@Category(ParallelStatsDisabledTest.class)
+public class CDCQueryIT extends CDCBaseIT {
+ // Offset of the first column, depending on whether PHOENIX_ROW_TIMESTAMP() is in the schema
+ // or not.
+ private final boolean forView;
+ private final boolean dataBeforeCDC;
+ private final PTable.QualifierEncodingScheme encodingScheme;
+ private final boolean multitenant;
+ private final Integer indexSaltBuckets;
+ private final Integer tableSaltBuckets;
+ private final boolean withSchemaName;
+
+ public CDCQueryIT(Boolean forView, Boolean dataBeforeCDC,
+ PTable.QualifierEncodingScheme encodingScheme, boolean multitenant,
+ Integer indexSaltBuckets, Integer tableSaltBuckets, boolean withSchemaName) {
+ this.forView = forView;
+ this.dataBeforeCDC = dataBeforeCDC;
+ this.encodingScheme = encodingScheme;
+ this.multitenant = multitenant;
+ this.indexSaltBuckets = indexSaltBuckets;
+ this.tableSaltBuckets = tableSaltBuckets;
+ this.withSchemaName = withSchemaName;
+ }
+
+ @Parameterized.Parameters(name = "forView={0} dataBeforeCDC={1}, encodingScheme={2}, " +
+ "multitenant={3}, indexSaltBuckets={4}, tableSaltBuckets={5} withSchemaName=${6}")
+ public static synchronized Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ { Boolean.FALSE, Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, null,
+ Boolean.FALSE },
+ { Boolean.FALSE, Boolean.TRUE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, null,
+ Boolean.TRUE },
+ { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 1, 1,
+ Boolean.FALSE },
+ // Once PHOENIX-7239, change this to have different salt buckets for data and index.
+ { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 1,
+ Boolean.TRUE },
+ { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4, null,
+ Boolean.FALSE },
+ { Boolean.TRUE, Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, null,
+ Boolean.FALSE },
+ });
+ }
+
+ @Before
+ public void beforeTest(){
+ EnvironmentEdgeManager.reset();
+ injectEdge = new ManualEnvironmentEdge();
+ injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+ }
+
+ @Test
+ public void testSelectCDC() throws Exception {
+ String cdcName, cdc_sql;
+ String schemaName = withSchemaName ? generateUniqueName() : null;
+ String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ String datatableName = tableName;
+ try (Connection conn = newConnection()) {
+ createTable(conn, "CREATE TABLE " + tableName + " ("
+ + (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "")
+ + "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, B.vb INTEGER, "
+ + "CONSTRAINT PK PRIMARY KEY " + (multitenant ? "(TENANT_ID, k) " : "(k)")
+ + ")", encodingScheme, multitenant, tableSaltBuckets, false, null);
+ if (forView) {
+ String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName,
+ encodingScheme);
+ tableName = viewName;
+ }
+ cdcName = generateUniqueName();
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ if (!dataBeforeCDC) {
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+ indexSaltBuckets, null);
+ }
+ }
+
+ String tenantId = multitenant ? "1000" : null;
+ String[] tenantids = {tenantId};
+ if (multitenant) {
+ tenantids = new String[] {tenantId, "2000"};
+ }
+
+ long startTS = System.currentTimeMillis();
+ List<ChangeRow> changes = generateChanges(startTS, tenantids, tableName, null,
+ COMMIT_SUCCESS);
+
+ if (dataBeforeCDC) {
+ try (Connection conn = newConnection()) {
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+ indexSaltBuckets, null);
+ }
+ // Testing with flushed data adds more coverage.
+ getUtility().getAdmin().flush(TableName.valueOf(datatableName));
+ getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
+ CDCUtil.getCDCIndexName(cdcName))));
+ }
+
+ //SingleCellIndexIT.dumpTable(tableName);
+ //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
+
+ String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
+ try (Connection conn = newConnection(tenantId)) {
+
+ // Existence of CDC shouldn't cause the regular query path to fail.
+ String uncovered_sql = "SELECT " + " /*+ INDEX(" + tableName + " " +
+ CDCUtil.getCDCIndexName(cdcName) + ") */ k, v1 FROM " + tableName;
+ try (ResultSet rs = conn.createStatement().executeQuery(uncovered_sql)) {
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ assertEquals(300, rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals(201, rs.getInt(2));
+ assertFalse(rs.next());
+ }
+
+ verifyChanges(tenantId, conn.createStatement().executeQuery(
+ "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName), changes,
+ CHANGE_IMG, true);
+ verifyChanges(tenantId, conn.createStatement().executeQuery(
+ "SELECT /*+ CDC_INCLUDE(CHANGE) */ PHOENIX_ROW_TIMESTAMP(), K," +
+ "\"CDC JSON\" FROM " + cdcFullName), changes,
+ CHANGE_IMG, true);
+ verifyChanges(tenantId, conn.createStatement().executeQuery(
+ "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName),
+ changes, PRE_POST_IMG, true);
+ verifyChanges(tenantId, conn.createStatement().executeQuery("SELECT * FROM " + cdcFullName),
+ changes, new HashSet<>(), true);
+
+ HashMap<String, int[]> testQueries = new HashMap<String, int[]>() {{
+ put("SELECT 'dummy', k, \"CDC JSON\" FROM " + cdcFullName,
+ new int[]{1, 2, 3, 1, 1, 1, 1, 2, 1, 1, 1, 1});
+ put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + cdcFullName +
+ " ORDER BY k ASC", new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 3});
+ put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + cdcFullName +
+ " ORDER BY k DESC", new int[]{3, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1});
+ put("SELECT PHOENIX_ROW_TIMESTAMP(), k, \"CDC JSON\" FROM " + cdcFullName +
+ " ORDER BY PHOENIX_ROW_TIMESTAMP() DESC",
+ new int[]{1, 1, 1, 1, 2, 1, 1, 1, 1, 3, 2, 1});
+ }};
+ Map dummyChange = new HashMap() {{
+ put(CDC_EVENT_TYPE, "dummy");
+ }};
+ for (Map.Entry<String, int[]> testQuery : testQueries.entrySet()) {
+ try (ResultSet rs = conn.createStatement().executeQuery(testQuery.getKey())) {
+ for (int i = 0; i < testQuery.getValue().length; ++i) {
+ int k = testQuery.getValue()[i];
+ assertEquals(true, rs.next());
+ assertEquals("Index: " + i + " for query: " + testQuery.getKey(),
+ k, rs.getInt(2));
+ Map change = gson.fromJson(rs.getString(3), HashMap.class);
+ change.put(CDC_EVENT_TYPE, "dummy");
+ // Verify that we are getting nothing but the event type as we specified
+ // no change scopes.
+ assertEquals(dummyChange, change);
+ }
+ assertEquals(false, rs.next());
+ }
+ }
+ }
+ }
+
+ private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStorageScheme)
+ throws Exception {
+ String cdcName, cdc_sql;
+ String schemaName = withSchemaName ? generateUniqueName() : null;
+ String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ String datatableName = tableName;
+ try (Connection conn = newConnection()) {
+ createTable(conn, "CREATE TABLE " + tableName + " (" +
+ (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") +
+ "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, CONSTRAINT PK PRIMARY KEY " +
+ (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", encodingScheme, multitenant,
+ tableSaltBuckets, true, immutableStorageScheme);
+ if (forView) {
+ String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName,
+ encodingScheme);
+ tableName = viewName;
+ }
+ cdcName = generateUniqueName();
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ if (!dataBeforeCDC) {
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+ indexSaltBuckets, null);
+ }
+ }
+
+ String tenantId = multitenant ? "1000" : null;
+ String[] tenantids = {tenantId};
+ if (multitenant) {
+ tenantids = new String[] {tenantId, "2000"};
+ }
+
+ long startTS = System.currentTimeMillis();
+ List<ChangeRow> changes = generateChangesImmutableTable(startTS, tenantids, tableName,
+ COMMIT_SUCCESS);
+
+ if (dataBeforeCDC) {
+ try (Connection conn = newConnection()) {
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+ indexSaltBuckets, null);
+ }
+ // Testing with flushed data adds more coverage.
+ getUtility().getAdmin().flush(TableName.valueOf(datatableName));
+ getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
+ CDCUtil.getCDCIndexName(cdcName))));
+ }
+
+ String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
+ try (Connection conn = newConnection(tenantId)) {
+ // For debug: uncomment to see the exact results logged to console.
+ //try (Statement stmt = conn.createStatement()) {
+ // try (ResultSet rs = stmt.executeQuery(
+ // "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," +
+ // "\"CDC JSON\" FROM " + cdcFullName)) {
+ // while (rs.next()) {
+ // System.out.println("----- " + rs.getString(1) + " " +
+ // rs.getInt(2) + " " + rs.getString(3));
+ // }
+ // }
+ //}
+ verifyChanges(tenantId, conn.createStatement().executeQuery(
+ "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName),
+ changes, PRE_POST_IMG, false);
+ verifyChanges(tenantId, conn.createStatement().executeQuery(
+ "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + cdcFullName), changes,
+ CHANGE_IMG, false);
+ verifyChanges(tenantId, conn.createStatement().executeQuery("SELECT /*+ CDC_INCLUDE(CHANGE) */ " +
+ "PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcFullName),
+ changes, CHANGE_IMG, false);
+ }
+ }
+
+ @Test
+ public void testSelectCDCImmutableOneCellPerColumn() throws Exception {
+ _testSelectCDCImmutable(PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
+ }
+
+ @Test
+ public void testSelectCDCImmutableSingleCell() throws Exception {
+ _testSelectCDCImmutable(PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS);
+ }
+
+ @Test
+ public void testSelectTimeRangeQueries() throws Exception {
+ String cdcName, cdc_sql;
+ String schemaName = withSchemaName ? generateUniqueName() : null;
+ String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ try (Connection conn = newConnection()) {
+ createTable(conn, "CREATE TABLE " + tableName + " (" +
+ (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") +
+ "k INTEGER NOT NULL, v1 INTEGER, CONSTRAINT PK PRIMARY KEY " +
+ (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", encodingScheme, multitenant,
+ tableSaltBuckets, false, null);
+ if (forView) {
+ String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName,
+ encodingScheme);
+ tableName = viewName;
+ }
+ cdcName = generateUniqueName();
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ if (!dataBeforeCDC) {
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+ indexSaltBuckets, null);
+ }
+ }
+
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+
+ String tenantId = multitenant ? "1000" : null;
+ String[] tenantids = {tenantId};
+ if (multitenant) {
+ tenantids = new String[] {tenantId, "2000"};
+ }
+
+ Timestamp ts1 = new Timestamp(System.currentTimeMillis());
+ cal.setTimeInMillis(ts1.getTime());
+ injectEdge.setValue(ts1.getTime());
+
+ for (String tid: tenantids) {
+ try (Connection conn = newConnection(tid)) {
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
+ conn.commit();
+ }
+ }
+
+ injectEdge.incrementValue(100);
+
+ for (String tid: tenantids) {
+ try (Connection conn = newConnection(tid)) {
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
+ conn.commit();
+ }
+ }
+
+ injectEdge.incrementValue(100);
+ cal.add(Calendar.MILLISECOND, 200);
+ Timestamp ts2 = new Timestamp(cal.getTime().getTime());
+ injectEdge.incrementValue(100);
+
+ for (String tid: tenantids) {
+ try (Connection conn = newConnection(tid)) {
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
+ conn.commit();
+ injectEdge.incrementValue(100);
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (3, 300)");
+ conn.commit();
+ }
+ }
+
+ injectEdge.incrementValue(100);
+ cal.add(Calendar.MILLISECOND, 200 + 100 * tenantids.length);
+ Timestamp ts3 = new Timestamp(cal.getTime().getTime());
+ injectEdge.incrementValue(100);
+
+ for (String tid: tenantids) {
+ try (Connection conn = newConnection(tid)) {
+ conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
+ conn.commit();
+ injectEdge.incrementValue(100);
+ conn.createStatement().execute("DELETE FROM " + tableName + " WHERE k = 2");
+ conn.commit();
+ }
+ }
+
+ injectEdge.incrementValue(100);
+ cal.add(Calendar.MILLISECOND, 200 + 100 * tenantids.length);
+ Timestamp ts4 = new Timestamp(cal.getTime().getTime());
+ EnvironmentEdgeManager.reset();
+
+ if (dataBeforeCDC) {
+ try (Connection conn = newConnection()) {
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+ indexSaltBuckets, null);
+ }
+ }
+
+ //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName));
+
+ String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
+ try (Connection conn = newConnection(tenantId)) {
+ String sel_sql =
+ "SELECT to_char(phoenix_row_timestamp()), k, \"CDC JSON\" FROM " + cdcFullName +
+ " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() <= ?";
+ Object[] testDataSets = new Object[] {
+ new Object[] {ts1, ts2, new int[] {1, 2}},
+ new Object[] {ts2, ts3, new int[] {1, 3}},
+ new Object[] {ts3, ts4, new int[] {1, 2}},
+ new Object[] {ts1, ts4, new int[] {1, 2, 1, 3, 1, 2}},
+ };
+ PreparedStatement stmt = conn.prepareStatement(sel_sql);
+ // For debug: uncomment to see the exact results logged to console.
+ //System.out.println("----- ts1: " + ts1 + " ts2: " + ts2 + " ts3: " + ts3 + " ts4: " +
+ // ts4);
+ //for (int i = 0; i < testDataSets.length; ++i) {
+ // Object[] testData = (Object[]) testDataSets[i];
+ // stmt.setTimestamp(1, (Timestamp) testData[0]);
+ // stmt.setTimestamp(2, (Timestamp) testData[1]);
+ // try (ResultSet rs = stmt.executeQuery()) {
+ // System.out.println("----- Test data set: " + i);
+ // while (rs.next()) {
+ // System.out.println("----- " + rs.getString(1) + " " +
+ // rs.getInt(2) + " " + rs.getString(3));
+ // }
+ // }
+ //}
+ for (int i = 0; i < testDataSets.length; ++i) {
+ Object[] testData = (Object[]) testDataSets[i];
+ stmt.setTimestamp(1, (Timestamp) testData[0]);
+ stmt.setTimestamp(2, (Timestamp) testData[1]);
+ try (ResultSet rs = stmt.executeQuery()) {
+ for (int j = 0; j < ((int[]) testData[2]).length; ++j) {
+ int k = ((int[]) testData[2])[j];
+ assertEquals(" Index: " + j + " Test data set: " + i,
+ true, rs.next());
+ assertEquals(" Index: " + j + " Test data set: " + i,
+ k, rs.getInt(2));
+ }
+ assertEquals("Test data set: " + i, false, rs.next());
+ }
+ }
+
+ PreparedStatement pstmt = conn.prepareStatement(
+ "SELECT * FROM " + cdcFullName + " WHERE PHOENIX_ROW_TIMESTAMP() > ?");
+ pstmt.setTimestamp(1, ts4);
+ try (ResultSet rs = pstmt.executeQuery()) {
+ assertEquals(false, rs.next());
+ }
+ }
+ }
+
+ @Test
+ public void testSelectCDCWithDDL() throws Exception {
+ String schemaName = withSchemaName ? generateUniqueName() : null;
+ String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ String datatableName = tableName;
+ String cdcName, cdc_sql;
+ try (Connection conn = newConnection()) {
+ createTable(conn, "CREATE TABLE " + tableName + " (" +
+ (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") +
+ "k INTEGER NOT NULL, v0 INTEGER, v1 INTEGER, v1v2 INTEGER, v2 INTEGER, B.vb INTEGER, " +
+ "v3 INTEGER, CONSTRAINT PK PRIMARY KEY " +
+ (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", encodingScheme, multitenant,
+ tableSaltBuckets, false, null);
+ if (forView) {
+ String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName,
+ encodingScheme);
+ tableName = viewName;
+ }
+
+ cdcName = generateUniqueName();
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ if (!dataBeforeCDC) {
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+ indexSaltBuckets, null);
+ }
+ conn.createStatement().execute("ALTER TABLE " + datatableName + " DROP COLUMN v0");
+ }
+
+ String tenantId = multitenant ? "1000" : null;
+ String[] tenantids = {tenantId};
+ if (multitenant) {
+ tenantids = new String[] {tenantId, "2000"};
+ }
+
+ long startTS = System.currentTimeMillis();
+ List<ChangeRow> changes = generateChanges(startTS, tenantids, tableName, datatableName,
+ COMMIT_SUCCESS);
+
+ if (dataBeforeCDC) {
+ try (Connection conn = newConnection()) {
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+ indexSaltBuckets, null);
+ }
+ // Testing with flushed data adds more coverage.
+ getUtility().getAdmin().flush(TableName.valueOf(datatableName));
+ getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName,
+ CDCUtil.getCDCIndexName(cdcName))));
+ }
+
+ try (Connection conn = newConnection(tenantId)) {
+ verifyChanges(tenantId, conn.createStatement().executeQuery(
+ "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROM " + SchemaUtil.getTableName(
+ schemaName, cdcName)),
+ changes, CHANGE_IMG, true);
+ }
+ }
+
+ private void assertCDCBinaryAndDateColumn(ResultSet rs,
+ List<byte []> byteColumnValues,
+ List<Date> dateColumnValues,
+ Timestamp timestamp) throws Exception {
+ assertEquals(true, rs.next());
+ assertEquals(1, rs.getInt(2));
+
+ Map<String, Object> row1 = new HashMap<String, Object>(){{
+ put(CDC_EVENT_TYPE, CDC_UPSERT_EVENT_TYPE);
+ }};
+ Map<String, Object> postImage = new HashMap<>();
+ postImage.put("A_BINARY",
+ Base64.getEncoder().encodeToString(byteColumnValues.get(0)));
+ postImage.put("D", dateColumnValues.get(0).toString());
+ postImage.put("T", timestamp.toString());
+ row1.put(CDC_POST_IMAGE, postImage);
+ Map<String, Object> changeImage = new HashMap<>();
+ changeImage.put("A_BINARY",
+ Base64.getEncoder().encodeToString(byteColumnValues.get(0)));
+ changeImage.put("D", dateColumnValues.get(0).toString());
+ changeImage.put("T", timestamp.toString());
+ row1.put(CDC_CHANGE_IMAGE, changeImage);
+ row1.put(CDC_PRE_IMAGE, new HashMap<String, String>() {{
+ }});
+ assertEquals(row1, gson.fromJson(rs.getString(3),
+ HashMap.class));
+
+ assertEquals(true, rs.next());
+ assertEquals(2, rs.getInt(2));
+ HashMap<String, Object> row2Json = gson.fromJson(rs.getString(3),
+ HashMap.class);
+ String row2BinaryColStr = (String) ((Map)((Map)row2Json.get(CDC_CHANGE_IMAGE))).get("A_BINARY");
+ byte[] row2BinaryCol = Base64.getDecoder().decode(row2BinaryColStr);
+
+ assertEquals(0, DescVarLengthFastByteComparisons.compareTo(byteColumnValues.get(1),
+ 0, byteColumnValues.get(1).length, row2BinaryCol, 0, row2BinaryCol.length));
+ }
+
+ @Test
+ public void testCDCBinaryAndDateColumn() throws Exception {
+ List<byte []> byteColumnValues = new ArrayList<>();
+ byteColumnValues.add( new byte[] {0,0,0,0,0,0,0,0,0,1});
+ byteColumnValues.add(new byte[] {0,0,0,0,0,0,0,0,0,2});
+ List<Date> dateColumnValues = new ArrayList<>();
+ dateColumnValues.add(Date.valueOf("2024-02-01"));
+ dateColumnValues.add(Date.valueOf("2024-01-31"));
+ Timestamp timestampColumnValue = Timestamp.valueOf("2024-01-31 12:12:14");
+ String cdcName, cdc_sql;
+ String schemaName = withSchemaName ? generateUniqueName() : null;
+ String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ try (Connection conn = newConnection()) {
+ createTable(conn, "CREATE TABLE " + tableName + " (" +
+ (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") +
+ "k INTEGER NOT NULL, a_binary binary(10), d Date, t TIMESTAMP, " +
+ "CONSTRAINT PK PRIMARY KEY " +
+ (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", encodingScheme, multitenant,
+ tableSaltBuckets, false, null);
+ if (forView) {
+ String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName,
+ encodingScheme);
+ tableName = viewName;
+ }
+ cdcName = generateUniqueName();
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ if (!dataBeforeCDC) {
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+ indexSaltBuckets, null);
+ }
+ }
+
+ String tenantId = multitenant ? "1000" : null;
+ try (Connection conn = newConnection(tenantId)) {
+ String upsertQuery = "UPSERT INTO " + tableName + " (k, a_binary, d, t) VALUES (?, ?, ?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(upsertQuery);
+ stmt.setInt(1, 1);
+ stmt.setBytes(2, byteColumnValues.get(0));
+ stmt.setDate(3, dateColumnValues.get(0));
+ stmt.setTimestamp(4, timestampColumnValue);
+ stmt.execute();
+ conn.commit();
+ stmt.setInt(1, 2);
+ stmt.setBytes(2, byteColumnValues.get(1));
+ stmt.setDate(3, dateColumnValues.get(1));
+ stmt.setTimestamp(4, timestampColumnValue);
+ stmt.execute();
+ conn.commit();
+ }
+
+ if (dataBeforeCDC) {
+ try (Connection conn = newConnection()) {
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme,
+ indexSaltBuckets, null);
+ }
+ }
+
+ try (Connection conn = newConnection(tenantId)) {
+ assertCDCBinaryAndDateColumn(conn.createStatement().executeQuery
+ ("SELECT /*+ CDC_INCLUDE(PRE, POST, CHANGE) */ * FROM " +
+ SchemaUtil.getTableName(schemaName, cdcName)),
+ byteColumnValues, dateColumnValues, timestampColumnValue);
+ }
+ }
+
+ @Test
+ public void testSelectCDCFailDataTableUpdate() throws Exception {
+ if (dataBeforeCDC == true) {
+ // In this case, index will not exist at the time of upsert, so we can't simulate the
+ // index failure.
+ return;
+ }
+ String schemaName = withSchemaName ? generateUniqueName() : null;
+ String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ String cdcName, cdc_sql;
+ try (Connection conn = newConnection()) {
+ createTable(conn, "CREATE TABLE " + tableName + " (" +
+ (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") +
+ "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, B.vb INTEGER, " +
+ "CONSTRAINT PK PRIMARY KEY " +
+ (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", encodingScheme, multitenant,
+ tableSaltBuckets, false, null);
+ if (forView) {
+ String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName());
+ createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName,
+ encodingScheme);
+ tableName = viewName;
+ }
+ cdcName = generateUniqueName();
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme, indexSaltBuckets, null);
+ }
+
+ String tenantId = multitenant ? "1000" : null;
+ String[] tenantids = {tenantId};
+ if (multitenant) {
+ tenantids = new String[] {tenantId, "2000"};
+ }
+
+ long startTS = System.currentTimeMillis();
+ generateChanges(startTS, tenantids, tableName, null,
+ COMMIT_FAILURE_EXPECTED);
+
+ try (Connection conn = newConnection(tenantId)) {
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " +
+ SchemaUtil.getTableName(schemaName, cdcName));
+ assertEquals(false, rs.next());
+ }
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
index d21d00ad2f..2dbbabe76b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java
@@ -48,11 +48,14 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Date;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Properties;
+import java.util.TimeZone;
import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
@@ -500,12 +503,18 @@ public class SingleCellIndexIT extends ParallelStatsDisabledIT {
hTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName.getBytes());
Scan scan = new Scan();
scan.setRaw(true);
+ scan.readAllVersions();
LOGGER.info("***** Table Name : " + tableName);
ResultScanner scanner = hTable.getScanner(scan);
+ // This is the default format of to_char(timestamp)
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
for (Result result = scanner.next(); result != null; result = scanner.next()) {
for (Cell cell : result.rawCells()) {
String cellString = cell.toString();
- LOGGER.info(cellString + " ****** value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
+ LOGGER.info(cellString + " ****** timestamp : " +
+ sdf.format(new Date(cell.getTimestamp())) + " ****** value : " +
+ Bytes.toStringBinary(CellUtil.cloneValue(cell)));
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
index 520c6d9ad1..2d0429281f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -260,7 +260,7 @@ public class CreateIndexCompiler {
return new BaseMutationPlan(context, operation) {
@Override
public MutationState execute() throws SQLException {
- return client.createIndex(create, splits, null);
+ return client.createIndex(create, splits);
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
index 1985d0f822..90fb9cb192 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
@@ -84,8 +84,12 @@ public class IndexStatementRewriter extends ParseNodeRewriter {
* @return new select statement or the same one if nothing was rewritten.
* @throws SQLException
*/
- public static SelectStatement translate(SelectStatement statement, ColumnResolver dataResolver, Map<TableRef, TableRef> multiTableRewriteMap) throws SQLException {
- return rewrite(statement, new IndexStatementRewriter(dataResolver, multiTableRewriteMap, false));
+ public static SelectStatement translate(SelectStatement statement,
+ ColumnResolver dataResolver,
+ Map<TableRef, TableRef> multiTableRewriteMap)
+ throws SQLException {
+ return rewrite(statement, new IndexStatementRewriter(dataResolver, multiTableRewriteMap,
+ false));
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 9981e31219..abd8626f89 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -28,9 +28,12 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.phoenix.expression.function.PhoenixRowTimestampFunction;
import org.apache.phoenix.parse.HintNode;
-import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.TerminalParseNode;
import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
@@ -240,8 +243,41 @@ public class QueryCompiler {
return plan;
}
+ private QueryPlan getExistingDataPlanForCDC() {
+ if (dataPlans != null) {
+ for (QueryPlan plan : dataPlans.values()) {
+ if (plan.getTableRef().getTable().getType() == PTableType.CDC) {
+ return plan;
+ }
+ }
+ }
+ return null;
+ }
+
public QueryPlan compileSelect(SelectStatement select) throws SQLException{
StatementContext context = new StatementContext(statement, resolver, bindManager, scan, sequenceManager);
+ QueryPlan dataPlanForCDC = getExistingDataPlanForCDC();
+ if (dataPlanForCDC != null) {
+ TableRef cdcTableRef = dataPlanForCDC.getTableRef();
+ PTable cdcTable = cdcTableRef.getTable();
+ NamedTableNode cdcDataTableName = NODE_FACTORY.namedTable(null,
+ NODE_FACTORY.table(cdcTable.getSchemaName().getString(),
+ cdcTable.getParentTableName().getString()),
+ select.getTableSamplingRate());
+ ColumnResolver dataTableResolver = FromCompiler.getResolver(cdcDataTableName,
+ statement.getConnection());
+ TableRef cdcDataTableRef = dataTableResolver.getTables().get(0);
+ Set<PTable.CDCChangeScope> cdcIncludeScopes =
+ cdcTable.getCDCIncludeScopes();
+ String cdcHint = select.getHint().getHint(Hint.CDC_INCLUDE);
+ if (cdcHint != null && cdcHint.startsWith(HintNode.PREFIX)) {
+ cdcIncludeScopes = CDCUtil.makeChangeScopeEnumsFromString(cdcHint.substring(1,
+ cdcHint.length() - 1));
+ }
+ context.setCDCDataTableRef(cdcDataTableRef);
+ context.setCDCTableRef(cdcTableRef);
+ context.setCDCIncludeScopes(cdcIncludeScopes);
+ }
if (select.isJoin()) {
JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver());
return compileJoinQuery(context, joinTable, false, false, null);
@@ -702,20 +738,44 @@ public class QueryCompiler {
if (projectedTable != null) {
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), select.getUdfParseNodes()));
}
-
- if (context.getCurrentTable().getTable().getType() == PTableType.CDC) {
- // This will get the data column added to the context so that projection can get
- // serialized..
- context.getDataColumnPosition(
- context.getCurrentTable().getTable().getColumnForColumnName(
- QueryConstants.CDC_JSON_COL_NAME));
- }
}
ColumnResolver resolver = context.getResolver();
TableRef tableRef = context.getCurrentTable();
PTable table = tableRef.getTable();
+ if (table.getType() == PTableType.CDC) {
+ List<AliasedNode> selectNodes = select.getSelect();
+ // For CDC queries, if a single wildcard projection is used, automatically insert
+ // PHOENIX_ROW_TIMESTAMP() as a project at the beginning.
+ ParseNode selectNode = selectNodes.size() == 1 ? selectNodes.get(0).getNode() : null;
+ if (selectNode instanceof TerminalParseNode
+ && ((TerminalParseNode) selectNode).isWildcardNode()) {
+ List<AliasedNode> tmpSelectNodes = Lists.newArrayListWithExpectedSize(
+ selectNodes.size() + 1);
+ tmpSelectNodes.add(NODE_FACTORY.aliasedNode(null,
+ NODE_FACTORY.function(PhoenixRowTimestampFunction.NAME,
+ Collections.emptyList())));
+ tmpSelectNodes.add(NODE_FACTORY.aliasedNode(null,
+ ((TerminalParseNode) selectNode).getRewritten()));
+ selectNodes = tmpSelectNodes;
+ }
+ List<OrderByNode> orderByNodes = select.getOrderBy();
+ // For CDC queries, if no ORDER BY is specified, add default ordering.
+ if (orderByNodes.size() == 0) {
+ orderByNodes = Lists.newArrayListWithExpectedSize(1);
+ orderByNodes.add(NODE_FACTORY.orderBy(
+ NODE_FACTORY.function(PhoenixRowTimestampFunction.NAME,
+ Collections.emptyList()),
+ false, SortOrder.getDefault() == SortOrder.ASC));
+ }
+ select = NODE_FACTORY.select(select.getFrom(),
+ select.getHint(), select.isDistinct(), selectNodes, select.getWhere(),
+ select.getGroupBy(), select.getHaving(), orderByNodes, select.getLimit(),
+ select.getOffset(), select.getBindCount(), select.isAggregate(),
+ select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
+ }
+
ParseNode viewWhere = null;
if (table.getViewStatement() != null) {
viewWhere = new SQLParser(table.getViewStatement()).parseQuery().getWhere();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index c12cd62d56..5d2ec718e0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -85,7 +85,9 @@ public class StatementContext {
private boolean isClientSideUpsertSelect;
private boolean isUncoveredIndex;
private String cdcIncludeScopes;
-
+ private TableRef cdcTableRef;
+ private TableRef cdcDataTableRef;
+
public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
}
@@ -386,4 +388,20 @@ public class StatementContext {
public void setCDCIncludeScopes(Set<PTable.CDCChangeScope> cdcIncludeScopes) {
this.cdcIncludeScopes = CDCUtil.makeChangeScopeStringFromEnums(cdcIncludeScopes);
}
+
+ public TableRef getCDCDataTableRef() {
+ return cdcDataTableRef;
+ }
+
+ public void setCDCDataTableRef(TableRef cdcDataTableRef) {
+ this.cdcDataTableRef = cdcDataTableRef;
+ }
+
+ public TableRef getCDCTableRef() {
+ return cdcTableRef;
+ }
+
+ public void setCDCTableRef(TableRef cdcTableRef) {
+ this.cdcTableRef = cdcTableRef;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index faa940a5e7..c99672e624 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -16,8 +16,6 @@
* limitations under the License.
*/
package org.apache.phoenix.compile;
-import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
-import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
@@ -57,7 +55,6 @@ import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
@@ -92,8 +89,10 @@ public class TupleProjectionCompiler {
if (node instanceof WildcardParseNode) {
if (((WildcardParseNode) node).isRewrite()) {
TableRef parentTableRef = FromCompiler.getResolver(
- NODE_FACTORY.namedTable(null, TableName.create(table.getSchemaName().getString(),
- table.getParentTableName().getString())), context.getConnection()).resolveTable(
+ NODE_FACTORY.namedTable(null,
+ TableName.create(table.getSchemaName().getString(),
+ table.getParentTableName().getString())),
+ context.getConnection()).resolveTable(
table.getSchemaName().getString(),
table.getParentTableName().getString());
for (PColumn column : parentTableRef.getTable().getColumns()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index d76046d3b4..18cfebdff3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -150,11 +150,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName";
public static final String INDEX_ROW_KEY = "_IndexRowKey";
public static final String READ_REPAIR_TRANSFORMING_TABLE = "_ReadRepairTransformingTable";
- public static final String CDC_DATA_TABLE_NAME = "_CdcDataTableName";
- public static final String CDC_JSON_COL_QUALIFIER = "_CdcJsonColumn_Qualifier";
- public static final String CDC_INCLUDE_SCOPES = "_CdcIncludeScopes";
- public static final String DATA_COL_QUALIFIER_TO_NAME_MAP = "_DataColQualToNameMap";
- public static final String DATA_COL_QUALIFIER_TO_TYPE_MAP = "_DataColQualToTypeMap";
+ public static final String CDC_DATA_TABLE_DEF = "_CdcDataTableDef";
public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);
@@ -237,7 +233,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
Exception cause = new StaleRegionBoundaryCacheException(region.getRegionInfo().getTable().getNameAsString());
throw new DoNotRetryIOException(cause.getMessage(), cause);
}
- if(isLocalIndex) {
+ if (isLocalIndex) {
ScanUtil.setupLocalIndexScan(scan);
}
}
@@ -414,7 +410,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
// If the exception is NotServingRegionException then throw it as
// StaleRegionBoundaryCacheException to handle it by phoenix client other wise hbase
// client may recreate scans with wrong region boundaries.
- if(t instanceof NotServingRegionException) {
+ if (t instanceof NotServingRegionException) {
Exception cause = new StaleRegionBoundaryCacheException(c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
throw new DoNotRetryIOException(cause.getMessage(), cause);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 178bb1d705..f9f5b4a4a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -18,63 +18,50 @@
package org.apache.phoenix.coprocessor;
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.generated.CDCInfoProtos;
import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.SingleCellColumnExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.CDCTableInfo;
import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.CDCChangeBuilder;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.sql.Types;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_NAME_MAP;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_TYPE_MAP;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_DEF;
+import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScanner {
private static final Logger LOGGER =
LoggerFactory.getLogger(CDCGlobalIndexRegionScanner.class);
-
- private Map<ImmutableBytesPtr, String> dataColQualNameMap;
- private Map<ImmutableBytesPtr, PDataType> dataColQualTypeMap;
- // Map<dataRowKey: Map<TS: Map<qualifier: Cell>>>
- private Map<ImmutableBytesPtr, Map<Long, Map<ImmutableBytesPtr, Cell>>> dataRowChanges =
- new HashMap<>();
+ private CDCTableInfo cdcDataTableInfo;
+ private CDCChangeBuilder changeBuilder;
public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner,
final Region region,
@@ -90,130 +77,152 @@ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScann
super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer,
viewConstants, ptr, pageSizeMs, queryLimit);
CDCUtil.initForRawScan(dataTableScan);
- dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
- dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
- scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+ cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+ .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+ changeBuilder = new CDCChangeBuilder(cdcDataTableInfo);
}
@Override
protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException {
+ //TODO: Get Timerange from the start row and end row of the index scan object
+ // and set it in the datatable scan object.
+// if (scan.getStartRow().length == 8) {
+// startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+// scan.getStartRow(), 0, SortOrder.getDefault());
+// }
+// if (scan.getStopRow().length == 8) {
+// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+// scan.getStopRow(), 0, SortOrder.getDefault());
+// }
return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
}
protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException {
if (indexRowIterator.hasNext()) {
List<Cell> indexRow = indexRowIterator.next();
- for (Cell c: indexRow) {
- if (c.getType() == Cell.Type.Put) {
- result.add(c);
- }
- }
+ // firstCell: Picking the earliest cell in the index row so that
+ // timestamp of the cell and the row will be same.
+ Cell firstIndexCell = indexRow.get(indexRow.size() - 1);
+ byte[] indexRowKey = ImmutableBytesPtr.cloneCellRowIfNecessary(firstIndexCell);
+ ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+ indexToDataRowKeyMap.get(indexRowKey));
+ Result dataRow = dataRows.get(dataRowKey);
+ Long changeTS = firstIndexCell.getTimestamp();
+ TupleProjector dataTableProjector = cdcDataTableInfo.getDataTableProjector();
+ Expression[] expressions = dataTableProjector != null ?
+ dataTableProjector.getExpressions() : null;
+ boolean isSingleCell = dataTableProjector != null;
+ byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(
+ cdcDataTableInfo.getQualifierEncodingScheme()).getFirst();
+ changeBuilder.initChange(changeTS);
try {
- Result dataRow = null;
- if (! result.isEmpty()) {
- Cell firstCell = result.get(0);
- byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
- firstCell.getRowOffset(), firstCell.getRowLength())
- .copyBytesIfNecessary();
- ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
- indexToDataRowKeyMap.get(indexRowKey));
- dataRow = dataRows.get(dataRowKey);
- Long indexRowTs = result.get(0).getTimestamp();
- Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get(
- dataRowKey);
- if (changeTimeline == null) {
- List<Cell> resultCells = Arrays.asList(dataRow.rawCells());
- Collections.sort(resultCells, CellComparator.getInstance().reversed());
- List<Cell> deleteMarkers = new ArrayList<>();
- List<List<Cell>> columns = new LinkedList<>();
- Cell currentColumnCell = null;
- Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo(
- EncodedColumnsUtil.getQualifierEncodingScheme(scan));
- List<Cell> currentColumn = null;
- Set<Long> uniqueTimeStamps = new HashSet<>();
- // TODO: From CompactionScanner.formColumns(), see if this can be refactored.
- for (Cell cell : resultCells) {
- uniqueTimeStamps.add(cell.getTimestamp());
- if (cell.getType() != Cell.Type.Put) {
- deleteMarkers.add(cell);
+ if (dataRow != null) {
+ int curColumnNum = 0;
+ List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+ this.cdcDataTableInfo.getColumnInfoList();
+ cellLoop:
+ for (Cell cell : dataRow.rawCells()) {
+ if (! changeBuilder.isChangeRelevant(cell)) {
+ continue;
+ }
+ byte[] cellFam = ImmutableBytesPtr.cloneCellFamilyIfNecessary(cell);
+ byte[] cellQual = ImmutableBytesPtr.cloneCellQualifierIfNecessary(cell);
+ if (cell.getType() == Cell.Type.DeleteFamily) {
+ if (changeTS == cell.getTimestamp()) {
+ changeBuilder.markAsDeletionEvent();
+ } else if (changeTS > cell.getTimestamp()
+ && changeBuilder.getLastDeletedTimestamp() == 0L) {
+ // Cells with timestamp less than the lowerBoundTsForPreImage
+ // can not be part of the PreImage as there is a Delete Family
+ // marker after that.
+ changeBuilder.setLastDeletedTimestamp(cell.getTimestamp());
}
- if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
- emptyKV.getFirst())) {
+ } else if ((cell.getType() == Cell.Type.DeleteColumn
+ || cell.getType() == Cell.Type.Put)
+ && !Arrays.equals(cellQual, emptyCQ)) {
+ if (! changeBuilder.isChangeRelevant(cell)) {
+ // We don't need to build the change image, just skip it.
continue;
}
- if (currentColumnCell == null) {
- currentColumn = new LinkedList<>();
- currentColumnCell = cell;
- currentColumn.add(cell);
- } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) {
- columns.add(currentColumn);
- currentColumn = new LinkedList<>();
- currentColumnCell = cell;
- currentColumn.add(cell);
- } else {
- currentColumn.add(cell);
+ // In this case, cell is the row, meaning we loop over rows..
+ if (isSingleCell) {
+ while (curColumnNum < cdcColumnInfoList.size()) {
+ boolean hasValue = dataTableProjector.getSchema().extractValue(
+ cell, (SingleCellColumnExpression)
+ expressions[curColumnNum], ptr);
+ if (hasValue) {
+ Object cellValue = getColumnValue(ptr.get(),
+ ptr.getOffset(), ptr.getLength(),
+ cdcColumnInfoList.get(curColumnNum).getColumnType());
+ changeBuilder.registerChange(cell, curColumnNum, cellValue);
+ }
+ ++curColumnNum;
+ }
+ break cellLoop;
}
- }
- if (currentColumn != null) {
- columns.add(currentColumn);
- }
- List<Long> sortedTimestamps = uniqueTimeStamps.stream().sorted().collect(
- Collectors.toList());
- // FIXME: Does this need to be Concurrent?
- Map<ImmutableBytesPtr, Cell> rollingRow = new HashMap<>();
- int[] columnPointers = new int[columns.size()];
- changeTimeline = new TreeMap<>();
- dataRowChanges.put(dataRowKey, changeTimeline);
- for (Long ts : sortedTimestamps) {
- for (int i = 0; i < columns.size(); ++i) {
- Cell cell = columns.get(i).get(columnPointers[i]);
- if (cell.getTimestamp() == ts) {
- rollingRow.put(new ImmutableBytesPtr(
- cell.getQualifierArray(),
- cell.getQualifierOffset(),
- cell.getQualifierLength()),
- cell);
- ++columnPointers[i];
+ while (true) {
+ CDCTableInfo.CDCColumnInfo currentColumnInfo =
+ cdcColumnInfoList.get(curColumnNum);
+ int columnComparisonResult = CDCUtil.compareCellFamilyAndQualifier(
+ cellFam, cellQual,
+ currentColumnInfo.getColumnFamily(),
+ currentColumnInfo.getColumnQualifier());
+ if (columnComparisonResult > 0) {
+ if (++curColumnNum >= cdcColumnInfoList.size()) {
+ // Have no more column definitions, so the rest of the cells
+ // must be for dropped columns and so can be ignored.
+ break cellLoop;
+ }
+ // Continue looking for the right column definition
+ // for this cell.
+ continue;
+ } else if (columnComparisonResult < 0) {
+ // We didn't find a column definition for this cell, ignore the
+ // current cell but continue working on the rest of the cells.
+ continue cellLoop;
}
+
+ // else, found the column definition.
+ Object cellValue = cell.getType() == Cell.Type.DeleteColumn ? null
+ : getColumnValue(cell, cdcColumnInfoList.get(curColumnNum)
+ .getColumnType());
+ changeBuilder.registerChange(cell, curColumnNum, cellValue);
+ // Done processing the current cell, check the next cell.
+ break;
}
- Map<ImmutableBytesPtr, Cell> rowOfCells = new HashMap();
- rowOfCells.putAll(rollingRow);
- changeTimeline.put(ts, rowOfCells);
}
}
-
- Map<ImmutableBytesPtr, Cell> mapOfCells = changeTimeline.get(indexRowTs);
- if (mapOfCells != null) {
- Map <String, Object> rowValueMap = new HashMap<>(mapOfCells.size());
- for (Map.Entry<ImmutableBytesPtr, Cell> entry: mapOfCells.entrySet()) {
- String colName = dataColQualNameMap.get(entry.getKey());
- Object colVal = dataColQualTypeMap.get(entry.getKey()).toObject(
- entry.getValue().getValueArray());
- rowValueMap.put(colName, colVal);
+ if (changeBuilder.isNonEmptyEvent()) {
+ Result cdcRow = getCDCImage(indexRowKey, firstIndexCell);
+ if (cdcRow != null && tupleProjector != null) {
+ if (firstIndexCell.getType() == Cell.Type.DeleteFamily) {
+ // result is of type EncodedColumnQualiferCellsList for queries with
+ // Order by clause. It fails when Delete Family cell is added to it
+ // as it expects column qualifier bytes which is not available.
+ // Adding empty PUT cell as a placeholder.
+ result.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+ .setRow(indexRowKey)
+ .setFamily(ImmutableBytesPtr.cloneCellFamilyIfNecessary(
+ firstIndexCell))
+ .setQualifier(indexMaintainer.getEmptyKeyValueQualifier())
+ .setTimestamp(firstIndexCell.getTimestamp())
+ .setType(Cell.Type.Put)
+ .setValue(EMPTY_BYTE_ARRAY).build());
+ } else {
+ result.add(firstIndexCell);
+ }
+ IndexUtil.addTupleAsOneCell(result, new ResultTuple(cdcRow),
+ tupleProjector, ptr);
+ } else {
+ result.clear();
}
- byte[] value =
- new Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8);
- CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
- ImmutableBytesPtr family = new ImmutableBytesPtr(firstCell.getFamilyArray(),
- firstCell.getFamilyOffset(), firstCell.getFamilyLength());
- dataRow = Result.create(Arrays.asList(builder.
- setRow(dataRowKey.copyBytesIfNecessary()).
- setFamily(family.copyBytesIfNecessary()).
- setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))).
- setTimestamp(firstCell.getTimestamp()).
- setValue(value).
- setType(Cell.Type.Put).
- build()));
+ } else {
+ result.clear();
}
- }
- if (dataRow != null && tupleProjector != null) {
- IndexUtil.addTupleAsOneCell(result, new ResultTuple(dataRow),
- tupleProjector, ptr);
- }
- else {
+ } else {
result.clear();
}
+
return true;
} catch (Throwable e) {
LOGGER.error("Exception in UncoveredIndexRegionScanner for region "
@@ -223,4 +232,44 @@ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScann
}
return false;
}
+
+ private Result getCDCImage(byte[] indexRowKey, Cell firstCell) {
+ Gson gson = new GsonBuilder().serializeNulls().create();
+ byte[] value = gson.toJson(changeBuilder.buildCDCEvent()).getBytes(StandardCharsets.UTF_8);
+ CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+ Result cdcRow = Result.create(Arrays.asList(builder
+ .setRow(indexRowKey)
+ .setFamily(ImmutableBytesPtr.cloneCellFamilyIfNecessary(firstCell))
+ .setQualifier(cdcDataTableInfo.getCdcJsonColQualBytes())
+ .setTimestamp(changeBuilder.getChangeTimestamp())
+ .setValue(value)
+ .setType(Cell.Type.Put)
+ .build()));
+ return cdcRow;
+ }
+
+ private Object getColumnValue(Cell cell, PDataType dataType) {
+ return getColumnValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ dataType);
+ }
+
+ private Object getColumnValue(byte[] cellValue, int offset, int length, PDataType dataType) {
+ if (dataType.getSqlType() == Types.BINARY) {
+ // Unfortunately, Base64.Encoder has no option to specify offset and length so can't
+ // avoid copying bytes.
+ return Base64.getEncoder().encodeToString(
+ ImmutableBytesPtr.copyBytesIfNecessary(cellValue, offset, length));
+ } else {
+ Object value = dataType.toObject(cellValue, offset, length);
+ if (dataType.getSqlType() == Types.DATE
+ || dataType.getSqlType() == Types.TIMESTAMP
+ || dataType.getSqlType() == Types.TIME
+ || dataType.getSqlType() == Types.TIME_WITH_TIMEZONE
+ || dataType.getSqlType() == Types.TIMESTAMP_WITH_TIMEZONE) {
+ value = value.toString();
+ }
+ return value;
+ }
+ }
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 874b6669c1..cbe79cef1a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -1433,7 +1433,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
return indexMutations.size();
}
- static boolean adjustScanFilter(Scan scan) {
+ public static boolean adjustScanFilter(Scan scan) {
// For rebuilds we use count (*) as query for regular tables which ends up setting the FirstKeyOnlyFilter on scan
// This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col
// For rebuilds we need all columns and all versions
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index d010c33dff..a302f84e19 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -59,8 +59,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index ca0259203a..7987a616a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -22,7 +22,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.sql.ParameterMetaData;
import java.sql.SQLException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -48,10 +47,8 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.compile.WhereCompiler;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ProjectedColumnExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
@@ -77,10 +74,8 @@ import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
import org.apache.phoenix.trace.TracingIterator;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
@@ -316,7 +311,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
ScanUtil.setCustomAnnotations(scan,
customAnnotations == null ? null : customAnnotations.getBytes());
// Set index related scan attributes.
- if (table.getType() == PTableType.INDEX || table.getType() == PTableType.CDC) {
+ if (table.getType() == PTableType.INDEX) {
if (table.getIndexType() == IndexType.LOCAL) {
ScanUtil.setLocalIndex(scan);
} else if (context.isUncoveredIndex()) {
@@ -334,8 +329,8 @@ public abstract class BaseQueryPlan implements QueryPlan {
// Set data columns to be join back from data table.
PTable parentTable = context.getCurrentTable().getTable();
String parentSchemaName = parentTable.getParentSchemaName().getString();
- if (parentTable.getType() == PTableType.CDC) {
- dataTable = parentTable;
+ if (context.getCDCTableRef() != null) {
+ dataTable = context.getCDCTableRef().getTable();
}
else {
String parentTableName = parentTable.getParentTableName().getString();
@@ -386,7 +381,9 @@ public abstract class BaseQueryPlan implements QueryPlan {
PName name = context.getCurrentTable().getTable().getName();
List<PTable> indexes = Lists.newArrayListWithExpectedSize(1);
for (PTable index : dataTable.getIndexes()) {
- if (index.getName().equals(name) && index.getIndexType() == IndexType.LOCAL) {
+ if (index.getName().equals(name) && (
+ index.getIndexType() == IndexType.LOCAL
+ || dataTable.getType() == PTableType.CDC)) {
indexes.add(index);
break;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 1cf23d7e0f..91503cf863 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -51,6 +51,7 @@ import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.Immutable;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
@@ -68,7 +69,10 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.hbase.index.AbstractValueGetter;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.exception.IndexWriteException;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
@@ -112,6 +116,7 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
@@ -595,6 +600,61 @@ public class MutationState implements SQLCloseable {
return ptr;
}
+ private List<Mutation> getCDCDeleteMutations(PTable table, PTable index,
+ Long mutationTimestamp,
+ List<Mutation> mutationList) throws
+ SQLException {
+ final ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+ IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
+ List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(mutationList.size());
+ for (final Mutation mutation : mutationList) {
+ // Only generate extra row mutations for DELETE
+ if (mutation instanceof Delete) {
+ ptr.set(mutation.getRow());
+ ValueGetter getter = new AbstractValueGetter() {
+ @Override
+ public byte[] getRowKey() {
+ return mutation.getRow();
+ }
+ @Override
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) {
+ // Always return null for our empty key value, as this will cause the index
+ // maintainer to always treat this Put as a new row.
+ if (IndexUtil.isEmptyKeyValue(table, ref)) {
+ return null;
+ }
+ byte[] family = ref.getFamily();
+ byte[] qualifier = ref.getQualifier();
+ Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
+ List<Cell> kvs = familyMap.get(family);
+ if (kvs == null) {
+ return null;
+ }
+ for (Cell kv : kvs) {
+ if (Bytes.compareTo(kv.getFamilyArray(), kv.getFamilyOffset(),
+ kv.getFamilyLength(), family, 0, family.length) == 0
+ && Bytes.compareTo(kv.getQualifierArray(),
+ kv.getQualifierOffset(), kv.getQualifierLength(),
+ qualifier, 0, qualifier.length) == 0) {
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+ connection.getKeyValueBuilder().getValueAsPtr(kv, ptr);
+ return ptr;
+ }
+ }
+ return null;
+ }
+ };
+ ImmutableBytesPtr key = new ImmutableBytesPtr(maintainer.buildRowKey(
+ getter, ptr, null, null, mutationTimestamp));
+ PRow row = table.newRow(
+ connection.getKeyValueBuilder(), mutationTimestamp, key, false);
+ row.delete();
+ indexMutations.addAll(row.toRowMutations());
+ }
+ }
+ return indexMutations;
+ }
+
private Iterator<Pair<PTable, List<Mutation>>> addRowMutations(final TableRef tableRef,
final MultiRowMutationState values, final long mutationTimestamp, final long serverTimestamp,
boolean includeAllIndexes, final boolean sendAll) {
@@ -628,6 +688,7 @@ public class MutationState implements SQLCloseable {
List<Mutation> indexMutations = null;
try {
+
if (!mutationsPertainingToIndex.isEmpty()) {
if (table.isTransactional()) {
if (indexMutationsMap == null) {
@@ -671,6 +732,19 @@ public class MutationState implements SQLCloseable {
}
}
}
+
+ if (CDCUtil.isCDCIndex(index)) {
+ List<Mutation> cdcMutations = getCDCDeleteMutations(
+ table, index, mutationTimestamp, mutationList);
+ if (cdcMutations.size() > 0) {
+ if (indexMutations == null) {
+ indexMutations = cdcMutations;
+ } else {
+ indexMutations.addAll(cdcMutations);
+ }
+ }
+ }
+
} catch (SQLException | IOException e) {
throw new IllegalDataException(e);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
index 302d5e2ffb..de15a8b1be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
@@ -139,7 +139,7 @@ public class TupleProjector {
* @param projector projector to serialize
* @return byte array
*/
- private static byte[] serializeProjectorIntoBytes(TupleProjector projector) {
+ public static byte[] serializeProjectorIntoBytes(TupleProjector projector) {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
DataOutputStream output = new DataOutputStream(stream);
@@ -172,7 +172,7 @@ public class TupleProjector {
* @param proj byte array to deserialize
* @return projector
*/
- private static TupleProjector deserializeProjectorFromBytes(byte[] proj) {
+ public static TupleProjector deserializeProjectorFromBytes(byte[] proj) {
if (proj == null) {
return null;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/SingleCellColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/SingleCellColumnExpression.java
index 2c152978e8..115e4e6d6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/SingleCellColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/SingleCellColumnExpression.java
@@ -88,23 +88,20 @@ public class SingleCellColumnExpression extends KeyValueColumnExpression {
public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
if (!super.evaluate(tuple, ptr)) {
return false;
- } else if (ptr.getLength() == 0) {
- return true;
}
- // the first position is reserved and we offset maxEncodedColumnQualifier by
- // ENCODED_CQ_COUNTER_INITIAL_VALUE (which is the minimum encoded column qualifier)
- int index = decodedColumnQualifier - QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE + 1;
- // Given a ptr to the entire array, set ptr to point to a particular element
- // within that array
- ColumnValueDecoder encoderDecoder = immutableStorageScheme.getDecoder();
- return encoderDecoder.decode(ptr, index);
+ return evaluate(ptr);
}
@Override
public boolean evaluateUnsafe(Tuple tuple, ImmutableBytesWritable ptr) {
if (!super.evaluateUnsafe(tuple, ptr)) {
return false;
- } else if (ptr.getLength() == 0) {
+ }
+ return evaluate(ptr);
+ }
+
+ public boolean evaluate(ImmutableBytesWritable ptr) {
+ if (ptr.getLength() == 0) {
return true;
}
// the first position is reserved and we offset maxEncodedColumnQualifier by
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java
index 9825c77bab..946fa2e861 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java
@@ -19,7 +19,9 @@ package org.apache.phoenix.hbase.index.util;
import java.io.DataInput;
import java.io.IOException;
+import java.util.Arrays;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -102,10 +104,35 @@ public class ImmutableBytesPtr extends ImmutableBytesWritable {
return copyBytesIfNecessary(this);
}
- public static byte[] copyBytesIfNecessary(ImmutableBytesWritable ptr) {
- if (ptr.getOffset() == 0 && ptr.getLength() == ptr.get().length) {
- return ptr.get();
+ public static byte[] copyBytesIfNecessary(ImmutableBytesWritable ptr) {
+ return copyBytesIfNecessary(ptr.get(), ptr.getOffset(), ptr.getLength());
+ }
+
+ public static byte[] copyBytesIfNecessary(byte[] bytes, int offset, int length) {
+ if (offset == 0 && length == bytes.length) {
+ return bytes;
+ }
+ return Arrays.copyOfRange(bytes, offset, offset + length);
+ }
+
+ public static byte[] cloneCellRowIfNecessary(Cell cell) {
+ return copyBytesIfNecessary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+ }
+
+ public static byte[] cloneCellFamilyIfNecessary(Cell cell) {
+ return copyBytesIfNecessary(cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength());
+ }
+
+ public static byte[] cloneCellQualifierIfNecessary(Cell cell) {
+ return ImmutableBytesPtr.copyBytesIfNecessary(
+ cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength());
+ }
+
+ public static byte[] cloneCellValueIfNecessary(Cell cell) {
+ return ImmutableBytesPtr.copyBytesIfNecessary(
+ cell.getValueArray(), cell.getValueOffset(),
+ cell.getValueLength());
}
- return ptr.copyBytes();
- }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/CDCTableInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/index/CDCTableInfo.java
new file mode 100644
index 0000000000..4d80f3e9d2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/CDCTableInfo.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.index;
+
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.TupleProjectionCompiler;
+import org.apache.phoenix.coprocessor.generated.CDCInfoProtos;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
+import static org.apache.phoenix.query.QueryConstants.NAME_SEPARATOR;
+
+
+/**
+ * CDC Table Def Class
+ */
+public class CDCTableInfo {
+ private List<CDCColumnInfo> columnInfoList;
+ private byte[] defaultColumnFamily;
+ private final Set<PTable.CDCChangeScope> includeScopes;
+ private PTable.QualifierEncodingScheme qualifierEncodingScheme;
+ private final byte[] cdcJsonColQualBytes;
+ private final TupleProjector dataTableProjector;
+
+ private CDCTableInfo(List<CDCColumnInfo> columnInfoList,
+ Set<PTable.CDCChangeScope> includeScopes, byte[] cdcJsonColQualBytes,
+ TupleProjector dataTableProjector) {
+ Collections.sort(columnInfoList);
+ this.columnInfoList = columnInfoList;
+ this.includeScopes = includeScopes;
+ this.cdcJsonColQualBytes = cdcJsonColQualBytes;
+ this.dataTableProjector = dataTableProjector;
+ }
+
+ public CDCTableInfo(byte[] defaultColumnFamily, List<CDCColumnInfo> columnInfoList,
+ Set<PTable.CDCChangeScope> includeScopes,
+ PTable.QualifierEncodingScheme qualifierEncodingScheme,
+ byte[] cdcJsonColQualBytes, TupleProjector dataTableProjector) {
+ this(columnInfoList, includeScopes, cdcJsonColQualBytes, dataTableProjector);
+ this.defaultColumnFamily = defaultColumnFamily;
+ this.qualifierEncodingScheme = qualifierEncodingScheme;
+ }
+
+ public List<CDCColumnInfo> getColumnInfoList() {
+ return columnInfoList;
+ }
+
+ public byte[] getDefaultColumnFamily() {
+ return defaultColumnFamily;
+ }
+
+ public PTable.QualifierEncodingScheme getQualifierEncodingScheme() {
+ return qualifierEncodingScheme;
+ }
+
+ public Set<PTable.CDCChangeScope> getIncludeScopes() {
+ return includeScopes;
+ }
+
+ public byte[] getCdcJsonColQualBytes() {
+ return cdcJsonColQualBytes;
+ }
+
+ public TupleProjector getDataTableProjector() {
+ return dataTableProjector;
+ }
+
+ public static CDCTableInfo createFromProto(CDCInfoProtos.CDCTableDef table) {
+ byte[] defaultColumnFamily = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+ if (table.hasDefaultFamilyName()) {
+ defaultColumnFamily = table.getDefaultFamilyName().toByteArray();
+ }
+ // For backward compatibility. Clients older than 4.10 will always have
+ // non-encoded qualifiers.
+ PTable.QualifierEncodingScheme qualifierEncodingScheme
+ = PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+ if (table.hasQualifierEncodingScheme()) {
+ qualifierEncodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue(
+ table.getQualifierEncodingScheme().toByteArray()[0]);
+ }
+ List<CDCColumnInfo> columns = Lists.newArrayListWithExpectedSize(table.getColumnsCount());
+ for (CDCInfoProtos.CDCColumnDef curColumnProto : table.getColumnsList()) {
+ columns.add(CDCColumnInfo.createFromProto(curColumnProto));
+ }
+ String includeScopesStr = table.getCdcIncludeScopes();
+ Set<PTable.CDCChangeScope> changeScopeSet;
+ try {
+ changeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(includeScopesStr);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ TupleProjector dataTableProjector = null;
+ if (table.hasDataTableProjectorBytes()) {
+ dataTableProjector = TupleProjector.deserializeProjectorFromBytes(
+ table.getDataTableProjectorBytes().toByteArray());
+ }
+ return new CDCTableInfo(defaultColumnFamily, columns, changeScopeSet,
+ qualifierEncodingScheme, table.getCdcJsonColQualBytes().toByteArray(),
+ dataTableProjector);
+ }
+
+ public static CDCInfoProtos.CDCTableDef toProto(StatementContext context)
+ throws SQLException {
+ PTable cdcTable = context.getCDCTableRef().getTable();
+ PTable dataTable = context.getCDCDataTableRef().getTable();
+ CDCInfoProtos.CDCTableDef.Builder builder = CDCInfoProtos.CDCTableDef.newBuilder();
+ if (dataTable.getDefaultFamilyName() != null) {
+ builder.setDefaultFamilyName(
+ ByteStringer.wrap(dataTable.getDefaultFamilyName().getBytes()));
+ }
+ String cdcIncludeScopes = context.getEncodedCdcIncludeScopes();
+ if (cdcIncludeScopes != null) {
+ builder.setCdcIncludeScopes(cdcIncludeScopes);
+ }
+ if (dataTable.getEncodingScheme() != null) {
+ builder.setQualifierEncodingScheme(ByteStringer.wrap(
+ new byte[] { dataTable.getEncodingScheme().getSerializedMetadataValue() }));
+ }
+ for (PColumn column : dataTable.getColumns()) {
+ if (column.getFamilyName() == null) {
+ continue;
+ }
+ builder.addColumns(CDCColumnInfo.toProto(column));
+ }
+ PColumn cdcJsonCol = cdcTable.getColumnForColumnName(CDC_JSON_COL_NAME);
+ builder.setCdcJsonColQualBytes(ByteStringer.wrap(cdcJsonCol.getColumnQualifierBytes()));
+
+ TableRef cdcDataTableRef = context.getCDCDataTableRef();
+ if (cdcDataTableRef.getTable().isImmutableRows() &&
+ cdcDataTableRef.getTable().getImmutableStorageScheme() ==
+ PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+
+ List<ColumnRef> dataColumns = new ArrayList<ColumnRef>();
+ PTable table = cdcDataTableRef.getTable();
+ for (PColumn column : table.getColumns()) {
+ if (!SchemaUtil.isPKColumn(column)) {
+ dataColumns.add(new ColumnRef(cdcDataTableRef, column.getPosition()));
+ }
+ }
+
+ PTable projectedDataTable = TupleProjectionCompiler.createProjectedTable(
+ cdcDataTableRef, dataColumns, false);;
+ TupleProjector dataTableProjector = new TupleProjector(projectedDataTable);
+ builder.setDataTableProjectorBytes(ByteStringer.wrap(
+ TupleProjector.serializeProjectorIntoBytes(dataTableProjector)));
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * CDC Column Def Class
+ */
+ public static class CDCColumnInfo implements Comparable<CDCColumnInfo> {
+
+ private final byte[] columnFamily;
+ private final byte[] columnQualifier;
+ private final String columnName;
+ private final PDataType columnType;
+ private final String columnFamilyName;
+ private String columnDisplayName;
+
+ public CDCColumnInfo(byte[] columnFamily, byte[] columnQualifier,
+ String columnName, PDataType columnType,
+ String columnFamilyName) {
+ this.columnFamily = columnFamily;
+ this.columnQualifier = columnQualifier;
+ this.columnName = columnName;
+ this.columnType = columnType;
+ this.columnFamilyName = columnFamilyName;
+ }
+
+ public byte[] getColumnFamily() {
+ return columnFamily;
+ }
+
+ public byte[] getColumnQualifier() {
+ return columnQualifier;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public PDataType getColumnType() {
+ return columnType;
+ }
+
+ public String getColumnFamilyName() {
+ return columnFamilyName;
+ }
+
+ @Override
+ public int compareTo(CDCColumnInfo columnInfo) {
+ return CDCUtil.compareCellFamilyAndQualifier(this.getColumnFamily(),
+ this.getColumnQualifier(),
+ columnInfo.getColumnFamily(),
+ columnInfo.getColumnQualifier());
+ }
+
+ public static CDCColumnInfo createFromProto(CDCInfoProtos.CDCColumnDef column) {
+ String columnName = column.getColumnName();
+ byte[] familyNameBytes = column.getFamilyNameBytes().toByteArray();
+ PDataType dataType = PDataType.fromSqlTypeName(column.getDataType());
+ byte[] columnQualifierBytes = column.getColumnQualifierBytes().toByteArray();
+ String columnFamilyName = StandardCharsets.UTF_8
+ .decode(ByteBuffer.wrap(familyNameBytes)).toString();
+ return new CDCColumnInfo(familyNameBytes,
+ columnQualifierBytes, columnName, dataType, columnFamilyName);
+ }
+
+ public static CDCInfoProtos.CDCColumnDef toProto(PColumn column) {
+ CDCInfoProtos.CDCColumnDef.Builder builder = CDCInfoProtos.CDCColumnDef.newBuilder();
+ builder.setColumnName(column.getName().toString());
+ if (column.getFamilyName() != null) {
+ builder.setFamilyNameBytes(ByteStringer.wrap(column.getFamilyName().getBytes()));
+ }
+ if (column.getDataType() != null) {
+ builder.setDataType(column.getDataType().getSqlTypeName());
+ }
+ if (column.getColumnQualifierBytes() != null) {
+ builder.setColumnQualifierBytes(
+ ByteStringer.wrap(column.getColumnQualifierBytes()));
+ }
+ return builder.build();
+ }
+
+ public String getColumnDisplayName(CDCTableInfo tableInfo) {
+ if (columnDisplayName == null) {
+ // Don't include Column Family if it is a default column Family
+ if (Arrays.equals(getColumnFamily(), tableInfo.getDefaultColumnFamily())) {
+ columnDisplayName = getColumnName();
+ } else {
+ columnDisplayName = getColumnFamilyName()
+ + NAME_SEPARATOR + getColumnName();
+ }
+ }
+ return columnDisplayName;
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 86f49208f4..0ff50c82fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -144,13 +144,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
private static final int EXPRESSION_NOT_PRESENT = -1;
private static final int ESTIMATED_EXPRESSION_SIZE = 8;
-
+
public static IndexMaintainer create(PTable dataTable, PTable index,
+ PhoenixConnection connection) throws SQLException {
+ return create(dataTable, null, index, connection);
+ }
+
+ public static IndexMaintainer create(PTable dataTable, PTable cdcTable, PTable index,
PhoenixConnection connection) throws SQLException {
if (dataTable.getType() == PTableType.INDEX || index.getType() != PTableType.INDEX || !dataTable.getIndexes().contains(index)) {
throw new IllegalArgumentException();
}
- IndexMaintainer maintainer = new IndexMaintainer(dataTable, index, connection);
+ IndexMaintainer maintainer = new IndexMaintainer(dataTable, cdcTable, index, connection);
return maintainer;
}
@@ -444,8 +449,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
this.dataRowKeySchema = dataRowKeySchema;
this.isDataTableSalted = isDataTableSalted;
}
-
+
private IndexMaintainer(final PTable dataTable, final PTable index,
+ PhoenixConnection connection) throws SQLException {
+ this(dataTable, null, index, connection);
+ }
+
+ private IndexMaintainer(final PTable dataTable, final PTable cdcTable, final PTable index,
PhoenixConnection connection) throws SQLException {
this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null);
this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable();
@@ -455,7 +465,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
this.isUncovered = index.getIndexType() == IndexType.UNCOVERED_GLOBAL;
this.encodingScheme = index.getEncodingScheme();
-
+ this.isCDCIndex = CDCUtil.isCDCIndex(index);
+
// null check for b/w compatibility
this.encodingScheme = index.getEncodingScheme() == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : index.getEncodingScheme();
this.immutableStorageScheme = index.getImmutableStorageScheme() == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : index.getImmutableStorageScheme();
@@ -499,7 +510,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
this.indexDataColumnCount = dataPKColumns.size();
PTable parentTable = dataTable;
// We need to get the PK column for the table on which the index is created
- if (!dataTable.getName().equals(index.getParentName())) {
+ if (!dataTable.getName().equals(cdcTable != null
+ ? cdcTable.getParentName() : index.getParentName())) {
try {
String tenantId = (index.getTenantId() != null) ?
index.getTenantId().getString() : null;
@@ -676,7 +688,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
this.indexWhere = index.getIndexWhereExpression(connection);
this.indexWhereColumns = index.getIndexWhereColumns(connection);
}
- this.isCDCIndex = CDCUtil.isCDCIndex(index);
initCachedState();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index a536c6d016..189f7196c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -18,7 +18,7 @@
package org.apache.phoenix.iterate;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_DEF;
import static org.apache.phoenix.coprocessor.ScanRegionObserver.WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
@@ -191,7 +191,7 @@ public abstract class RegionScannerFactory {
dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
pageSizeMs, offset, actualStartKey, extraLimit);
} else {
- if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
+ if (scan.getAttribute(CDC_DATA_TABLE_DEF) != null) {
s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
pageSizeMs, extraLimit);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 48a5734149..e1716fd20c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -19,7 +19,6 @@
package org.apache.phoenix.optimize;
import java.sql.SQLException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -69,6 +68,7 @@ import org.apache.phoenix.schema.PDatum;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
import org.apache.phoenix.schema.TableRef;
@@ -82,6 +82,8 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
+import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
+
public class QueryOptimizer {
private static final ParseNodeFactory FACTORY = new ParseNodeFactory();
@@ -219,16 +221,36 @@ public class QueryOptimizer {
return Collections.<QueryPlan> singletonList(dataPlan);
}
+ ColumnResolver indexResolver = null;
+ boolean forCDC = false;
PTable table = dataPlan.getTableRef().getTable();
if (table.getType() == PTableType.CDC) {
- Set<PTable.CDCChangeScope> cdcIncludeScopes = table.getCDCIncludeScopes();
- String cdcHint = select.getHint().getHint(Hint.CDC_INCLUDE);
- if (cdcHint != null && cdcHint.startsWith(HintNode.PREFIX)) {
- cdcIncludeScopes = CDCUtil.makeChangeScopeEnumsFromString(cdcHint.substring(1,
- cdcHint.length() - 1));
+ NamedTableNode indexTableNode = FACTORY.namedTable(null,
+ FACTORY.table(table.getSchemaName().getString(),
+ CDCUtil.getCDCIndexName(table.getTableName().getString())),
+ select.getTableSamplingRate());
+ indexResolver = FromCompiler.getResolver(indexTableNode,
+ statement.getConnection());
+ TableRef indexTableRef = indexResolver.getTables().get(0);
+ PTable cdcIndex = indexTableRef.getTable();
+ PTableImpl.Builder indexBuilder = PTableImpl.builderFromExisting(cdcIndex);
+ List<PColumn> idxColumns = cdcIndex.getColumns();
+ if (cdcIndex.getBucketNum() != null) {
+ // If salted, it will get added by the builder, so avoid duplication.
+ idxColumns = idxColumns.subList(1, idxColumns.size());
}
- dataPlan.getContext().setCDCIncludeScopes(cdcIncludeScopes);
- return Arrays.asList(dataPlan);
+ indexBuilder.setColumns(idxColumns);
+ indexBuilder.setParentName(table.getName());
+ indexBuilder.setParentTableName(table.getTableName());
+ cdcIndex = indexBuilder.build();
+ indexTableRef.setTable(cdcIndex);
+
+ PTableImpl.Builder cdcBuilder = PTableImpl.builderFromExisting(table);
+ cdcBuilder.setColumns(table.getColumns());
+ cdcBuilder.setIndexes(Collections.singletonList(cdcIndex));
+ table = cdcBuilder.build();
+ dataPlan.getTableRef().setTable(table);
+ forCDC = true;
}
List<PTable>indexes = Lists.newArrayList(dataPlan.getTableRef().getTable().getIndexes());
@@ -251,21 +273,29 @@ public class QueryOptimizer {
targetColumns = targetDatums;
}
- List<QueryPlan> plans = Lists.newArrayListWithExpectedSize(1 + indexes.size());
- SelectStatement translatedIndexSelect = IndexStatementRewriter.translate(select, FromCompiler.getResolver(dataPlan.getTableRef()));
- plans.add(dataPlan);
- QueryPlan hintedPlan = getHintedQueryPlan(statement, translatedIndexSelect, indexes, targetColumns, parallelIteratorFactory, plans);
- if (hintedPlan != null) {
- PTable index = hintedPlan.getTableRef().getTable();
- if (stopAtBestPlan && hintedPlan.isApplicable() && (index.getIndexWhere() == null
- || isPartialIndexUsable(select, dataPlan, index))) {
- return Collections.singletonList(hintedPlan);
+ List<QueryPlan> plans = Lists.newArrayListWithExpectedSize((forCDC ? 0 : 1)
+ + indexes.size());
+ SelectStatement translatedIndexSelect = IndexStatementRewriter.translate(
+ select, FromCompiler.getResolver(dataPlan.getTableRef()));
+ QueryPlan hintedPlan = null;
+ // We can't have hints work with CDC queries so skip looking for hinted plans.
+ if (! forCDC) {
+ plans.add(dataPlan);
+ hintedPlan = getHintedQueryPlan(statement, translatedIndexSelect, indexes,
+ targetColumns, parallelIteratorFactory, plans);
+ if (hintedPlan != null) {
+ PTable index = hintedPlan.getTableRef().getTable();
+ if (stopAtBestPlan && hintedPlan.isApplicable() && (index.getIndexWhere() == null
+ || isPartialIndexUsable(select, dataPlan, index))) {
+ return Collections.singletonList(hintedPlan);
+ }
+ plans.add(0, hintedPlan);
}
- plans.add(0, hintedPlan);
}
for (PTable index : indexes) {
- QueryPlan plan = addPlan(statement, translatedIndexSelect, index, targetColumns, parallelIteratorFactory, dataPlan, false);
+ QueryPlan plan = addPlan(statement, translatedIndexSelect, index, targetColumns,
+ parallelIteratorFactory, dataPlan, false, indexResolver);
if (plan != null &&
(index.getIndexWhere() == null
|| isPartialIndexUsable(select, dataPlan, index))) {
@@ -330,7 +360,8 @@ public class QueryOptimizer {
// Hinted index is applicable, so return it's index
PTable index = indexes.get(indexPos);
indexes.remove(indexPos);
- QueryPlan plan = addPlan(statement, select, index, targetColumns, parallelIteratorFactory, dataPlan, true);
+ QueryPlan plan = addPlan(statement, select, index, targetColumns,
+ parallelIteratorFactory, dataPlan, true, null);
if (plan != null) {
return plan;
}
@@ -350,17 +381,33 @@ public class QueryOptimizer {
return -1;
}
- private QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan, boolean isHinted) throws SQLException {
- int nColumns = dataPlan.getProjector().getColumnCount();
+ private QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index,
+ List<? extends PDatum> targetColumns,
+ ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan,
+ boolean isHinted, ColumnResolver indexResolver)
+ throws SQLException {
String tableAlias = dataPlan.getTableRef().getTableAlias();
- String alias = tableAlias==null ? null : '"' + tableAlias + '"'; // double quote in case it's case sensitive
+ String alias = tableAlias == null ? null
+ : '"' + tableAlias + '"'; // double quote in case it's case sensitive
String schemaName = index.getParentSchemaName().getString();
- schemaName = schemaName.length() == 0 ? null : '"' + schemaName + '"';
+ schemaName = schemaName.length() == 0 ? null : '"' + schemaName + '"';
String tableName = '"' + index.getTableName().getString() + '"';
- TableNode table = FACTORY.namedTable(alias, FACTORY.table(schemaName, tableName),select.getTableSamplingRate());
+ TableNode table = FACTORY.namedTable(alias, FACTORY.table(schemaName, tableName),
+ select.getTableSamplingRate());
SelectStatement indexSelect = FACTORY.select(select, table);
- ColumnResolver resolver = FromCompiler.getResolverForQuery(indexSelect, statement.getConnection());
+ ColumnResolver resolver = indexResolver != null ? indexResolver
+ : FromCompiler.getResolverForQuery(indexSelect, statement.getConnection());
+ return addPlan(statement, select, index, targetColumns, parallelIteratorFactory, dataPlan,
+ isHinted, indexSelect, resolver);
+ }
+
+ private QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index,
+ List<? extends PDatum> targetColumns,
+ ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan,
+ boolean isHinted, SelectStatement indexSelect,
+ ColumnResolver resolver) throws SQLException {
+ int nColumns = dataPlan.getProjector().getColumnCount();
// We will or will not do tuple projection according to the data plan.
boolean isProjected = dataPlan.getContext().getResolver().getTables().get(0).getTable().getType() == PTableType.PROJECTED;
// Check index state of now potentially updated index table to make sure it's active
@@ -384,7 +431,8 @@ public class QueryOptimizer {
: index.getTableName().getString();
throw new ColumnNotFoundException(schemaNameStr, tableNameStr, null, "*");
}
- // translate nodes that match expressions that are indexed to the associated column parse node
+ // translate nodes that match expressions that are indexed to the
+ // associated column parse node
SelectStatement rewrittenIndexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(index, null, statement.getConnection(), indexSelect.getUdfParseNodes()));
QueryCompiler compiler = new QueryCompiler(statement, rewrittenIndexSelect, resolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, true, dataPlans);
@@ -396,30 +444,40 @@ public class QueryOptimizer {
plan.getContext().setUncoveredIndex(true);
PhoenixConnection connection = statement.getConnection();
IndexMaintainer maintainer;
- PTable dataTable;
+ PTable newIndexTable;
+ String dataTableName;
if (indexTable.getViewIndexId() != null
&& indexTable.getName().getString().contains(
- QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
+ QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
// MetaDataClient modifies the index table name for view indexes if the
// parent view of an index has a child view. We need to recreate a PTable
// object with the correct table name to get the index maintainer
int lastIndexOf = indexTable.getName().getString().lastIndexOf(
QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
String indexName = indexTable.getName().getString().substring(lastIndexOf + 1);
- PTable newIndexTable = PhoenixRuntime.getTable(connection, indexName);
- dataTable = PhoenixRuntime.getTable(connection, SchemaUtil.getTableName(
+ newIndexTable = PhoenixRuntime.getTable(connection, indexName);
+ dataTableName = SchemaUtil.getTableName(
newIndexTable.getParentSchemaName().getString(),
- indexTable.getParentTableName().getString()));
- maintainer = newIndexTable.getIndexMaintainer(dataTable,
- statement.getConnection());
+ indexTable.getParentTableName().getString());
} else {
- dataTable = PhoenixRuntime.getTable(connection,
- SchemaUtil.getTableName(indexTable.getParentSchemaName().getString(),
- indexTable.getParentTableName().getString()));
- maintainer = indexTable.getIndexMaintainer(dataTable, connection);
+ newIndexTable = indexTable;
+ dataTableName = SchemaUtil.getTableName(
+ indexTable.getParentSchemaName().getString(),
+ indexTable.getParentTableName().getString());
}
+ PTable dataTableFromDataPlan = dataPlan.getTableRef().getTable();
+ PTable cdcTable = null;
+ if (dataTableFromDataPlan.getType() == PTableType.CDC) {
+ cdcTable = dataTableFromDataPlan;
+ dataTableName = SchemaUtil.getTableName(
+ indexTable.getParentSchemaName().getString(),
+ dataTableFromDataPlan.getParentTableName().getString());
+ }
+ PTable dataTable = PhoenixRuntime.getTable(connection, dataTableName);
+ maintainer = newIndexTable.getIndexMaintainer(dataTable, cdcTable, connection);
Set<org.apache.hadoop.hbase.util.Pair<String, String>> indexedColumns =
maintainer.getIndexedColumnInfo();
+ // TODO: Why is PHOENIX_ROW_TIMESTAMP() not showing up?
for (org.apache.hadoop.hbase.util.Pair<String, String> pair : indexedColumns) {
// The first member of the pair is the column family. For the data table PK columns, the column
// family is set to null. The data PK columns should not be added to the set of data columns
@@ -430,6 +488,11 @@ public class QueryOptimizer {
plan.getContext().getDataColumnPosition(pColumn);
}
}
+ if (dataTableFromDataPlan.getType() == PTableType.CDC) {
+ PColumn cdcJsonCol = dataTableFromDataPlan.getColumnForColumnName(
+ CDC_JSON_COL_NAME);
+ plan.getContext().getDataColumnPosition(cdcJsonCol);
+ }
}
indexTableRef = plan.getTableRef();
indexTable = indexTableRef.getTable();
@@ -727,7 +790,8 @@ public class QueryOptimizer {
return select;
}
- SelectStatement indexSelect = IndexStatementRewriter.translate(FACTORY.select(select, newFrom), resolver, replacement);
+ SelectStatement indexSelect = IndexStatementRewriter.translate(FACTORY.select(select,
+ newFrom), resolver, replacement);
for (TableRef indexTableRef : replacement.values()) {
// replace expressions with corresponding matching columns for functional indexes
indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(indexTableRef.getTable(), indexTableRef.getTableAlias(), connection, indexSelect.getUdfParseNodes()));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/FamilyWildcardParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/FamilyWildcardParseNode.java
index 80a08bfc9a..59f0954288 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/FamilyWildcardParseNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/FamilyWildcardParseNode.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.parse;
import java.sql.SQLException;
+import org.apache.hadoop.hbase.backup.example.HFileArchiveTableMonitor;
import org.apache.phoenix.compile.ColumnResolver;
/**
@@ -79,5 +80,14 @@ public class FamilyWildcardParseNode extends NamedParseNode {
toSQL(buf);
buf.append(".*");
}
-}
+ @Override
+ public boolean isWildcardNode() {
+ return true;
+ }
+
+ @Override
+ public FamilyWildcardParseNode getRewritten() {
+ return new FamilyWildcardParseNode(getName(), true);
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
index bfb5782d8f..a8a7fd3820 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
@@ -18,22 +18,23 @@
package org.apache.phoenix.parse;
import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.expression.function.PhoenixRowTimestampFunction;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.ColumnRef;
-import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
*
@@ -74,12 +75,14 @@ public class ParseNodeRewriter extends TraverseAllParseNodeVisitor<ParseNode> {
new ParseNodeRewriter(columnResolver, selectStament.getSelect().size());
return ParseNodeRewriter.rewrite(selectStament, parseNodeRewriter);
}
+
/**
* Rewrite the select statement by switching any constants to the right hand side
* of the expression.
+ *
* @param statement the select statement
* @return new select statement
- * @throws SQLException
+ * @throws SQLException
*/
public static SelectStatement rewrite(SelectStatement statement, ParseNodeRewriter rewriter) throws SQLException {
Map<String,ParseNode> aliasMap = rewriter.getAliasMap();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/TableWildcardParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/TableWildcardParseNode.java
index 7c7f4160fd..3ff5972fb0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/TableWildcardParseNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/TableWildcardParseNode.java
@@ -82,5 +82,15 @@ public class TableWildcardParseNode extends NamedParseNode {
toSQL(buf);
buf.append(".*");
}
+
+ @Override
+ public boolean isWildcardNode() {
+ return true;
+ }
+
+ @Override
+ public TableWildcardParseNode getRewritten() {
+ return new TableWildcardParseNode(tableName, true);
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java
index 6c2679b18b..78224e68d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java
@@ -32,4 +32,12 @@ public abstract class TerminalParseNode extends ParseNode {
public final List<ParseNode> getChildren() {
return Collections.emptyList();
}
+
+ public boolean isWildcardNode() {
+ return false;
+ }
+
+ public TerminalParseNode getRewritten() {
+ return null;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java
index 9922c3f669..70cdbd2892 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/WildcardParseNode.java
@@ -77,6 +77,15 @@ public class WildcardParseNode extends TerminalParseNode {
buf.append(' ');
buf.append(NAME);
buf.append(' ');
- }
-
+ }
+
+ @Override
+ public boolean isWildcardNode() {
+ return true;
+ }
+
+ @Override
+ public WildcardParseNode getRewritten() {
+ return REWRITE_INSTANCE;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index bf9352caad..06557803ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -776,7 +776,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW));
return locations;
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
- throw new TableNotFoundException(table.getNameAsString());
+ TableNotFoundException ex = new TableNotFoundException(table.getNameAsString());
+ e.initCause(ex);
+ throw ex;
} catch (IOException e) {
LOGGER.error("Exception encountered in getAllTableRegions for "
+ "table: {}, retryCount: {}", table.getNameAsString(), retryCount, e);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index ba9f3904a2..975631aa17 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -307,6 +307,12 @@ public interface QueryConstants {
byte VIEW_MODIFIED_PROPERTY_TAG_TYPE = (byte) 70;
String CDC_JSON_COL_NAME = "CDC JSON";
+ String CDC_EVENT_TYPE = "event_type";
+ String CDC_PRE_IMAGE = "pre_image";
+ String CDC_POST_IMAGE = "post_image";
+ String CDC_CHANGE_IMAGE = "change_image";
+ String CDC_UPSERT_EVENT_TYPE = "upsert";
+ String CDC_DELETE_EVENT_TYPE = "delete";
/**
* We mark counter values 0 to 10 as reserved. Value 0 is used by
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index d1ad19fce2..8f0ff72879 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -211,6 +211,12 @@ public class DelegateTable implements PTable {
return delegate.getIndexMaintainer(dataTable, connection);
}
+ @Override
+ public IndexMaintainer getIndexMaintainer(PTable dataTable, PTable cdcTable,
+ PhoenixConnection connection) throws SQLException {
+ return delegate.getIndexMaintainer(dataTable, cdcTable, connection);
+ }
+
@Override
public TransformMaintainer getTransformMaintainer(PTable oldTable, PhoenixConnection connection) {
return delegate.getTransformMaintainer(oldTable, connection);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/KeyValueSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/KeyValueSchema.java
index 10906916f5..a8d6c7dce1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/KeyValueSchema.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/KeyValueSchema.java
@@ -21,11 +21,13 @@ import java.util.List;
import net.jcip.annotations.Immutable;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.SingleCellColumnExpression;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.ByteUtil;
@@ -143,6 +145,30 @@ public class KeyValueSchema extends ValueSchema {
}
}
+ /**
+ * Extract value out of a cell encoded with {@link
+ * org.apache.phoenix.schema.PTable.ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS}
+ *
+ * @param cell The cell, exepected to have an encoded value.
+ * @param expression The expression
+ * @param ptr The pointer in which the extracted value can be found, if successful.
+ * @return {@code true} on success.
+ */
+ public boolean extractValue(Cell cell, SingleCellColumnExpression expression,
+ ImmutableBytesWritable ptr) {
+ ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ List<Field> fields = getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ Field field = fields.get(i);
+ for (int j = 0; j < field.getCount(); j++) {
+ if (expression.evaluate(ptr) && ptr.getLength() > 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
private int getVarLengthBytes(int length) {
return length + WritableUtils.getVIntSize(length);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 9717430a11..5b3172a682 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NA
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
import static org.apache.phoenix.query.QueryServices.INDEX_CREATE_DEFAULT_STATE;
+import static org.apache.phoenix.schema.PTableType.CDC;
import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSet;
import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSetWithExpectedSize;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB;
@@ -1435,11 +1436,10 @@ public class MetaDataClient {
* listed as an index column.
* @param statement
* @param splits
- * @param indexPKExpresionsType If non-{@code null}, all PK expressions should be of this specific type.
* @return MutationState from population of index table from data table
* @throws SQLException
*/
- public MutationState createIndex(CreateIndexStatement statement, byte[][] splits, PDataType indexPKExpresionsType) throws SQLException {
+ public MutationState createIndex(CreateIndexStatement statement, byte[][] splits) throws SQLException {
IndexKeyConstraint ik = statement.getIndexConstraint();
TableName indexTableName = statement.getIndexTableName();
@@ -1554,9 +1554,6 @@ public class MetaDataClient {
if (expression.isStateless()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException();
}
- if (indexPKExpresionsType != null && expression.getDataType() != indexPKExpresionsType) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION).build().buildException();
- }
unusedPkColumns.remove(expression);
// Go through parse node to get string as otherwise we
@@ -1741,29 +1738,27 @@ public class MetaDataClient {
statement.getProps().size() + 1);
populatePropertyMaps(statement.getProps(), tableProps, commonFamilyProps, PTableType.CDC);
- NamedNode indexName = FACTORY.indexName(CDCUtil.getCDCIndexName(
- statement.getCdcObjName().getName()));
- IndexKeyConstraint indexKeyConstraint =
- FACTORY.indexKey(Arrays.asList(new Pair[]{Pair.newPair(
- FACTORY.function(PhoenixRowTimestampFunction.NAME, Collections.emptyList()),
- SortOrder.getDefault())}));
IndexType indexType = (IndexType) TableProperty.INDEX_TYPE.getValue(tableProps);
- ListMultimap<String, Pair<String, Object>> indexProps = ArrayListMultimap.create();
- if (TableProperty.SALT_BUCKETS.getValue(tableProps) != null) {
- indexProps.put(QueryConstants.ALL_FAMILY_PROPERTIES_KEY, new Pair<>(
- TableProperty.SALT_BUCKETS.getPropertyName(),
- TableProperty.SALT_BUCKETS.getValue(tableProps)));
- }
- CreateIndexStatement indexStatement = FACTORY.createIndex(indexName, FACTORY.namedTable(null,
- statement.getDataTable(), (Double) null), indexKeyConstraint, null, null,
- indexProps, statement.isIfNotExists(), indexType, true, 0,
- new HashMap<>(), null);
- MutationState indexMutationState;
+ PhoenixStatement pstmt = new PhoenixStatement(connection);
+ String dataTableFullName = SchemaUtil.getTableName(statement.getDataTable().getSchemaName(),
+ statement.getDataTable().getTableName());
+ String createIndexSql = "CREATE " +
+ (indexType == IndexType.LOCAL ? "LOCAL " : "UNCOVERED ") +
+ "INDEX " + (statement.isIfNotExists() ? "IF NOT EXISTS " : "") +
+ "\"" + CDCUtil.getCDCIndexName(statement.getCdcObjName().getName()) + "\"" +
+ " ON " + dataTableFullName + " (" + PhoenixRowTimestampFunction.NAME + "()) ASYNC";
+ List<String> indexProps = new ArrayList<>();
+ Object saltBucketNum = TableProperty.SALT_BUCKETS.getValue(tableProps);
+ if (saltBucketNum != null) {
+ indexProps.add("SALT_BUCKETS=" + saltBucketNum);
+ }
+ Object columnEncodedBytes = TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
+ if (columnEncodedBytes != null) {
+ indexProps.add("COLUMN_ENCODED_BYTES=" + columnEncodedBytes);
+ }
+ createIndexSql = createIndexSql + " " + String.join(", ", indexProps);
try {
- // TODO: Should we also allow PTimestamp here, in fact PTimestamp is the right type,
- // but we are forced to support PDate because of incorrect type for
- // PHOENIX_ROW_TIMESTAMP (see PHOENIX-6807)?
- indexMutationState = createIndex(indexStatement, null, PDate.INSTANCE);
+ pstmt.execute(createIndexSql);
} catch (SQLException e) {
if (e.getErrorCode() == TABLE_ALREADY_EXIST.getErrorCode()) {
throw new SQLExceptionInfo.Builder(TABLE_ALREADY_EXIST).setTableName(
@@ -1776,12 +1771,9 @@ public class MetaDataClient {
List<PColumn> pkColumns = dataTable.getPKColumns();
List<ColumnDef> columnDefs = new ArrayList<>();
List<ColumnDefInPkConstraint> pkColumnDefs = new ArrayList<>();
- ColumnName timeIdxCol = FACTORY.columnName(PhoenixRowTimestampFunction.NAME + "()");
- columnDefs.add(FACTORY.columnDef(timeIdxCol, PDate.INSTANCE.getSqlTypeName(), false, null, false,
- PDate.INSTANCE.getMaxLength(null), PDate.INSTANCE.getScale(null), false,
- SortOrder.getDefault(), "", null, false));
- pkColumnDefs.add(FACTORY.columnDefInPkConstraint(timeIdxCol, SortOrder.getDefault(), false));
- for (PColumn pcol : pkColumns) {
+ int pkOffset = dataTable.getBucketNum() != null ? 1 : 0;
+ for (int i = pkOffset; i < pkColumns.size(); ++i) {
+ PColumn pcol = pkColumns.get(i);
columnDefs.add(FACTORY.columnDef(FACTORY.columnName(pcol.getName().getString()),
pcol.getDataType().getSqlTypeName(), false, null, false, pcol.getMaxLength(),
pcol.getScale(), false, pcol.getSortOrder(), "", null, false));
@@ -1791,15 +1783,24 @@ public class MetaDataClient {
columnDefs.add(FACTORY.columnDef(FACTORY.columnName(QueryConstants.CDC_JSON_COL_NAME),
PVarchar.INSTANCE.getSqlTypeName(), false, null, true, null,
null, false, SortOrder.getDefault(), "", null, false));
+ tableProps = new HashMap<>();
+ if (dataTable.getImmutableStorageScheme() == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ // CDC table doesn't need SINGLE_CELL_ARRAY_WITH_OFFSETS encoding, so override it.
+ tableProps.put(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName(),
+ ONE_CELL_PER_COLUMN.name());
+ }
+ if (dataTable.isMultiTenant()) {
+ tableProps.put(TableProperty.MULTI_TENANT.getPropertyName(), Boolean.TRUE);
+ }
CreateTableStatement tableStatement = FACTORY.createTable(
FACTORY.table(dataTable.getSchemaName().getString(), statement.getCdcObjName().getName()),
- statement.getProps(), columnDefs, FACTORY.primaryKey(null, pkColumnDefs),
+ null, columnDefs, FACTORY.primaryKey(null, pkColumnDefs),
Collections.emptyList(), PTableType.CDC, statement.isIfNotExists(), null, null,
statement.getBindCount(), null);
createTableInternal(tableStatement, null, dataTable, null, null, null,
null, null, false, null,
null, statement.getIncludeScopes(), tableProps, commonFamilyProps);
- return indexMutationState;
+ return new MutationState(0, 0, connection);
}
/**
@@ -2024,7 +2025,7 @@ public class MetaDataClient {
}
return false;
}
-
+
/**
* While adding or dropping columns we write a cell to the SYSTEM.MUTEX table with the rowkey of the
* physical table to prevent conflicting concurrent modifications. For eg two client adding a column
@@ -2306,7 +2307,7 @@ public class MetaDataClient {
}
// Can't set any of these on views or shared indexes on views
- if (tableType != PTableType.VIEW && !allocateIndexId) {
+ if (tableType != PTableType.VIEW && tableType != PTableType.CDC && !allocateIndexId) {
saltBucketNum = (Integer) TableProperty.SALT_BUCKETS.getValue(tableProps);
if (saltBucketNum != null) {
if (saltBucketNum < 0 || saltBucketNum > SaltingUtil.MAX_BUCKET_NUM) {
@@ -2412,8 +2413,8 @@ public class MetaDataClient {
.setSchemaName(schemaName).setTableName(tableName)
.build().buildException();
}
- if (TableProperty.TTL.getValue(commonFamilyProps) != null
- && transactionProvider != null
+ if (TableProperty.TTL.getValue(commonFamilyProps) != null
+ && transactionProvider != null
&& transactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.SET_TTL)) {
throw new SQLExceptionInfo.Builder(PhoenixTransactionProvider.Feature.SET_TTL.getCode())
.setMessage(transactionProvider.name())
@@ -2543,7 +2544,7 @@ public class MetaDataClient {
}
pkColumns = newLinkedHashSet(parent.getPKColumns());
- // Add row linking view to its parent
+ // Add row linking view to its parent
try (PreparedStatement linkStatement = connection.prepareStatement(CREATE_VIEW_LINK)) {
linkStatement.setString(1, tenantIdStr);
linkStatement.setString(2, schemaName);
@@ -2569,7 +2570,20 @@ public class MetaDataClient {
columns = new LinkedHashMap<PColumn,PColumn>(colDefs.size());
pkColumns = newLinkedHashSetWithExpectedSize(colDefs.size() + 1); // in case salted
}
-
+
+ if (tableType == PTableType.CDC) {
+ if (parent.getType() == VIEW) {
+ physicalNames = Collections.singletonList(
+ PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(
+ parent.getBaseTableLogicalName(), isNamespaceMapped)));
+ }
+ else {
+ physicalNames = Collections.singletonList(
+ PNameFactory.newName(SchemaUtil.getTableName(schemaName,
+ CDCUtil.getCDCIndexName(tableName))));
+ }
+ }
+
// Don't add link for mapped view, as it just points back to itself and causes the drop to
// fail because it looks like there's always a view associated with it.
if (!physicalNames.isEmpty()) {
@@ -2630,7 +2644,7 @@ public class MetaDataClient {
/*
* We can't control what column qualifiers are used in HTable mapped to Phoenix views. So we are not
* able to encode column names.
- */
+ */
if (viewType != MAPPED) {
/*
* For regular phoenix views, use the storage scheme of the physical table since they all share the
@@ -2648,14 +2662,14 @@ public class MetaDataClient {
// System tables have hard-coded column qualifiers. So we can't use column encoding for them.
else if (!SchemaUtil.isSystemTable(Bytes.toBytes(SchemaUtil.getTableName(schemaName, tableName)))|| SchemaUtil.isLogTable(schemaName, tableName)) {
/*
- * Indexes inherit the storage scheme of the parent data tables. Otherwise, we always attempt to
- * create tables with encoded column names.
- *
- * Also of note is the case with shared indexes i.e. local indexes and view indexes. In these cases,
- * column qualifiers for covered columns don't have to be unique because rows of the logical indexes are
+ * Indexes inherit the storage scheme of the parent data tables. Otherwise, we always attempt to
+ * create tables with encoded column names.
+ *
+ * Also of note is the case with shared indexes i.e. local indexes and view indexes. In these cases,
+ * column qualifiers for covered columns don't have to be unique because rows of the logical indexes are
* partitioned by the virtue of indexId present in the row key. As such, different shared indexes can use
* potentially overlapping column qualifiers.
- *
+ *
*/
if (parent != null) {
Byte encodingSchemeSerializedByte = (Byte) TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps);
@@ -2689,7 +2703,9 @@ public class MetaDataClient {
}
}
- if (parent.getImmutableStorageScheme() == SINGLE_CELL_ARRAY_WITH_OFFSETS && immutableStorageScheme == ONE_CELL_PER_COLUMN) {
+ if (tableType != CDC &&
+ parent.getImmutableStorageScheme() == SINGLE_CELL_ARRAY_WITH_OFFSETS &&
+ immutableStorageScheme == ONE_CELL_PER_COLUMN) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.INVALID_IMMUTABLE_STORAGE_SCHEME_CHANGE)
.setSchemaName(schemaName).setTableName(tableName).build()
@@ -2903,7 +2919,7 @@ public class MetaDataClient {
column.getFamilyName());
}
}
-
+
// We need a PK definition for a TABLE or mapped VIEW
if (!wasPKDefined && pkColumnsNames.isEmpty() && tableType != PTableType.VIEW && viewType != ViewType.MAPPED) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_MISSING)
@@ -2995,7 +3011,7 @@ public class MetaDataClient {
.build();
connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
}
-
+
// Update column qualifier counters
if (EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme)) {
// Store the encoded column counter for phoenix entities that have their own hbase
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index c64de2ef16..ec80113b2c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -870,11 +870,12 @@ public interface PTable extends PMetaDataEntity {
PName getPhysicalName(boolean returnColValueFromSyscat);
boolean isImmutableRows();
-
boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection)
throws SQLException;
IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection)
throws SQLException;
+ IndexMaintainer getIndexMaintainer(PTable dataTable, PTable cdcTable,
+ PhoenixConnection connection) throws SQLException;
TransformMaintainer getTransformMaintainer(PTable oldTable, PhoenixConnection connection);
PName getDefaultFamilyName();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 0b3beb885b..6e6fbc6d00 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -1763,9 +1763,16 @@ public class PTableImpl implements PTable {
@Override
public synchronized IndexMaintainer getIndexMaintainer(PTable dataTable,
+ PhoenixConnection connection)
+ throws SQLException {
+ return getIndexMaintainer(dataTable, null, connection);
+ }
+
+ @Override
+ public synchronized IndexMaintainer getIndexMaintainer(PTable dataTable, PTable cdcTable,
PhoenixConnection connection) throws SQLException {
if (indexMaintainer == null) {
- indexMaintainer = IndexMaintainer.create(dataTable, this, connection);
+ indexMaintainer = IndexMaintainer.create(dataTable, cdcTable, this, connection);
}
return indexMaintainer;
}
@@ -1795,9 +1802,7 @@ public class PTableImpl implements PTable {
return SchemaUtil.getPhysicalHBaseTableName(schemaName,
physicalTableNameColumnInSyscat, isNamespaceMapped);
}
- return SchemaUtil.getPhysicalHBaseTableName(schemaName, getType() == PTableType.CDC ?
- PNameFactory.newName(CDCUtil.getCDCIndexName(tableName.getString())) :
- tableName, isNamespaceMapped);
+ return SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, isNamespaceMapped);
} else {
return PNameFactory.newName(physicalNames.get(0).getBytes());
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
index 60633877d4..cfa8fe2bfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
@@ -41,9 +41,6 @@ public class SaltingUtil {
public static final PColumnImpl SALTING_COLUMN = new PColumnImpl(
PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false, null,
HConstants.LATEST_TIMESTAMP);
- public static final RowKeySchema VAR_BINARY_SALTED_SCHEMA = new RowKeySchemaBuilder(2)
- .addField(SALTING_COLUMN, false, SortOrder.getDefault())
- .addField(SchemaUtil.VAR_BINARY_DATUM, false, SortOrder.getDefault()).build();
public static List<KeyRange> generateAllSaltingRanges(int bucketNum) {
List<KeyRange> allRanges = Lists.newArrayListWithExpectedSize(bucketNum);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
new file mode 100644
index 0000000000..e947ed7003
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.phoenix.index.CDCTableInfo;
+import org.apache.phoenix.schema.PTable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.phoenix.query.QueryConstants.CDC_CHANGE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
+
+public class CDCChangeBuilder {
+ private final boolean isChangeImageInScope;
+ private final boolean isPreImageInScope;
+ private final boolean isPostImageInScope;
+ private final CDCTableInfo cdcDataTableInfo;
+ private String changeType;
+ private long lastDeletedTimestamp;
+ private long changeTimestamp;
+ private Map<String, Object> preImage = null;
+ private Map<String, Object> changeImage = null;
+
+ public CDCChangeBuilder(CDCTableInfo cdcDataTableInfo) {
+ this.cdcDataTableInfo = cdcDataTableInfo;
+ Set<PTable.CDCChangeScope> changeScopes = cdcDataTableInfo.getIncludeScopes();
+ isChangeImageInScope = changeScopes.contains(PTable.CDCChangeScope.CHANGE);
+ isPreImageInScope = changeScopes.contains(PTable.CDCChangeScope.PRE);
+ isPostImageInScope = changeScopes.contains(PTable.CDCChangeScope.POST);
+ }
+
+ public void initChange(long ts) {
+ changeTimestamp = ts;
+ changeType = null;
+ lastDeletedTimestamp = 0L;
+ if (isPreImageInScope || isPostImageInScope) {
+ preImage = new HashMap<>();
+ }
+ if (isChangeImageInScope || isPostImageInScope) {
+ changeImage = new HashMap<>();
+ }
+ }
+
+ public long getChangeTimestamp() {
+ return changeTimestamp;
+ }
+
+ public boolean isDeletionEvent() {
+ return changeType == CDC_DELETE_EVENT_TYPE;
+ }
+
+ public boolean isNonEmptyEvent() {
+ return changeType != null;
+ }
+
+ public void markAsDeletionEvent() {
+ changeType = CDC_DELETE_EVENT_TYPE;
+ }
+
+ public long getLastDeletedTimestamp() {
+ return lastDeletedTimestamp;
+ }
+
+ public void setLastDeletedTimestamp(long lastDeletedTimestamp) {
+ this.lastDeletedTimestamp = lastDeletedTimestamp;
+ }
+
+ public boolean isChangeRelevant(Cell cell) {
+ if (cell.getTimestamp() > changeTimestamp) {
+ return false;
+ }
+ if (cell.getType() != Cell.Type.DeleteFamily && !isOlderThanChange(cell) &&
+ isDeletionEvent()) {
+ // We don't need to build the change image in this case.
+ return false;
+ }
+ return true;
+ }
+
+ public void registerChange(Cell cell, int columnNum, Object value) {
+ if (!isChangeRelevant(cell)) {
+ return;
+ }
+ CDCTableInfo.CDCColumnInfo columnInfo =
+ cdcDataTableInfo.getColumnInfoList().get(columnNum);
+ String cdcColumnName = columnInfo.getColumnDisplayName(cdcDataTableInfo);
+ if (isOlderThanChange(cell)) {
+ if ((isPreImageInScope || isPostImageInScope) &&
+ !preImage.containsKey(cdcColumnName)) {
+ preImage.put(cdcColumnName, value);
+ }
+ } else if (cell.getTimestamp() == changeTimestamp) {
+ assert !isDeletionEvent() : "Not expected to find a change for delete event";
+ changeType = CDC_UPSERT_EVENT_TYPE;
+ if (isChangeImageInScope || isPostImageInScope) {
+ changeImage.put(cdcColumnName, value);
+ }
+ }
+ }
+
+ public Map buildCDCEvent() {
+ assert (changeType != null) : "Not expected when no event was detected";
+ Map<String, Object> cdcChange = new HashMap<>();
+ if (isPreImageInScope) {
+ cdcChange.put(CDC_PRE_IMAGE, preImage);
+ }
+ if (changeType == CDC_UPSERT_EVENT_TYPE) {
+ if (isChangeImageInScope) {
+ cdcChange.put(CDC_CHANGE_IMAGE, changeImage);
+ }
+ if (isPostImageInScope) {
+ Map<String, Object> postImage = new HashMap<>();
+ if (!isDeletionEvent()) {
+ postImage.putAll(preImage);
+ postImage.putAll(changeImage);
+ }
+ cdcChange.put(CDC_POST_IMAGE, postImage);
+ }
+ }
+ cdcChange.put(CDC_EVENT_TYPE, changeType);
+ return cdcChange;
+ }
+
+ public boolean isOlderThanChange(Cell cell) {
+ return (cell.getTimestamp() < changeTimestamp &&
+ cell.getTimestamp() > lastDeletedTimestamp) ? true : false;
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 002da0a9c5..438638c1dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -25,13 +25,16 @@ import java.util.NavigableSet;
import java.util.Set;
import java.util.StringTokenizer;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.util.StringUtils;
-
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
import org.apache.phoenix.schema.PTable;
+import static org.apache.phoenix.coprocessor.GlobalIndexRegionScanner.adjustScanFilter;
+
public class CDCUtil {
public static final String CDC_INDEX_PREFIX = "__CDC__";
public static final String CDC_INDEX_TYPE_LOCAL = "L";
@@ -80,12 +83,7 @@ public class CDCUtil {
}
public static String getCDCIndexName(String cdcName) {
- return CDC_INDEX_PREFIX + cdcName;
- }
-
- public static String getCDCNameFromIndexName(String indexName) {
- assert(indexName.startsWith(CDC_INDEX_PREFIX));
- return indexName.substring(CDC_INDEX_PREFIX.length());
+ return CDC_INDEX_PREFIX + SchemaUtil.getTableNameFromFullName(cdcName.toUpperCase());
}
public static boolean isCDCIndex(String indexName) {
@@ -102,12 +100,21 @@ public class CDCUtil {
scan.setCacheBlocks(false);
Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
if (! familyMap.isEmpty()) {
- familyMap.keySet().stream().forEach(fQual -> {
- if (familyMap.get(fQual) != null) {
- familyMap.get(fQual).clear();
- }
- });
+ familyMap.clear();
}
return scan;
}
+
+ public static int compareCellFamilyAndQualifier(byte[] columnFamily1,
+ byte[] columnQual1,
+ byte[] columnFamily2,
+ byte[] columnQual2) {
+ int familyNameComparison = DescVarLengthFastByteComparisons.compareTo(columnFamily1,
+ 0, columnFamily1.length, columnFamily2, 0, columnFamily2.length);
+ if (familyNameComparison != 0) {
+ return familyNameComparison;
+ }
+ return DescVarLengthFastByteComparisons.compareTo(columnQual1,
+ 0, columnQual1.length, columnQual2, 0, columnQual2.length);
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 5a06b6287d..a2e7b72215 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -299,7 +299,7 @@ public class IndexUtil {
}
- private static boolean isEmptyKeyValue(PTable table, ColumnReference ref) {
+ public static boolean isEmptyKeyValue(PTable table, ColumnReference ref) {
byte[] emptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(table);
byte[] emptyKeyValueQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
return (Bytes.compareTo(emptyKeyValueCF, 0, emptyKeyValueCF.length, ref.getFamilyWritable()
@@ -586,7 +586,7 @@ public class IndexUtil {
tupleProjector.getValueBitSet(), ptr);
Cell firstCell = result.get(0);
Cell keyValue =
- PhoenixKeyValueUtil.newKeyValue(firstCell.getRowArray(),
+ PhoenixKeyValueUtil.newKeyValue(firstCell.getRowArray(), // FIXME: This does DEEP_COPY of cell, do we need that?
firstCell.getRowOffset(),firstCell.getRowLength(), VALUE_COLUMN_FAMILY,
VALUE_COLUMN_QUALIFIER, firstCell.getTimestamp(), value, 0, value.length);
result.add(keyValue);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index f2b6b911fb..a52213aea2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -19,32 +19,21 @@ package org.apache.phoenix.util;
import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_NAME;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_INCLUDE_SCOPES;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_JSON_COL_QUALIFIER;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CDC_DATA_TABLE_DEF;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_NAME_MAP;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.DATA_COL_QUALIFIER_TO_TYPE_MAP;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
-import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME;
import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -68,11 +57,11 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.exception.SQLExceptionCode;
@@ -89,6 +78,7 @@ import org.apache.phoenix.filter.PagingFilter;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.index.CDCTableInfo;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -323,14 +313,16 @@ public class ScanUtil {
Filter filter = scan.getFilter();
if (filter == null) {
scan.setFilter(andWithFilter);
- } else if (filter instanceof FilterList && ((FilterList)filter).getOperator() == FilterList.Operator.MUST_PASS_ALL) {
+ } else if (filter instanceof FilterList && ((FilterList)filter).getOperator()
+ == FilterList.Operator.MUST_PASS_ALL) {
FilterList filterList = (FilterList)filter;
List<Filter> allFilters = new ArrayList<Filter>(filterList.getFilters().size() + 1);
allFilters.addAll(filterList.getFilters());
allFilters.add(andWithFilter);
scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,allFilters));
} else {
- scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,Arrays.asList(filter, andWithFilter)));
+ scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,
+ Arrays.asList(filter, andWithFilter)));
}
}
@@ -1136,7 +1128,7 @@ public class ScanUtil {
}
public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table,
- PhoenixConnection phoenixConnection) throws SQLException {
+ PhoenixConnection phoenixConnection, StatementContext context) throws SQLException {
boolean isTransforming = (table.getTransformingNewTable() != null);
PTable indexTable = table;
// Transforming index table can be repaired in regular path via globalindexchecker coproc on it.
@@ -1180,25 +1172,22 @@ public class ScanUtil {
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
scan.setAttribute(BaseScannerRegionObserver.READ_REPAIR_TRANSFORMING_TABLE, TRUE_BYTES);
} else {
- if (table.getType() != PTableType.CDC && (table.getType() != PTableType.INDEX ||
- !IndexUtil.isGlobalIndex(indexTable))) {
+ if (table.getType() != PTableType.INDEX || !IndexUtil.isGlobalIndex(indexTable)) {
return;
}
if (table.isTransactional() && table.getIndexType() == IndexType.UNCOVERED_GLOBAL) {
return;
}
- PTable dataTable = ScanUtil.getDataTable(indexTable, phoenixConnection);
+ PTable dataTable = context.getCDCDataTableRef() != null ?
+ context.getCDCDataTableRef().getTable() :
+ ScanUtil.getDataTable(indexTable, phoenixConnection);
if (dataTable == null) {
// This index table must be being deleted. No need to set the scan attributes
return;
}
// MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child
// view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work
- if (table.getType() == PTableType.CDC) {
- indexTable = PhoenixRuntime.getTable(phoenixConnection,
- CDCUtil.getCDCIndexName(table.getName().getString()));
- }
- else if (indexTable.getViewIndexId() != null &&
+ if (indexTable.getViewIndexId() != null &&
indexTable.getName().getString().contains(
QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
int lastIndexOf = indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
@@ -1302,7 +1291,7 @@ public class ScanUtil {
public static void setScanAttributesForClient(Scan scan, PTable table,
StatementContext context) throws SQLException {
PhoenixConnection phoenixConnection = context.getConnection();
- setScanAttributesForIndexReadRepair(scan, table, phoenixConnection);
+ setScanAttributesForIndexReadRepair(scan, table, phoenixConnection, context);
setScanAttributesForPhoenixTTL(scan, table, phoenixConnection);
byte[] emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME);
byte[] emptyCQ = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME);
@@ -1320,112 +1309,10 @@ public class ScanUtil {
setScanAttributeForPaging(scan, phoenixConnection);
- if (table.getType() == PTableType.CDC) {
- PTable dataTable = PhoenixRuntime.getTable(phoenixConnection,
- SchemaUtil.getTableName(table.getSchemaName().getString(),
- table.getParentTableName().getString()));
- scan.setAttribute(CDC_DATA_TABLE_NAME,
- table.getParentName().getBytes());
-
- PColumn cdcJsonCol = table.getColumnForColumnName(CDC_JSON_COL_NAME);
- scan.setAttribute(CDC_JSON_COL_QUALIFIER, cdcJsonCol.getColumnQualifierBytes());
- scan.setAttribute(CDC_INCLUDE_SCOPES,
- context.getEncodedCdcIncludeScopes().getBytes(StandardCharsets.UTF_8));
+ if (context.getCDCTableRef() != null) {
+ scan.setAttribute(CDC_DATA_TABLE_DEF, CDCTableInfo.toProto(context).toByteArray());
CDCUtil.initForRawScan(scan);
- List<PColumn> columns = dataTable.getColumns();
- Map<byte[], String> dataColQualNameMap = new HashMap<>(columns.size());
- Map<byte[], PDataType> dataColTypeMap = new HashMap<>();
- for (PColumn col: columns) {
- if (col.getColumnQualifierBytes() != null) {
- dataColQualNameMap.put(col.getColumnQualifierBytes(), col.getName().getString());
- dataColTypeMap.put(col.getColumnQualifierBytes(), col.getDataType());
- }
- }
- scan.setAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP,
- serializeColumnQualifierToNameMap(dataColQualNameMap));
- scan.setAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP,
- serializeColumnQualifierToTypeMap(dataColTypeMap));
- }
- }
-
- public static byte[] serializeColumnQualifierToNameMap(Map<byte[], String> colQualNameMap) {
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- DataOutputStream output = new DataOutputStream(stream);
- try {
- output.writeInt(colQualNameMap.size());
- for (Map.Entry<byte[], String> entry: colQualNameMap.entrySet()) {
- output.writeInt(entry.getKey().length);
- output.write(entry.getKey());
- WritableUtils.writeString(output, entry.getValue());
- }
- return stream.toByteArray();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static Map<ImmutableBytesPtr, String> deserializeColumnQualifierToNameMap(
- byte[] mapBytes) {
- ByteArrayInputStream stream = new ByteArrayInputStream(mapBytes);
- DataInputStream input = new DataInputStream(stream);
- try {
- Map<ImmutableBytesPtr, String> colQualNameMap = new HashMap<>();
- int size = input.readInt();
- for (int i = 0; i < size; ++i) {
- int qualLength = input.readInt();
- byte[] qualBytes = new byte[qualLength];
- int bytesRead = input.read(qualBytes);
- if (bytesRead != qualLength) {
- throw new IOException("Expected number of bytes: " + qualLength + " but got " +
- "only: " + bytesRead);
- }
- String colName = WritableUtils.readString(input);
- colQualNameMap.put(new ImmutableBytesPtr(qualBytes), colName);
- }
- return colQualNameMap;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static byte[] serializeColumnQualifierToTypeMap(
- Map<byte[], PDataType> pkColNamesAndTypes) {
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- DataOutputStream output = new DataOutputStream(stream);
- try {
- output.writeInt(pkColNamesAndTypes.size());
- for (Map.Entry<byte[], PDataType> entry: pkColNamesAndTypes.entrySet()) {
- output.writeInt(entry.getKey().length);
- output.write(entry.getKey());
- WritableUtils.writeString(output, entry.getValue().getSqlTypeName());
- }
- return stream.toByteArray();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static Map<ImmutableBytesPtr, PDataType> deserializeColumnQualifierToTypeMap(
- byte[] pkColInfoBytes) {
- ByteArrayInputStream stream = new ByteArrayInputStream(pkColInfoBytes);
- DataInputStream input = new DataInputStream(stream);
- try {
- Map<ImmutableBytesPtr, PDataType> colQualTypeMap = new HashMap<>();
- int colCnt = input.readInt();
- for (int i = 0; i < colCnt; ++i) {
- int qualLength = input.readInt();
- byte[] qualBytes = new byte[qualLength];
- int bytesRead = input.read(qualBytes);
- if (bytesRead != qualLength) {
- throw new IOException("Expected number of bytes: " + qualLength + " but got " +
- "only: " + bytesRead);
- }
- colQualTypeMap.put(new ImmutableBytesPtr(qualBytes),
- PDataType.fromSqlTypeName(WritableUtils.readString(input)));
- }
- return colQualTypeMap;
- } catch (IOException e) {
- throw new RuntimeException(e);
+ GlobalIndexRegionScanner.adjustScanFilter(scan);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java b/phoenix-core/src/main/protobuf/CDCInfo.proto
similarity index 54%
copy from phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java
copy to phoenix-core/src/main/protobuf/CDCInfo.proto
index 6c2679b18b..92f2c29333 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/TerminalParseNode.java
+++ b/phoenix-core/src/main/protobuf/CDCInfo.proto
@@ -15,21 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.phoenix.parse;
-import java.util.Collections;
-import java.util.List;
+option java_package = "org.apache.phoenix.coprocessor.generated";
+option java_outer_classname = "CDCInfoProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
-/**
- *
- * Abstract node for expressions that have no children
- *
- *
- * @since 0.1
- */
-public abstract class TerminalParseNode extends ParseNode {
- @Override
- public final List<ParseNode> getChildren() {
- return Collections.emptyList();
- }
+message CDCColumnDef {
+ required string columnName = 1;
+ required bytes familyNameBytes = 2;
+ required bytes columnQualifierBytes = 3;
+ required string dataType = 4;
+}
+
+message CDCTableDef {
+ optional bytes defaultFamilyName = 1;
+ optional string cdcIncludeScopes = 2;
+ required bytes qualifierEncodingScheme = 3;
+ repeated CDCColumnDef columns = 4;
+ required bytes cdcJsonColQualBytes = 5;
+ optional bytes dataTableProjectorBytes = 6;
}