You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2023/12/04 07:56:46 UTC

(phoenix) branch PHOENIX-7001-feature updated: PHOENIX-7074 DROP CDC Implementation (#1713)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch PHOENIX-7001-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this push:
     new 5d3fd4033d PHOENIX-7074 DROP CDC Implementation (#1713)
5d3fd4033d is described below

commit 5d3fd4033d763bb1fb2b8dc3d20a6a1d5bd461a4
Author: TheNamesRai <sa...@outlook.com>
AuthorDate: Mon Dec 4 13:26:40 2023 +0530

    PHOENIX-7074 DROP CDC Implementation (#1713)
---
 .../java/org/apache/phoenix/end2end/CDCMiscIT.java | 61 ++++++++++++++++++++++
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  2 +-
 .../coprocessor/PhoenixAccessController.java       |  5 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |  4 +-
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  | 23 +++++++-
 .../org/apache/phoenix/schema/MetaDataClient.java  | 21 ++++++++
 .../java/org/apache/phoenix/util/CDCUtilTest.java  | 18 +++++++
 7 files changed, 129 insertions(+), 5 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
index 5995b5d2e4..7e081419e2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
@@ -208,4 +208,65 @@ public class CDCMiscIT extends ParallelStatsDisabledIT {
         assertEquals("TENANTID", cdcPkColumns.get(1).getName().getString());
         assertEquals("K", cdcPkColumns.get(2).getName().getString());
     }
+
+    @Test
+    public void testDropCDC () throws SQLException {
+        Properties props = new Properties();
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+                        + " v2 DATE)");
+        String cdcName = generateUniqueName();
+
+        String cdc_sql = "CREATE CDC " + cdcName
+                + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
+        conn.createStatement().execute(cdc_sql);
+        assertCDCState(conn, cdcName, null, 3);
+
+        String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+        conn.createStatement().execute(drop_cdc_sql);
+
+        try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
+                "system.catalog WHERE table_name = '" + cdcName +
+                "' AND column_name IS NULL and column_family IS NULL")) {
+            assertEquals(false, rs.next());
+        }
+        try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " +
+                "system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) +
+                "' AND column_name IS NULL and column_family IS NULL")) {
+            assertEquals(false, rs.next());
+        }
+
+        try {
+            conn.createStatement().execute(drop_cdc_sql);
+            fail("Expected to fail as cdc table doesn't exist");
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
+            assertTrue(e.getMessage().endsWith(cdcName));
+        }
+    }
+
+    @Test
+    public void testDropCDCIndex () throws SQLException {
+        Properties props = new Properties();
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+                        + " v2 DATE)");
+        String cdcName = generateUniqueName();
+        String cdc_sql = "CREATE CDC " + cdcName
+                + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
+        conn.createStatement().execute(cdc_sql);
+        assertCDCState(conn, cdcName, null, 3);
+        String drop_cdc_index_sql = "DROP INDEX \"" + CDCUtil.getCDCIndexName(cdcName) + "\" ON " + tableName;
+        try {
+            conn.createStatement().execute(drop_cdc_index_sql);
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.CANNOT_DROP_CDC_INDEX.getErrorCode(), e.getErrorCode());
+            assertTrue(e.getMessage().endsWith(CDCUtil.getCDCIndexName(cdcName)));
+        }
+    }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 7c80c85c60..f4a0dcb7d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -2999,7 +2999,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             // Add to list of HTables to delete, unless it's a view or its a shared index
             if (tableType == INDEX && table.getViewIndexId() != null) {
                 sharedTablesToDelete.add(new SharedTableState(table));
-            } else if (tableType != PTableType.VIEW) {
+            } else if (tableType != PTableType.VIEW && tableType != PTableType.CDC) {
                 tableNamesToDelete.add(table.getPhysicalName().getBytes());
             }
             invalidateList.add(cacheKey);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
index 1039d88947..b3b648a677 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
@@ -366,7 +366,7 @@ public class PhoenixAccessController extends BaseMetaDataEndpointObserver {
         if (!accessCheckEnabled) { return; }
 
         for (MasterObserver observer : getAccessControllers()) {
-            if (tableType != PTableType.VIEW) {
+            if (tableType != PTableType.VIEW && tableType != PTableType.CDC) {
                 observer.preDeleteTable(getMasterObsevrverContext(), physicalTableName);
             }
             if (indexes != null) {
@@ -377,7 +377,8 @@ public class PhoenixAccessController extends BaseMetaDataEndpointObserver {
             }
         }
         //checking similar permission checked during the create of the view.
-        if (tableType == PTableType.VIEW || tableType == PTableType.INDEX) {
+        if (tableType == PTableType.VIEW || tableType == PTableType.INDEX
+                || tableType == PTableType.CDC) {
             if (execPermissionsCheckEnabled) {
                 requireAccess("Drop "+tableType, parentPhysicalTableName, Action.READ, Action.EXEC);
             } else {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 5139eb5764..7842e8f4c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -441,7 +441,9 @@ public enum SQLExceptionCode {
     
     CANNOT_ALTER_IMMUTABLE_ROWS_PROPERTY(1133, "XCL33", "IMMUTABLE_ROWS property can be changed only if the table storage scheme is ONE_CELL_PER_KEYVALUE_COLUMN"),
     CANNOT_ALTER_TABLE_PROPERTY_ON_VIEW(1134, "XCL34", "Altering this table property on a view is not allowed"),
-    
+
+    CANNOT_DROP_CDC_INDEX(1153, "XCL53",
+            "Cannot drop the index associated with CDC"),
     IMMUTABLE_TABLE_PROPERTY_INVALID(1135, "XCL35", "IMMUTABLE table property cannot be used with CREATE IMMUTABLE TABLE statement "),
     
     MAX_COLUMNS_EXCEEDED(1136, "XCL36", "The number of columns exceed the maximum supported by the table's qualifier encoding scheme"),
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 4fd18f0f8b..026095a4b7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.jdbc;
 
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_DROP_CDC_INDEX;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
@@ -215,6 +216,7 @@ import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.CursorUtil;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -1591,6 +1593,12 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
 
                 @Override
                 public MutationState execute() throws SQLException {
+                    String indexName = ExecutableDropIndexStatement.this.getIndexName().getName();
+                    if (CDCUtil.isACDCIndex(indexName)) {
+                        throw new SQLExceptionInfo.Builder(CANNOT_DROP_CDC_INDEX)
+                                .setTableName(indexName)
+                                .build().buildException();
+                    }
                     MetaDataClient client = new MetaDataClient(getContext().getConnection());
                     return client.dropIndex(ExecutableDropIndexStatement.this);
                 }
@@ -1607,7 +1615,20 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
         @SuppressWarnings("unchecked")
         @Override
         public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
-            return null;
+            final StatementContext context = new StatementContext(stmt);
+            return new BaseMutationPlan(context, this.getOperation()) {
+
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    return new ExplainPlan(Collections.singletonList("DROP CDC"));
+                }
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    MetaDataClient client = new MetaDataClient(getContext().getConnection());
+                    return client.dropCDC(ExecutableDropCDCStatement.this);
+                }
+            };
         }
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index ac433090d4..8755b0ed96 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -162,6 +162,7 @@ import java.util.HashSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.parse.CreateCDCStatement;
+import org.apache.phoenix.parse.DropCDCStatement;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.phoenix.coprocessor.TableInfo;
 import org.apache.phoenix.query.ConnectionlessQueryServicesImpl;
@@ -3631,6 +3632,26 @@ public class MetaDataClient {
 		return dropTable(schemaName, tableName, parentTableName, PTableType.INDEX, statement.ifExists(), false, false);
     }
 
+    public MutationState dropCDC(DropCDCStatement statement) throws SQLException {
+        String schemaName = statement.getTableName().getSchemaName();
+        String cdcTableName = statement.getCdcObjName().getName();
+        String parentTableName = statement.getTableName().getTableName();
+        // Dropping the virtual CDC Table
+        dropTable(schemaName, cdcTableName, parentTableName, PTableType.CDC, statement.ifExists(),
+                false, false);
+
+        String indexName = CDCUtil.getCDCIndexName(statement.getCdcObjName().getName());
+        // Dropping the uncovered index associated with the CDC Table
+        try {
+            return dropTable(schemaName, indexName, parentTableName, PTableType.INDEX,
+                    statement.ifExists(), false, false);
+        } catch (SQLException e) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.fromErrorCode(e.getErrorCode()))
+                    .setTableName(statement.getCdcObjName().getName()).setRootCause(e.getCause())
+                    .build().buildException();
+        }
+    }
+
     private MutationState dropFunction(String functionName,
             boolean ifExists) throws SQLException {
         connection.rollback();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
index 3282ba62c9..e14fed4810 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.phoenix.util;
 
 import org.apache.phoenix.schema.PTable;