You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2023/12/04 07:56:46 UTC
(phoenix) branch PHOENIX-7001-feature updated: PHOENIX-7074 DROP CDC Implementation (#1713)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch PHOENIX-7001-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this push:
new 5d3fd4033d PHOENIX-7074 DROP CDC Implementation (#1713)
5d3fd4033d is described below
commit 5d3fd4033d763bb1fb2b8dc3d20a6a1d5bd461a4
Author: TheNamesRai <sa...@outlook.com>
AuthorDate: Mon Dec 4 13:26:40 2023 +0530
PHOENIX-7074 DROP CDC Implementation (#1713)
---
.../java/org/apache/phoenix/end2end/CDCMiscIT.java | 61 ++++++++++++++++++++++
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 2 +-
.../coprocessor/PhoenixAccessController.java | 5 +-
.../apache/phoenix/exception/SQLExceptionCode.java | 4 +-
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 23 +++++++-
.../org/apache/phoenix/schema/MetaDataClient.java | 21 ++++++++
.../java/org/apache/phoenix/util/CDCUtilTest.java | 18 +++++++
7 files changed, 129 insertions(+), 5 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
index 5995b5d2e4..7e081419e2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
@@ -208,4 +208,65 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
assertEquals("TENANTID", cdcPkColumns.get(1).getName().getString());
assertEquals("K", cdcPkColumns.get(2).getName().getString());
}
+
+ @Test
+ public void testDropCDC () throws SQLException {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+ + " v2 DATE)");
+ String cdcName = generateUniqueName();
+
+ String cdc_sql = "CREATE CDC " + cdcName
+ + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
+ conn.createStatement().execute(cdc_sql);
+ assertCDCState(conn, cdcName, null, 3);
+
+ String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+ conn.createStatement().execute(drop_cdc_sql);
+
+ try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
+ "system.catalog WHERE table_name = '" + cdcName +
+ "' AND column_name IS NULL and column_family IS NULL")) {
+ assertEquals(false, rs.next());
+ }
+ try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " +
+ "system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) +
+ "' AND column_name IS NULL and column_family IS NULL")) {
+ assertEquals(false, rs.next());
+ }
+
+ try {
+ conn.createStatement().execute(drop_cdc_sql);
+ fail("Expected to fail as cdc table doesn't exist");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
+ assertTrue(e.getMessage().endsWith(cdcName));
+ }
+ }
+
+ @Test
+ public void testDropCDCIndex () throws SQLException {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+ + " v2 DATE)");
+ String cdcName = generateUniqueName();
+ String cdc_sql = "CREATE CDC " + cdcName
+ + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
+ conn.createStatement().execute(cdc_sql);
+ assertCDCState(conn, cdcName, null, 3);
+ String drop_cdc_index_sql = "DROP INDEX \"" + CDCUtil.getCDCIndexName(cdcName) + "\" ON " + tableName;
+ try {
+ conn.createStatement().execute(drop_cdc_index_sql);
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.CANNOT_DROP_CDC_INDEX.getErrorCode(), e.getErrorCode());
+ assertTrue(e.getMessage().endsWith(CDCUtil.getCDCIndexName(cdcName)));
+ }
+ }
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 7c80c85c60..f4a0dcb7d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -2999,7 +2999,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
// Add to list of HTables to delete, unless it's a view or its a shared index
if (tableType == INDEX && table.getViewIndexId() != null) {
sharedTablesToDelete.add(new SharedTableState(table));
- } else if (tableType != PTableType.VIEW) {
+ } else if (tableType != PTableType.VIEW && tableType != PTableType.CDC) {
tableNamesToDelete.add(table.getPhysicalName().getBytes());
}
invalidateList.add(cacheKey);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
index 1039d88947..b3b648a677 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
@@ -366,7 +366,7 @@ public class PhoenixAccessController extends BaseMetaDataEndpointObserver {
if (!accessCheckEnabled) { return; }
for (MasterObserver observer : getAccessControllers()) {
- if (tableType != PTableType.VIEW) {
+ if (tableType != PTableType.VIEW && tableType != PTableType.CDC) {
observer.preDeleteTable(getMasterObsevrverContext(), physicalTableName);
}
if (indexes != null) {
@@ -377,7 +377,8 @@ public class PhoenixAccessController extends BaseMetaDataEndpointObserver {
}
}
//checking similar permission checked during the create of the view.
- if (tableType == PTableType.VIEW || tableType == PTableType.INDEX) {
+ if (tableType == PTableType.VIEW || tableType == PTableType.INDEX
+ || tableType == PTableType.CDC) {
if (execPermissionsCheckEnabled) {
requireAccess("Drop "+tableType, parentPhysicalTableName, Action.READ, Action.EXEC);
} else {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 5139eb5764..7842e8f4c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -441,7 +441,9 @@ public enum SQLExceptionCode {
CANNOT_ALTER_IMMUTABLE_ROWS_PROPERTY(1133, "XCL33", "IMMUTABLE_ROWS property can be changed only if the table storage scheme is ONE_CELL_PER_KEYVALUE_COLUMN"),
CANNOT_ALTER_TABLE_PROPERTY_ON_VIEW(1134, "XCL34", "Altering this table property on a view is not allowed"),
-
+
+ CANNOT_DROP_CDC_INDEX(1153, "XCL53",
+ "Cannot drop the index associated with CDC"),
IMMUTABLE_TABLE_PROPERTY_INVALID(1135, "XCL35", "IMMUTABLE table property cannot be used with CREATE IMMUTABLE TABLE statement "),
MAX_COLUMNS_EXCEEDED(1136, "XCL36", "The number of columns exceed the maximum supported by the table's qualifier encoding scheme"),
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 4fd18f0f8b..026095a4b7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.jdbc;
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_DROP_CDC_INDEX;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
@@ -215,6 +216,7 @@ import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.CursorUtil;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -1591,6 +1593,12 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
@Override
public MutationState execute() throws SQLException {
+ String indexName = ExecutableDropIndexStatement.this.getIndexName().getName();
+ if (CDCUtil.isACDCIndex(indexName)) {
+ throw new SQLExceptionInfo.Builder(CANNOT_DROP_CDC_INDEX)
+ .setTableName(indexName)
+ .build().buildException();
+ }
MetaDataClient client = new MetaDataClient(getContext().getConnection());
return client.dropIndex(ExecutableDropIndexStatement.this);
}
@@ -1607,7 +1615,20 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
@SuppressWarnings("unchecked")
@Override
public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
- return null;
+ final StatementContext context = new StatementContext(stmt);
+ return new BaseMutationPlan(context, this.getOperation()) {
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("DROP CDC"));
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ MetaDataClient client = new MetaDataClient(getContext().getConnection());
+ return client.dropCDC(ExecutableDropCDCStatement.this);
+ }
+ };
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index ac433090d4..8755b0ed96 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -162,6 +162,7 @@ import java.util.HashSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.parse.CreateCDCStatement;
+import org.apache.phoenix.parse.DropCDCStatement;
import org.apache.hadoop.hbase.client.Table;
import org.apache.phoenix.coprocessor.TableInfo;
import org.apache.phoenix.query.ConnectionlessQueryServicesImpl;
@@ -3631,6 +3632,26 @@ public class MetaDataClient {
return dropTable(schemaName, tableName, parentTableName, PTableType.INDEX, statement.ifExists(), false, false);
}
+ public MutationState dropCDC(DropCDCStatement statement) throws SQLException {
+ String schemaName = statement.getTableName().getSchemaName();
+ String cdcTableName = statement.getCdcObjName().getName();
+ String parentTableName = statement.getTableName().getTableName();
+ // Dropping the virtual CDC Table
+ dropTable(schemaName, cdcTableName, parentTableName, PTableType.CDC, statement.ifExists(),
+ false, false);
+
+ String indexName = CDCUtil.getCDCIndexName(statement.getCdcObjName().getName());
+ // Dropping the uncovered index associated with the CDC Table
+ try {
+ return dropTable(schemaName, indexName, parentTableName, PTableType.INDEX,
+ statement.ifExists(), false, false);
+ } catch (SQLException e) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.fromErrorCode(e.getErrorCode()))
+ .setTableName(statement.getCdcObjName().getName()).setRootCause(e.getCause())
+ .build().buildException();
+ }
+ }
+
private MutationState dropFunction(String functionName,
boolean ifExists) throws SQLException {
connection.rollback();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
index 3282ba62c9..e14fed4810 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.phoenix.util;
import org.apache.phoenix.schema.PTable;