You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by tk...@apache.org on 2023/07/06 14:44:42 UTC

[phoenix] branch master updated: PHOENIX-6141 : Ensure consistency between SYSTEM.CATALOG and SYSTEM.CHILD_LINK (#1575)

This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 61180892c2 PHOENIX-6141 : Ensure consistency between SYSTEM.CATALOG and SYSTEM.CHILD_LINK (#1575)
61180892c2 is described below

commit 61180892c2f71f5e7eb2956c0e226cd03dda48f1
Author: palash <pa...@gmail.com>
AuthorDate: Thu Jul 6 07:44:36 2023 -0700

    PHOENIX-6141 : Ensure consistency between SYSTEM.CATALOG and SYSTEM.CHILD_LINK (#1575)
    
    Implemented a new read-repair based mechanism to ensure consistency between SYSTEM.CATALOG and SYSTEM.CHILD_LINK tables.
    ---------
    Co-authored-by: Palash Chauhan <p....@pchauha-ltm8owy.internal.salesforce.com>
---
 .../end2end/ConnectionQueryServicesTestImpl.java   |  19 +-
 .../phoenix/end2end/OrphanChildLinkRowsIT.java     | 270 +++++++++++++++++++++
 .../org/apache/phoenix/end2end/ViewMetadataIT.java |  19 +-
 .../coprocessor/ChildLinkMetaDataEndpoint.java     | 227 ++++++++++++++++-
 .../phoenix/coprocessor/ReadRepairScanner.java     | 180 ++++++++++++++
 .../phoenix/coprocessor/TaskRegionObserver.java    |   1 +
 .../coprocessor/tasks/ChildLinkScanTask.java       |  76 ++++++
 .../phoenix/query/ConnectionQueryServicesImpl.java | 114 +++++++--
 .../org/apache/phoenix/query/QueryServices.java    |   2 +
 .../apache/phoenix/query/QueryServicesOptions.java |   2 +
 .../java/org/apache/phoenix/schema/PTable.java     |   3 +-
 .../java/org/apache/phoenix/util/ScanUtil.java     |  13 +
 12 files changed, 892 insertions(+), 34 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index e342f50354..0496f86e88 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -24,11 +24,13 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.query.ConnectionQueryServicesImpl;
@@ -62,7 +64,22 @@ public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl
     // Use Provider.values() instead of Provider.available() here because array accesses will be
     // indexed by ordinal.
     private final PhoenixTransactionService[] txServices = new PhoenixTransactionService[TransactionFactory.Provider.values().length];
-    
+
+    private static boolean failPhaseThreeChildLinkWriteForTesting = false;
+    public static void setFailPhaseThreeChildLinkWriteForTesting(boolean fail) {
+        failPhaseThreeChildLinkWriteForTesting = fail;
+    }
+
+    @Override
+    public void sendChildLinkMutations(List<Mutation> mutations, boolean isVerified, boolean isDelete,
+                                       byte[] physicalTableNameBytes, byte[] schemaBytes) throws SQLException {
+
+        if ((isDelete || isVerified) && failPhaseThreeChildLinkWriteForTesting) {
+            throw new SQLException("Simulating phase-3 write failure");
+        }
+        super.sendChildLinkMutations(mutations, isVerified, isDelete, physicalTableNameBytes, schemaBytes);
+    }
+
     public ConnectionQueryServicesTestImpl(QueryServices services, ConnectionInfo info, Properties props) throws SQLException {
         super(services, info, props);
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrphanChildLinkRowsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrphanChildLinkRowsIT.java
new file mode 100644
index 0000000000..9211b75939
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrphanChildLinkRowsIT.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.coprocessor.tasks.ChildLinkScanTask;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
+import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
+
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
+public class OrphanChildLinkRowsIT extends BaseTest {
+
+    private final boolean pagingEnabled;
+    private final String CREATE_TABLE_DDL = "CREATE TABLE %s (TENANT_ID VARCHAR NOT NULL, A INTEGER NOT NULL, B INTEGER CONSTRAINT PK PRIMARY KEY (TENANT_ID, A))";
+    private final String CREATE_VIEW_DDL = "CREATE VIEW %s (NEW_COL1 INTEGER, NEW_COL2 INTEGER) AS SELECT * FROM %s WHERE B > 10";
+
+    public OrphanChildLinkRowsIT(boolean pagingEnabled) {
+        this.pagingEnabled = pagingEnabled;
+    }
+
+    @Parameterized.Parameters(name="OrphanChildLinkRowsIT_pagingEnabled={0}")
+    public static synchronized Collection<Boolean> data() {
+        return Arrays.asList(false, true);
+    }
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, "0");
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    /**
+     * Disable the child link scan task.
+     * Create tables T1 and T2. Create 3 views on T1.
+     * Create a view (same name as existing view on T1) on T2. This CREATE VIEW will fail.
+     * Verify if there was no orphan child link from T2.
+     *
+     * Instrument CQSI to fail phase three of CREATE VIEW. Create a new view V4 on T2 (passes) and V1 on T2 which will fail.
+     * Both links T2->V4 and T2->V1 will be in UNVERIFIED state, repaired during read.
+     * Check if T2 has only 1 child link.
+     */
+    @Test
+    public void testNoOrphanChildLinkRow() throws Exception {
+
+        ConnectionQueryServicesTestImpl.setFailPhaseThreeChildLinkWriteForTesting(false);
+        ChildLinkScanTask.disableChildLinkScanTask(true);
+
+        String tableName1 = "T_" + generateUniqueName();
+        String tableName2 = "T_" + generateUniqueName();
+        String sameViewName = "V_"+generateUniqueName();
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            connection.createStatement().execute(String.format(CREATE_TABLE_DDL, tableName1));
+            connection.createStatement().execute(String.format(CREATE_TABLE_DDL, tableName2));
+
+            //create 3 views in the first table
+            connection.createStatement().execute(String.format(CREATE_VIEW_DDL, sameViewName, tableName1));
+            for (int i=0; i<2; i++) {
+                connection.createStatement().execute(String.format(CREATE_VIEW_DDL, "V_"+generateUniqueName(), tableName1));
+            }
+
+            connection.createStatement().execute(String.format(CREATE_VIEW_DDL, sameViewName, tableName2));
+            Assert.fail("Exception should have been thrown when creating a view with same name on a different table.");
+        }
+        catch (TableAlreadyExistsException e) {
+            //expected since we are creating a view with the same name as an existing view
+        }
+
+        verifyNoOrphanChildLinkRow(tableName1, 3);
+        verifyNoOrphanChildLinkRow(tableName2, 0);
+
+        // configure CQSI to fail the last write phase of CREATE VIEW
+        // where child link mutations are set to VERIFIED or are deleted
+        ConnectionQueryServicesTestImpl.setFailPhaseThreeChildLinkWriteForTesting(true);
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            connection.createStatement().execute(String.format(CREATE_VIEW_DDL, "V_"+generateUniqueName(), tableName2));
+
+            connection.createStatement().execute(String.format(CREATE_VIEW_DDL, sameViewName, tableName2));
+            Assert.fail("Exception should have been thrown when creating a view with same name on a different table.");
+        }
+        catch (TableAlreadyExistsException e) {
+            //expected since we are creating a view with the same name as an existing view
+        }
+        verifyNoOrphanChildLinkRow(tableName1, 3);
+        verifyNoOrphanChildLinkRow(tableName2, 1);
+    }
+
+    /**
+     * Enable child link scan task and configure CQSI to fail the last write phase of CREATE VIEW
+     * Create 2 tables X and Y.
+     * Create a view (same name as existing view on table X) on table Y.
+     * Verify if all rows in HBase table are VERIFIED after Task finishes.
+     */
+    @Test
+    public void testChildLinkScanTaskRepair() throws Exception {
+
+        ConnectionQueryServicesTestImpl.setFailPhaseThreeChildLinkWriteForTesting(true);
+        ChildLinkScanTask.disableChildLinkScanTask(false);
+
+        String tableName1 = "T_" + generateUniqueName();
+        String tableName2 = "T_" + generateUniqueName();
+        String sameViewName = "V_"+generateUniqueName();
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            connection.createStatement().execute(String.format(CREATE_TABLE_DDL, tableName1));
+            connection.createStatement().execute(String.format(CREATE_TABLE_DDL, tableName2));
+            connection.createStatement().execute(String.format(CREATE_VIEW_DDL, sameViewName, tableName1));
+
+            connection.createStatement().execute(String.format(CREATE_VIEW_DDL, sameViewName, tableName2));
+            Assert.fail("Exception should have been thrown when creating a view with same name on a different table.");
+        }
+        catch (TableAlreadyExistsException e) {
+            //expected since we are creating a view with the same name as an existing view
+        }
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            // wait for TASKs to complete
+            waitForChildLinkScanTasks();
+
+            // scan the physical table and check there are no UNVERIFIED rows
+            PTable childLinkPTable = PhoenixRuntime.getTable(connection, SYSTEM_CHILD_LINK_NAME);
+            byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(childLinkPTable);
+            byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(childLinkPTable).getFirst();
+            Scan scan = new Scan();
+            HTable table = (HTable) connection.unwrap(PhoenixConnection.class).getQueryServices().getTable(TableName.valueOf(SYSTEM_CHILD_LINK_NAME).getName());
+            ResultScanner results = table.getScanner(scan);
+            Result result = results.next();
+            while (result != null) {
+                Assert.assertTrue("Found Child Link row with UNVERIFIED status", Arrays.equals(result.getValue(emptyCF, emptyCQ), VERIFIED_BYTES));
+                result = results.next();
+            }
+        }
+    }
+
+    /**
+     * Do multiple times: Create 2 tables and view with same name on both tables.
+     * Check if LIMIT query on SYSTEM.CHILD_LINK returns the right number of rows
+     * Check if only one child link is returned for every table.
+     */
+    @Test
+    public void testChildLinkQueryWithLimit() throws Exception {
+
+        ConnectionQueryServicesTestImpl.setFailPhaseThreeChildLinkWriteForTesting(true);
+        ChildLinkScanTask.disableChildLinkScanTask(true);
+
+        Properties props = new Properties();
+        if (pagingEnabled) {
+            props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, "0");
+        }
+        Map<String, String> expectedChildLinks = new HashMap<>();
+        try (Connection connection = DriverManager.getConnection(getUrl(), props)) {
+            for (int i=0; i<7; i++) {
+                String table1 = "T_" + generateUniqueName();
+                String table2 = "T_" + generateUniqueName();
+                String view = "V_" + generateUniqueName();
+                connection.createStatement().execute(String.format(CREATE_TABLE_DDL, table1));
+                connection.createStatement().execute(String.format(CREATE_TABLE_DDL, table2));
+                connection.createStatement().execute(String.format(CREATE_VIEW_DDL, view, table1));
+                expectedChildLinks.put(table1, view);
+                try {
+                    connection.createStatement().execute(String.format(CREATE_VIEW_DDL, view, table2));
+                    Assert.fail("Exception should have been thrown when creating a view with same name on a different table.");
+                }
+                catch (TableAlreadyExistsException e) {
+
+                }
+            }
+
+            String childLinkQuery = "SELECT * FROM SYSTEM.CHILD_LINK LIMIT 5";
+            ResultSet rs = connection.createStatement().executeQuery(childLinkQuery);
+            int count = 0;
+            while (rs.next()) {
+                count++;
+            }
+            Assert.assertEquals("Incorrect number of child link rows returned", 5, count);
+        }
+        for (String table : expectedChildLinks.keySet()) {
+            verifyNoOrphanChildLinkRow(table, 1);
+        }
+    }
+
+    private void verifyNoOrphanChildLinkRow(String table, int numExpectedChildLinks) throws Exception {
+        String childLinkQuery = "SELECT * FROM SYSTEM.CHILD_LINK";
+        if (table != null) {
+            childLinkQuery += (" WHERE TABLE_NAME=" + "'" + table + "'");
+        }
+
+        int count = 0;
+        Properties props = new Properties();
+        if (pagingEnabled) {
+            props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, "0");
+        }
+        try (Connection connection = DriverManager.getConnection(getUrl(), props)) {
+            ResultSet rs = connection.createStatement().executeQuery(childLinkQuery);
+            while (rs.next()) {
+                count++;
+            }
+        }
+        Assert.assertTrue("Found Orphan Linking Row", count <= numExpectedChildLinks);
+        Assert.assertTrue("All expected Child Links not returned by query", count >= numExpectedChildLinks);
+    }
+
+    /*
+    Wait for all child link scan tasks to finish.
+     */
+    public static void waitForChildLinkScanTasks() throws Exception {
+        int maxTries = 10, nTries=0;
+        int sleepIntervalMs = 2000;
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            do {
+                Thread.sleep(sleepIntervalMs);
+                ResultSet rs = connection.createStatement().executeQuery("SELECT COUNT(*) FROM SYSTEM.TASK WHERE " +
+                        PhoenixDatabaseMetaData.TASK_TYPE + " = " + PTable.TaskType.CHILD_LINK_SCAN.getSerializedValue() +
+                        " AND " + PhoenixDatabaseMetaData.TASK_STATUS + " != 'COMPLETED'");
+                rs.next();
+                int numPendingTasks = rs.getInt(1);
+                if (numPendingTasks == 0) break;
+            } while(++nTries < maxTries);
+        }
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
index 7e90118363..219eb8aa83 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
@@ -691,11 +691,13 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
             String parent2, String viewSchema, String viewName)
             throws SQLException {
 
-        final String querySysChildLink =
-                "SELECT * FROM SYSTEM.CHILD_LINK WHERE TABLE_SCHEM='%s' AND "
-                        + "TABLE_NAME='%s' AND COLUMN_FAMILY='%s' AND "
-                        + LINK_TYPE + " = " +
-                        PTable.LinkType.CHILD_TABLE.getSerializedValue();
+        /*
+        After PHOENIX-6141, view creation happens in three phases to avoid orphan rows in
+        SYSTEM.CHILD_LINK. We can instrument test CQSI to fail the third phase which will
+        lead to UNVERIFIED orphan rows needed for some tests in this class.
+         */
+        ConnectionQueryServicesTestImpl.setFailPhaseThreeChildLinkWriteForTesting(true);
+
         try (Connection conn = DriverManager.getConnection(getUrl());
                 Statement stmt = conn.createStatement()) {
             stmt.execute(String.format(CREATE_BASE_TABLE_DDL, parentSchema,
@@ -711,13 +713,6 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
             } catch (TableAlreadyExistsException ignored) {
                 // expected
             }
-
-            // Confirm that the orphan parent->child link exists after the
-            // second view creation
-            ResultSet rs = stmt.executeQuery(String.format(querySysChildLink,
-                    parentSchema,  parent2, SchemaUtil.getTableName(
-                            viewSchema, viewName)));
-            assertTrue(rs.next());
         }
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ChildLinkMetaDataEndpoint.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ChildLinkMetaDataEndpoint.java
index 90f516c170..46127eb5e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ChildLinkMetaDataEndpoint.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ChildLinkMetaDataEndpoint.java
@@ -20,12 +20,28 @@ package org.apache.phoenix.coprocessor;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.CreateViewAddChildLinkRequest;
 import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.ChildLinkMetaDataService;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
@@ -34,24 +50,40 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CHECK_VERIFY_COLUMN;
 import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.mutateRowsWithLocks;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES;
+import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
+import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
+
 
 /**
  * Endpoint co-processor through which Phoenix metadata mutations for SYSTEM.CHILD_LINK flow.
  * The {@code parent->child } links ({@link org.apache.phoenix.schema.PTable.LinkType#CHILD_TABLE})
  * are stored in the SYSTEM.CHILD_LINK table.
+ *
+ * After PHOENIX-6141, this also serves as an observer coprocessor
+ * that verifies scanned rows of SYSTEM.CHILD_LINK table.
  */
-public class ChildLinkMetaDataEndpoint extends ChildLinkMetaDataService implements RegionCoprocessor {
+public class ChildLinkMetaDataEndpoint extends ChildLinkMetaDataService
+        implements RegionCoprocessor, RegionObserver {
 
 	private static final Logger LOGGER = LoggerFactory.getLogger(ChildLinkMetaDataEndpoint.class);
     private RegionCoprocessorEnvironment env;
@@ -111,4 +143,197 @@ public class ChildLinkMetaDataEndpoint extends ChildLinkMetaDataService implemen
 		return phoenixAccessCoprocessorHost;
 	}
 
+    /**
+     * Class that verifies a given row of a SYSTEM.CHILD_LINK table.
+     * An instance of this class is created for each scanner on the table
+     * and used to verify individual rows.
+     */
+    public class ChildLinkMetaDataScanner extends ReadRepairScanner {
+
+        private Table sysCatHTable;
+        private Scan childLinkScan;
+        private Scan sysCatViewHeaderRowScan;
+        private Scan sysCatChildParentLinkScan;
+        private final String NULL_DELIMITER = "\0";
+
+        public ChildLinkMetaDataScanner(RegionCoprocessorEnvironment env,
+                                        Scan scan,
+                                        RegionScanner scanner) throws IOException {
+            super(env, scan, scanner);
+            sysCatChildParentLinkScan = new Scan();
+            setSysCatViewHeaderRowScan();
+            ageThreshold = env.getConfiguration().getLong(
+                    QueryServices.CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
+                    QueryServicesOptions.DEFAULT_CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS);
+            sysCatHTable = ServerUtil.ConnectionFactory.
+                    getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION, env).
+                    getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
+        }
+
+        /*
+        If the row is VERIFIED, remove the empty column from the row
+         */
+        @Override
+        public boolean verifyRow(List<Cell> cellList) {
+            long cellListSize = cellList.size();
+            Cell cell = null;
+            if (cellListSize == 0) {
+                return true;
+            }
+            Iterator<Cell> cellIterator = cellList.iterator();
+            while (cellIterator.hasNext()) {
+                cell = cellIterator.next();
+                if (isEmptyColumn(cell)) {
+                    if (Bytes.compareTo(
+                            cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                            VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0
+                        ) {
+                        return false;
+                    }
+                    // Empty column is not supposed to be returned to the client except
+                    // when it is the only column included in the scan
+                    if (cellListSize > 1) {
+                        cellIterator.remove();
+                    }
+                    return true;
+                }
+            }
+            // no empty column found
+            return false;
+        }
+
+        /*
+        Find parent link in syscat for given child link.
+        If found, mark child link row VERIFIED and start a new scan from it.
+        Otherwise, delete if row is old enough.
+         */
+        @Override
+        public void repairRow(List<Cell> row) throws IOException {
+            Cell cell = row.get(0);
+            byte[] rowKey = CellUtil.cloneRow(cell);
+            long ts = row.get(0).getTimestamp();
+
+            childLinkScan = new Scan(scan);
+
+            boolean isChildParentLinkPresent = isRowPresentInSysCat(sysCatChildParentLinkScan,
+                                                            getChildParentLinkSysCatRowKey(rowKey));
+
+            boolean isViewHeaderRowPresent = false;
+            if (isChildParentLinkPresent) {
+                isViewHeaderRowPresent = isRowPresentInSysCat(sysCatViewHeaderRowScan,
+                                                                getViewHeaderSysCatRowKey(rowKey));
+            }
+            // if row found, repair and verifyRowAndRemoveEmptyColumn
+            if (isChildParentLinkPresent && isViewHeaderRowPresent) {
+                markChildLinkVerified(rowKey, ts, region);
+                scanner.close();
+                childLinkScan.withStartRow(rowKey, true);
+                scanner = region.getScanner(childLinkScan);
+                hasMore = true;
+            }
+            // if not, delete if old enough, otherwise ignore
+            else {
+                deleteIfAgedEnough(rowKey, ts, region);
+            }
+            row.clear();
+        }
+
+        private boolean isRowPresentInSysCat(Scan scan, byte[] rowKey) throws IOException {
+            scan.withStartRow(rowKey, true);
+            scan.withStopRow(rowKey, true);
+            scan.setTimeRange(0, maxTimestamp);
+
+            Result result = null;
+            try (ResultScanner resultScanner = sysCatHTable.getScanner(scan)) {
+                result = resultScanner.next();
+            }
+            catch (Throwable t) {
+                ServerUtil.throwIOException(sysCatHTable.getName().toString(), t);
+            }
+
+            return (result != null) && (!result.isEmpty());
+        }
+
+        /*
+        Construct row key for SYSTEM.CATALOG view header row from a given SYSTEM.CHILD_LINK row key
+         */
+        private byte[] getViewHeaderSysCatRowKey(byte[] childLinkRowKey) {
+            String[] childLinkRowKeyCols = new String(childLinkRowKey, StandardCharsets.UTF_8)
+                                                .split(NULL_DELIMITER);
+            checkArgument(childLinkRowKeyCols.length == 5);
+            String childTenantId = childLinkRowKeyCols[3];
+            String childFullName = childLinkRowKeyCols[4];
+
+            String childSchema = SchemaUtil.getSchemaNameFromFullName(childFullName);
+            String childTable = SchemaUtil.getTableNameFromFullName(childFullName);
+
+            String[] sysCatRowKeyCols = new String[] {childTenantId, childSchema, childTable};
+            return String.join(NULL_DELIMITER, sysCatRowKeyCols).getBytes(StandardCharsets.UTF_8);
+        }
+
+        /*
+        Construct row key for SYSTEM.CATALOG parent->child link from a given SYSTEM.CHILD_LINK row key
+        SYSTEM.CATALOG -> (CHILD_TENANT_ID, CHILD_SCHEMA, CHILD_TABLE, PARENT_TENANT_ID, PARENT_FULL_NAME)
+        SYSTEM.CHILD_LINK -> (PARENT_TENANT_ID, PARENT_SCHEMA, PARENT_TABLE, CHILD_TENANT_ID, CHILD_FULL_NAME)
+         */
+        private byte[] getChildParentLinkSysCatRowKey(byte[] childLinkRowKey) {
+            String[] childLinkRowKeyCols = new String(childLinkRowKey, StandardCharsets.UTF_8)
+                                                .split(NULL_DELIMITER);
+            checkArgument(childLinkRowKeyCols.length == 5);
+            String parentTenantId = childLinkRowKeyCols[0];
+            String parentSchema = childLinkRowKeyCols[1];
+            String parentTable = childLinkRowKeyCols[2];
+            String childTenantId = childLinkRowKeyCols[3];
+            String childFullName = childLinkRowKeyCols[4];
+
+            String parentFullName = SchemaUtil.getTableName(parentSchema, parentTable);
+            String childSchema = SchemaUtil.getSchemaNameFromFullName(childFullName);
+            String childTable = SchemaUtil.getTableNameFromFullName(childFullName);
+
+            String[] sysCatRowKeyCols = new String[] {childTenantId, childSchema, childTable,
+                                                        parentTenantId, parentFullName};
+            return String.join(NULL_DELIMITER, sysCatRowKeyCols).getBytes(StandardCharsets.UTF_8);
+        }
+
+
+        private void deleteIfAgedEnough(byte[] rowKey, long ts, Region region) throws IOException {
+            if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
+                Delete del = new Delete(rowKey);
+                Mutation[] mutations = new Mutation[]{del};
+                region.batchMutate(mutations);
+            }
+        }
+
+
+        private void markChildLinkVerified(byte[] rowKey, long ts, Region region) throws IOException {
+            Put put = new Put(rowKey, ts);
+            put.addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
+            Mutation[] mutations = new Mutation[]{put};
+            region.batchMutate(mutations);
+        }
+
+        private void setSysCatViewHeaderRowScan() {
+            sysCatViewHeaderRowScan = new Scan();
+            sysCatViewHeaderRowScan.addColumn(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
+            SingleColumnValueFilter tableTypeFilter = new SingleColumnValueFilter(
+                    TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES, CompareOperator.EQUAL,
+                    PTableType.VIEW.getSerializedValue().getBytes());
+            sysCatViewHeaderRowScan.setFilter(tableTypeFilter);
+        }
+
+    }
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+        return Optional.of(this);
+    }
+
+    @Override
+    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+                                         Scan scan, RegionScanner s) throws IOException {
+        if (scan.getAttribute(CHECK_VERIFY_COLUMN) == null) {
+            return s;
+        }
+        return new ChildLinkMetaDataScanner(c.getEnvironment(), scan, new PagingRegionScanner(c.getEnvironment().getRegion(), s, scan));
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ReadRepairScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ReadRepairScanner.java
new file mode 100644
index 0000000000..ff22ae271a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ReadRepairScanner.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
+public abstract class ReadRepairScanner extends BaseRegionScanner {
+
+    public Logger LOGGER;
+    public RegionScanner scanner;
+    public Scan scan;
+    public RegionCoprocessorEnvironment env;
+    public byte[] emptyCF;
+    public byte[] emptyCQ;
+    public Region region;
+    public boolean hasMore;
+    public long pageSizeMs;
+    public long pageSize = Long.MAX_VALUE;
+    public long rowCount = 0;
+    public long maxTimestamp;
+    public long ageThreshold;
+    public boolean initialized = false;
+
+    public ReadRepairScanner(RegionCoprocessorEnvironment env, Scan scan, RegionScanner scanner) {
+        super(scanner);
+        LOGGER = LoggerFactory.getLogger(this.getClass());
+        this.env = env;
+        this.scan = scan;
+        this.scanner = scanner;
+        region = env.getRegion();
+        emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+        emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+        pageSizeMs = getPageSizeMsForRegionScanner(scan);
+        maxTimestamp = scan.getTimeRange().getMax();
+    }
+
+    private void init() throws IOException {
+        if (!initialized) {
+            PageFilter pageFilter = ScanUtil.removePageFilter(scan);
+            if (pageFilter != null) {
+                pageSize = pageFilter.getPageSize();
+                scanner.close();
+                scanner = region.getScanner(scan);
+            }
+            initialized = true;
+        }
+    }
+
+    /*
+    Method which checks whether a row is VERIFIED (i.e. does not need repair).
+     */
+    abstract boolean verifyRow(List<Cell> row);
+
+    /*
+    Method which repairs the given row
+     */
+    abstract void repairRow(List<Cell> row) throws IOException;
+
+    public boolean next(List<Cell> result, boolean raw) throws IOException {
+        try {
+            init();
+            long startTime = EnvironmentEdgeManager.currentTimeMillis();
+            do {
+                if (raw) {
+                    hasMore = scanner.nextRaw(result);
+                } else {
+                    hasMore = scanner.next(result);
+                }
+                if (result.isEmpty()) {
+                    return hasMore;
+                }
+                if (isDummy(result)) {
+                    return true;
+                }
+                Cell cell = result.get(0);
+                if (verifyRowAndRepairIfNecessary(result)) {
+                    break;
+                }
+                if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) {
+                    byte[] rowKey = CellUtil.cloneRow(cell);
+                    result.clear();
+                    getDummyResult(rowKey, result);
+                    return true;
+                }
+                // skip this row as it is invalid
+                // if there is no more row, then result will be an empty list
+            } while (hasMore);
+            rowCount++;
+            if (rowCount == pageSize) {
+                return false;
+            }
+            return hasMore;
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+            return false; // impossible
+        }
+    }
+
+    @Override
+    public boolean next(List<Cell> result) throws IOException {
+        return next(result, false);
+    }
+
+    @Override
+    public boolean nextRaw(List<Cell> result) throws IOException {
+        return next(result, true);
+    }
+
+    /**
+     * Helper method to verifies and repairs the row if necessary.
+     * @param cellList
+     * @return true if the row is already VERIFIED, false if the row needed repair
+     * @throws IOException
+     */
+    private boolean verifyRowAndRepairIfNecessary(List<Cell> cellList) throws IOException {
+        // check if row is VERIFIED
+        if (verifyRow(cellList)) {
+            return true;
+        }
+        else {
+            try {
+                repairRow(cellList);
+            } catch (IOException e) {
+                LOGGER.warn("Row Repair failure on region {}.",
+                        env.getRegionInfo().getRegionNameAsString());
+                throw e;
+            }
+
+            if (cellList.isEmpty()) {
+                return false;
+            }
+            return true;
+        }
+    }
+
+    public boolean isEmptyColumn(Cell cell) {
+        return Bytes.compareTo(
+                cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+                emptyCF, 0, emptyCF.length) == 0 &&
+                Bytes.compareTo(
+                    cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                    emptyCQ, 0, emptyCQ.length) == 0;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index f5ae114faf..7431d12418 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -73,6 +73,7 @@ public class TaskRegionObserver implements RegionObserver, RegionCoprocessor {
             .put(TaskType.DROP_CHILD_VIEWS, "org.apache.phoenix.coprocessor.tasks.DropChildViewsTask")
             .put(TaskType.INDEX_REBUILD, "org.apache.phoenix.coprocessor.tasks.IndexRebuildTask")
             .put(TaskType.TRANSFORM_MONITOR, "org.apache.phoenix.coprocessor.tasks.TransformMonitorTask")
+            .put(TaskType.CHILD_LINK_SCAN, "org.apache.phoenix.coprocessor.tasks.ChildLinkScanTask")
             .build();
 
     public enum TaskResultCode {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/ChildLinkScanTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/ChildLinkScanTask.java
new file mode 100644
index 0000000000..f41200ad31
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/ChildLinkScanTask.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+
+/*
+Task to run a simple select * query on SYSTEM.CHILD_LINK table
+to trigger read repair and verify any unverified rows.
+ */
+public class ChildLinkScanTask extends BaseTask {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ChildLinkScanTask.class);
+    private static final String CHILD_LINK_QUERY = "SELECT COUNT(*) FROM SYSTEM.CHILD_LINK";
+    private static boolean isDisabled = false;
+
+    @VisibleForTesting
+    public static void disableChildLinkScanTask(boolean disable) {
+        isDisabled = disable;
+    }
+
+    @Override
+    public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+
+        if (isDisabled) {
+            return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
+                    "ChildLinkScan task is disabled.");
+        }
+
+        int count = 0;
+        try {
+            PhoenixConnection pconn = QueryUtil.getConnectionOnServer(
+                    env.getConfiguration()).unwrap(PhoenixConnection.class);
+            ResultSet rs = pconn.createStatement().executeQuery(CHILD_LINK_QUERY);
+            if (rs.next()) {
+                count = rs.getInt(1);
+            }
+        }
+        catch (Exception e) {
+            LOGGER.error("Exception in Child Link Scan Task: " + e);
+            return new TaskRegionObserver.TaskResult(
+                    TaskRegionObserver.TaskResultCode.FAIL, e.getMessage());
+        }
+        return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS,
+                                            "Number of rows in SYSTEM.CHILD_LINK: " + count);
+    }
+
+    @Override
+    public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord taskRecord)
+            throws Exception {
+        return null;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index c5f5b8fd52..752c46a76e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -49,6 +49,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAM
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_HBASE_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
@@ -74,6 +75,8 @@ import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_FAIL
 import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_SUCCESS;
 import static org.apache.phoenix.monitoring.MetricType.TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS;
 import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
+import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
@@ -95,6 +98,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -274,6 +278,8 @@ import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableProperty;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PInteger;
@@ -291,7 +297,9 @@ import org.apache.phoenix.transaction.TransactionFactory.Provider;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.ConfigUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.JDBCUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -304,7 +312,6 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.StringUtil;
-import org.apache.phoenix.util.TimeKeeper;
 import org.apache.phoenix.util.UpgradeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -394,6 +401,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private ServerSideRPCControllerFactory serverSideRPCControllerFactory;
     private boolean localIndexUpgradeRequired;
 
+
     private static interface FeatureSupported {
         boolean isSupported(ConnectionQueryServices services);
     }
@@ -2158,29 +2166,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
 
         // Avoid the client-server RPC if this is not a view creation
+        // For view creation, write UNVERIFIED child_link rows
         if (!childLinkMutations.isEmpty()) {
-            // Send mutations for parent-child links to SYSTEM.CHILD_LINK
-            // We invoke this using rowKey available in the first element
-            // of childLinkMutations.
-            final byte[] rowKey = childLinkMutations.get(0).getRow();
-            final RpcController controller = getController(PhoenixDatabaseMetaData.SYSTEM_LINK_HBASE_TABLE_NAME);
-            final MetaDataMutationResult result =
-                childLinkMetaDataCoprocessorExec(rowKey,
-                    new ChildLinkMetaDataServiceCallBack(controller, childLinkMutations));
-
-            switch (result.getMutationCode()) {
-                case UNABLE_TO_CREATE_CHILD_LINK:
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_CREATE_CHILD_LINK)
-                            .setSchemaName(Bytes.toString(schemaBytes))
-                            .setTableName(Bytes.toString(physicalTableNameBytes)).build().buildException();
-                default:
-                    break;
-            }
+            sendChildLinkMutations(childLinkMutations, false, false, physicalTableNameBytes, schemaBytes);
         }
 
         // Send the remaining metadata mutations to SYSTEM.CATALOG
         byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes, tableBytes);
-        return metaDataCoprocessorExec(SchemaUtil.getPhysicalHBaseTableName(schemaBytes, tableBytes,
+        MetaDataMutationResult result = metaDataCoprocessorExec(SchemaUtil.getPhysicalHBaseTableName(schemaBytes, tableBytes,
                 SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, this.props)).toString(),
                 tableKey,
                 new Batch.Call<MetaDataService, MetaDataResponse>() {
@@ -2207,6 +2200,89 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 return rpcCallback.get();
             }
         });
+
+        //For view creation, if SYSCAT rpc succeeds, mark child_link rows as VERIFIED
+        if (!childLinkMutations.isEmpty()) {
+            try {
+                if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
+                    sendChildLinkMutations(childLinkMutations, true, false, physicalTableNameBytes, schemaBytes);
+                } else {
+                    sendChildLinkMutations(childLinkMutations, true, true, physicalTableNameBytes, schemaBytes);
+                }
+            }
+            catch (SQLException e) {
+                //unverified rows will be repaired during read
+                LOGGER.debug("Exception in phase-3 of view creation: " + e.getMessage());
+                addChildLinkScanTask();
+            }
+        }
+        return result;
+    }
+
+    /*
+    Helper method to send mutations to SYSTEM.CHILD_LINK using its endpoint coprocessor
+     */
+    public void sendChildLinkMutations(List<Mutation> mutations, boolean isVerified, boolean isDelete,
+                                        byte[] physicalTableNameBytes, byte[] schemaBytes)
+            throws SQLException {
+
+        // get empty column information
+        PTable childLinkLogicalTable = getTable(null, PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME, HConstants.LATEST_TIMESTAMP);
+        byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(childLinkLogicalTable);
+        byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(childLinkLogicalTable).getFirst();
+
+        // add empty column value to mutations or create delete mutations for phase-3
+        List<Mutation> childLinkMutations = new ArrayList<>();
+        for (Mutation m : mutations) {
+            if (isDelete) {
+                Delete delete = new Delete(m.getRow());
+                childLinkMutations.add(delete);
+            }
+            else {
+                Put put = isVerified ? new Put(m.getRow()) : (Put)m;
+                byte[] emptyColumnValue = isVerified ? VERIFIED_BYTES : UNVERIFIED_BYTES;
+                put.addColumn(emptyCF, emptyCQ, emptyColumnValue);
+                childLinkMutations.add(put);
+            }
+        }
+
+        // Send mutations for parent-child links to SYSTEM.CHILD_LINK
+        // We invoke this using rowKey available in the first element
+        // of childLinkMutations.
+        final byte[] rowKey = childLinkMutations.get(0).getRow();
+        final RpcController controller = getController(PhoenixDatabaseMetaData.SYSTEM_LINK_HBASE_TABLE_NAME);
+        final MetaDataMutationResult result =
+                childLinkMetaDataCoprocessorExec(rowKey,
+                        new ChildLinkMetaDataServiceCallBack(controller, childLinkMutations));
+
+        switch (result.getMutationCode()) {
+            case UNABLE_TO_CREATE_CHILD_LINK:
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_CREATE_CHILD_LINK)
+                        .setSchemaName(Bytes.toString(schemaBytes))
+                        .setTableName(Bytes.toString(physicalTableNameBytes)).build().buildException();
+            default:
+                break;
+        }
+    }
+
+    /*
+    Add a task to SYSTEM.TASK table which does a simple select * query on SYSTEM.CHILD_LINK table.
+    This should trigger the read repair step and verify any unverified rows.
+     */
+    private void addChildLinkScanTask() {
+
+        try {
+            PhoenixConnection conn = QueryUtil.getConnection(config).unwrap(PhoenixConnection.class);
+            Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+                    .setConn(conn)
+                    .setTaskType(PTable.TaskType.CHILD_LINK_SCAN)
+                    .setTaskStatus(PTable.TaskStatus.CREATED.toString())
+                    .setStartTs(new Timestamp(EnvironmentEdgeManager.currentTimeMillis()))
+                    .setTableName(SYSTEM_CHILD_LINK_TABLE)
+                    .build());
+        } catch (Throwable t) {
+            LOGGER.error("Adding a task to scan SYSTEM.CHILD_LINK failed!", t);
+        }
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index e4885a7910..53252941a9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -350,6 +350,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED = "phoenix.ttl.server_side.masking.enabled";
     // The time limit on the amount of work to be done in one RPC call
     public static final String PHOENIX_SERVER_PAGE_SIZE_MS = "phoenix.server.page.size.ms";
+    // The minimum age of an unverified child link row to be eligible for deletion
+    public static final String CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB = "phoenix.child.link.row.age.threshold.to.delete.ms";
     // Phoenix TTL implemented by CompactionScanner and TTLRegionScanner is enabled
     public static final String PHOENIX_TABLE_TTL_ENABLED = "phoenix.table.ttl.enabled";
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index adb3b55e2f..78305e50cc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -403,6 +403,8 @@ public class QueryServicesOptions {
     public static final int DEFAULT_MAX_REGION_LOCATIONS_SIZE_EXPLAIN_PLAN = 5;
 
 
+    public static final long DEFAULT_CHILD_LINK_ROW_AGE_THRESHOLD_TO_DELETE_MS = 1 * 60 * 60 * 1000; // 1 hour
+
     private final Configuration config;
 
     private QueryServicesOptions(Configuration config) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 1798d444b6..6e44586363 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -204,7 +204,8 @@ public interface PTable extends PMetaDataEntity {
     public enum TaskType {
         DROP_CHILD_VIEWS((byte)1),
         INDEX_REBUILD((byte)2),
-        TRANSFORM_MONITOR((byte)3);
+        TRANSFORM_MONITOR((byte)3),
+        CHILD_LINK_SCAN((byte)4);
 
         private final byte[] byteValue;
         private final byte serializedValue;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 7f2690ed77..3928142f95 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -79,6 +79,7 @@ import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
 import org.apache.phoenix.query.QueryConstants;
@@ -1302,10 +1303,22 @@ public class ScanUtil {
         }
     }
 
+    private static void setScanAtrributesForChildLinkRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) {
+        if (table.getName().getString().equals(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME)) {
+            scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES);
+            byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+            byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF);
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
+            addEmptyColumnToScan(scan, emptyCF, emptyCQ);
+        }
+    }
+
     public static void setScanAttributesForClient(Scan scan, PTable table,
                                                   PhoenixConnection phoenixConnection) throws SQLException {
         setScanAttributesForIndexReadRepair(scan, table, phoenixConnection);
         setScanAttributesForPhoenixTTL(scan, table, phoenixConnection);
+        setScanAtrributesForChildLinkRepair(scan, table, phoenixConnection);
         byte[] emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME);
         byte[] emptyCQ = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME);
         if (emptyCF != null && emptyCQ != null) {