You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2020/10/21 16:51:23 UTC

[phoenix] branch 4.x updated: PHOENIX-6142: Make DDL operations resilient to orphan parent->child linking rows in SYSTEM.CHILD_LINK

This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new afda3c1  PHOENIX-6142: Make DDL operations resilient to orphan parent->child linking rows in SYSTEM.CHILD_LINK
afda3c1 is described below

commit afda3c1f37b342aab0dafa1c5376d344c9e23113
Author: Chinmay Kulkarni <ch...@gmail.com>
AuthorDate: Tue Sep 29 18:27:50 2020 -0700

    PHOENIX-6142: Make DDL operations resilient to orphan parent->child linking rows in SYSTEM.CHILD_LINK
---
 .../org/apache/phoenix/end2end/ViewMetadataIT.java | 316 ++++++++++++-
 .../org/apache/phoenix/end2end/ViewUtilIT.java     | 158 +++++--
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  | 494 +++++++++++----------
 .../org/apache/phoenix/schema/MetaDataClient.java  |  20 +-
 .../java/org/apache/phoenix/util/UpgradeUtil.java  | 126 ++++--
 .../java/org/apache/phoenix/util/ViewUtil.java     | 339 +++++++++++---
 6 files changed, 1058 insertions(+), 395 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
index a6beba9..51fc812 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
@@ -20,7 +20,10 @@ package org.apache.phoenix.end2end;
 import static com.google.common.collect.Lists.newArrayListWithExpectedSize;
 import static org.apache.phoenix.coprocessor.TaskRegionObserver.TASK_DETAILS;
 import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_MODIFY_VIEW_PK;
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_MUTATE_TABLE;
 import static org.apache.phoenix.exception.SQLExceptionCode.NOT_NULLABLE_COLUMN_IN_ROW_KEY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LINK_HBASE_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
@@ -28,6 +31,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TASK_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static org.apache.phoenix.schema.PTable.TaskType.DROP_CHILD_VIEWS;
+import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -41,6 +46,8 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -52,25 +59,33 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost;
+import org.apache.phoenix.coprocessor.TableInfo;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
 import org.apache.phoenix.end2end.ViewIT.TestMetaDataRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnAlreadyExistsException;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.ViewUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -81,6 +96,18 @@ import com.google.common.collect.Maps;
 public class ViewMetadataIT extends SplitSystemCatalogIT {
 
     private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+    final static String BASE_TABLE_SCHEMA = "S";
+    final static String CHILD_VIEW_LEVEL_1_SCHEMA = "S1";
+    private final static String CHILD_VIEW_LEVEL_2_SCHEMA = "S2";
+    private final static String CHILD_VIEW_LEVEL_3_SCHEMA = "S3";
+    final static String CREATE_BASE_TABLE_DDL =
+            "CREATE TABLE %s.%s (A INTEGER NOT NULL PRIMARY KEY, B INTEGER, C INTEGER)";
+    final static String CREATE_CHILD_VIEW_LEVEL_1_DDL =
+            "CREATE VIEW %s.%s (NEW_COL1 INTEGER) AS SELECT * FROM %s.%s WHERE B > 10";
+    final static String CREATE_CHILD_VIEW_LEVEL_2_DDL =
+            "CREATE VIEW %s.%s (NEW_COL2 INTEGER) AS SELECT * FROM %s.%s WHERE NEW_COL1=5";
+    final static String CREATE_CHILD_VIEW_LEVEL_3_DDL =
+            "CREATE VIEW %s.%s (NEW_COL3 INTEGER) AS SELECT * FROM %s.%s WHERE NEW_COL2=10";
 
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
@@ -294,7 +321,286 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
         }
     }
 
-    private void runDropChildViewsTask() {
+    @Test
+    public void testAlterTableIsResilientToOrphanLinks() throws SQLException {
+        final String parent1TableName = generateUniqueName();
+        final String parent2TableName = generateUniqueName();
+        final String viewName = "V_" + generateUniqueName();
+        // Note that this column name is the same as the new column on the child view
+        final String alterTableDDL = "ALTER TABLE %s ADD NEW_COL1 VARCHAR";
+        createOrphanLink(SCHEMA1, parent1TableName, parent2TableName, SCHEMA2, viewName);
+
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            // Should not fail since this table is unrelated to the view in spite of
+            // the orphan parent->child link
+            stmt.execute(String.format(alterTableDDL,
+                    SchemaUtil.getTableName(SCHEMA1, parent2TableName)));
+            try {
+                stmt.execute(String.format(alterTableDDL,
+                        SchemaUtil.getTableName(SCHEMA1, parent1TableName)));
+                fail("Adding column should be disallowed since there is a conflicting column type "
+                        + "on the child view");
+            } catch (SQLException sqlEx) {
+                assertEquals(CANNOT_MUTATE_TABLE.getErrorCode(), sqlEx.getErrorCode());
+            }
+        }
+    }
+
+    @Test
+    public void testDropTableIsResilientToOrphanLinks() throws SQLException {
+        final String parent1TableName = generateUniqueName();
+        final String parent2TableName = generateUniqueName();
+        final String viewName = "V_" + generateUniqueName();
+        final String dropTableNoCascadeDDL = "DROP TABLE %s ";
+        createOrphanLink(SCHEMA1, parent1TableName, parent2TableName, SCHEMA2, viewName);
+
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            // Should not fail since this table is unrelated to the view in spite of
+            // the orphan parent->child link
+            stmt.execute(String.format(dropTableNoCascadeDDL,
+                    SchemaUtil.getTableName(SCHEMA1, parent2TableName)));
+            try {
+                stmt.execute(String.format(dropTableNoCascadeDDL,
+                        SchemaUtil.getTableName(SCHEMA1, parent1TableName)));
+                fail("Drop table without cascade should fail since there is a child view");
+            } catch (SQLException sqlEx) {
+                assertEquals(CANNOT_MUTATE_TABLE.getErrorCode(), sqlEx.getErrorCode());
+            }
+        }
+    }
+
+    /**
+     * Create a view hierarchy:
+     *
+     *              _____ parent1 ____                                 _____ parent2 ____
+     *             /         |        \                               /         |        \
+     *    level1view1   level1view3  level1view4            level1view2   level1view5  level1view6
+     *         |                                                 |
+     *  t001.level2view1                                 t001.level2view2
+     *                                                           |
+     *                                                   t001.level3view1
+     *
+     * We induce orphan links by recreating the same view names on top of different parents
+     */
+    @Test
+    public void testViewHierarchyWithOrphanLinks() throws Exception {
+        final List<TableInfo> expectedLegitChildViewsListForParent1 = new ArrayList<>();
+        final List<TableInfo> expectedLegitChildViewsListForParent2 = new ArrayList<>();
+        final String tenantId = "t001";
+        final String parent1TableName = "P1_" + generateUniqueName();
+        final String parent2TableName = "P2_" + generateUniqueName();
+
+        final String level1ViewName1 = "L1_V_1_" + generateUniqueName();
+        final String level1ViewName2 = "L1_V_2_" + generateUniqueName();
+        final String level1ViewName3 = "L1_V_3_" + generateUniqueName();
+        final String level1ViewName4 = "L1_V_4_" + generateUniqueName();
+        final String level1ViewName5 = "L1_V_5_" + generateUniqueName();
+        final String level1ViewName6 = "L1_V_6_" + generateUniqueName();
+
+        final String level2ViewName1 = "L2_V_1_" + generateUniqueName();
+        final String level2ViewName2 = "L2_V_2_" + generateUniqueName();
+
+        final String level3ViewName1 = "L3_V_1_" + generateUniqueName();
+        createOrphanLink(BASE_TABLE_SCHEMA, parent1TableName, parent2TableName,
+                CHILD_VIEW_LEVEL_1_SCHEMA, level1ViewName1);
+
+        // Create other legit views on top of parent1 and parent2
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(String.format(CREATE_CHILD_VIEW_LEVEL_1_DDL,
+                    CHILD_VIEW_LEVEL_1_SCHEMA, level1ViewName3,
+                    BASE_TABLE_SCHEMA, parent1TableName));
+            conn.createStatement().execute(String.format(CREATE_CHILD_VIEW_LEVEL_1_DDL,
+                    CHILD_VIEW_LEVEL_1_SCHEMA, level1ViewName4,
+                    BASE_TABLE_SCHEMA, parent1TableName));
+
+            conn.createStatement().execute(String.format(CREATE_CHILD_VIEW_LEVEL_1_DDL,
+                    CHILD_VIEW_LEVEL_1_SCHEMA, level1ViewName2,
+                    BASE_TABLE_SCHEMA, parent2TableName));
+            conn.createStatement().execute(String.format(CREATE_CHILD_VIEW_LEVEL_1_DDL,
+                    CHILD_VIEW_LEVEL_1_SCHEMA, level1ViewName5,
+                    BASE_TABLE_SCHEMA, parent2TableName));
+            conn.createStatement().execute(String.format(CREATE_CHILD_VIEW_LEVEL_1_DDL,
+                    CHILD_VIEW_LEVEL_1_SCHEMA, level1ViewName6,
+                    BASE_TABLE_SCHEMA, parent2TableName));
+        }
+        Properties props = new Properties();
+        props.put(TENANT_ID_ATTRIB, tenantId);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute(String.format(CREATE_CHILD_VIEW_LEVEL_2_DDL,
+                    CHILD_VIEW_LEVEL_2_SCHEMA, level2ViewName1,
+                    CHILD_VIEW_LEVEL_1_SCHEMA, level1ViewName1));
+            conn.createStatement().execute(String.format(CREATE_CHILD_VIEW_LEVEL_2_DDL,
+                    CHILD_VIEW_LEVEL_2_SCHEMA, level2ViewName2,
+                    CHILD_VIEW_LEVEL_1_SCHEMA, level1ViewName2));
+
+            // Try to recreate the same view on a different global view to create an orphan link
+            try {
+                conn.createStatement().execute(String.format(CREATE_CHILD_VIEW_LEVEL_2_DDL,
+                        CHILD_VIEW_LEVEL_2_SCHEMA, level2ViewName2,
+                        CHILD_VIEW_LEVEL_1_SCHEMA, level1ViewName1));
+                fail("Creating the same view again should have failed");
+            } catch (TableAlreadyExistsException ignored) {
+                // expected
+            }
+            // Create a third level view
+            conn.createStatement().execute(String.format(CREATE_CHILD_VIEW_LEVEL_3_DDL,
+                    CHILD_VIEW_LEVEL_3_SCHEMA, level3ViewName1,
+                    CHILD_VIEW_LEVEL_2_SCHEMA, level2ViewName2));
+        }
+        // Populate all expected legitimate views in depth-first order
+        expectedLegitChildViewsListForParent1.add(new TableInfo(null,
+                CHILD_VIEW_LEVEL_1_SCHEMA.getBytes(), level1ViewName1.getBytes()));
+        expectedLegitChildViewsListForParent1.add(new TableInfo(tenantId.getBytes(),
+                CHILD_VIEW_LEVEL_2_SCHEMA.getBytes(), level2ViewName1.getBytes()));
+        expectedLegitChildViewsListForParent1.add(new TableInfo(null,
+                CHILD_VIEW_LEVEL_1_SCHEMA.getBytes(), level1ViewName3.getBytes()));
+        expectedLegitChildViewsListForParent1.add(new TableInfo(null,
+                CHILD_VIEW_LEVEL_1_SCHEMA.getBytes(), level1ViewName4.getBytes()));
+
+        expectedLegitChildViewsListForParent2.add(new TableInfo(null,
+                CHILD_VIEW_LEVEL_1_SCHEMA.getBytes(), level1ViewName2.getBytes()));
+        expectedLegitChildViewsListForParent2.add(new TableInfo(tenantId.getBytes(),
+                CHILD_VIEW_LEVEL_2_SCHEMA.getBytes(), level2ViewName2.getBytes()));
+        expectedLegitChildViewsListForParent2.add(new TableInfo(tenantId.getBytes(),
+                CHILD_VIEW_LEVEL_3_SCHEMA.getBytes(), level3ViewName1.getBytes()));
+        expectedLegitChildViewsListForParent2.add(new TableInfo(null,
+                CHILD_VIEW_LEVEL_1_SCHEMA.getBytes(), level1ViewName5.getBytes()));
+        expectedLegitChildViewsListForParent2.add(new TableInfo(null,
+                CHILD_VIEW_LEVEL_1_SCHEMA.getBytes(), level1ViewName6.getBytes()));
+
+        /*
+            After this setup, SYSTEM.CHILD_LINK parent->child linking rows will look like this:
+            parent1->level1view1
+            parent1->level1view3
+            parent1->level1view4
+
+            parent2->level1view1 (orphan)
+            parent2->level1view2
+            parent2->level1view5
+            parent2->level1view6
+
+            level1view1->t001.level2view1
+            level1view1->t001.level2view2 (orphan)
+            level1view2->t001.level2view2
+
+            t001.level2view2->t001.level3view1
+         */
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+            try (Table childLinkTable = cqs.getTable(SchemaUtil.getPhysicalName(
+                    SYSTEM_LINK_HBASE_TABLE_NAME.toBytes(), cqs.getProps()).getName())) {
+                Pair<List<PTable>, List<TableInfo>> allDescendants =
+                        ViewUtil.findAllDescendantViews(childLinkTable, cqs.getConfiguration(),
+                                EMPTY_BYTE_ARRAY, BASE_TABLE_SCHEMA.getBytes(),
+                                parent1TableName.getBytes(), HConstants.LATEST_TIMESTAMP, false);
+                List<PTable> legitChildViews = allDescendants.getFirst();
+                List<TableInfo> orphanViews = allDescendants.getSecond();
+                // All of the orphan links are legit views of the other parent so they don't count
+                // as orphan views for this parent
+                assertTrue(orphanViews.isEmpty());
+                assertLegitChildViews(expectedLegitChildViewsListForParent1, legitChildViews);
+
+                allDescendants = ViewUtil.findAllDescendantViews(childLinkTable,
+                        cqs.getConfiguration(), EMPTY_BYTE_ARRAY, BASE_TABLE_SCHEMA.getBytes(),
+                        parent2TableName.getBytes(), HConstants.LATEST_TIMESTAMP, false);
+                legitChildViews = allDescendants.getFirst();
+                orphanViews = allDescendants.getSecond();
+                // All of the orphan links are legit views of the other parent so they don't count
+                // as orphan views for this parent
+                assertTrue(orphanViews.isEmpty());
+                assertLegitChildViews(expectedLegitChildViewsListForParent2, legitChildViews);
+
+                // Drop one of the legitimate level 1 views that was on top of parent1
+                conn.createStatement().execute(String.format("DROP VIEW %s.%s CASCADE",
+                        CHILD_VIEW_LEVEL_1_SCHEMA, level1ViewName1));
+                // The view hierarchy rooted at this view is 2 levels deep so we must run the
+                // DropChildViewsTask twice to clear out views level by level
+                runDropChildViewsTask();
+                runDropChildViewsTask();
+
+                expectedLegitChildViewsListForParent1.clear();
+                expectedLegitChildViewsListForParent1.add(new TableInfo(
+                        null, CHILD_VIEW_LEVEL_1_SCHEMA.getBytes(), level1ViewName3.getBytes()));
+                expectedLegitChildViewsListForParent1.add(new TableInfo(
+                        null, CHILD_VIEW_LEVEL_1_SCHEMA.getBytes(), level1ViewName4.getBytes()));
+
+                allDescendants = ViewUtil.findAllDescendantViews(childLinkTable,
+                        cqs.getConfiguration(), EMPTY_BYTE_ARRAY, BASE_TABLE_SCHEMA.getBytes(),
+                        parent1TableName.getBytes(), HConstants.LATEST_TIMESTAMP, false);
+                legitChildViews = allDescendants.getFirst();
+                orphanViews = allDescendants.getSecond();
+                assertLegitChildViews(expectedLegitChildViewsListForParent1, legitChildViews);
+                assertTrue(orphanViews.isEmpty());
+
+                allDescendants = ViewUtil.findAllDescendantViews(childLinkTable,
+                        cqs.getConfiguration(), EMPTY_BYTE_ARRAY, BASE_TABLE_SCHEMA.getBytes(),
+                        parent2TableName.getBytes(), HConstants.LATEST_TIMESTAMP, false);
+                legitChildViews = allDescendants.getFirst();
+                orphanViews = allDescendants.getSecond();
+
+                // We prune orphan branches and so we will not explore any more orphan links that
+                // stem from the first found orphan
+                assertEquals(1, orphanViews.size());
+                assertEquals(0, orphanViews.get(0).getTenantId().length);
+                assertEquals(CHILD_VIEW_LEVEL_1_SCHEMA,
+                        Bytes.toString(orphanViews.get(0).getSchemaName()));
+                assertEquals(level1ViewName1, Bytes.toString(orphanViews.get(0).getTableName()));
+                assertLegitChildViews(expectedLegitChildViewsListForParent2, legitChildViews);
+            }
+        }
+    }
+
+    private void assertLegitChildViews(List<TableInfo> expectedList, List<PTable> actualList) {
+        assertEquals(expectedList.size(), actualList.size());
+        for (int i=0; i<expectedList.size(); i++) {
+            TableInfo expectedChild = expectedList.get(i);
+            byte[] expectedTenantId = expectedChild.getTenantId();
+            PName actualTenantId = actualList.get(i).getTenantId();
+            assertTrue((expectedTenantId == null && actualTenantId == null) ||
+                    ((actualTenantId != null && expectedTenantId != null) &&
+                            Arrays.equals(actualTenantId.getBytes(), expectedTenantId)));
+            assertEquals(Bytes.toString(expectedChild.getSchemaName()),
+                    actualList.get(i).getSchemaName().getString());
+            assertEquals(Bytes.toString(expectedChild.getTableName()),
+                    actualList.get(i).getTableName().getString());
+        }
+    }
+
+    // Create 2 base tables and attempt to create the same view on top of both. The second view
+    // creation will fail, however an orphan parent->child link will be created inside
+    // SYSTEM.CHILD_LINK between parent2 and the view
+    static void createOrphanLink(String parentSchema, String parent1, String parent2,
+            String viewSchema, String viewName) throws SQLException {
+
+        final String querySysChildLink =
+                "SELECT * FROM SYSTEM.CHILD_LINK WHERE TABLE_SCHEM='%s' AND "
+                        + "TABLE_NAME='%s' AND COLUMN_FAMILY='%s' AND " + LINK_TYPE + " = " +
+                        PTable.LinkType.CHILD_TABLE.getSerializedValue();
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            stmt.execute(String.format(CREATE_BASE_TABLE_DDL, parentSchema, parent1));
+            stmt.execute(String.format(CREATE_BASE_TABLE_DDL, parentSchema, parent2));
+            stmt.execute(String.format(CREATE_CHILD_VIEW_LEVEL_1_DDL, viewSchema, viewName,
+                    parentSchema, parent1));
+            try {
+                stmt.execute(String.format(CREATE_CHILD_VIEW_LEVEL_1_DDL, viewSchema, viewName,
+                        parentSchema, parent2));
+                fail("Creating the same view again should have failed");
+            } catch (TableAlreadyExistsException ignored) {
+                // expected
+            }
+
+            // Confirm that the orphan parent->child link exists after the second view creation
+            ResultSet rs = stmt.executeQuery(String.format(querySysChildLink, parentSchema,
+                    parent2, SchemaUtil.getTableName(viewSchema, viewName)));
+            assertTrue(rs.next());
+        }
+    }
+
+    void runDropChildViewsTask() {
         // Run DropChildViewsTask to complete the tasks for dropping child views
         TaskRegionObserver.SelfHealingTask task = new TaskRegionObserver.SelfHealingTask(
                 TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
@@ -429,8 +735,8 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
         for (int i = 0; i < numTasks; i++) {
             Timestamp maxTs = new Timestamp(HConstants.LATEST_TIMESTAMP);
             assertNotEquals("Should have got a valid timestamp", maxTs, rs.getTimestamp(2));
-            assertTrue("Task should be completed",
-                    PTable.TaskStatus.COMPLETED.toString().equals(rs.getString(6)));
+            assertEquals("Task should be completed", PTable.TaskStatus.COMPLETED.toString(),
+                    rs.getString(6));
             assertNotNull("Task end time should not be null", rs.getTimestamp(7));
             String taskData = rs.getString(9);
             assertTrue("Task data should contain final status", taskData != null &&
@@ -523,7 +829,7 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
             fail();
         }
         catch (SQLException e) {
-            assertEquals(SQLExceptionCode.CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
+            assertEquals(CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
         }
 
         // drop table cascade should succeed
@@ -710,7 +1016,7 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
             conn.createStatement().execute("ALTER TABLE " + fullTableName + " DROP COLUMN v1");
             fail();
         } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
+            assertEquals(CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
         }
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewUtilIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewUtilIT.java
index 634ff72..1332a3d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewUtilIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewUtilIT.java
@@ -17,13 +17,17 @@
  */
 package org.apache.phoenix.end2end;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.TableInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TableViewFinderResult;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ViewUtil;
 import org.junit.Test;
@@ -31,16 +35,24 @@ import org.junit.Test;
 import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SPLITTABLE_SYSTEM_CATALOG;
+import static org.apache.phoenix.end2end.ViewMetadataIT.BASE_TABLE_SCHEMA;
+import static org.apache.phoenix.end2end.ViewMetadataIT.CHILD_VIEW_LEVEL_1_SCHEMA;
+import static org.apache.phoenix.end2end.ViewMetadataIT.CREATE_BASE_TABLE_DDL;
+import static org.apache.phoenix.end2end.ViewMetadataIT.CREATE_CHILD_VIEW_LEVEL_1_DDL;
+import static org.apache.phoenix.end2end.ViewMetadataIT.createOrphanLink;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LINK_HBASE_TABLE_NAME;
+import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-
 public class ViewUtilIT extends ParallelStatsDisabledIT {
 
     @Test
@@ -66,7 +78,8 @@ public class ViewUtilIT extends ParallelStatsDisabledIT {
         String leafViewName1 = schema + "." + generateUniqueName();
         String leafViewName2 = schema + "." + generateUniqueName();
 
-        String tableDDLQuery = "CREATE TABLE " + fullTableName + " (A BIGINT PRIMARY KEY, B BIGINT)";
+        String tableDDLQuery = "CREATE TABLE " + fullTableName
+                + " (A BIGINT PRIMARY KEY, B BIGINT)";
         String viewDDLQuery = "CREATE VIEW %s AS SELECT * FROM %s";
 
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
@@ -81,10 +94,10 @@ public class ViewUtilIT extends ParallelStatsDisabledIT {
                     String.format(viewDDLQuery, leafViewName2, thirdLevelViewName));
 
             try (PhoenixConnection phoenixConnection =
-                    DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class)) {
-                Table catalogOrChildTable = phoenixConnection.getQueryServices().getTable(
-                        SchemaUtil.getPhysicalName(catalogOrChildTableName.toBytes(),
-                                phoenixConnection.getQueryServices().getProps()).getName());
+                    DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
+                    Table catalogOrChildTable = phoenixConnection.getQueryServices().getTable(
+                            SchemaUtil.getPhysicalName(catalogOrChildTableName.toBytes(),
+                                    phoenixConnection.getQueryServices().getProps()).getName())) {
 
                 assertTrue(ViewUtil.hasChildViews(catalogOrChildTable,
                         tenantIdInBytes, schemaInBytes,
@@ -120,7 +133,7 @@ public class ViewUtilIT extends ParallelStatsDisabledIT {
         String tenantId = generateUniqueName();
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Properties tenantProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        tenantProps.setProperty(TENANT_ID_ATTRIB, tenantId);
         TableName catalogOrChildTableName = ViewUtil.getSystemTableForChildLinks(0, config);
         String schema = generateUniqueName();
         byte[] schemaInBytes = schema.getBytes(StandardCharsets.UTF_8);
@@ -151,17 +164,19 @@ public class ViewUtilIT extends ParallelStatsDisabledIT {
                 tenantConn.createStatement().execute(
                         String.format(viewDDL, tenantViewOnGlobalView, globalViewName));
                 tenantConn.createStatement().execute(
-                        String.format(viewDDL, tenantViewOnMultiTenantTable1, multiTenantTableName));
+                        String.format(viewDDL, tenantViewOnMultiTenantTable1,
+                                multiTenantTableName));
                 tenantConn.createStatement().execute(
-                        String.format(viewDDL, tenantViewOnMultiTenantTable2, multiTenantTableName));
+                        String.format(viewDDL, tenantViewOnMultiTenantTable2,
+                                multiTenantTableName));
                 tenantConn.createStatement().execute(viewIndexDDL);
             }
 
             try (PhoenixConnection phoenixConnection = DriverManager.getConnection(getUrl(),
-                    props).unwrap(PhoenixConnection.class)) {
-                Table catalogOrChildTable = phoenixConnection.getQueryServices().getTable(
-                        SchemaUtil.getPhysicalName(catalogOrChildTableName.toBytes(),
-                                phoenixConnection.getQueryServices().getProps()).getName());
+                    props).unwrap(PhoenixConnection.class);
+                    Table catalogOrChildTable = phoenixConnection.getQueryServices().getTable(
+                            SchemaUtil.getPhysicalName(catalogOrChildTableName.toBytes(),
+                                    phoenixConnection.getQueryServices().getProps()).getName())) {
 
                 assertTrue(ViewUtil.hasChildViews(catalogOrChildTable,
                         emptyTenantIdInBytes, schemaInBytes,
@@ -205,7 +220,8 @@ public class ViewUtilIT extends ParallelStatsDisabledIT {
         String leafViewName2 = schema + "." + generateUniqueName();
         int NUMBER_OF_VIEWS = 3;
 
-        String tableDDLQuery = "CREATE TABLE " + fullTableName + " (A BIGINT PRIMARY KEY, B BIGINT)";
+        String tableDDLQuery = "CREATE TABLE " + fullTableName
+                + " (A BIGINT PRIMARY KEY, B BIGINT)";
         String viewDDLQuery = "CREATE VIEW %s AS SELECT * FROM %s";
 
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
@@ -218,10 +234,10 @@ public class ViewUtilIT extends ParallelStatsDisabledIT {
                     String.format(viewDDLQuery, leafViewName2, middleLevelViewName));
 
             try (PhoenixConnection phoenixConnection = DriverManager.getConnection(getUrl(),
-                    props).unwrap(PhoenixConnection.class)) {
-                Table catalogOrChildTable = phoenixConnection.getQueryServices().getTable(
-                        SchemaUtil.getPhysicalName(catalogOrChildTableName.toBytes(),
-                                phoenixConnection.getQueryServices().getProps()).getName());
+                    props).unwrap(PhoenixConnection.class);
+                    Table catalogOrChildTable = phoenixConnection.getQueryServices().getTable(
+                            SchemaUtil.getPhysicalName(catalogOrChildTableName.toBytes(),
+                                    phoenixConnection.getQueryServices().getProps()).getName())) {
 
                 TableViewFinderResult result = new TableViewFinderResult();
                 ViewUtil.findAllRelatives(catalogOrChildTable, tenantIdInBytes, schemaInBytes,
@@ -259,10 +275,10 @@ public class ViewUtilIT extends ParallelStatsDisabledIT {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         String tenantId1 = generateUniqueName();
         Properties tenantProps1 = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        tenantProps1.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId1);
+        tenantProps1.setProperty(TENANT_ID_ATTRIB, tenantId1);
         String tenantId2 = generateUniqueName();
         Properties tenantProps2 = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        tenantProps2.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId2);
+        tenantProps2.setProperty(TENANT_ID_ATTRIB, tenantId2);
         TableName catalogOrChildTableName = ViewUtil.getSystemTableForChildLinks(0, config);
         String schema = generateUniqueName();
         byte[] schemaInBytes = schema.getBytes(StandardCharsets.UTF_8);
@@ -297,10 +313,10 @@ public class ViewUtilIT extends ParallelStatsDisabledIT {
             }
 
             try (PhoenixConnection phoenixConnection = DriverManager.getConnection(getUrl(),
-                    props).unwrap(PhoenixConnection.class)) {
-                Table catalogOrChildTable = phoenixConnection.getQueryServices().getTable(
-                        SchemaUtil.getPhysicalName(catalogOrChildTableName.toBytes(),
-                                phoenixConnection.getQueryServices().getProps()).getName());
+                    props).unwrap(PhoenixConnection.class);
+                    Table catalogOrChildTable = phoenixConnection.getQueryServices().getTable(
+                            SchemaUtil.getPhysicalName(catalogOrChildTableName.toBytes(),
+                                    phoenixConnection.getQueryServices().getProps()).getName())) {
 
                 TableViewFinderResult result = new TableViewFinderResult();
                 ViewUtil.findAllRelatives(catalogOrChildTable, emptyTenantIdInBytes, schemaInBytes,
@@ -333,4 +349,96 @@ public class ViewUtilIT extends ParallelStatsDisabledIT {
             }
         }
     }
+
+    @Test
+    public void testFindLegitChildViews() throws Exception {
+        final String parentTable = generateUniqueName();
+        List<String> childViewNames = new ArrayList<>(3);
+        childViewNames.add("A_" + generateUniqueName());
+        childViewNames.add("B_" + generateUniqueName());
+        childViewNames.add("C_" + generateUniqueName());
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(String.format(CREATE_BASE_TABLE_DDL, BASE_TABLE_SCHEMA,
+                    parentTable));
+            for (String childViewName : childViewNames) {
+                conn.createStatement().execute(String.format(CREATE_CHILD_VIEW_LEVEL_1_DDL,
+                        CHILD_VIEW_LEVEL_1_SCHEMA, childViewName, BASE_TABLE_SCHEMA, parentTable));
+            }
+            ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+            try (Table childLinkTable = cqs.getTable(SchemaUtil.getPhysicalName(
+                    SYSTEM_LINK_HBASE_TABLE_NAME.toBytes(), cqs.getProps()).getName())) {
+                Pair<List<PTable>, List<TableInfo>> allDescendants =
+                        ViewUtil.findAllDescendantViews(childLinkTable, cqs.getConfiguration(),
+                                EMPTY_BYTE_ARRAY, BASE_TABLE_SCHEMA.getBytes(),
+                                parentTable.getBytes(), HConstants.LATEST_TIMESTAMP, true);
+                assertTrue("No orphan views expected", allDescendants.getSecond().isEmpty());
+                List<PTable> childViews = allDescendants.getFirst();
+                assertEquals("Just 1 legit child view expected", 1, childViews.size());
+                PTable childView = childViews.get(0);
+
+                // Should have got the first child view as per alphabetical ordering of the
+                // linking row scan result
+                assertEquals(CHILD_VIEW_LEVEL_1_SCHEMA, childView.getSchemaName().getString());
+                assertEquals(childViewNames.get(0), childView.getTableName().getString());
+                assertEquals(BASE_TABLE_SCHEMA, childView.getParentSchemaName().getString());
+                assertEquals(parentTable, childView.getParentTableName().getString());
+
+                allDescendants = ViewUtil.findAllDescendantViews(childLinkTable,
+                        cqs.getConfiguration(), EMPTY_BYTE_ARRAY, BASE_TABLE_SCHEMA.getBytes(),
+                        parentTable.getBytes(), HConstants.LATEST_TIMESTAMP, false);
+                assertTrue("No orphan views expected", allDescendants.getSecond().isEmpty());
+                childViews = allDescendants.getFirst();
+                assertEquals("All child views expected", childViewNames.size(), childViews.size());
+                for (int i=0; i<childViewNames.size(); i++) {
+                    childView = childViews.get(i);
+                    assertEquals(CHILD_VIEW_LEVEL_1_SCHEMA, childView.getSchemaName().getString());
+                    assertEquals(childViewNames.get(i), childView.getTableName().getString());
+                    assertEquals(BASE_TABLE_SCHEMA, childView.getParentSchemaName().getString());
+                    assertEquals(parentTable, childView.getParentTableName().getString());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testOrphanViewDetection() throws Exception {
+        final String parent1TableName = generateUniqueName();
+        final String parent2TableName = generateUniqueName();
+        final String viewName = "V_" + generateUniqueName();
+        createOrphanLink(BASE_TABLE_SCHEMA, parent1TableName, parent2TableName,
+                CHILD_VIEW_LEVEL_1_SCHEMA, viewName);
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+            try (Table childLinkTable = cqs.getTable(SchemaUtil.getPhysicalName(
+                    SYSTEM_LINK_HBASE_TABLE_NAME.toBytes(), cqs.getProps()).getName())) {
+                // The view is a legitimate child of parent1, so it should not be counted as
+                // neither a legitimate view of parent2, nor an orphan view of parent2
+                Pair<List<PTable>, List<TableInfo>> allDescendants =
+                        ViewUtil.findAllDescendantViews(childLinkTable, cqs.getConfiguration(),
+                                EMPTY_BYTE_ARRAY, BASE_TABLE_SCHEMA.getBytes(),
+                                parent2TableName.getBytes(), HConstants.LATEST_TIMESTAMP, false);
+                assertTrue("No orphan views expected", allDescendants.getSecond().isEmpty());
+                assertTrue("No legitimate views expected", allDescendants.getFirst().isEmpty());
+
+                // Drop that view
+                conn.createStatement().execute(String.format("DROP VIEW %s.%s",
+                        CHILD_VIEW_LEVEL_1_SCHEMA,
+                        viewName));
+
+                // Now this view is no longer a legitimate child view of parent1 either, so the
+                // orphan parent2->view link should show up as an orphan view of parent2
+                allDescendants = ViewUtil.findAllDescendantViews(childLinkTable,
+                        cqs.getConfiguration(), EMPTY_BYTE_ARRAY, BASE_TABLE_SCHEMA.getBytes(),
+                        parent2TableName.getBytes(), HConstants.LATEST_TIMESTAMP, false);
+                assertTrue("No legitimate views expected", allDescendants.getFirst().isEmpty());
+                List<TableInfo> orphanViews = allDescendants.getSecond();
+                assertEquals("1 orphan view expected", 1, orphanViews.size());
+                assertEquals(CHILD_VIEW_LEVEL_1_SCHEMA,
+                        Bytes.toString(orphanViews.get(0).getSchemaName()));
+                assertEquals(viewName, Bytes.toString(orphanViews.get(0).getTableName()));
+            }
+        }
+    }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index b3fa588..0b9fd7e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -78,8 +78,10 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
 import static org.apache.phoenix.query.QueryConstants.VIEW_MODIFIED_PROPERTY_TAG_TYPE;
 import static org.apache.phoenix.schema.PTableType.INDEX;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
+import static org.apache.phoenix.util.ViewUtil.findAllDescendantViews;
 import static org.apache.phoenix.util.ViewUtil.getSystemTableForChildLinks;
 
 import java.io.IOException;
@@ -230,7 +232,6 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
-import org.apache.phoenix.util.TableViewFinderResult;
 import org.apache.phoenix.util.UpgradeUtil;
 import org.apache.phoenix.util.ViewUtil;
 import org.slf4j.Logger;
@@ -1762,7 +1763,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             // this is a no-op (See PHOENIX-5544)
             if (!Bytes.toString(schemaName).equals(QueryConstants.SYSTEM_SCHEMA_NAME)) {
                 ViewUtil.dropChildViews(env, tenantIdBytes, schemaName, tableName,
-                        getSystemTableForChildLinks(clientVersion, env.getConfiguration()).getName());
+                        getSystemTableForChildLinks(clientVersion, env.getConfiguration())
+                                .getName());
             }
 
             byte[] parentTableKey = null;
@@ -2140,41 +2142,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         return PTableType.INDEX == tableType && parentTable.getIndexes().size() >= maxIndexesPerTable;
     }
 
-    private List<PTable> findAllChildViews(long clientTimeStamp, int clientVersion, byte[] tenantId,
-            byte[] schemaName, byte[] tableName) throws IOException, SQLException {
-        TableViewFinderResult result = new TableViewFinderResult();
-        try (Table hTable = env.getTable(
-                getSystemTableForChildLinks(clientVersion, env.getConfiguration()))) {
-            ViewUtil.findAllRelatives(hTable, tenantId, schemaName, tableName,
-                    LinkType.CHILD_TABLE, result);
-        }
-        List<PTable> childViews = Lists.newArrayListWithExpectedSize(result.getLinks().size());
-        for (TableInfo viewInfo : result.getLinks()) {
-            byte[] viewTenantId = viewInfo.getTenantId();
-            byte[] viewSchemaName = viewInfo.getSchemaName();
-            byte[] viewName = viewInfo.getTableName();
-            PTable view;
-            Properties props = new Properties();
-            if (viewTenantId != null) {
-                props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, Bytes.toString(viewTenantId));
-            }
-            if (clientTimeStamp != HConstants.LATEST_TIMESTAMP) {
-                props.setProperty("CurrentSCN", Long.toString(clientTimeStamp));
-            }
-            try (PhoenixConnection connection =
-                         QueryUtil.getConnectionOnServer(props, env.getConfiguration())
-                                 .unwrap(PhoenixConnection.class)) {
-                view = PhoenixRuntime.getTableNoCache(connection, SchemaUtil.getTableName(viewSchemaName, viewName));
-            }
-            if (view == null) {
-                ServerUtil.throwIOException("View not found", new TableNotFoundException(Bytes.toString(viewSchemaName),
-                        Bytes.toString(viewName)));
-            }
-            childViews.add(view);
-        }
-        return childViews;
-    }
-
     private void separateLocalAndRemoteMutations(Region region, List<Mutation> mutations,
                                                  List<Mutation> localMutations, List<Mutation> remoteMutations) {
         HRegionInfo regionInfo = region.getRegionInfo();
@@ -2195,15 +2162,16 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         byte[][] rowKeyMetaData = new byte[3][];
         String tableType = request.getTableType();
         byte[] schemaName = null;
-        byte[] tableName = null;
+        byte[] tableOrViewName = null;
         boolean dropTableStats = false;
+        final int clientVersion = request.getClientVersion();
         try {
             List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
             List<Mutation> childLinkMutations = Lists.newArrayList();
             MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
             byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
             schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
-            tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+            tableOrViewName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
             PTableType pTableType = PTableType.fromSerializedValue(tableType);
             // Disallow deletion of a system table
             if (pTableType == PTableType.SYSTEM) {
@@ -2216,7 +2184,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             List<byte[]> tableNamesToDelete = Lists.newArrayList();
             List<SharedTableState> sharedTablesToDelete = Lists.newArrayList();
 
-            byte[] lockKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
+            byte[] lockKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableOrViewName);
             Region region = env.getRegion();
             MetaDataMutationResult result = checkTableKeyInRegion(lockKey, region);
             if (result != null) {
@@ -2237,7 +2205,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             }
 
             long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
-            PTable loadedTable = doGetTable(tenantIdBytes, schemaName, tableName,
+            PTable loadedTable = doGetTable(tenantIdBytes, schemaName, tableOrViewName,
                     clientTimeStamp, null, request.getClientVersion());
             if (loadedTable == null) {
                 builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
@@ -2246,9 +2214,81 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 return;
             }
             getCoprocessorHost().preDropTable(Bytes.toString(tenantIdBytes),
-                    SchemaUtil.getTableName(schemaName, tableName),
+                    SchemaUtil.getTableName(schemaName, tableOrViewName),
                     TableName.valueOf(loadedTable.getPhysicalName().getBytes()),
                     getParentPhysicalTableName(loadedTable), pTableType, loadedTable.getIndexes());
+
+            if (pTableType == PTableType.TABLE || pTableType == PTableType.VIEW) {
+                // check to see if the table has any child views
+                try (Table hTable = env.getTable(
+                        getSystemTableForChildLinks(clientVersion, env.getConfiguration()))) {
+
+                    // This call needs to be done before acquiring the row lock on the header row
+                    // for the table/view being dropped, otherwise the calls to resolve its child
+                    // views via PhoenixRuntime.getTableNoCache() will deadlock since this call
+                    // itself needs to get the parent table which needs to acquire a write lock
+                    // on the same header row
+                    Pair<List<PTable>, List<TableInfo>> descendantViews =
+                            findAllDescendantViews(hTable, env.getConfiguration(),
+                                    tenantIdBytes, schemaName, tableOrViewName, clientTimeStamp,
+                                    true);
+                    List<PTable> legitimateChildViews = descendantViews.getFirst();
+                    List<TableInfo> orphanChildViews = descendantViews.getSecond();
+                    if (!legitimateChildViews.isEmpty()) {
+                        if (!isCascade) {
+                            LOGGER.error("DROP without CASCADE on tables or views with child views "
+                                    + "is not permitted");
+                            // DROP without CASCADE on tables/views with child views is
+                            // not permitted
+                            builder.setReturnCode(
+                                    MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
+                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                            done.run(builder.build());
+                            return;
+                        }
+                        if (clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG &&
+                                !SchemaUtil.getPhysicalTableName(SYSTEM_CHILD_LINK_NAME_BYTES,
+                                        env.getConfiguration()).equals(hTable.getName())) {
+                            // (See PHOENIX-5544) For an old client connecting to a non-upgraded
+                            // server, we disallow dropping a base table/view that has child views.
+                            LOGGER.error("Dropping a table or view that has child views is "
+                                    + "not permitted for old clients connecting to a new server "
+                                    + "with old metadata (even if CASCADE is provided). "
+                                    + "Please upgrade the client at least to  "
+                                    + MIN_SPLITTABLE_SYSTEM_CATALOG_VERSION);
+                            builder.setReturnCode(
+                                    MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
+                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                            done.run(builder.build());
+                            return;
+                        }
+                    }
+
+                    // If the CASCADE option is provided and we have at least one legitimate/orphan
+                    // view stemming from this parent and the client is 4.15+ (or older but
+                    // connecting to an upgraded server), we use the SYSTEM.TASK table to
+                    // asynchronously drop child views
+                    if (isCascade && !(legitimateChildViews.isEmpty() && orphanChildViews.isEmpty())
+                            && (clientVersion >= MIN_SPLITTABLE_SYSTEM_CATALOG ||
+                            SchemaUtil.getPhysicalTableName(SYSTEM_CHILD_LINK_NAME_BYTES,
+                                    env.getConfiguration()).equals(hTable.getName()))) {
+                        try {
+                            PhoenixConnection conn =
+                                    QueryUtil.getConnectionOnServer(env.getConfiguration())
+                                            .unwrap(PhoenixConnection.class);
+                            Task.addTask(conn, PTable.TaskType.DROP_CHILD_VIEWS,
+                                    Bytes.toString(tenantIdBytes), Bytes.toString(schemaName),
+                                    Bytes.toString(tableOrViewName),
+                                    PTable.TaskStatus.CREATED.toString(),
+                                    null, null, null, null,
+                                    this.accessCheckEnabled);
+                        } catch (Throwable t) {
+                            LOGGER.error("Adding a task to drop child views failed!", t);
+                        }
+                    }
+                }
+            }
+
             List<RowLock> locks = Lists.newArrayList();
             try {
                 acquireLock(region, lockKey, locks);
@@ -2257,10 +2297,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 }
 
                 List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
-                result =
-                        doDropTable(lockKey, tenantIdBytes, schemaName, tableName, parentTableName,
-                                PTableType.fromSerializedValue(tableType), tableMetadata, childLinkMutations,
-                                invalidateList, tableNamesToDelete, sharedTablesToDelete, isCascade, request.getClientVersion());
+                result = doDropTable(lockKey, tenantIdBytes, schemaName, tableOrViewName,
+                        parentTableName, PTableType.fromSerializedValue(tableType), tableMetadata,
+                        childLinkMutations, invalidateList, tableNamesToDelete,
+                        sharedTablesToDelete, request.getClientVersion());
                 if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                     done.run(MetaDataMutationResult.toProto(result));
                     return;
@@ -2271,10 +2311,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 List<Mutation> localMutations =
                         Lists.newArrayListWithExpectedSize(tableMetadata.size());
                 List<Mutation> remoteMutations = Lists.newArrayList();
-                separateLocalAndRemoteMutations(region, tableMetadata, localMutations, remoteMutations);
+                separateLocalAndRemoteMutations(region, tableMetadata, localMutations,
+                        remoteMutations);
                 if (!remoteMutations.isEmpty()) {
                     // while dropping a table all the mutations should be local
-                    String msg = "Found unexpected mutations while dropping table " + SchemaUtil.getTableName(schemaName, tableName);
+                    String msg = "Found unexpected mutations while dropping table "
+                            + SchemaUtil.getTableName(schemaName, tableOrViewName);
                     LOGGER.error(msg);
                     for (Mutation m : remoteMutations) {
                         LOGGER.debug("Mutation rowkey : " + Bytes.toStringBinary(m.getRow()));
@@ -2284,8 +2326,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 }
 
                 // drop rows from catalog on this region
-                mutateRowsWithLocks(this.accessCheckEnabled, region, localMutations, Collections.<byte[]>emptySet(),
-                    HConstants.NO_NONCE, HConstants.NO_NONCE);
+                mutateRowsWithLocks(this.accessCheckEnabled, region, localMutations,
+                        Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
 
                 long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
                 for (ImmutableBytesPtr ckey : invalidateList) {
@@ -2320,8 +2362,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             }
         } catch (Throwable t) {
             LOGGER.error("dropTable failed", t);
-            ProtobufUtil.setControllerException(controller,
-                    ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
+            ProtobufUtil.setControllerException(controller, ServerUtil.createIOException(
+                    SchemaUtil.getTableName(schemaName, tableOrViewName), t));
         }
     }
 
@@ -2404,10 +2446,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         return null;
     }
 
-    private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName, byte[] tableName,
-                                               byte[] parentTableName, PTableType tableType, List<Mutation> catalogMutations,
-                                               List<Mutation> childLinkMutations, List<ImmutableBytesPtr> invalidateList, List<byte[]> tableNamesToDelete,
-                                               List<SharedTableState> sharedTablesToDelete, boolean isCascade, int clientVersion)
+    private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName,
+            byte[] tableName, byte[] parentTableName, PTableType tableType,
+            List<Mutation> catalogMutations, List<Mutation> childLinkMutations,
+            List<ImmutableBytesPtr> invalidateList, List<byte[]> tableNamesToDelete,
+            List<SharedTableState> sharedTablesToDelete, int clientVersion)
             throws IOException, SQLException {
 
         Region region = env.getRegion();
@@ -2417,11 +2460,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         PTable table = getTableFromCache(cacheKey, clientTimeStamp, clientVersion);
 
         // We always cache the latest version - fault in if not in cache
-        if (table != null
-                || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion)) != null) {
+        if (table != null || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP,
+                clientVersion)) != null) {
             if (table.getTimeStamp() < clientTimeStamp) {
                 if (isTableDeleted(table) || tableType != table.getType()) {
-                    return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+                    return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                            EnvironmentEdgeManager.currentTimeMillis(), null);
                 }
             } else {
                 return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
@@ -2432,13 +2476,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         // there was a table, but it's been deleted. In either case we want to return.
         if (table == null) {
             if (buildDeletedTable(key, cacheKey, region, clientTimeStamp) != null) {
-                return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+                return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
+                        EnvironmentEdgeManager.currentTimeMillis(), null);
             }
-            return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+            return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                    EnvironmentEdgeManager.currentTimeMillis(), null);
         }
         // Make sure we're not deleting the "wrong" child
-        if (parentTableName != null && table.getParentTableName() != null && !Arrays.equals(parentTableName, table.getParentTableName().getBytes())) {
-            return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+        if (parentTableName != null && table.getParentTableName() != null &&
+                !Arrays.equals(parentTableName, table.getParentTableName().getBytes())) {
+            return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                    EnvironmentEdgeManager.currentTimeMillis(), null);
         }
         // Since we don't allow back in time DDL, we know if we have a table it's the one
         // we want to delete. FIXME: we shouldn't need a scan here, but should be able to
@@ -2453,55 +2501,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                         EnvironmentEdgeManager.currentTimeMillis(), null);
             }
 
-            if (tableType == PTableType.TABLE || tableType == PTableType.VIEW || tableType == PTableType.SYSTEM) {
-                // check to see if the table has any child views
-                try (Table hTable = env.getTable(getSystemTableForChildLinks(clientVersion,
-                        env.getConfiguration()))) {
-                    boolean hasChildViews =
-                            ViewUtil.hasChildViews(hTable, tenantId, schemaName, tableName,
-                                    clientTimeStamp);
-                    if (hasChildViews) {
-                        if (!isCascade) {
-                            LOGGER.error("DROP without CASCADE on tables with child views "
-                                    + "is not permitted");
-                            // DROP without CASCADE on tables with child views is not permitted
-                            return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
-                                    EnvironmentEdgeManager.currentTimeMillis(), null);
-                        }
-                        // For 4.15+ clients and older clients connecting to an upgraded server,
-                        // add a task to drop child views of the base table.
-                        if (clientVersion >= MIN_SPLITTABLE_SYSTEM_CATALOG ||
-                                SchemaUtil.getPhysicalTableName(SYSTEM_CHILD_LINK_NAME_BYTES,
-                                env.getConfiguration()).equals(hTable.getName())) {
-                            try {
-                                PhoenixConnection conn =
-                                        QueryUtil.getConnectionOnServer(env.getConfiguration())
-                                                .unwrap(PhoenixConnection.class);
-                                Task.addTask(conn, PTable.TaskType.DROP_CHILD_VIEWS,
-                                        Bytes.toString(tenantId), Bytes.toString(schemaName),
-                                        Bytes.toString(tableName),
-                                        PTable.TaskStatus.CREATED.toString(),
-                                        null, null, null, null,
-                                        this.accessCheckEnabled);
-                            } catch (Throwable t) {
-                                LOGGER.error("Adding a task to drop child views failed!", t);
-                            }
-                        } else {
-                            // (See PHOENIX-5544) For an old client connecting to a non-upgraded
-                            // server, we disallow dropping a base table that has child views.
-                            LOGGER.error("Dropping a table that has child views is not permitted "
-                                    + "for old clients connecting to a new server with old metadata."
-                                    + " Please upgrade the client to " +
-                                    MIN_SPLITTABLE_SYSTEM_CATALOG_VERSION);
-                            return new MetaDataMutationResult(
-                                    MutationCode.UNALLOWED_TABLE_MUTATION,
-                                    EnvironmentEdgeManager.currentTimeMillis(), null);
-                        }
-                    }
-                }
-            }
-
-            // Add to list of HTables to delete, unless it's a view or its a shared index 
+            // Add to list of HTables to delete, unless it's a view or its a shared index
             if (tableType == INDEX && table.getViewIndexId() != null) {
                 sharedTablesToDelete.add(new SharedTableState(table));
             } else if (tableType != PTableType.VIEW) {
@@ -2511,25 +2511,37 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             byte[][] rowKeyMetaData = new byte[5][];
             do {
                 Cell kv = results.get(LINK_TYPE_INDEX);
-                int nColumns = getVarChars(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), 0, rowKeyMetaData);
+                int nColumns = getVarChars(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+                        0, rowKeyMetaData);
                 if (nColumns == 5
                         && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0
-                        && Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
+                        && Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
+                        kv.getQualifierLength(),
                         LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length) == 0) {
-                    LinkType linkType = LinkType.fromSerializedValue(kv.getValueArray()[kv.getValueOffset()]);
-                    if (rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length == 0 && linkType == LinkType.INDEX_TABLE) {
+                    LinkType linkType = LinkType.fromSerializedValue(
+                            kv.getValueArray()[kv.getValueOffset()]);
+                    if (rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length == 0 &&
+                            linkType == LinkType.INDEX_TABLE) {
                         indexNames.add(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
-                    } else if (tableType == PTableType.VIEW && (linkType == LinkType.PARENT_TABLE || linkType == LinkType.PHYSICAL_TABLE)) {
-                        // Populate the delete mutations for parent->child link for the child view in question,
-                        // which we issue to SYSTEM.CHILD_LINK later
-                        Cell parentTenantIdCell = MetaDataUtil.getCell(results, PhoenixDatabaseMetaData.PARENT_TENANT_ID_BYTES);
-                        PName parentTenantId = parentTenantIdCell != null ? PNameFactory.newName(parentTenantIdCell.getValueArray(), parentTenantIdCell.getValueOffset(), parentTenantIdCell.getValueLength()) : null;
-                        byte[] linkKey = MetaDataUtil.getChildLinkKey(parentTenantId, table.getParentSchemaName(), table.getParentTableName(), table.getTenantId(), table.getName());
+                    } else if (tableType == PTableType.VIEW && (linkType == LinkType.PARENT_TABLE ||
+                            linkType == LinkType.PHYSICAL_TABLE)) {
+                        // Populate the delete mutations for parent->child link for the child view
+                        // in question, which we issue to SYSTEM.CHILD_LINK later
+                        Cell parentTenantIdCell = MetaDataUtil.getCell(results,
+                                PhoenixDatabaseMetaData.PARENT_TENANT_ID_BYTES);
+                        PName parentTenantId = parentTenantIdCell != null ?
+                                PNameFactory.newName(parentTenantIdCell.getValueArray(),
+                                        parentTenantIdCell.getValueOffset(),
+                                        parentTenantIdCell.getValueLength()) : null;
+                        byte[] linkKey = MetaDataUtil.getChildLinkKey(parentTenantId,
+                                table.getParentSchemaName(), table.getParentTableName(),
+                                table.getTenantId(), table.getName());
                         Delete linkDelete = new Delete(linkKey, clientTimeStamp);
                         childLinkMutations.add(linkDelete);
                     }
                 }
-                Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), clientTimeStamp);
+                Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+                        clientTimeStamp);
                 catalogMutations.add(delete);
                 results.clear();
                 scanner.next(results);
@@ -2545,87 +2557,85 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             // of the client.
             Delete delete = new Delete(indexKey, clientTimeStamp);
             catalogMutations.add(delete);
-            MetaDataMutationResult result =
-                    doDropTable(indexKey, tenantId, schemaName, indexName, tableName, PTableType.INDEX,
-                            catalogMutations, childLinkMutations, invalidateList, tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
+            MetaDataMutationResult result = doDropTable(indexKey, tenantId, schemaName, indexName,
+                    tableName, PTableType.INDEX, catalogMutations, childLinkMutations,
+                    invalidateList, tableNamesToDelete, sharedTablesToDelete, clientVersion);
             if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                 return result;
             }
         }
 
         if (clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG && tableType == PTableType.VIEW) {
-            try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)) {
-                PTable pTable = PhoenixRuntime.getTableNoCache(connection, table.getParentName().getString());
+            try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(
+                    env.getConfiguration()).unwrap(PhoenixConnection.class)) {
+                PTable pTable = PhoenixRuntime.getTableNoCache(connection,
+                        table.getParentName().getString());
                 table = ViewUtil.addDerivedColumnsAndIndexesFromParent(connection, table, pTable);
             }
         }
         return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,
-                EnvironmentEdgeManager.currentTimeMillis(), table, tableNamesToDelete, sharedTablesToDelete);
+                EnvironmentEdgeManager.currentTimeMillis(), table, tableNamesToDelete,
+                sharedTablesToDelete);
     }
 
     /**
-     * Validate if mutation is allowed on parent table/view
-     * based on their child views.
-     * If this method returns MetaDataMutationResult, mutation is not allowed,
-     * and returned object will contain returnCode
-     * (MutationCode) to indicate the underlying
-     * problem (validation failure code).
+     * Validate if mutation is allowed on parent table/view based on their child views.
+     * If this method returns MetaDataMutationResult, mutation is not allowed, and returned object
+     * will contain returnCode (MutationCode) to indicate the underlying problem
+     * (validation failure code).
      *
      * @param expectedType expected type of PTable
      * @param clientTimeStamp client timestamp, e.g check
      *     {@link MetaDataUtil#getClientTimeStamp(List)}
      * @param tenantId tenant Id
      * @param schemaName schema name
-     * @param tableName table name
-     * @param childViews child views of table or parent view.
-     *     Usually this is an empty list
-     *     passed to this method, and this method will add
-     *     child views retrieved using
-     *     {@link #findAllChildViews(long, int, byte[], byte[], byte[])}
-     * @param clientVersion client version, used to determine if
-     *     mutation is allowed.
-     * @return Optional.empty() if mutation is allowed on parent
-     *     table/view. If not allowed, returned Optional object
-     *     will contain metaDataMutationResult with MutationCode.
+     * @param tableOrViewName table or view name
+     * @param childViews child views of table or parent view. Usually this is an empty list
+     *     passed to this method, and this method will add child views retrieved using
+     *     {@link ViewUtil#findAllDescendantViews(Table, Configuration, byte[], byte[], byte[],
+     *     long, boolean)}
+     * @param clientVersion client version, used to determine if mutation is allowed.
+     * @return Optional.empty() if mutation is allowed on parent table/view. If not allowed,
+     *     returned Optional object will contain metaDataMutationResult with MutationCode.
      * @throws IOException if something goes wrong while retrieving
      *     child views using
-     *     {@link #findAllChildViews(long, int, byte[], byte[], byte[])}
-     * @throws SQLException if something goes wrong while retrieving
-     *     child views using
-     *     {@link #findAllChildViews(long, int, byte[], byte[], byte[])}
+     *     {@link ViewUtil#findAllDescendantViews(Table, Configuration, byte[], byte[], byte[],
+     *     long, boolean)}
+     * @throws SQLException if something goes wrong while retrieving child views using
+     *     {@link ViewUtil#findAllDescendantViews(Table, Configuration, byte[], byte[], byte[],
+     *     long, boolean)}
      */
     private Optional<MetaDataMutationResult> validateIfMutationAllowedOnParent(
             final PTableType expectedType, final long clientTimeStamp,
             final byte[] tenantId, final byte[] schemaName,
-            final byte[] tableName, final List<PTable> childViews,
+            final byte[] tableOrViewName, final List<PTable> childViews,
             final int clientVersion) throws IOException, SQLException {
         boolean isMutationAllowed = true;
-        if (expectedType == PTableType.TABLE ||
-                expectedType == PTableType.VIEW) {
-            childViews.addAll(findAllChildViews(clientTimeStamp, clientVersion,
-                tenantId, schemaName, tableName));
+        if (expectedType == PTableType.TABLE || expectedType == PTableType.VIEW) {
+            try (Table hTable = env.getTable(getSystemTableForChildLinks(clientVersion,
+                    env.getConfiguration()))) {
+                childViews.addAll(findAllDescendantViews(hTable, env.getConfiguration(),
+                        tenantId, schemaName, tableOrViewName, clientTimeStamp, false)
+                        .getFirst());
+            }
+
             if (!childViews.isEmpty()) {
-                // From 4.15 onwards we allow SYSTEM.CATALOG to split and no
-                // longer propagate parent
-                // metadata changes to child views.
-                // If the client is on a version older than 4.15 we have to
-                // block adding a column to a parent table/view as we no
-                // longer lock the parent table on the server side
-                // while creating a child view to prevent conflicting changes.
+                // From 4.15 onwards we allow SYSTEM.CATALOG to split and no longer
+                // propagate parent metadata changes to child views.
+                // If the client is on a version older than 4.15 we have to block adding a
+                // column to a parent as we no longer lock the parent on the
+                // server side while creating a child view to prevent conflicting changes.
                 // This is handled on the client side from 4.15 onwards.
-                // Also if
-                // QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK is
-                // true, we block adding a column to a parent table/view so
-                // that we can rollback the upgrade if required.
+                // Also if QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK is true,
+                // we block adding a column to a parent so that we can rollback the
+                // upgrade if required.
                 if (clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG) {
                     isMutationAllowed = false;
-                    LOGGER.error("Unable to add or drop a column as the " +
-                        "client is older than {}",
+                    LOGGER.error("Unable to add or drop a column as the client is older than {}",
                         MIN_SPLITTABLE_SYSTEM_CATALOG_VERSION);
                 } else if (allowSplittableSystemCatalogRollback) {
                     isMutationAllowed = false;
-                    LOGGER.error("Unable to add or drop a column as the {} " +
-                        "config is set to true",
+                    LOGGER.error("Unable to add or drop a column as the {} config is set to true",
                         QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK);
                 }
             }
@@ -2648,15 +2658,14 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
         byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
         byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
-        byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
-        byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
-        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        byte[] tableOrViewName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+        byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableOrViewName);
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableOrViewName);
         // server-side, except for indexing, we always expect the keyvalues to be standard KeyValues
-        PTableType expectedType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE,
-                new ImmutableBytesWritable());
+        PTableType expectedType = MetaDataUtil.getTableType(tableMetadata,
+                GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());
         List<byte[]> tableNamesToDelete = Lists.newArrayList();
         List<SharedTableState> sharedTablesToDelete = Lists.newArrayList();
-        List<PTable> childViews = Lists.newArrayList();
         long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
         try {
             Region region = env.getRegion();
@@ -2667,12 +2676,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
 
             List<RowLock> locks = Lists.newArrayList();
             try {
-                Optional<MetaDataMutationResult> mutationResult =
-                    validateIfMutationAllowedOnParent(expectedType,
-                        clientTimeStamp, tenantId, schemaName, tableName,
+                List<PTable> childViews = Lists.newArrayList();
+                Optional<MetaDataMutationResult> mutationResult = validateIfMutationAllowedOnParent(
+                        expectedType, clientTimeStamp, tenantId, schemaName, tableOrViewName,
                         childViews, clientVersion);
-                // only if mutation is allowed, we should get Optional.empty()
-                // here
+                // only if mutation is allowed, we should get Optional.empty() here
                 if (mutationResult.isPresent()) {
                     return mutationResult.get();
                 }
@@ -2693,13 +2701,14 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     }
                 }
                 // Get client timeStamp from mutations
-                if (table == null
-                        && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion)) == null) {
+                if (table == null && (table = buildTable(key, cacheKey, region,
+                        HConstants.LATEST_TIMESTAMP, clientVersion)) == null) {
                     // if not found then call newerTableExists and add delete marker for timestamp
                     // found
                     table = buildDeletedTable(key, cacheKey, region, clientTimeStamp);
                     if (table != null) {
-                        LOGGER.info("Found newer table deleted as of " + table.getTimeStamp() + " versus client timestamp of " + clientTimeStamp);
+                        LOGGER.info("Found newer table deleted as of " + table.getTimeStamp()
+                                + " versus client timestamp of " + clientTimeStamp);
                         return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
                                 EnvironmentEdgeManager.currentTimeMillis(), null);
                     }
@@ -2712,7 +2721,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 if (parentTable != null) {
                     Properties props = new Properties();
                     if (tenantId != null) {
-                        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, Bytes.toString(tenantId));
+                        props.setProperty(TENANT_ID_ATTRIB, Bytes.toString(tenantId));
                     }
                     if (clientTimeStamp != HConstants.LATEST_TIMESTAMP) {
                         props.setProperty("CurrentSCN", Long.toString(clientTimeStamp));
@@ -2720,21 +2729,22 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     try (PhoenixConnection connection =
                                  QueryUtil.getConnectionOnServer(props, env.getConfiguration())
                                          .unwrap(PhoenixConnection.class)) {
-                        table = ViewUtil.addDerivedColumnsAndIndexesFromParent(connection, table, parentTable);
+                        table = ViewUtil.addDerivedColumnsAndIndexesFromParent(connection, table,
+                                parentTable);
                     }
                 }
 
                 if (table.getTimeStamp() >= clientTimeStamp) {
-                    LOGGER.info("Found newer table as of " + table.getTimeStamp() + " versus client timestamp of "
-                            + clientTimeStamp);
+                    LOGGER.info("Found newer table as of " + table.getTimeStamp()
+                            + " versus client timestamp of " + clientTimeStamp);
                     return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
                             EnvironmentEdgeManager.currentTimeMillis(), table);
                 } else if (isTableDeleted(table)) {
                     return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
                             EnvironmentEdgeManager.currentTimeMillis(), null);
                 }
-                long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup TABLE_SEQ_NUM in
-                // tableMetaData
+                // lookup TABLE_SEQ_NUM in tableMetaData
+                long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1;
 
                 if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("For table " + Bytes.toStringBinary(key) + " expecting seqNum "
@@ -2766,19 +2776,20 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
 
                 if (!childViews.isEmpty()) {
                     // validate the add or drop column mutations
-                    result = mutator.validateWithChildViews(table, childViews, tableMetadata, schemaName, tableName);
+                    result = mutator.validateWithChildViews(table, childViews, tableMetadata,
+                            schemaName, tableOrViewName);
                     if (result != null) {
                         return result;
                     }
                 }
 
                 getCoprocessorHost().preAlterTable(Bytes.toString(tenantId),
-                        SchemaUtil.getTableName(schemaName, tableName),
+                        SchemaUtil.getTableName(schemaName, tableOrViewName),
                         TableName.valueOf(table.getPhysicalName().getBytes()),
                         getParentPhysicalTableName(table), table.getType());
 
-                result = mutator.validateAndAddMetadata(table, rowKeyMetaData, tableMetadata, region,
-                        invalidateList, locks, clientTimeStamp, clientVersion);
+                result = mutator.validateAndAddMetadata(table, rowKeyMetaData, tableMetadata,
+                        region, invalidateList, locks, clientTimeStamp, clientVersion);
                 // if the update mutation caused tables to be deleted, the mutation code returned
                 // will be MutationCode.TABLE_ALREADY_EXISTS
                 if (result != null
@@ -2786,18 +2797,20 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     return result;
                 }
 
-                // drop any indexes on the base table that need the column that is going to be dropped
-                List<Pair<PTable, PColumn>> tableAndDroppedColumnPairs = mutator.getTableAndDroppedColumnPairs();
+                // drop any indexes on the base table that need the column that is going to be
+                // dropped
+                List<Pair<PTable, PColumn>> tableAndDroppedColumnPairs =
+                        mutator.getTableAndDroppedColumnPairs();
                 Iterator<Pair<PTable, PColumn>> iterator = tableAndDroppedColumnPairs.iterator();
                 while (iterator.hasNext()) {
                     Pair<PTable, PColumn> pair = iterator.next();
-                    // remove the current table and column being dropped from the list
-                    // and drop any indexes that require the column being dropped while holding the row lock
+                    // remove the current table and column being dropped from the list and drop any
+                    // indexes that require the column being dropped while holding the row lock
                     if (table.equals(pair.getFirst())) {
                         iterator.remove();
-                        result = dropIndexes(env, pair.getFirst(), invalidateList, locks, clientTimeStamp,
-                                tableMetadata, pair.getSecond(), tableNamesToDelete, sharedTablesToDelete,
-                                clientVersion);
+                        result = dropIndexes(env, pair.getFirst(), invalidateList, locks,
+                                clientTimeStamp, tableMetadata, pair.getSecond(),
+                                tableNamesToDelete, sharedTablesToDelete, clientVersion);
                         if (result != null
                                 && result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                             return result;
@@ -2820,11 +2833,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 separateLocalAndRemoteMutations(region, tableMetadata, localMutations,
                         remoteMutations);
                 if (!remoteMutations.isEmpty()) {
-                    // there should only be remote mutations if we are adding a column to a view that uses encoded column qualifiers
-                    // (the remote mutations are to update the encoded column qualifier counter on the parent table)
+                    // there should only be remote mutations if we are adding a column to a view
+                    // that uses encoded column qualifiers (the remote mutations are to update the
+                    // encoded column qualifier counter on the parent table)
                     if (mutator.getMutateColumnType() == ColumnMutator.MutateColumnType.ADD_COLUMN
                             && type == PTableType.VIEW
-                            && table.getEncodingScheme() != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS) {
+                            && table.getEncodingScheme() !=
+                            QualifierEncodingScheme.NON_ENCODED_QUALIFIERS) {
                         processRemoteRegionMutations(
                                 PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, remoteMutations,
                                 MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE);
@@ -2834,7 +2849,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                         : ByteUtil.EMPTY_BYTE_ARRAY,
                                 table.getParentTableName().getBytes());
                     } else {
-                        String msg = "Found unexpected mutations while adding or dropping column to " + fullTableName;
+                        String msg = "Found unexpected mutations while adding or dropping column "
+                                + "to " + fullTableName;
                         LOGGER.error(msg);
                         for (Mutation m : remoteMutations) {
                             LOGGER.debug("Mutation rowkey : " + Bytes.toStringBinary(m.getRow()));
@@ -2843,8 +2859,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                         throw new IllegalStateException(msg);
                     }
                 }
-                mutateRowsWithLocks(this.accessCheckEnabled, region, localMutations, Collections.<byte[]>emptySet(),
-                    HConstants.NO_NONCE, HConstants.NO_NONCE);
+                mutateRowsWithLocks(this.accessCheckEnabled, region, localMutations,
+                        Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
                 // Invalidate from cache
                 for (ImmutableBytesPtr invalidateKey : invalidateList) {
                     metaDataCache.invalidate(invalidateKey);
@@ -2852,28 +2868,33 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 // Get client timeStamp from mutations, since it may get updated by the
                 // mutateRowsWithLocks call
                 long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
-                // if the update mutation caused tables to be deleted just return the result which will contain the table to be deleted
+                // if the update mutation caused tables to be deleted just return the result which
+                // will contain the table to be deleted
                 if (result != null
                         && result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                     return result;
                 } else {
-                    table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion);
+                    table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP,
+                            clientVersion);
                     if (clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG && type == PTableType.VIEW) {
-                        try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)) {
-                            PTable pTable = PhoenixRuntime.getTableNoCache(connection, table.getParentName().getString());
-                            table = ViewUtil.addDerivedColumnsAndIndexesFromParent(connection, table, pTable);
+                        try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(
+                                env.getConfiguration()).unwrap(PhoenixConnection.class)) {
+                            PTable pTable = PhoenixRuntime.getTableNoCache(connection,
+                                    table.getParentName().getString());
+                            table = ViewUtil.addDerivedColumnsAndIndexesFromParent(connection,
+                                    table, pTable);
                         }
                     }
-                    return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table,
-                            tableNamesToDelete, sharedTablesToDelete);
+                    return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,
+                            currentTime, table, tableNamesToDelete, sharedTablesToDelete);
                 }
             } finally {
                 releaseRowLocks(region, locks);
-                // drop indexes on views that require the column being dropped
-                // these could be on a different region server so don't hold row locks while dropping them
+                // drop indexes on views that require the column being dropped these could be on a
+                // different region server so don't hold row locks while dropping them
                 for (Pair<PTable, PColumn> pair : mutator.getTableAndDroppedColumnPairs()) {
-                    result = dropRemoteIndexes(env, pair.getFirst(), clientTimeStamp, pair.getSecond(),
-                            tableNamesToDelete, sharedTablesToDelete);
+                    result = dropRemoteIndexes(env, pair.getFirst(), clientTimeStamp,
+                            pair.getSecond(), tableNamesToDelete, sharedTablesToDelete);
                     if (result != null
                             && result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                         return result;
@@ -3097,10 +3118,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     }
 
     private MetaDataMutationResult dropIndexes(RegionCoprocessorEnvironment env, PTable table,
-                                               List<ImmutableBytesPtr> invalidateList, List<RowLock> locks,
-                                               long clientTimeStamp, List<Mutation> tableMetaData,
-                                               PColumn columnToDelete, List<byte[]> tableNamesToDelete,
-                                               List<SharedTableState> sharedTablesToDelete, int clientVersion)
+            List<ImmutableBytesPtr> invalidateList, List<RowLock> locks, long clientTimeStamp,
+            List<Mutation> tableMetaData, PColumn columnToDelete, List<byte[]> tableNamesToDelete,
+            List<SharedTableState> sharedTablesToDelete, int clientVersion)
             throws IOException, SQLException {
         // Look for columnToDelete in any indexes. If found as PK column, get lock and drop the
         // index and then invalidate it
@@ -3111,19 +3131,24 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                         env.getConfiguration()).unwrap(PhoenixConnection.class);
         for (PTable index : table.getIndexes()) {
             // ignore any indexes derived from ancestors
-            if (index.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
+            if (index.getName().getString().contains(
+                    QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
                 continue;
             }
-            byte[] tenantId = index.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : index.getTenantId().getBytes();
+            byte[] tenantId = index.getTenantId() == null ?
+                    ByteUtil.EMPTY_BYTE_ARRAY : index.getTenantId().getBytes();
             IndexMaintainer indexMaintainer = index.getIndexMaintainer(table, connection);
             byte[] indexKey =
                     SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(), index
                             .getTableName().getBytes());
-            Pair<String, String> columnToDeleteInfo = new Pair<>(columnToDelete.getFamilyName().getString(),
-                    columnToDelete.getName().getString());
-            ColumnReference colDropRef = new ColumnReference(columnToDelete.getFamilyName().getBytes(),
+            Pair<String, String> columnToDeleteInfo =
+                    new Pair<>(columnToDelete.getFamilyName().getString(),
+                            columnToDelete.getName().getString());
+            ColumnReference colDropRef = new ColumnReference(
+                    columnToDelete.getFamilyName().getBytes(),
                     columnToDelete.getColumnQualifierBytes());
-            boolean isColumnIndexed = indexMaintainer.getIndexedColumnInfo().contains(columnToDeleteInfo);
+            boolean isColumnIndexed = indexMaintainer.getIndexedColumnInfo().contains(
+                    columnToDeleteInfo);
             boolean isCoveredColumn = indexMaintainer.getCoveredColumns().contains(colDropRef);
             // If index requires this column for its pk, then drop it
             if (isColumnIndexed) {
@@ -3144,11 +3169,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 // occur while we're dropping it.
                 acquireLock(region, indexKey, locks);
                 List<Mutation> childLinksMutations = Lists.newArrayList();
-                MetaDataMutationResult result = doDropTable(indexKey, tenantId, index.getSchemaName().getBytes(),
-                        index.getTableName().getBytes(), table.getName().getBytes(), index.getType(),
-                        tableMetaData, childLinksMutations, invalidateList, tableNamesToDelete, sharedTablesToDelete,
-                        false, clientVersion);
-                if (result != null && result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
+                MetaDataMutationResult result = doDropTable(indexKey, tenantId,
+                        index.getSchemaName().getBytes(), index.getTableName().getBytes(),
+                        table.getName().getBytes(), index.getType(), tableMetaData,
+                        childLinksMutations, invalidateList, tableNamesToDelete,
+                        sharedTablesToDelete, clientVersion);
+                if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                     return result;
                 }
                 // there should be no child links to delete since we are just dropping an index
@@ -3208,7 +3234,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 // if the index is not present on the current region make an rpc to drop it
                 Properties props = new Properties();
                 if (tenantId != null) {
-                    props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, Bytes.toString(tenantId));
+                    props.setProperty(TENANT_ID_ATTRIB, Bytes.toString(tenantId));
                 }
                 if (clientTimeStamp != HConstants.LATEST_TIMESTAMP) {
                     props.setProperty("CurrentSCN", Long.toString(clientTimeStamp));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 302feab..4e2b96d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -20,9 +20,6 @@ package org.apache.phoenix.schema;
 import static com.google.common.collect.Sets.newLinkedHashSet;
 import static com.google.common.collect.Sets.newLinkedHashSetWithExpectedSize;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB;
-import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
-import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
-import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
 import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.INDEX_NAME;
 import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.REBUILD_ALL;
 import static org.apache.phoenix.exception.SQLExceptionCode.INSUFFICIENT_MULTI_TENANT_COLUMNS;
@@ -172,7 +169,6 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.SharedTableState;
-import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.util.ViewUtil;
 import org.apache.phoenix.util.JacksonUtil;
@@ -687,7 +683,7 @@ public class MetaDataClient {
                     // Otherwise, a tenant would be required to create a VIEW first
                     // which is not really necessary unless you want to filter or add
                     // columns
-                    PTable temp = addTableToCache(result);
+                    addTableToCache(result);
                     return result;
                 } else {
                     // if (result.getMutationCode() == MutationCode.NEWER_TABLE_FOUND) {
@@ -1685,7 +1681,7 @@ public class MetaDataClient {
                 requiredCols.addAll(includedColumns);
                 for (ColumnName colName : requiredCols) {
                     // acquire the mutex using the global physical table name to
-                    // prevent this column from being dropped while the index is being created
+                    // prevent this column from being dropped while the view is being created
                     boolean acquiredMutex = writeCell(null, physicalSchemaName, physicalTableName,
                             colName.toString());
                     if (!acquiredMutex) {
@@ -2860,7 +2856,7 @@ public class MetaDataClient {
                         // if the base table column is referenced in the view
                         if (isViewColumnReferenced.get(columnPosition)) {
                             // acquire the mutex using the global physical table name to
-                            // prevent this column from being dropped while the index is being created
+                            // prevent this column from being dropped while the view is being created
                             boolean acquiredMutex = writeCell(null, parentPhysicalSchemaName, parentPhysicalTableName,
                                     column.getName().getString());
                             if (!acquiredMutex) {
@@ -4729,22 +4725,20 @@ public class MetaDataClient {
         }
     }
 
-    private PTable addTableToCache(MetaDataMutationResult result) throws SQLException {
-        return addTableToCache(result, TransactionUtil.getResolvedTime(connection, result));
+    private void addTableToCache(MetaDataMutationResult result) throws SQLException {
+        addTableToCache(result, TransactionUtil.getResolvedTime(connection, result));
     }
 
-    private PTable addTableToCache(MetaDataMutationResult result, long timestamp) throws SQLException {
+    private void addTableToCache(MetaDataMutationResult result, long timestamp) throws SQLException {
         addColumnsAndIndexesFromAncestors(result, null, false);
         PTable table = result.getTable();
         connection.addTable(table, timestamp);
-        return table;
     }
 
-    private List<PFunction> addFunctionToCache(MetaDataMutationResult result) throws SQLException {
+    private void addFunctionToCache(MetaDataMutationResult result) throws SQLException {
         for(PFunction function: result.getFunctions()) {
             connection.addFunction(function);
         }
-        return result.getFunctions();
     }
 
     private void addSchemaToCache(MetaDataMutationResult result) throws SQLException {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 6a0e2ef..dbe93e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -40,6 +40,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE_BYTES;
@@ -655,7 +656,7 @@ public class UpgradeUtil {
         LOGGER.info("Upgrading SYSTEM.SEQUENCE table");
 
         byte[] seqTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE);
-        Table sysTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+        Table sysTable = conn.getQueryServices().getTable(SYSTEM_CATALOG_NAME_BYTES);
         try {
             LOGGER.info("Setting SALT_BUCKETS property of SYSTEM.SEQUENCE to " + SaltingUtil.MAX_BUCKET_NUM);
             KeyValue saltKV = KeyValueUtil.newKeyValue(seqTableKey, 
@@ -1363,15 +1364,15 @@ public class UpgradeUtil {
                             LinkType.CHILD_TABLE, childViewsResult);
 
                     // Iterate over the chain of child views
-                    for (TableInfo tableInfo: childViewsResult.getLinks()) {
+                    for (TableInfo viewInfo: childViewsResult.getLinks()) {
                         PTable view;
-                        String viewName = SchemaUtil.getTableName(tableInfo.getSchemaName(),
-                                tableInfo.getTableName());
+                        String viewName = SchemaUtil.getTableName(viewInfo.getSchemaName(),
+                                viewInfo.getTableName());
                         try {
                             view = PhoenixRuntime.getTable(newConn, viewName);
                         } catch (TableNotFoundException e) {
                             // Ignore
-                            LOGGER.warn("Error getting PTable for view: " + viewName);
+                            LOGGER.error("Error getting PTable for view: " + viewInfo, e);
                             continue;
                         }
                         syncUpdateCacheFreqForIndexesOfTable(view, stmt);
@@ -2080,15 +2081,15 @@ public class UpgradeUtil {
      * use map table utility in psql.py
      */
     public static void mapTableToNamespace(HBaseAdmin admin, Table metatable, String tableName,
-            ReadOnlyProps props, Long ts, PTableType pTableType, PName tenantId) throws SnapshotCreationException,
-                    IllegalArgumentException, IOException, InterruptedException, SQLException {
+            ReadOnlyProps props, Long ts, PTableType pTableType, PName tenantId)
+            throws IllegalArgumentException, IOException, InterruptedException, SQLException {
         String destTablename = SchemaUtil
                 .normalizeIdentifier(SchemaUtil.getPhysicalTableName(tableName, props).getNameAsString());
         mapTableToNamespace(admin, metatable, tableName, destTablename, props, ts, tableName, pTableType, tenantId);
     }
 
     public static void upgradeTable(PhoenixConnection conn, String srcTable) throws SQLException,
-            SnapshotCreationException, IllegalArgumentException, IOException, InterruptedException {
+            IllegalArgumentException, IOException, InterruptedException {
         ReadOnlyProps readOnlyProps = conn.getQueryServices().getProps();
         if (conn.getSchema() != null) { throw new IllegalArgumentException(
                 "Schema should not be set for connection!!"); }
@@ -2096,10 +2097,8 @@ public class UpgradeUtil {
                 readOnlyProps)) { throw new IllegalArgumentException(
                         QueryServices.IS_NAMESPACE_MAPPING_ENABLED + " is not enabled!!"); }
         try (HBaseAdmin admin = conn.getQueryServices().getAdmin();
-                Table metatable = conn.getQueryServices()
-                        .getTable(SchemaUtil
-                                .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, readOnlyProps)
-                                .getName());) {
+                Table metatable = conn.getQueryServices().getTable(SchemaUtil.getPhysicalName(
+                        SYSTEM_CATALOG_NAME_BYTES, readOnlyProps).getName())) {
             String fullTableName = SchemaUtil.normalizeIdentifier(srcTable);
             String schemaName = SchemaUtil.getSchemaNameFromFullName(fullTableName);
             String tableName = SchemaUtil.getTableNameFromFullName(fullTableName);
@@ -2108,18 +2107,23 @@ public class UpgradeUtil {
             
             // Upgrade is not required if schemaName is not present.
             if (schemaName.equals("") && !PTableType.VIEW
-                    .equals(table.getType())) { throw new IllegalArgumentException("Table doesn't have schema name"); }
-
-            if (table.isNamespaceMapped()) { throw new IllegalArgumentException("Table is already upgraded"); }
+                    .equals(table.getType())) {
+                throw new IllegalArgumentException("Table doesn't have schema name");
+            }
+            if (table.isNamespaceMapped()) {
+                throw new IllegalArgumentException("Table is already upgraded");
+            }
             if (!schemaName.equals("")) {
                 LOGGER.info(String.format("Creating schema %s..", schemaName));
                 conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
             }
             String oldPhysicalName = table.getPhysicalName().getString();
             String newPhysicalTablename = SchemaUtil.normalizeIdentifier(
-                    SchemaUtil.getPhysicalTableName(oldPhysicalName, readOnlyProps).getNameAsString());
+                    SchemaUtil.getPhysicalTableName(oldPhysicalName, readOnlyProps)
+                            .getNameAsString());
             LOGGER.info(String.format("Upgrading %s %s..", table.getType(), fullTableName));
-            LOGGER.info(String.format("oldPhysicalName %s newPhysicalTablename %s..", oldPhysicalName, newPhysicalTablename));
+            LOGGER.info(String.format("oldPhysicalName %s newPhysicalTablename %s..",
+                    oldPhysicalName, newPhysicalTablename));
             LOGGER.info(String.format("teanantId %s..", conn.getTenantId()));
 
             TableViewFinderResult childViewsResult = new TableViewFinderResult();
@@ -2129,8 +2133,10 @@ public class UpgradeUtil {
                         .getTable(SchemaUtil.getPhysicalName(
                                 i==0 ? SYSTEM_CHILD_LINK_NAME_BYTES : SYSTEM_CATALOG_TABLE_BYTES,
                                 readOnlyProps).getName())) {
-                    byte[] tenantId = conn.getTenantId() != null ? conn.getTenantId().getBytes() : null;
-                    ViewUtil.findAllRelatives(sysCatOrSysChildLinkTable, tenantId, schemaName.getBytes(),
+                    byte[] tenantId = conn.getTenantId() != null ?
+                            conn.getTenantId().getBytes() : null;
+                    ViewUtil.findAllRelatives(sysCatOrSysChildLinkTable, tenantId,
+                            schemaName.getBytes(),
                             tableName.getBytes(), LinkType.CHILD_TABLE, childViewsResult);
                     break;
                 } catch (TableNotFoundException ex) {
@@ -2143,13 +2149,15 @@ public class UpgradeUtil {
             }
 
             // Upgrade the data or main table
-            mapTableToNamespace(admin, metatable, fullTableName, newPhysicalTablename, readOnlyProps,
-                    PhoenixRuntime.getCurrentScn(readOnlyProps), fullTableName, table.getType(),conn.getTenantId());
+            mapTableToNamespace(admin, metatable, fullTableName, newPhysicalTablename,
+                    readOnlyProps, PhoenixRuntime.getCurrentScn(readOnlyProps), fullTableName,
+                    table.getType(),conn.getTenantId());
             // clear the cache and get new table
             conn.removeTable(conn.getTenantId(), fullTableName,
                 table.getParentName() != null ? table.getParentName().getString() : null,
                 table.getTimeStamp());
-            byte[] tenantIdBytes = conn.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : conn.getTenantId().getBytes();
+            byte[] tenantIdBytes = conn.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY :
+                    conn.getTenantId().getBytes();
             conn.getQueryServices().clearTableFromCache(
                     tenantIdBytes,
                     table.getSchemaName().getBytes(), table.getTableName().getBytes(),
@@ -2157,8 +2165,9 @@ public class UpgradeUtil {
             MetaDataMutationResult result =
                     new MetaDataClient(conn).updateCache(conn.getTenantId(), schemaName, tableName,
                         true);
-            if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { throw new TableNotFoundException(
-              schemaName, fullTableName); }
+            if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
+                throw new TableNotFoundException(schemaName, fullTableName);
+            }
             table = result.getTable();
             
             // check whether table is properly upgraded before upgrading indexes
@@ -2180,35 +2189,41 @@ public class UpgradeUtil {
                                 "local index '%s' found with physical hbase table name ''..",
                                 index.getName(), srcTableName));
                         destTableName = Bytes
-                                .toString(MetaDataUtil.getLocalIndexPhysicalName(newPhysicalTablename.getBytes()));
+                                .toString(MetaDataUtil.getLocalIndexPhysicalName(
+                                        newPhysicalTablename.getBytes()));
                         // update parent_table property in local index table descriptor
                         conn.createStatement()
-                                .execute(String.format("ALTER TABLE %s set " + MetaDataUtil.PARENT_TABLE_KEY + "='%s'",
+                                .execute(String.format("ALTER TABLE %s set " +
+                                                MetaDataUtil.PARENT_TABLE_KEY + "='%s'",
                                         phoenixTableName, table.getPhysicalName()));
                     } else if (MetaDataUtil.isViewIndex(srcTableName)) {
                         LOGGER.info(String.format(
                                 "View index '%s' found with physical hbase table name ''..",
                                 index.getName(), srcTableName));
                         destTableName = Bytes
-                                .toString(MetaDataUtil.getViewIndexPhysicalName(newPhysicalTablename.getBytes()));
+                                .toString(MetaDataUtil.getViewIndexPhysicalName(
+                                        newPhysicalTablename.getBytes()));
                     } else {
                         LOGGER.info(String.format(
                                 "Global index '%s' found with physical hbase table name ''..",
                                 index.getName(), srcTableName));
                         destTableName = SchemaUtil
-                                .getPhysicalTableName(index.getPhysicalName().getString(), readOnlyProps)
-                                .getNameAsString();
+                                .getPhysicalTableName(index.getPhysicalName().getString(),
+                                        readOnlyProps).getNameAsString();
                     }
                     LOGGER.info(String.format("Upgrading index %s..", index.getName()));
-                    if (!(table.getType() == PTableType.VIEW && !MetaDataUtil.isViewIndex(srcTableName)
+                    if (!(table.getType() == PTableType.VIEW && !MetaDataUtil.isViewIndex(
+                            srcTableName)
                             && IndexType.LOCAL != index.getIndexType())) {
-                        mapTableToNamespace(admin, metatable, srcTableName, destTableName, readOnlyProps,
-                                PhoenixRuntime.getCurrentScn(readOnlyProps), phoenixTableName, index.getType(),
-                                conn.getTenantId());
+                        mapTableToNamespace(admin, metatable, srcTableName, destTableName,
+                                readOnlyProps, PhoenixRuntime.getCurrentScn(readOnlyProps),
+                                phoenixTableName, index.getType(), conn.getTenantId());
                     }
                     if (updateLink) {
-                        LOGGER.info(String.format("Updating link information for index '%s' ..", index.getName()));
-                        updateLink(conn, srcTableName, destTableName,index.getSchemaName(),index.getTableName());
+                        LOGGER.info(String.format("Updating link information for index '%s' ..",
+                                index.getName()));
+                        updateLink(conn, srcTableName, destTableName,index.getSchemaName(),
+                                index.getTableName());
                         conn.commit();
                     }
                     conn.getQueryServices().clearTableFromCache(
@@ -2219,19 +2234,24 @@ public class UpgradeUtil {
                 updateIndexesSequenceIfPresent(conn, table);
                 conn.commit();
             } else {
-                throw new RuntimeException("Error: problem occured during upgrade. Table is not upgraded successfully");
+                throw new RuntimeException("Error: problem occured during upgrade. "
+                        + "Table is not upgraded successfully");
             }
             if (table.getType() == PTableType.VIEW) {
-                LOGGER.info(String.format("Updating link information for view '%s' ..", table.getTableName()));
-                updateLink(conn, oldPhysicalName, newPhysicalTablename,table.getSchemaName(),table.getTableName());
+                LOGGER.info(String.format("Updating link information for view '%s' ..",
+                        table.getTableName()));
+                updateLink(conn, oldPhysicalName, newPhysicalTablename,table.getSchemaName(),
+                        table.getTableName());
                 conn.commit();
                 
                 // if the view is a first level child, then we need to create the PARENT_TABLE link
                 // that was overwritten by the PHYSICAL_TABLE link 
                 if (table.getParentName().equals(table.getPhysicalName())) {
-                    LOGGER.info(String.format("Creating PARENT link for view '%s' ..", table.getTableName()));
+                    LOGGER.info(String.format("Creating PARENT link for view '%s' ..",
+                            table.getTableName()));
                     // Add row linking view to its parent 
-                    PreparedStatement linkStatement = conn.prepareStatement(MetaDataClient.CREATE_VIEW_LINK);
+                    PreparedStatement linkStatement = conn.prepareStatement(
+                            MetaDataClient.CREATE_VIEW_LINK);
                     linkStatement.setString(1, Bytes.toStringBinary(tenantIdBytes));
                     linkStatement.setString(2, table.getSchemaName().getString());
                     linkStatement.setString(3, table.getTableName().getString());
@@ -2249,7 +2269,8 @@ public class UpgradeUtil {
             }
             // Upgrade all child views
             if (table.getType() == PTableType.TABLE) {
-                mapChildViewsToNamespace(conn.getURL(), conn.getClientInfo(), childViewsResult.getLinks());
+                mapChildViewsToNamespace(conn.getURL(), conn.getClientInfo(),
+                        childViewsResult.getLinks());
             }
         }
     }
@@ -2341,27 +2362,36 @@ public class UpgradeUtil {
         deleteLinkStatment.execute();
     }
     
-    private static void mapChildViewsToNamespace(String connUrl, Properties props, List<TableInfo> viewInfoList)
-            throws SQLException, SnapshotCreationException, IllegalArgumentException, IOException,
-            InterruptedException {
-        String tenantId = null;
+    private static void mapChildViewsToNamespace(String connUrl, Properties props,
+            List<TableInfo> viewInfoList) throws SQLException, IllegalArgumentException,
+            IOException, InterruptedException {
+        String tenantId;
         String prevTenantId = null;
         PhoenixConnection conn = null;
         for (TableInfo viewInfo : viewInfoList) {
             tenantId = viewInfo.getTenantId()!=null ? Bytes.toString(viewInfo.getTenantId()) : null;
-            String viewName = SchemaUtil.getTableName(viewInfo.getSchemaName(), viewInfo.getTableName());
+            String viewName = SchemaUtil.getTableName(viewInfo.getSchemaName(),
+                    viewInfo.getTableName());
             if (!java.util.Objects.equals(prevTenantId, tenantId)) {
                 if (tenantId != null) {
                     props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
                 } else {
                     props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
                 }
-                if (conn!=null)
+                if (conn!=null) {
                     conn.close();
+                }
                 conn = DriverManager.getConnection(connUrl, props).unwrap(PhoenixConnection.class);
             }
             LOGGER.info(String.format("Upgrading view %s for tenantId %s..", viewName,tenantId));
-            UpgradeUtil.upgradeTable(conn, viewName);
+            if (conn != null) {
+                try {
+                    UpgradeUtil.upgradeTable(conn, viewName);
+                } catch (TableNotFoundException e) {
+                    // Ignore
+                    LOGGER.error("Error getting PTable for view: " + viewInfo, e);
+                }
+            }
             prevTenantId = tenantId;
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
index 174f6b0..5e4d535 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
@@ -23,10 +23,14 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
+import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -86,33 +90,173 @@ public class ViewUtil {
 
     private static final Logger logger = LoggerFactory.getLogger(ViewUtil.class);
 
-    public static void findAllRelatives(Table sysCatOrsysChildLink, byte[] tenantId, byte[] schema, byte[] table,
-        PTable.LinkType linkType, TableViewFinderResult result) throws IOException {
-        findAllRelatives(sysCatOrsysChildLink, tenantId, schema, table, linkType, HConstants.LATEST_TIMESTAMP, result);
+    /**
+     * Find all the descendant views of a given table or view in a depth-first fashion.
+     * Note that apart from scanning the parent->child links, we also validate each view
+     * by trying to resolve it.
+     * Use {@link ViewUtil#findAllRelatives(Table, byte[], byte[], byte[], LinkType,
+     * TableViewFinderResult)} if you want to find other links and don't care about orphan results.
+     *
+     * @param sysCatOrsysChildLink Table corresponding to either SYSTEM.CATALOG or SYSTEM.CHILD_LINK
+     * @param serverSideConfig server-side configuration
+     * @param tenantId tenantId of the view (null if it is a table or global view)
+     * @param schemaName schema name of the table/view
+     * @param tableOrViewName name of the table/view
+     * @param clientTimeStamp client timestamp
+     * @param findJustOneLegitimateChildView if true, we are only interested in knowing if there is
+     *                                       at least one legitimate child view, so we return early.
+     *                                       If false, we want to find all legitimate child views
+     *                                       and all orphan views (views that no longer exist)
+     *                                       stemming from this table/view and all of its legitimate
+     *                                       child views.
+     *
+     * @return a Pair where the first element is a list of all legitimate child views (or just 1
+     * child view in case findJustOneLegitimateChildView is true) and where the second element is
+     * a list of all orphan views stemming from this table/view and all of its legitimate child
+     * views (in case findJustOneLegitimateChildView is true, this list will be incomplete since we
+     * are not interested in it anyhow)
+     *
+     * @throws IOException thrown if there is an error scanning SYSTEM.CHILD_LINK or SYSTEM.CATALOG
+     * @throws SQLException thrown if there is an error getting a connection to the server or an
+     * error retrieving the PTable for a child view
+     */
+    public static Pair<List<PTable>, List<TableInfo>> findAllDescendantViews(
+            Table sysCatOrsysChildLink, Configuration serverSideConfig, byte[] tenantId,
+            byte[] schemaName, byte[] tableOrViewName, long clientTimeStamp,
+            boolean findJustOneLegitimateChildView)
+            throws IOException, SQLException {
+        List<PTable> legitimateChildViews = new ArrayList<>();
+        List<TableInfo> orphanChildViews = new ArrayList<>();
+
+        findAllDescendantViews(sysCatOrsysChildLink, serverSideConfig, tenantId, schemaName,
+                tableOrViewName, clientTimeStamp, legitimateChildViews, orphanChildViews,
+                findJustOneLegitimateChildView);
+        return new Pair<>(legitimateChildViews, orphanChildViews);
     }
 
-    private static void findAllRelatives(Table sysCatOrsysChildLink, byte[] tenantId, byte[] schema, byte[] table,
-        PTable.LinkType linkType, long timestamp, TableViewFinderResult result) throws IOException {
+    private static void findAllDescendantViews(Table sysCatOrsysChildLink,
+            Configuration serverSideConfig, byte[] parentTenantId, byte[] parentSchemaName,
+            byte[] parentTableOrViewName, long clientTimeStamp, List<PTable> legitimateChildViews,
+            List<TableInfo> orphanChildViews, boolean findJustOneLegitimateChildView)
+            throws IOException, SQLException {
         TableViewFinderResult currentResult =
-            findRelatedViews(sysCatOrsysChildLink, tenantId, schema, table, linkType, timestamp);
+                findImmediateRelatedViews(sysCatOrsysChildLink, parentTenantId, parentSchemaName,
+                        parentTableOrViewName, LinkType.CHILD_TABLE, clientTimeStamp);
+        for (TableInfo viewInfo : currentResult.getLinks()) {
+            byte[] viewTenantId = viewInfo.getTenantId();
+            byte[] viewSchemaName = viewInfo.getSchemaName();
+            byte[] viewName = viewInfo.getTableName();
+            PTable view;
+            Properties props = new Properties();
+            if (viewTenantId != null) {
+                props.setProperty(TENANT_ID_ATTRIB, Bytes.toString(viewTenantId));
+            }
+            if (clientTimeStamp != HConstants.LATEST_TIMESTAMP) {
+                props.setProperty(CURRENT_SCN_ATTRIB, Long.toString(clientTimeStamp));
+            }
+            try (PhoenixConnection connection =
+                    QueryUtil.getConnectionOnServer(props, serverSideConfig)
+                            .unwrap(PhoenixConnection.class)) {
+                try {
+                    view = PhoenixRuntime.getTableNoCache(connection,
+                            SchemaUtil.getTableName(viewSchemaName, viewName));
+                } catch (TableNotFoundException ex) {
+                    logger.error("Found an orphan parent->child link keyed by this parent."
+                            + " Parent Tenant Id: '" + Bytes.toString(parentTenantId)
+                            + "'. Parent Schema Name: '" + Bytes.toString(parentSchemaName)
+                            + "'. Parent Table/View Name: '" + Bytes.toString(parentTableOrViewName)
+                            + "'. The child view which could not be resolved has ViewInfo: '"
+                            + viewInfo + "'.", ex);
+                    orphanChildViews.add(viewInfo);
+                    // Prune orphan branches
+                    continue;
+                }
+
+                if (isLegitimateChildView(view, parentSchemaName, parentTableOrViewName)) {
+                    legitimateChildViews.add(view);
+                    // return early since we're only interested in knowing if there is at least one
+                    // valid child view
+                    if (findJustOneLegitimateChildView) {
+                        break;
+                    }
+                    // Note that we only explore this branch if the current view is a legitimate
+                    // child view, else we ignore it and move on to the next potential child view
+                    findAllDescendantViews(sysCatOrsysChildLink, serverSideConfig,
+                            viewInfo.getTenantId(), viewInfo.getSchemaName(),
+                            viewInfo.getTableName(), clientTimeStamp, legitimateChildViews,
+                            orphanChildViews, findJustOneLegitimateChildView);
+                } else {
+                    logger.error("Found an orphan parent->child link keyed by this parent."
+                            + " Parent Tenant Id: '" + Bytes.toString(parentTenantId)
+                            + "'. Parent Schema Name: '" + Bytes.toString(parentSchemaName)
+                            + "'. Parent Table/View Name: '" + Bytes.toString(parentTableOrViewName)
+                            + "'. There currently exists a legitimate view of the same name which"
+                            + " is not a descendant of this table/view. View Info: '" + viewInfo
+                            + "'. Ignoring this view and not counting it as a child view.");
+                    // Prune unrelated view branches left around due to orphan parent->child links
+                }
+            }
+        }
+    }
+
+    private static boolean isLegitimateChildView(PTable view, byte[] parentSchemaName,
+            byte[] parentTableOrViewName) {
+        return view != null && view.getParentSchemaName() != null &&
+                view.getParentTableName() != null &&
+                (Arrays.equals(view.getParentSchemaName().getBytes(), parentSchemaName) &&
+                        Arrays.equals(view.getParentTableName().getBytes(), parentTableOrViewName));
+    }
+
+    /**
+     * Returns relatives in a breadth-first fashion. Note that this is not resilient to orphan
+     * linking rows and we also do not try to resolve any of the views to ensure they are valid.
+     * Use {@link ViewUtil#findAllDescendantViews(Table, Configuration, byte[], byte[], byte[],
+     * long, boolean)} if you are only interested in {@link LinkType#CHILD_TABLE} and need to be
+     * resilient to orphan linking rows.
+     *
+     * @param sysCatOrsysChildLink Table corresponding to either SYSTEM.CATALOG or SYSTEM.CHILD_LINK
+     * @param tenantId tenantId of the key (null if it is a table or global view)
+     * @param schema schema name to use in the key
+     * @param table table/view name to use in the key
+     * @param linkType link type
+     * @param result containing all linked entities
+     *
+     * @throws IOException thrown if there is an error scanning SYSTEM.CHILD_LINK or SYSTEM.CATALOG
+     */
+    public static void findAllRelatives(Table sysCatOrsysChildLink, byte[] tenantId, byte[] schema,
+            byte[] table, PTable.LinkType linkType, TableViewFinderResult result)
+            throws IOException {
+        findAllRelatives(sysCatOrsysChildLink, tenantId, schema, table, linkType,
+                HConstants.LATEST_TIMESTAMP, result);
+    }
+
+    private static void findAllRelatives(Table sysCatOrsysChildLink, byte[] tenantId, byte[] schema,
+            byte[] table, PTable.LinkType linkType, long timestamp, TableViewFinderResult result)
+            throws IOException {
+        TableViewFinderResult currentResult = findImmediateRelatedViews(sysCatOrsysChildLink,
+                tenantId, schema, table, linkType, timestamp);
         result.addResult(currentResult);
         for (TableInfo viewInfo : currentResult.getLinks()) {
-            findAllRelatives(sysCatOrsysChildLink, viewInfo.getTenantId(), viewInfo.getSchemaName(), viewInfo.getTableName(), linkType, timestamp, result);
+            findAllRelatives(sysCatOrsysChildLink, viewInfo.getTenantId(), viewInfo.getSchemaName(),
+                    viewInfo.getTableName(), linkType, timestamp, result);
         }
     }
 
     /**
-     * Runs a scan on SYSTEM.CATALOG or SYSTEM.CHILD_LINK to get the related tables/views
+     * Runs a scan on SYSTEM.CATALOG or SYSTEM.CHILD_LINK to get the immediate related tables/views.
      */
-    private static TableViewFinderResult findRelatedViews(Table sysCatOrsysChildLink, byte[] tenantId, byte[] schema, byte[] table,
-        PTable.LinkType linkType, long timestamp) throws IOException {
+    private static TableViewFinderResult findImmediateRelatedViews(Table sysCatOrsysChildLink,
+            byte[] tenantId, byte[] schema, byte[] table, PTable.LinkType linkType, long timestamp)
+            throws IOException {
         if (linkType==PTable.LinkType.INDEX_TABLE || linkType==PTable.LinkType.EXCLUDED_COLUMN) {
-            throw new IllegalArgumentException("findAllRelatives does not support link type "+linkType);
+            throw new IllegalArgumentException("findAllRelatives does not support link type "
+                    + linkType);
         }
         byte[] key = SchemaUtil.getTableKey(tenantId, schema, table);
-		Scan scan = MetaDataUtil.newTableRowsScan(key, MetaDataProtocol.MIN_TABLE_TIMESTAMP, timestamp);
-        SingleColumnValueFilter linkFilter =
-            new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareFilter.CompareOp.EQUAL,
+		Scan scan = MetaDataUtil.newTableRowsScan(key, MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                timestamp);
+        SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES,
+                LINK_TYPE_BYTES, CompareFilter.CompareOp.EQUAL,
                 linkType.getSerializedValueAsByteArray());
         linkFilter.setFilterIfMissing(true);
         scan.setFilter(linkFilter);
@@ -134,12 +278,15 @@ public class ViewUtil {
                 } else if (linkType==PTable.LinkType.VIEW_INDEX_PARENT_TABLE) {
                     viewTenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
                 } 
-                else if (linkType==PTable.LinkType.PHYSICAL_TABLE && result.getValue(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES)!=null) {
+                else if (linkType==PTable.LinkType.PHYSICAL_TABLE &&
+                        result.getValue(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES)!=null) {
                     // do not links from indexes to their physical table
                     continue;
                 }
-                byte[] viewSchemaName = SchemaUtil.getSchemaNameFromFullName(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]).getBytes();
-                byte[] viewName = SchemaUtil.getTableNameFromFullName(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]).getBytes();
+                byte[] viewSchemaName = SchemaUtil.getSchemaNameFromFullName(
+                        rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]).getBytes();
+                byte[] viewName = SchemaUtil.getTableNameFromFullName(
+                        rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]).getBytes();
                 tableInfoList.add(new TableInfo(viewTenantId, viewSchemaName, viewName));
             }
             return new TableViewFinderResult(tableInfoList);
@@ -147,20 +294,23 @@ public class ViewUtil {
     }
     
     /**
-     * Check metadata to find all child views for a given table/view
-     * @param sysCatOrsysChildLink For older (pre-4.15.0) clients, we look for child links inside SYSTEM.CATALOG,
-     *                             otherwise we look for them inside SYSTEM.CHILD_LINK
+     * Check metadata to find if a given table/view has any immediate child views. Note that this
+     * is not resilient to orphan parent->child links.
+     * @param sysCatOrsysChildLink For older (pre-4.15.0) clients, we look for child links inside
+     *                             SYSTEM.CATALOG, otherwise we look for them inside
+     *                             SYSTEM.CHILD_LINK
      * @param tenantId tenantId
      * @param schemaName table schema name
      * @param tableName table name
      * @param timestamp passed client-side timestamp
      * @return true if the given table has at least one child view
-     * @throws IOException
+     * @throws IOException thrown if there is an error scanning SYSTEM.CHILD_LINK or SYSTEM.CATALOG
      */
-    public static boolean hasChildViews(Table sysCatOrsysChildLink, byte[] tenantId, byte[] schemaName,
-                                        byte[] tableName, long timestamp) throws IOException {
+    public static boolean hasChildViews(Table sysCatOrsysChildLink, byte[] tenantId,
+            byte[] schemaName, byte[] tableName, long timestamp) throws IOException {
         byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
-        Scan scan = MetaDataUtil.newTableRowsScan(key, MetaDataProtocol.MIN_TABLE_TIMESTAMP, timestamp);
+        Scan scan = MetaDataUtil.newTableRowsScan(key, MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+                timestamp);
         SingleColumnValueFilter linkFilter =
                 new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES,
                         CompareFilter.CompareOp.EQUAL,
@@ -181,8 +331,21 @@ public class ViewUtil {
         }
     }
 
+    /**
+     * Attempt to drop an orphan child view i.e. a child view for which we see a parent->child entry
+     * in SYSTEM.CHILD_LINK/SYSTEM.CATALOG (as a child) but for whom the parent no longer exists.
+     * @param env Region Coprocessor environment
+     * @param tenantIdBytes tenantId of the parent
+     * @param schemaName schema of the parent
+     * @param tableOrViewName parent table/view name
+     * @param sysCatOrSysChildLink SYSTEM.CATALOG or SYSTEM.CHILD_LINK which is used to find the
+     *                             parent->child linking rows
+     * @throws IOException thrown if there is an error scanning SYSTEM.CHILD_LINK or SYSTEM.CATALOG
+     * @throws SQLException thrown if there is an error getting a connection to the server or
+     * an error retrieving the PTable for a child view
+     */
     public static void dropChildViews(RegionCoprocessorEnvironment env, byte[] tenantIdBytes,
-            byte[] schemaName, byte[] tableName, byte[] sysCatOrSysChildLink)
+            byte[] schemaName, byte[] tableOrViewName, byte[] sysCatOrSysChildLink)
             throws IOException, SQLException {
         Table hTable = null;
         try {
@@ -195,50 +358,72 @@ public class ViewUtil {
             return;
         }
 
-        TableViewFinderResult childViewsResult = null;
+        TableViewFinderResult childViewsResult;
         try {
-            childViewsResult = findRelatedViews(
+            childViewsResult = findImmediateRelatedViews(
                     hTable,
                     tenantIdBytes,
                     schemaName,
-                    tableName,
+                    tableOrViewName,
                     LinkType.CHILD_TABLE,
                     HConstants.LATEST_TIMESTAMP);
         } finally {
            hTable.close();
         }
 
-        if(childViewsResult == null) {
-            logger.info("tenantIdBytes:" + Bytes.toStringBinary(tenantIdBytes) +
-                        ", schemaName:" + Bytes.toStringBinary(schemaName) +
-                        ", tableName:" + Bytes.toStringBinary(tableName) +
-                        ", ViewUtil.findRelatedViews return null.");
-            return;
-        }
-
         for (TableInfo viewInfo : childViewsResult.getLinks()) {
             byte[] viewTenantId = viewInfo.getTenantId();
             byte[] viewSchemaName = viewInfo.getSchemaName();
             byte[] viewName = viewInfo.getTableName();
             if (logger.isDebugEnabled()) {
-                logger.debug("dropChildViews :" + Bytes.toString(schemaName) + "." + Bytes.toString(tableName) +
-                        " -> " + Bytes.toString(viewSchemaName) + "." + Bytes.toString(viewName) +
-                        "with tenant id :" + Bytes.toString(viewTenantId));
+                logger.debug("dropChildViews : " + Bytes.toString(schemaName) + "."
+                        + Bytes.toString(tableOrViewName) + " -> "
+                        + Bytes.toString(viewSchemaName) + "." + Bytes.toString(viewName)
+                        + "with tenant id :" + Bytes.toString(viewTenantId));
             }
             Properties props = new Properties();
+            PTable view = null;
             if (viewTenantId != null && viewTenantId.length != 0)
-                props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, Bytes.toString(viewTenantId));
-            try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(props, env.getConfiguration())
-                    .unwrap(PhoenixConnection.class)) {
-                MetaDataClient client = new MetaDataClient(connection);
-                org.apache.phoenix.parse.TableName viewTableName = org.apache.phoenix.parse.TableName
-                        .create(Bytes.toString(viewSchemaName), Bytes.toString(viewName));
+                props.setProperty(TENANT_ID_ATTRIB, Bytes.toString(viewTenantId));
+            try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(props,
+                    env.getConfiguration()).unwrap(PhoenixConnection.class)) {
                 try {
-                    client.dropTable(
-                            new DropTableStatement(viewTableName, PTableType.VIEW, true, true, true));
+                    // Ensure that the view to be dropped has some ancestor that no longer exists
+                    // (and thus will throw a TableNotFoundException). Otherwise, if we are looking
+                    // at an orphan parent->child link, then the view might actually be a legitimate
+                    // child view on another table/view and we should obviously not drop it
+                    view = PhoenixRuntime.getTableNoCache(connection,
+                            SchemaUtil.getTableName(viewSchemaName, viewName));
+                } catch (TableNotFoundException expected) {
+                    // Expected for an orphan view since some ancestor was dropped earlier
+                    logger.info("Found an expected orphan parent->child link keyed by the parent."
+                            + " Parent Tenant Id: '" + Bytes.toString(tenantIdBytes)
+                            + "'. Parent Schema Name: '" + Bytes.toString(schemaName)
+                            + "'. Parent Table/View Name: '" + Bytes.toString(tableOrViewName)
+                            + "'. Will attempt to drop this child view with ViewInfo: '"
+                            + viewInfo + "'.");
                 }
-                catch (TableNotFoundException e) {
-                    logger.info("Ignoring view "+viewTableName+" as it has already been dropped");
+                if (view != null) {
+                    logger.error("Found an orphan parent->child link keyed by this parent or"
+                            + " its descendant. Parent Tenant Id: '" + Bytes.toString(tenantIdBytes)
+                            + "'. Parent Schema Name: '" + Bytes.toString(schemaName)
+                            + "'. Parent Table/View Name: '" + Bytes.toString(tableOrViewName)
+                            + "'. There currently exists a legitimate view of the same name whose"
+                            + " parent hierarchy exists. View Info: '" + viewInfo
+                            + "'. Ignoring this view and not attempting to drop it.");
+                    continue;
+                }
+
+                MetaDataClient client = new MetaDataClient(connection);
+                org.apache.phoenix.parse.TableName viewTableName =
+                        org.apache.phoenix.parse.TableName.create(Bytes.toString(viewSchemaName),
+                                Bytes.toString(viewName));
+                try {
+                    client.dropTable(new DropTableStatement(viewTableName, PTableType.VIEW, true,
+                            true, true));
+                } catch (TableNotFoundException e) {
+                    logger.info("Ignoring view " + viewTableName
+                            + " as it has already been dropped");
                 }
             }
         }
@@ -254,7 +439,7 @@ public class ViewUtil {
      * @param clientVersion client version
      * @param conf server-side configuration
      * @return name of the system table to be used
-     * @throws SQLException
+     * @throws SQLException thrown if there is an error connecting to the server
      */
     public static TableName getSystemTableForChildLinks(int clientVersion,
             Configuration conf) throws SQLException, IOException {
@@ -292,10 +477,11 @@ public class ViewUtil {
     }
 
     /**
-     * Adds indexes of the parent table to inheritedIndexes if the index contains all required columns
+     * Adds indexes of the parent table to inheritedIndexes if the index contains all required
+     * columns
      */
     public static void addIndexesFromParent(PhoenixConnection connection, PTable view,
-                                            PTable parentTable, List<PTable> inheritedIndexes) throws SQLException {
+            PTable parentTable, List<PTable> inheritedIndexes) throws SQLException {
         List<PTable> parentTableIndexes = parentTable.getIndexes();
         for (PTable index : parentTableIndexes) {
             boolean containsAllReqdCols = true;
@@ -324,27 +510,31 @@ public class ViewUtil {
             for (PColumn col : view.getColumns()) {
                 if (col.isViewReferenced() || col.getViewConstant() != null) {
                     try {
-                        // It'd be possible to use a local index that doesn't have all view constants,
-                        // but the WHERE clause for the view statement (which is added to the index below)
-                        // would fail to compile.
+                        // It'd be possible to use a local index that doesn't have all view
+                        // constants, but the WHERE clause for the view statement (which is added to
+                        // the index below) would fail to compile.
                         String indexColumnName = IndexUtil.getIndexColumnName(col);
                         index.getColumnForColumnName(indexColumnName);
                     } catch (ColumnNotFoundException e1) {
                         PColumn indexCol = null;
                         try {
-                            String cf = col.getFamilyName()!=null ? col.getFamilyName().getString() : null;
+                            String cf = col.getFamilyName()!=null ?
+                                    col.getFamilyName().getString() : null;
                             String colName = col.getName().getString();
                             if (cf != null) {
-                                indexCol = parentTable.getColumnFamily(cf).getPColumnForColumnName(colName);
+                                indexCol = parentTable.getColumnFamily(cf)
+                                        .getPColumnForColumnName(colName);
                             }
                             else {
                                 indexCol = parentTable.getColumnForColumnName(colName);
                             }
-                        } catch (ColumnNotFoundException e2) { // Ignore this index and continue with others
+                        } catch (ColumnNotFoundException e2) {
+                            // Ignore this index and continue with others
                             containsAllReqdCols = false;
                             break;
                         }
-                        if (indexCol.getViewConstant()==null || Bytes.compareTo(indexCol.getViewConstant(), col.getViewConstant())!=0) {
+                        if (indexCol.getViewConstant()==null || Bytes.compareTo(
+                                indexCol.getViewConstant(), col.getViewConstant())!=0) {
                             containsAllReqdCols = false;
                             break;
                         }
@@ -353,15 +543,18 @@ public class ViewUtil {
             }
             if (containsAllReqdCols) {
                 // Tack on view statement to index to get proper filtering for view
-                String viewStatement = IndexUtil.rewriteViewStatement(connection, index, parentTable, view.getViewStatement());
+                String viewStatement = IndexUtil.rewriteViewStatement(connection, index,
+                        parentTable, view.getViewStatement());
                 PName modifiedIndexName = PNameFactory.newName(view.getName().getString()
-                        + QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR + index.getName().getString());
-                // add the index table with a new name so that it does not conflict with the existing index table
-                // and set update cache frequency to that of the view
+                        + QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR +
+                        index.getName().getString());
+                // add the index table with a new name so that it does not conflict with the
+                // existing index table and set update cache frequency to that of the view
                 if (Objects.equal(viewStatement, index.getViewStatement())) {
                     inheritedIndexes.add(index);
                 } else {
-                    inheritedIndexes.add(PTableImpl.builderWithColumns(index, getColumnsToClone(index))
+                    inheritedIndexes.add(PTableImpl.builderWithColumns(index,
+                            getColumnsToClone(index))
                             .setTableName(modifiedIndexName)
                             .setViewStatement(viewStatement)
                             .setUpdateCacheFrequency(view.getUpdateCacheFrequency())
@@ -383,7 +576,8 @@ public class ViewUtil {
         boolean hasIndexId = table.getViewIndexId() != null;
         // For views :
         if (!hasIndexId) {
-            // 1. need to resolve the views's own indexes so that any columns added by ancestors are included
+            // 1. need to resolve the views's own indexes so that any columns added by ancestors
+            // are included
             List<PTable> allIndexes = Lists.newArrayList();
             if (pTable !=null && pTable.getIndexes() !=null && !pTable.getIndexes().isEmpty()) {
                 for (PTable viewIndex : pTable.getIndexes()) {
@@ -461,7 +655,8 @@ public class ViewUtil {
 
         long maxTableTimestamp = view.getTimeStamp();
         int numPKCols = view.getPKColumns().size();
-        // set the final table timestamp as the max timestamp of the view/view index or its ancestors
+        // set the final table timestamp as the max timestamp of the view/view index or its
+        // ancestors
         maxTableTimestamp = Math.max(maxTableTimestamp, parentTable.getTimeStamp());
         if (hasIndexId) {
             // add all pk columns of parent tables to indexes
@@ -543,7 +738,8 @@ public class ViewUtil {
         // When creating a PTable for views or view indexes, use the baseTable PTable for attributes
         // inherited from the physical base table.
         // if a TableProperty is not valid on a view we set it to the base table value
-        // if a TableProperty is valid on a view and is not mutable on a view we set it to the base table value
+        // if a TableProperty is valid on a view and is not mutable on a view we set it to the base
+        // table value
         // if a TableProperty is valid on a view and is mutable on a view, we use the value set
         // on the view if the view had previously modified the property, otherwise we propagate the
         // value from the base table (see PHOENIX-4763)
@@ -556,9 +752,11 @@ public class ViewUtil {
                 .setAutoPartitionSeqName(parentTable.getAutoPartitionSeqName())
                 .setAppendOnlySchema(parentTable.isAppendOnlySchema())
                 .setImmutableStorageScheme(parentTable.getImmutableStorageScheme() == null ?
-                        PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN : parentTable.getImmutableStorageScheme())
+                        PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN :
+                        parentTable.getImmutableStorageScheme())
                 .setQualifierEncodingScheme(parentTable.getEncodingScheme() == null ?
-                        PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : parentTable.getEncodingScheme())
+                        PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS :
+                        parentTable.getEncodingScheme())
                 .setBaseColumnCount(baseTableColumnCount)
                 .setTimeStamp(maxTableTimestamp)
                 .setExcludedColumns(ImmutableList.copyOf(excludedColumns))
@@ -628,7 +826,8 @@ public class ViewUtil {
                         // For non-diverged views, if the same column exists in a parent and child,
                         // we keep the latest column.
                         PColumn existingColumn = allColumns.get(existingColumnIndex);
-                        if (!isDiverged && ancestorColumn.getTimestamp() > existingColumn.getTimestamp()) {
+                        if (!isDiverged && ancestorColumn.getTimestamp() >
+                                existingColumn.getTimestamp()) {
                             allColumns.remove(existingColumnIndex);
                             allColumns.add(ancestorColumn);
                         }