You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2023/05/04 23:48:00 UTC

[jira] [Commented] (PHOENIX-6141) Ensure consistency between SYSTEM.CATALOG and SYSTEM.CHILD_LINK

    [ https://issues.apache.org/jira/browse/PHOENIX-6141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719567#comment-17719567 ] 

ASF GitHub Bot commented on PHOENIX-6141:
-----------------------------------------

jpisaac commented on code in PR #1575:
URL: https://github.com/apache/phoenix/pull/1575#discussion_r1185605456


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ReadRepairScanner.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.filter.PagedFilter;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
+public abstract class ReadRepairScanner extends BaseRegionScanner {
+
+    public Logger LOGGER;
+    public RegionScanner scanner;
+    public Scan scan;
+    public RegionCoprocessorEnvironment env;
+    public byte[] emptyCF;
+    public byte[] emptyCQ;
+    public Region region;
+    public boolean hasMore;
+    public long pageSizeMs;
+    public long pageSize = Long.MAX_VALUE;
+    public long rowCount = 0;
+    public long maxTimestamp;
+    public long ageThreshold;
+    public boolean restartScanDueToPageFilterRemoval = false;
+
+    /*
+    Scanner used for checking ground truth to help with read repair.
+     */
+    private Scan externalScan = null;
+    public Scan getExternalScan() { return externalScan; }

Review Comment:
   Can u add some comments how this method is to be used by the framework?  Do not see a setter for this attribute? Is it supposed to be overridden by the implementing classes?



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/ChildLinkScanTask.java:
##########
@@ -0,0 +1,53 @@
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+
+/*
+Task to run a simple select * query on SYSTEM.CHILD_LINK table to trigger read repair and verify any unverified rows.
+ */
+public class ChildLinkScanTask extends BaseTask {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ChildLinkScanTask.class);
+    private static final String CHILD_LINK_QUERY = "SELECT * FROM SYSTEM.CHILD_LINK";
+    private static boolean isDisabled = false;
+
+    @VisibleForTesting
+    public static void disableChildLinkScanTask(boolean disable) {
+        isDisabled = disable;
+    }
+
+    @Override
+    public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+
+        if (isDisabled) {
+            return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, "ChildLinkScan task is disabled.");
+        }
+
+        int count = 0;
+        try {
+            PhoenixConnection pconn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);

Review Comment:
   See this code snippet for eg. https://github.com/apache/phoenix/blob/9a50b02ca6ce5fcca7f1b7975bd22d605ad84dc0/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java#L1871



##########
phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java:
##########
@@ -1333,10 +1334,22 @@ public static Long getPageSizeInMs(ReadOnlyProps props) {
         return null;
     }
 
+    private static void setScanAtrributesForChildLinkRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) {

Review Comment:
   You can move this to the ChildLinkScanTask.java file. Thus will run the verification logic only when needed instead of every time there is a query on the CHILD_LINK table.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ChildLinkMetaDataEndpoint.java:
##########
@@ -111,4 +145,162 @@ private PhoenixMetaDataCoprocessorHost getCoprocessorHost() {
 		return phoenixAccessCoprocessorHost;
 	}
 
+    /**
+     * Class that verifies a given row of a SYSTEM.CHILD_LINK table.
+     * An instance of this class is created for each scanner on the table
+     * and used to verify individual rows.
+     */
+    public class ChildLinkMetaDataScanner extends ReadRepairScanner {
+
+        private Table sysCatHTable;
+        private Scan childLinkScan;
+
+        public ChildLinkMetaDataScanner(RegionCoprocessorEnvironment env,
+                                        Scan scan,
+                                        RegionScanner scanner) throws IOException {
+            super(env, scan, scanner);
+            ageThreshold = env.getConfiguration().getLong(
+                    QueryServices.CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
+                    QueryServicesOptions.DEFAULT_CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS);
+            sysCatHTable = ServerUtil.ConnectionFactory.
+                    getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION, env).
+                    getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
+        }
+
+        /*
+        If the row is VERIFIED, remove the empty column from the row
+         */
+        @Override
+        public boolean verifyRow(List<Cell> cellList) {
+            long cellListSize = cellList.size();
+            Cell cell = null;
+            if (cellListSize == 0) {
+                return true;
+            }
+            Iterator<Cell> cellIterator = cellList.iterator();
+            while (cellIterator.hasNext()) {
+                cell = cellIterator.next();
+                if (isEmptyColumn(cell)) {
+                    if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                            VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0) {
+                        return false;
+                    }
+                    // Empty column is not supposed to be returned to the client except
+                    // when it is the only column included in the scan
+                    if (cellListSize > 1) {
+                        cellIterator.remove();
+                    }
+                    return true;
+                }
+            }
+            // no empty column found
+            return false;
+        }
+
+        /*
+        Find parent link in syscat for given child link.
+        If found, mark child link row VERIFIED and start a new scan from it.
+        Otherwise, delete if row is old enough.
+         */
+        @Override
+        public void repairRow(List<Cell> row) throws IOException {
+            Cell cell = row.get(0);
+            byte[] rowKey = CellUtil.cloneRow(cell);
+            long ts = row.get(0).getTimestamp();
+
+            Scan sysCatScan = getExternalScanner();
+            childLinkScan = new Scan(scan);
+
+
+            // build syscat rowKey using given rowKey
+            byte[] sysCatRowKey = getSysCatRowKey(rowKey);
+
+            // scan syscat to find row
+            sysCatScan.withStartRow(sysCatRowKey, true);
+            sysCatScan.withStopRow(sysCatRowKey, true);
+            sysCatScan.setTimeRange(0, maxTimestamp);
+            Result result = null;
+            try (ResultScanner resultScanner = sysCatHTable.getScanner(sysCatScan)){
+                result = resultScanner.next();
+            } catch (Throwable t) {
+                ServerUtil.throwIOException(sysCatHTable.getName().toString(), t);
+            }
+            // if row found, repair and verifyRowAndRemoveEmptyColumn
+            if (result != null && !result.isEmpty()) {
+                markChildLinkVerified(rowKey, ts, region);
+                scanner.close();
+                childLinkScan.withStartRow(rowKey, true);
+                scanner = region.getScanner(childLinkScan);
+                hasMore = true;
+            }
+            // if not, delete if old enough, otherwise ignore
+            else {
+                deleteIfAgedEnough(rowKey, ts, region);
+                if (restartScanDueToPageFilterRemoval) {
+                    scanner.close();
+                    childLinkScan.withStartRow(rowKey, true);
+                    scanner = region.getScanner(childLinkScan);
+                    hasMore = true;
+                    restartScanDueToPageFilterRemoval = false;
+                }
+            }
+            row.clear();
+        }
+
+        /*
+        Construct row key for SYSTEM.CATALOG from a given SYSTEM.CHILD_LINK row key
+        SYSTEM.CATALOG -> (CHILD_TENANT_ID, CHILD_SCHEMA, CHILD_TABLE, PARENT_TENANT_ID, PARENT_FULL_NAME)
+        SYSTEM.CHILD_LINK -> (PARENT_TENANT_ID, PARENT_SCHEMA, PARENT_TABLE, CHILD_TENANT_ID, CHILD_FULL_NAME)
+         */
+        private byte[] getSysCatRowKey(byte[] childLinkRowKey) {
+            String NULL_DELIMITER = "\0";
+            String[] childLinkRowKeyCols = new String(childLinkRowKey, StandardCharsets.UTF_8).split(NULL_DELIMITER);
+            checkArgument(childLinkRowKeyCols.length == 5);
+            String parentTenantId = childLinkRowKeyCols[0];
+            String parentSchema = childLinkRowKeyCols[1];
+            String parentTable = childLinkRowKeyCols[2];
+            String childTenantId = childLinkRowKeyCols[3];
+            String childFullName = childLinkRowKeyCols[4];
+
+            String parentFullName = SchemaUtil.getTableName(parentSchema, parentTable);
+            String childSchema = SchemaUtil.getSchemaNameFromFullName(childFullName);
+            String childTable = SchemaUtil.getTableNameFromFullName(childFullName);
+
+            String[] sysCatRowKeyCols = new String[] {childTenantId, childSchema, childTable, parentTenantId, parentFullName};
+            return String.join(NULL_DELIMITER, sysCatRowKeyCols).getBytes(StandardCharsets.UTF_8);
+        }
+
+
+        private void deleteIfAgedEnough(byte[] rowKey, long ts, Region region) throws IOException {
+            if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
+                Delete del = new Delete(rowKey);
+                Mutation[] mutations = new Mutation[]{del};
+                region.batchMutate(mutations);
+            }
+        }
+
+
+        private void markChildLinkVerified(byte[] rowKey, long ts, Region region) throws IOException {
+            Put put = new Put(rowKey);

Review Comment:
   I think you may need to use the Put(byte[] row, long ts) constructor. 



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/OrphanChildLinkRowsIT.java:
##########
@@ -0,0 +1,201 @@
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.coprocessor.tasks.ChildLinkScanTask;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.phoenix.end2end.IndexRebuildTaskIT.waitForTaskState;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class OrphanChildLinkRowsIT extends BaseTest {
+
+    private static Map<String, String> expectedChildLinks = new HashMap<>();
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, "0");
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+
+        // Create 2 tables - T1 and T2. Create a view V1 on T1.
+        String t1 = "CREATE TABLE IF NOT EXISTS S1.T1 (TENANT_ID VARCHAR NOT NULL, A INTEGER NOT NULL, B INTEGER CONSTRAINT PK PRIMARY KEY (TENANT_ID, A))";
+        String t2 = "CREATE TABLE IF NOT EXISTS S2.T2 (TENANT_ID VARCHAR NOT NULL, A INTEGER NOT NULL, B INTEGER CONSTRAINT PK PRIMARY KEY (TENANT_ID, A))";
+        String v1 = "CREATE VIEW IF NOT EXISTS VS1.V1 (NEW_COL1 INTEGER, NEW_COL2 INTEGER) AS SELECT * FROM S1.T1 WHERE B > 10";
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            connection.createStatement().execute(t1);
+            connection.createStatement().execute(t2);
+            connection.createStatement().execute(v1);
+        }
+
+        expectedChildLinks.put("S1.T1", "VS1.V1");
+    }
+
+    /**
+     * 1. Disable the child link scan task.
+     * 2. Create a view (same name as existing view on T1) on T2. This CREATE VIEW will fail, verify if there was no orphan child link because of that.
+     *
+     * 3. Instrument CQSI to fail phase three of CREATE VIEW. Create a new view V2 on T2 (passes) and V1 on T2 which will fail.
+     *    Both links T2->V2 and T2->V1 will be in UNVERIFIED state, repaired during read.
+     *    Check if only 2 child links are returned: T2->V2 and T1->V1.
+     */
+    @Test
+    public void testNoOrphanChildLinkRow() throws Exception {
+
+        ConnectionQueryServicesImpl.setFailPhaseThreeChildLinkWriteForTesting(false);
+        ChildLinkScanTask.disableChildLinkScanTask(true);
+
+        String v2 = "CREATE VIEW VS1.V1 (NEW_COL1 INTEGER, NEW_COL2 INTEGER) AS SELECT * FROM S2.T2 WHERE B > 10";
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            connection.createStatement().execute(v2);
+        }
+        catch (TableAlreadyExistsException e) {

Review Comment:
   I see so we want to swallow the exception.





> Ensure consistency between SYSTEM.CATALOG and SYSTEM.CHILD_LINK
> ---------------------------------------------------------------
>
>                 Key: PHOENIX-6141
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6141
>             Project: Phoenix
>          Issue Type: Improvement
>    Affects Versions: 5.0.0, 4.15.0
>            Reporter: Chinmay Kulkarni
>            Assignee: Palash Chauhan
>            Priority: Blocker
>             Fix For: 5.2.0, 5.1.4
>
>
> Before 4.15, "CREATE/DROP VIEW" was an atomic operation since we were issuing batch mutations on just the 1 SYSTEM.CATALOG region. In 4.15 we introduced SYSTEM.CHILD_LINK to store the parent->child links and so a CREATE VIEW is no longer atomic since it consists of 2 separate RPCs  (1 to SYSTEM.CHILD_LINK to add the linking row and another to SYSTEM.CATALOG to write metadata for the new view). 
> If the second RPC i.e. the RPC to write metadata to SYSTEM.CATALOG fails after the 1st RPC has already gone through, there will be an inconsistency between both metadata tables. We will see orphan parent->child linking rows in SYSTEM.CHILD_LINK in this case. This can cause the following issues:
> # ALTER TABLE calls on the base table will fail
> # DROP TABLE without CASCADE will fail
> # The upgrade path has calls like UpgradeUtil.upgradeTable() which will fail
> # Any metadata consistency checks can be thrown off
> # Unnecessary extra storage of orphan links
> The first 3 issues happen because we wrongly deduce that a base table has child views due to the orphan linking rows.
> This Jira aims at trying to come up with a way to make mutations among SYSTEM.CATALOG and SYSTEM.CHILD_LINK an atomic transaction. We can use a 2-phase commit approach like in global indexing or also potentially explore using a transaction manager. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)