You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ji...@apache.org on 2023/12/02 01:26:13 UTC

(phoenix) branch PHOENIX-6978-feature updated: PHOENIX-7040 : Support TTL for views using the new column TTL in SYSTEM.CATALOG (#1710)

This is an automated email from the ASF dual-hosted git repository.

jisaac pushed a commit to branch PHOENIX-6978-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-6978-feature by this push:
     new cbac4b2275 PHOENIX-7040 : Support TTL for views using the new column TTL in SYSTEM.CATALOG (#1710)
cbac4b2275 is described below

commit cbac4b2275b4a5131248ee899a875430407b9a5a
Author: Lokesh Khurana <kh...@gmail.com>
AuthorDate: Fri Dec 1 17:26:08 2023 -0800

    PHOENIX-7040 : Support TTL for views using the new column TTL in SYSTEM.CATALOG (#1710)
    
    PHOENIX-7040  Support TTL for views using the new column TTL in SYSTEM.CATALOG
    ---------
    
    Co-authored-by: Lokesh Khurana <lo...@salesforce.com>
---
 .../apache/phoenix/end2end/TTLAsPhoenixTTLIT.java  | 229 ++++++++++++++++-----
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  | 184 ++++++++++++-----
 .../apache/phoenix/exception/SQLExceptionCode.java |   8 +-
 .../PhoenixTransformWithViewsInputFormat.java      |   1 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |   9 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  | 156 ++++++++------
 .../org/apache/phoenix/schema/TableProperty.java   |   2 +-
 .../java/org/apache/phoenix/util/ViewUtil.java     | 145 ++++++++++---
 8 files changed, 549 insertions(+), 185 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java
index 4ef1653881..a2c959b16f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java
@@ -22,10 +22,13 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -33,12 +36,15 @@ import org.junit.experimental.categories.Category;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
 import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX;
-import static org.apache.phoenix.exception.SQLExceptionCode.TTL_SUPPORTED_FOR_TABLES_ONLY;
-import static org.apache.phoenix.exception.SQLExceptionCode.VIEW_WITH_PROPERTIES;
+import static org.apache.phoenix.exception.SQLExceptionCode.TTL_ALREADY_DEFINED_IN_HIERARCHY;
+import static org.apache.phoenix.exception.SQLExceptionCode.TTL_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
@@ -47,6 +53,8 @@ import static org.junit.Assert.fail;
 public class TTLAsPhoenixTTLIT extends ParallelStatsDisabledIT{
 
     private static final long DEFAULT_TTL_FOR_TEST = 86400;
+    private static final long DEFAULT_TTL_FOR_CHILD = 10000;
+    public static final String TENANT_URL_FMT = "%s;%s=%s";
 
     /**
      * test TTL is being set as PhoenixTTL when PhoenixTTL is enabled.
@@ -58,7 +66,6 @@ public class TTLAsPhoenixTTLIT extends ParallelStatsDisabledIT{
                     createTableWithOrWithOutTTLAsItsProperty(conn, true)));
             assertEquals("TTL is not set correctly at Phoenix level", DEFAULT_TTL_FOR_TEST,
                     table.getTTL());
-            assertNull("RowKeyPrefix should be Null", table.getRowKeyPrefix());
         }
     }
 
@@ -163,8 +170,7 @@ public class TTLAsPhoenixTTLIT extends ParallelStatsDisabledIT{
             //By default, Indexes should set TTL what Base Table has
             createIndexOnTableOrViewProvidedWithTTL(conn, tableName, PTable.IndexType.LOCAL, false);
             createIndexOnTableOrViewProvidedWithTTL(conn, tableName, PTable.IndexType.GLOBAL, false);
-            List<PTable> indexes = conn.unwrap(PhoenixConnection.class).getTable(
-                    new PTableKey(null, tableName)).getIndexes();
+            List<PTable> indexes = PhoenixRuntime.getTable(conn, tableName).getIndexes();
             for (PTable index : indexes) {
                 assertTTLValueOfIndex(DEFAULT_TTL_FOR_TEST, index);;
             }
@@ -215,93 +221,200 @@ public class TTLAsPhoenixTTLIT extends ParallelStatsDisabledIT{
         }
     }
 
-
     @Test
-    public void testSettingTTLForViews() throws Exception {
+    public void testSettingTTLForViewsOnTableWithTTL() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String tenantID = generateUniqueName();
             String tenantID1 = generateUniqueName();
 
             Properties props = new Properties();
-            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantID);
+            props.setProperty(TENANT_ID_ATTRIB, tenantID);
             Connection tenantConn = DriverManager.getConnection(getUrl(), props);
 
             Properties props1 = new Properties();
-            props1.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantID1);
+            props1.setProperty(TENANT_ID_ATTRIB, tenantID1);
             Connection tenantConn1 = DriverManager.getConnection(getUrl(), props1);
 
             String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, true);
+            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), DEFAULT_TTL_FOR_TEST,
+                    tableName);
+
+            //Setting TTL on views is not allowed if Table already has TTL
+            try {
+                createUpdatableViewOnTableWithTTL(conn, tableName, true);
+                fail();
+            } catch (SQLException sqe) {
+                assertEquals("Should fail with TTL already defined in hierarchy",
+                        TTL_ALREADY_DEFINED_IN_HIERARCHY.getErrorCode(), sqe.getErrorCode());
+            }
+
+            //TTL is only supported for Table and Updatable Views
+            try {
+                createReadOnlyViewOnTableWithTTL(conn, tableName, true);
+                fail();
+            } catch (SQLException sqe) {
+                assertEquals("Should have failed with TTL supported on Table and Updatable" +
+                        "View only", TTL_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY.getErrorCode(), sqe.getErrorCode());
+            }
 
-            //View gets TTL value from its hierarchy only for Updatable Views
+            //View should have gotten TTL from parent table.
             String viewName = createUpdatableViewOnTableWithTTL(conn, tableName, false);
             assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
                     DEFAULT_TTL_FOR_TEST, viewName);
 
-            //View gets TTL value from its hierarchy
-            String viewName1 = createReadOnlyViewOnTableWithTTL(conn, tableName, false);
-            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
-                    PhoenixDatabaseMetaData.TTL_NOT_DEFINED, viewName1);
+            //Child View's PTable gets TTL from parent View's PTable which gets from Table.
+            String childView = createViewOnViewWithTTL(tenantConn, viewName, false);
+            assertTTLValueOfTableOrView(tenantConn.unwrap(PhoenixConnection.class),
+                    DEFAULT_TTL_FOR_TEST, childView);
+
+            String childView1 = createViewOnViewWithTTL(tenantConn1, viewName, false);
+            assertTTLValueOfTableOrView(tenantConn1.unwrap(PhoenixConnection.class),
+                    DEFAULT_TTL_FOR_TEST, childView1);
 
-            //Index on Global View should get TTL from View.
             createIndexOnTableOrViewProvidedWithTTL(conn, viewName, PTable.IndexType.GLOBAL,
                     false);
             createIndexOnTableOrViewProvidedWithTTL(conn, viewName, PTable.IndexType.LOCAL,
                     false);
-            List<PTable> indexes = conn.unwrap(PhoenixConnection.class).getTable(
-                    new PTableKey(null, viewName)).getIndexes();
+
+            List<PTable> indexes = PhoenixRuntime.getTable(
+                    conn.unwrap(PhoenixConnection.class), viewName).getIndexes();
+
             for (PTable index : indexes) {
                 assertTTLValueOfIndex(DEFAULT_TTL_FOR_TEST, index);
             }
 
-            //Child View gets TTL from parent View which gets from Table.
-            String childView = createViewOnViewWithTTL(tenantConn, viewName, false);
-            assertTTLValueOfTableOrView(tenantConn.unwrap(PhoenixConnection.class),
-                    DEFAULT_TTL_FOR_TEST, childView);
+            createIndexOnTableOrViewProvidedWithTTL(conn, tableName, PTable.IndexType.GLOBAL, false);
+            List<PTable> tIndexes = PhoenixRuntime.getTable(
+                    conn.unwrap(PhoenixConnection.class), tableName).getIndexes();
 
-            String childView1 = createViewOnViewWithTTL(tenantConn1, viewName, false);
-            assertTTLValueOfTableOrView(tenantConn1.unwrap(PhoenixConnection.class),
-                    DEFAULT_TTL_FOR_TEST, childView1);
+            for (PTable index : tIndexes) {
+                assertTTLValueOfIndex(DEFAULT_TTL_FOR_TEST, index);
+            }
 
-            //Setting TTL on Views should not be allowed.
+        }
+    }
+
+    @Test
+    public void testAlteringTTLToNONEAndThenSettingAtAnotherLevel() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tenantID = generateUniqueName();
 
+            Properties props = new Properties();
+            props.setProperty(TENANT_ID_ATTRIB, tenantID);
+            Connection tenantConn = DriverManager.getConnection(getUrl(), props);
+
+            String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, true);
+            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), DEFAULT_TTL_FOR_TEST,
+                    tableName);
+
+            //Setting TTL on views is not allowed if Table already has TTL
             try {
                 createUpdatableViewOnTableWithTTL(conn, tableName, true);
                 fail();
             } catch (SQLException sqe) {
-                assertEquals("Should fail with TTL supported for tables only",
-                        TTL_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), sqe.getErrorCode());
+                assertEquals("Should fail with TTL already defined in hierarchy",
+                        TTL_ALREADY_DEFINED_IN_HIERARCHY.getErrorCode(), sqe.getErrorCode());
             }
 
-            try {
-                createReadOnlyViewOnTableWithTTL(conn, tableName, true);
-                fail();
-            } catch (SQLException sqe) {
-                assertEquals("Should fail with TTL supported for tables only",
-                        TTL_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), sqe.getErrorCode());
-            }
+            String ddl = "ALTER TABLE " + tableName + " SET TTL=NONE";
+            conn.createStatement().execute(ddl);
+
+            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), TTL_NOT_DEFINED,
+                    tableName);
 
+            String viewName = createUpdatableViewOnTableWithTTL(conn, tableName, true);
+            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), DEFAULT_TTL_FOR_CHILD, viewName);
 
             try {
-                conn.createStatement().execute("ALTER VIEW " + viewName + " SET TTL = 1000");
+                createViewOnViewWithTTL(tenantConn, viewName, true);
                 fail();
             } catch (SQLException sqe) {
-                assertEquals("Cannot Set or Alter TTL on Views",
-                        VIEW_WITH_PROPERTIES.getErrorCode(), sqe.getErrorCode());
+                assertEquals("Should fail with TTL already defined in hierarchy",
+                        TTL_ALREADY_DEFINED_IN_HIERARCHY.getErrorCode(), sqe.getErrorCode());
             }
 
             try {
-                createIndexOnTableOrViewProvidedWithTTL(conn, viewName, PTable.IndexType.GLOBAL,true);
-                fail();
+                ddl = "ALTER TABLE " + tableName + " SET TTL=" + DEFAULT_TTL_FOR_TEST;
+                conn.createStatement().execute(ddl);
             } catch (SQLException sqe) {
-                assertEquals("Should fail with Cannot set or Alter property for index",
-                        CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX.getErrorCode(), sqe.getErrorCode());
+                assertEquals("Should fail with TTL already defined in hierarchy",
+                        TTL_ALREADY_DEFINED_IN_HIERARCHY.getErrorCode(), sqe.getErrorCode());
             }
+
+            ddl = "ALTER VIEW " + viewName + " SET TTL=NONE";
+            conn.createStatement().execute(ddl);
+
+            String childView = createViewOnViewWithTTL(tenantConn, viewName, true);
+            assertTTLValueOfTableOrView(tenantConn.unwrap(PhoenixConnection.class),
+                    DEFAULT_TTL_FOR_CHILD, childView);
+
+            ddl = "ALTER VIEW " + childView + " SET TTL=NONE";
+            tenantConn.createStatement().execute(ddl);
+
+            assertTTLValueOfTableOrView(tenantConn.unwrap(PhoenixConnection.class),
+                    TTL_NOT_DEFINED, childView);
+
+            ddl = "ALTER VIEW " + viewName + " SET TTL=" + DEFAULT_TTL_FOR_CHILD;
+            conn.createStatement().execute(ddl);
+            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), DEFAULT_TTL_FOR_CHILD, viewName);
+
+        }
+    }
+
+    @Test
+    public void testAlteringTTLAtOneLevelAndCheckingAtAnotherLevel() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tenantID = generateUniqueName();
+            String tenantID1 = generateUniqueName();
+
+            Properties props = new Properties();
+            props.setProperty(TENANT_ID_ATTRIB, tenantID);
+            Connection tenantConn = DriverManager.getConnection(getUrl(), props);
+
+            Properties props1 = new Properties();
+            props1.setProperty(TENANT_ID_ATTRIB, tenantID1);
+            Connection tenantConn1 = DriverManager.getConnection(getUrl(), props1);
+
+            String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, true);
+            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), DEFAULT_TTL_FOR_TEST,
+                    tableName);
+
+            //View should have gotten TTL from parent table.
+            String viewName = createUpdatableViewOnTableWithTTL(conn, tableName, false);
+            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
+                    DEFAULT_TTL_FOR_TEST, viewName);
+
+            //Child View's PTable gets TTL from parent View's PTable which gets from Table.
+            String childView = createViewOnViewWithTTL(tenantConn, viewName, false);
+            assertTTLValueOfTableOrView(tenantConn.unwrap(PhoenixConnection.class),
+                    DEFAULT_TTL_FOR_TEST, childView);
+
+            String childView1 = createViewOnViewWithTTL(tenantConn1, viewName, false);
+            assertTTLValueOfTableOrView(tenantConn1.unwrap(PhoenixConnection.class),
+                    DEFAULT_TTL_FOR_TEST, childView1);
+
+            String alter = "ALTER TABLE " + tableName + " SET TTL = " + DEFAULT_TTL_FOR_CHILD;
+            conn.createStatement().execute(alter);
+
+            //Clear Cache for all Tables to reflect Alter TTL commands in hierarchy
+            clearCache(conn, null, tableName);
+            clearCache(conn, null, viewName);
+            clearCache(tenantConn, null, childView);
+            clearCache(tenantConn1, null, childView1);
+
+            //Assert TTL for each entity again with altered value
+            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
+                    DEFAULT_TTL_FOR_CHILD, viewName);
+            assertTTLValueOfTableOrView(tenantConn.unwrap(PhoenixConnection.class),
+                    DEFAULT_TTL_FOR_CHILD, childView);
+            assertTTLValueOfTableOrView(tenantConn1.unwrap(PhoenixConnection.class),
+                    DEFAULT_TTL_FOR_CHILD, childView1);
         }
     }
 
     private void assertTTLValueOfTableOrView(PhoenixConnection conn, long expected, String name) throws SQLException {
         assertEquals("TTL value did not match :-", expected,
-                conn.getTable(new PTableKey(conn.getTenantId(), name)).getTTL());
+                PhoenixRuntime.getTableNoCache(conn, name).getTTL());
     }
 
     private void assertTTLValueOfIndex(long expected, PTable index) {
@@ -328,13 +441,13 @@ public class TTLAsPhoenixTTLIT extends ParallelStatsDisabledIT{
             case LOCAL:
                 String localIndexName = baseTableOrViewName + "_Local_" + generateUniqueName();
                 conn.createStatement().execute("CREATE LOCAL INDEX " + localIndexName + " ON " +
-                        baseTableOrViewName + " (COL1) " + (withTTL ? "TTL = 1000" : ""));
+                        baseTableOrViewName + " (COL1) " + (withTTL ? "TTL = " + DEFAULT_TTL_FOR_CHILD : ""));
                 return localIndexName;
 
             case GLOBAL:
                 String globalIndexName = baseTableOrViewName + "_Global_" + generateUniqueName();
                 conn.createStatement().execute("CREATE INDEX " + globalIndexName + " ON " +
-                        baseTableOrViewName + " (COL1) " + (withTTL ? "TTL = 1000" : ""));
+                        baseTableOrViewName + " (COL1) " + (withTTL ? "TTL = " + DEFAULT_TTL_FOR_CHILD : ""));
                 return globalIndexName;
 
             default:
@@ -348,7 +461,7 @@ public class TTLAsPhoenixTTLIT extends ParallelStatsDisabledIT{
         conn.createStatement().execute("CREATE VIEW " + viewName
                 + " (" + generateUniqueName() + " SMALLINT) as select * from "
                 + baseTableName + " where id > 1 "
-                + (withTTL ? "TTL = 1000" : "") );
+                + (withTTL ? "TTL = " + DEFAULT_TTL_FOR_CHILD  : "") );
         return viewName;
     }
 
@@ -358,7 +471,7 @@ public class TTLAsPhoenixTTLIT extends ParallelStatsDisabledIT{
         conn.createStatement().execute("CREATE VIEW " + viewName
                 + " (" + generateUniqueName() + " SMALLINT) as select * from "
                 + baseTableName + " where id = 1 "
-                + (withTTL ? "TTL = 1000" : "") );
+                + (withTTL ? "TTL = " + DEFAULT_TTL_FOR_CHILD : "") );
         return viewName;
     }
 
@@ -366,8 +479,30 @@ public class TTLAsPhoenixTTLIT extends ParallelStatsDisabledIT{
                                            boolean withTTL) throws SQLException {
         String childView = parentViewName + "_" + generateUniqueName();
         conn.createStatement().execute("CREATE VIEW " + childView +
-                " (E BIGINT, F BIGINT) AS SELECT * FROM " + parentViewName);
+                " (E BIGINT, F BIGINT) AS SELECT * FROM " + parentViewName +
+                (withTTL ? " TTL = " + DEFAULT_TTL_FOR_CHILD : ""));
         return childView;
     }
 
+    /**
+     * TODO :- We are externally calling clearCache for Alter Table scenario, remove this after
+     * https://issues.apache.org/jira/browse/PHOENIX-7135 is completed.
+     * @throws SQLException
+     */
+
+    public static void clearCache(Connection tenantConnection, String schemaName, String tableName) throws SQLException {
+
+        PhoenixConnection currentConnection = tenantConnection.unwrap(PhoenixConnection.class);
+        PName tenantIdName = currentConnection.getTenantId();
+        String tenantId = tenantIdName == null ? "" : tenantIdName.getString();
+
+        // Clear server side cache
+        currentConnection.unwrap(PhoenixConnection.class).getQueryServices().clearTableFromCache(
+                Bytes.toBytes(tenantId), schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY :
+                        Bytes.toBytes(schemaName), Bytes.toBytes(tableName), 0);
+
+        // Clear connection cache
+        currentConnection.getMetaDataCache().removeTable(currentConnection.getTenantId(),
+                String.format("%s.%s", schemaName, tableName), null, 0);
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 2028f7aeda..ae5046c1b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -56,6 +56,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT_BYTES
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARENT_TENANT_ID_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME_BYTES;
@@ -82,16 +84,21 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
+import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
 import static org.apache.phoenix.query.QueryConstants.VIEW_MODIFIED_PROPERTY_TAG_TYPE;
+import static org.apache.phoenix.schema.PTable.LinkType.PARENT_TABLE;
+import static org.apache.phoenix.schema.PTable.LinkType.PHYSICAL_TABLE;
+import static org.apache.phoenix.schema.PTable.LinkType.VIEW_INDEX_PARENT_TABLE;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 import static org.apache.phoenix.schema.PTableType.INDEX;
+import static org.apache.phoenix.schema.PTableType.VIEW;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
-import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
-import static org.apache.phoenix.util.SchemaUtil.getVarChars;
+import static org.apache.phoenix.util.SchemaUtil.*;
 import static org.apache.phoenix.util.ViewUtil.findAllDescendantViews;
 import static org.apache.phoenix.util.ViewUtil.getSystemTableForChildLinks;
 
 import java.io.IOException;
+import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSetMetaData;
@@ -115,6 +122,7 @@ import org.apache.hadoop.hbase.ArrayBackedTag;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -132,6 +140,7 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
@@ -1411,8 +1420,16 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         int ttl = ttlKv == null ? TTL_NOT_DEFINED : PInteger.INSTANCE
                 .getCodec().decodeInt(ttlKv.getValueArray(),
                         ttlKv.getValueOffset(), SortOrder.getDefault());
-        builder.setTTL(ttlKv != null ? ttl : oldTable != null
-                ? oldTable.getTTL() : TTL_NOT_DEFINED);
+        ttl = ttlKv != null ? ttl : oldTable != null
+                ? oldTable.getTTL() : TTL_NOT_DEFINED;
+        if (tableType == VIEW && ttl == TTL_NOT_DEFINED) {
+            //Scan SysCat to get TTL from Parent View/Table
+            byte[] viewKey = SchemaUtil.getTableKey(tenantId == null ? null : tenantId.getBytes(),
+                    schemaName == null ? null : schemaName.getBytes(), tableNameBytes);
+            ttl = scanTTLFromParent(viewKey, clientTimeStamp);
+
+            // TODO: Need to Update Cache for Alter Commands, can use PHOENIX-6883.
+        }
 
         Cell rowKeyPrefixKv = tableKeyValues[ROW_KEY_PREFIX_INDEX];
         byte[] rowKeyPrefix = rowKeyPrefixKv != null
@@ -1462,6 +1479,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
 
         PTable transformingNewTable = null;
         boolean isRegularView = (tableType == PTableType.VIEW && viewType != ViewType.MAPPED);
+        boolean isThisAViewIndex = false;
         for (List<Cell> columnCellList : allColumnCellList) {
 
             Cell colKv = columnCellList.get(LINK_TYPE_INDEX);
@@ -1483,7 +1501,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
                 if (linkType == LinkType.INDEX_TABLE) {
                     addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes, clientVersion);
-                } else if (linkType == LinkType.PHYSICAL_TABLE) {
+                } else if (linkType == PHYSICAL_TABLE) {
                     // famName contains the logical name of the parent table. We need to get the actual physical name of the table
                     PTable parentTable = null;
                     if (indexType != IndexType.LOCAL) {
@@ -1527,7 +1545,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                         setPhysicalName = true;
                         parentLogicalName = SchemaUtil.getTableName(parentTable.getSchemaName(), parentTable.getTableName());
                     }
-                } else if (linkType == LinkType.PARENT_TABLE) {
+                } else if (linkType == PARENT_TABLE) {
                     parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes()));
                     parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes()));
                 } else if (linkType == LinkType.EXCLUDED_COLUMN) {
@@ -1544,6 +1562,11 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                             ServerUtil.throwIOException("Transforming new table not found", new TableNotFoundException(schemaName.getString(), famName.getString()));
                         }
                     }
+                } else if (linkType == VIEW_INDEX_PARENT_TABLE) {
+                    byte[] indexKey = SchemaUtil.getTableKey(tenantId == null ? null : tenantId.getBytes(),
+                            schemaName == null ? null : schemaName.getBytes(), tableNameBytes);
+                    ttl = scanTTLFromParent(indexKey, clientTimeStamp);
+                    isThisAViewIndex = true;
                 }
             } else {
                 long columnTimestamp =
@@ -1556,6 +1579,14 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     isSalted, baseColumnCount, isRegularView, columnTimestamp);
             }
         }
+        if(tableType == INDEX && !isThisAViewIndex && ttl == TTL_NOT_DEFINED) {
+            //If this is an index on Table get TTL from Table
+            byte[] tableKey = getTableKey(tenantId == null ? null : tenantId.getBytes(),
+                    parentSchemaName == null ? null : parentSchemaName.getBytes(),
+                    parentTableName.getBytes());
+            ttl = scanTTLFromParent(tableKey, clientTimeStamp);
+        }
+        builder.setTTL(ttl);
         builder.setEncodedCQCounter(cqCounter);
 
         builder.setIndexes(indexes != null ? indexes : oldTable != null
@@ -1586,6 +1617,68 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         return builder.build();
     }
 
+    private int scanTTLFromParent(byte[] viewKey, long clientTimeStamp) throws IOException, SQLException {
+        Scan scan = MetaDataUtil.newTableRowsScan(viewKey, MIN_TABLE_TIMESTAMP, clientTimeStamp);
+        Table sysCat = ServerUtil.getHTableForCoprocessorScan(this.env,
+                SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES,
+                        env.getConfiguration()));
+        ResultScanner scanner = sysCat.getScanner(scan);
+        Result result = scanner.next();
+        boolean startCheckingForLink = false;
+        do {
+            if (result == null) {
+                return TTL_NOT_DEFINED;
+            }
+            if (startCheckingForLink) {
+                byte[] linkTypeBytes = result.getValue(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
+                if (linkTypeBytes != null) {
+                    byte[][] rowKeyMetaData = new byte[5][];
+                    getVarChars(result.getRow(), 5, rowKeyMetaData);
+                    byte[] parentViewTenantId = null;
+                    if (LinkType.fromSerializedValue(linkTypeBytes[0]) == PARENT_TABLE) {
+                        parentViewTenantId = result.getValue(TABLE_FAMILY_BYTES,
+                                PARENT_TENANT_ID_BYTES);
+                        return getTTLFromAppropriateParent(parentViewTenantId, rowKeyMetaData, clientTimeStamp);
+                    } else if (LinkType.fromSerializedValue(linkTypeBytes[0]) ==
+                            VIEW_INDEX_PARENT_TABLE) {
+                        //We are calculating TTL for indexes on Views
+                        parentViewTenantId = rowKeyMetaData
+                                [PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+                        return getTTLFromAppropriateParent(parentViewTenantId, rowKeyMetaData, clientTimeStamp);
+                    } else if (LinkType.fromSerializedValue(linkTypeBytes[0]) ==
+                            PHYSICAL_TABLE) {
+                        return getTTLFromAppropriateParent(parentViewTenantId, rowKeyMetaData, clientTimeStamp);
+                    }
+                }
+            } else {
+                if (result.getValue(TABLE_FAMILY_BYTES, TTL_BYTES) != null) {
+                    return PInteger.INSTANCE.getCodec().decodeInt(
+                            result.getValue(DEFAULT_COLUMN_FAMILY_BYTES, TTL_BYTES),
+                            0, SortOrder.getDefault());
+                }
+            }
+
+            result = scanner.next();
+            startCheckingForLink = true;
+        } while (result != null);
+
+        return TTL_NOT_DEFINED;
+    }
+
+    private int getTTLFromAppropriateParent(byte[] parentViewTenantId, byte[][] rowKeyMetaData,
+                                            long clientTimeStamp) throws IOException, SQLException {
+        String parentSchema =SchemaUtil.getSchemaNameFromFullName(
+                rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
+        byte[] parentViewSchemaName = parentSchema != null
+                ? parentSchema.getBytes(StandardCharsets.UTF_8) : null;
+        byte[] parentViewName = SchemaUtil.getTableNameFromFullName(
+                        rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX])
+                .getBytes(StandardCharsets.UTF_8);
+        byte[] parentViewKey = SchemaUtil.getTableKey(parentViewTenantId,
+                parentViewSchemaName, parentViewName);
+        return scanTTLFromParent(parentViewKey, clientTimeStamp);
+    }
+
     private Long getViewIndexId(Cell[] tableKeyValues, PDataType viewIndexIdType) {
         Cell viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
         return viewIndexIdKv == null ? null :
@@ -1992,11 +2085,11 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 Mutation m = tableMetadata.get(i);
                 if (m instanceof Put) {
                     LinkType linkType = MetaDataUtil.getLinkType(m);
-                    if (linkType == LinkType.PHYSICAL_TABLE) {
+                    if (linkType == PHYSICAL_TABLE) {
                         physicalTableRow = m;
                         physicalTableLinkFound = true;
                     }
-                    if (linkType == LinkType.PARENT_TABLE) {
+                    if (linkType == PARENT_TABLE) {
                         parentTableRow = m;
                         parentTableLinkFound = true;
                     }
@@ -2443,7 +2536,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                         // TODO: Avoid doing server-server RPC when we have held row locks
                         MetaDataResponse response =
                                 processRemoteRegionMutations(
-                                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
+                                        SYSTEM_CATALOG_NAME_BYTES,
                                         remoteMutations, MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE);
                         clearRemoteTableFromCache(clientTimeStamp,
                                 parentTable.getSchemaName() != null
@@ -2995,12 +3088,12 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     if (rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length == 0 &&
                             linkType == LinkType.INDEX_TABLE) {
                         indexNames.add(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
-                    } else if (tableType == PTableType.VIEW && (linkType == LinkType.PARENT_TABLE ||
-                            linkType == LinkType.PHYSICAL_TABLE)) {
+                    } else if (tableType == PTableType.VIEW && (linkType == PARENT_TABLE ||
+                            linkType == PHYSICAL_TABLE)) {
                         // Populate the delete mutations for parent->child link for the child view
                         // in question, which we issue to SYSTEM.CHILD_LINK later
                         Cell parentTenantIdCell = MetaDataUtil.getCell(results,
-                                PhoenixDatabaseMetaData.PARENT_TENANT_ID_BYTES);
+                                PARENT_TENANT_ID_BYTES);
                         PName parentTenantId = parentTenantIdCell != null ?
                                 PNameFactory.newName(parentTenantIdCell.getValueArray(),
                                         parentTenantIdCell.getValueOffset(),
@@ -3085,11 +3178,19 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             final int clientVersion) throws IOException, SQLException {
         boolean isMutationAllowed = true;
         boolean isSchemaMutationAllowed = true;
+        Pair<Boolean, Boolean> scanSysCatForTTLDefinedOnAnyChildPair = new Pair<>(true, false);
         if (expectedType == PTableType.TABLE || expectedType == PTableType.VIEW) {
             try (Table hTable = ServerUtil.getHTableForCoprocessorScan(env,
-                    getSystemTableForChildLinks(clientVersion, env.getConfiguration()))) {
-                childViews.addAll(findAllDescendantViews(hTable, env.getConfiguration(),
-                        tenantId, schemaName, tableOrViewName, clientTimeStamp, false)
+                    getSystemTableForChildLinks(clientVersion, env.getConfiguration()));
+                Table sysCat = ServerUtil.getHTableForCoprocessorScan(env,
+                        SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES,
+                                env.getConfiguration()))) {
+                List<PTable> legitimateChildViews = new ArrayList<>();
+
+                childViews.addAll(findAllDescendantViews(hTable, sysCat, env.getConfiguration(),
+                        tenantId, schemaName, tableOrViewName, clientTimeStamp, new ArrayList<>(),
+                        new ArrayList<>(), false,
+                        scanSysCatForTTLDefinedOnAnyChildPair)
                         .getFirst());
             }
 
@@ -3113,10 +3214,18 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                         QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK);
                 }
             }
-            // Validate TTL property settings for views only.
-            if (expectedType == PTableType.VIEW) {
-                isSchemaMutationAllowed = validatePhoenixTTLAttributeSettingForView(
-                        parentTable, childViews, tableMetadata, TTL_BYTES);
+
+            if (scanSysCatForTTLDefinedOnAnyChildPair.getSecond()
+                    && validateTTLAttributeSettingForEntity(tableMetadata, TTL_BYTES)) {
+                //We got here means There was already TTL defined at one of the child, and we are
+                //trying to set TTL at current level which should not be allowed as TTL can only
+                //be defined at one level in hierarchy.
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.
+                        TTL_ALREADY_DEFINED_IN_HIERARCHY)
+                        .setSchemaName(Arrays.toString(schemaName))
+                        .setTableName(Arrays.toString(tableOrViewName))
+                        .build()
+                        .buildException();
             }
         }
         if (!isMutationAllowed) {
@@ -3353,7 +3462,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                             && table.getEncodingScheme() !=
                             QualifierEncodingScheme.NON_ENCODED_QUALIFIERS)) {
                         processRemoteRegionMutations(
-                                PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, remoteMutations,
+                                SYSTEM_CATALOG_NAME_BYTES, remoteMutations,
                                 MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE);
                         //if we're a view or index, clear the cache for our parent
                         if ((type == PTableType.VIEW || type == INDEX) && table.getParentTableName() != null) {
@@ -3441,45 +3550,22 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
     }
 
     // Checks whether a non-zero TTL value is being set.
-    private boolean validatePhoenixTTLAttributeSettingForView(
-            final PTable parentTable,
-            final List<PTable> childViews,
+    private boolean validateTTLAttributeSettingForEntity(
             final List<Mutation> tableMetadata,
             final byte[] ttlBytes) {
-        boolean hasNewPhoenixTTLAttribute = false;
-        boolean isSchemaMutationAllowed = true;
         for (Mutation m : tableMetadata) {
             if (m instanceof Put) {
                 Put p = (Put)m;
                 List<Cell> cells = p.get(TABLE_FAMILY_BYTES, ttlBytes);
                 if (cells != null && cells.size() > 0) {
                     Cell cell = cells.get(0);
-                    long newTTL = (long) PLong.INSTANCE.toObject(cell.getValueArray(),
+                    int newTTL = (int) PInteger.INSTANCE.toObject(cell.getValueArray(),
                             cell.getValueOffset(), cell.getValueLength());
-                    hasNewPhoenixTTLAttribute =  newTTL != TTL_NOT_DEFINED;
-                    //TODO:- Re-enable when we have ViewTTL
-                    return false;
+                    return newTTL != TTL_NOT_DEFINED;
                 }
             }
         }
-
-        if (hasNewPhoenixTTLAttribute) {
-            // Disallow if the parent has PHOENIX_TTL set.
-            if (parentTable != null &&  parentTable.getTTL() != TTL_NOT_DEFINED) {
-                isSchemaMutationAllowed = false;
-            }
-
-            // Since we do not allow propagation of TTL values during ALTER for now.
-            // If a child view exists and this view previously had a TTL value set
-            // then that implies that the child view too has a valid TTL (non zero).
-            // In this case we do not allow for ALTER of the view's TTL value.
-            if (!childViews.isEmpty()) {
-                isSchemaMutationAllowed = false;
-            }
-        }
-        return isSchemaMutationAllowed;
-
-
+        return false;
     }
 
     public static class ColumnFinder extends StatelessTraverseAllExpressionVisitor<Void> {
@@ -3863,7 +3949,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             LOGGER.error("Old client is not compatible when" + " system tables are upgraded to map to namespace");
             ProtobufUtil.setControllerException(controller,
                     ServerUtil.createIOException(
-                            SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
+                            SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES,
                                     isTablesMappingEnabled).toString(),
                             new DoNotRetryIOException(
                                     "Old client is not compatible when" + " system tables are upgraded to map to namespace")));
@@ -3896,7 +3982,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 LOGGER.error("loading system catalog table inside getVersion failed", t);
                 ProtobufUtil.setControllerException(controller,
                         ServerUtil.createIOException(
-                                SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
+                                SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES,
                                         isTablesMappingEnabled).toString(), t));
             }
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index cbd32b6409..3a320cfb53 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -358,13 +358,17 @@ public enum SQLExceptionCode {
             + "view has TTL set,"),
     CHANGE_DETECTION_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY(10954, "44A36",
         CHANGE_DETECTION_ENABLED + " is only supported on tables and views"),
-    TTL_SUPPORTED_FOR_TABLES_ONLY(10955, "44A37", TTL
-            + "property can only be set for tables"),
+
+    TTL_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY(10955, "44A37", TTL
+            + "property can only be set for tables and updatable views only"),
     CANNOT_CREATE_INDEX_CHILD_VIEWS_EXTEND_PK(10956, "44A38", "Index can be created "
             + "only if none of the child views extends primary key"),
     VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES(10957, "44A39", "View can extend parent primary key"
             + " only if none of the parents have indexes in the parent hierarchy"),
 
+    TTL_ALREADY_DEFINED_IN_HIERARCHY(10958, "44A40", TTL
+            + " property is already defined in hierarchy for this entity"),
+
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
         @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformWithViewsInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformWithViewsInputFormat.java
index 8156ba8f13..48b64cf239 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformWithViewsInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformWithViewsInputFormat.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.mapreduce.transform;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Pair;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 05e9936558..fdc51bb36c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -63,6 +63,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TASK_TABLE_TTL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_FOR_MUTEX;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME;
@@ -2871,7 +2872,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                             .buildException();
                                 }
                                 //Handle FOREVER and NONE case
-                                propValue = convertForeverAndNoneTTLValue(propValue);
+                                propValue = convertForeverAndNoneTTLValue(propValue, isPhoenixTTLEnabled());
                                 //If Phoenix level TTL is enabled we are using TTL as phoenix
                                 //Table level property.
                                 if (!isPhoenixTTLEnabled()) {
@@ -3192,12 +3193,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 tableDesc.getColumnFamily(SchemaUtil.getEmptyColumnFamily(table)).getTimeToLive();
     }
 
-    public static Object convertForeverAndNoneTTLValue(Object propValue) {
+    public static Object convertForeverAndNoneTTLValue(Object propValue, boolean isPhoenixTTLEnabled) {
         //Handle FOREVER and NONE value for TTL at HBase level TTL.
         if (propValue instanceof String) {
             String strValue = (String) propValue;
-            if ("FOREVER".equalsIgnoreCase(strValue) || "NONE".equalsIgnoreCase(strValue)) {
+            if ("FOREVER".equalsIgnoreCase(strValue)) {
                 propValue = HConstants.FOREVER;
+            } else if ("NONE".equalsIgnoreCase(strValue)) {
+                propValue = isPhoenixTTLEnabled ? TTL_NOT_DEFINED : HConstants.FOREVER;
             }
         }
         return propValue;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index ff2351d425..f34cb3f0a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.schema;
 
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Admin;
@@ -287,6 +288,7 @@ import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_P
 import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
 import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static org.apache.phoenix.schema.PTable.ViewType.MAPPED;
+import static org.apache.phoenix.schema.PTable.ViewType.UPDATABLE;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.schema.PTableType.TABLE;
 import static org.apache.phoenix.schema.PTableType.VIEW;
@@ -1098,7 +1100,7 @@ public class MetaDataClient {
                     tableProps.put(prop.getFirst(), prop.getSecond());
                     if (!isPhoenixTTLEnabled()) {
                         //Handling FOREVER and NONE case for TTL when phoenix.table.ttl.enable is false.
-                        Object value = ConnectionQueryServicesImpl.convertForeverAndNoneTTLValue(prop.getSecond());
+                        Object value = ConnectionQueryServicesImpl.convertForeverAndNoneTTLValue(prop.getSecond(), false);
                         commonFamilyProps.put(prop.getFirst(), value);
                     }
                     //If phoenix.table.ttl.enabled is true doesn't store TTL as columnFamilyProp
@@ -2073,40 +2075,35 @@ public class MetaDataClient {
         }
     }
 
-    /**
-     * TODO:- Change this to store TTL as only one level where it is defined!
-     * Get TTL defined for Index or View in its parent hierarchy as defining TTL directly on index
-     * or view is not allowed. View on SYSTEM table is not allowed and already handled during
-     * plan compilation.
-     * @param parent
-     * @return appropriate TTL for the entity calling the function.
-     * @throws TableNotFoundException
+
+    /***
+     * Get TTL defined for given entity (Index, View or Table) in hierarchy
+     * * For an index it will return TTL defined either from parent table
+     *      or from parent view's hierarchy if it is defined.
+     * * For view it will return TTL defined from its parent table or from parent view's hierarchy
+     *      if it is defined
+     * * For table it will just return TTL_NOT_DEFINED as it has no parent.
+     * @param parent entity's parent
+     * @return TTL from hierarchy if defined otherwise TTL_NOT_DEFINED.
+     * @throws TableNotFoundException if not able ot find any table in hierarchy
      */
-    private Integer getTTLFromParent(PTable parent) throws TableNotFoundException {
-        return (parent.getType() == TABLE) ? Integer.valueOf(parent.getTTL())
-                : (parent.getType() == VIEW ? getTTLFromAncestor(parent) : null);
+    private Integer checkAndGetTTLFromHierarchy(PTable parent) throws SQLException {
+        return parent != null ? (parent.getType() == TABLE ? parent.getTTL()
+                : (parent.getType() == VIEW ? getTTLFromViewHierarchy(parent) : TTL_NOT_DEFINED))
+                : TTL_NOT_DEFINED;
     }
 
     /**
-     * Get TTL defined for the given View according to its hierarchy.
+     * Get TTL defined for the given View if it is defined in hierarchy.
      * @param view
      * @return appropriate TTL from Views defined above for the entity calling.
-     * @throws TableNotFoundException
+     * @throws TableNotFoundException if not able to find any table in hierarchy
      */
-    private Integer getTTLFromAncestor(PTable view) throws TableNotFoundException {
-        try {
+    private Integer getTTLFromViewHierarchy(PTable view) throws SQLException {
             return view.getTTL() != TTL_NOT_DEFINED
                     ? Integer.valueOf(view.getTTL()) : (checkIfParentIsTable(view)
-                    ? connection.getTable(new PTableKey(null,
-                            view.getPhysicalNames().get(0).getString())).getTTL()
-                    : getTTLFromAncestor(connection.getTable(new PTableKey(
-                                    connection.getTenantId(), view.getParentName().getString()))));
-        } catch (TableNotFoundException tne) {
-            //Check again for TTL from ancestors, what if view here is tenant view on top of
-            //Global View without any tenant id.
-            return getTTLFromAncestor(connection.getTable(new PTableKey(null,
-                    view.getParentName().getString())));
-        }
+                    ? PhoenixRuntime.getTable(connection, view.getPhysicalNames().get(0).toString()).getTTL()
+                    : getTTLFromViewHierarchy(PhoenixRuntime.getTable(connection, view.getParentName().toString())));
     }
 
     private boolean checkIfParentIsTable(PTable view) {
@@ -2173,12 +2170,13 @@ public class MetaDataClient {
                     tableType == PTableType.VIEW ? parent.getColumns().size()
                             : QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
 
-            Integer phoenixTTL = TTL_NOT_DEFINED;
-            Integer phoenixTTLProp = (Integer) TableProperty.TTL.getValue(tableProps);
+            Integer ttl = TTL_NOT_DEFINED;
+            Integer ttlFromHierarchy = TTL_NOT_DEFINED;
+            Integer ttlProp = (Integer) TableProperty.TTL.getValue(tableProps);
 
             // Validate TTL prop value if set
-            if (phoenixTTLProp != null) {
-                if (phoenixTTLProp < 0) {
+            if (ttlProp != null) {
+                if (ttlProp < 0) {
                     throw new SQLExceptionInfo.Builder(SQLExceptionCode.ILLEGAL_DATA)
                             .setMessage(String.format("entity = %s, TTL value should be > 0",
                                     tableName))
@@ -2186,16 +2184,25 @@ public class MetaDataClient {
                             .buildException();
                 }
 
-                if (tableType != TABLE) {
+                if (tableType != TABLE && (tableType != VIEW || viewType != UPDATABLE)) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.
+                            TTL_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY)
+                            .setSchemaName(schemaName)
+                            .setTableName(tableName)
+                            .build()
+                            .buildException();
+                }
+                ttlFromHierarchy = checkAndGetTTLFromHierarchy(parent);
+                if (ttlFromHierarchy != TTL_NOT_DEFINED) {
                     throw new SQLExceptionInfo.Builder(SQLExceptionCode.
-                            TTL_SUPPORTED_FOR_TABLES_ONLY)
+                            TTL_ALREADY_DEFINED_IN_HIERARCHY)
                             .setSchemaName(schemaName)
                             .setTableName(tableName)
                             .build()
                             .buildException();
                 }
-                //Set Only in case of Tables.
-                phoenixTTL = phoenixTTLProp;
+
+                ttl = ttlProp;
             }
 
             Boolean isChangeDetectionEnabledProp =
@@ -2209,8 +2216,6 @@ public class MetaDataClient {
                 timestamp = TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider);
                 isImmutableRows = parent.isImmutableRows();
                 isAppendOnlySchema = parent.isAppendOnlySchema();
-                //Check up hierarchy and get appropriate TTL
-                phoenixTTL = getTTLFromParent(parent);
 
                 // Index on view
                 // TODO: Can we support a multi-tenant index directly on a multi-tenant
@@ -2412,7 +2417,7 @@ public class MetaDataClient {
                 .setSchemaName(schemaName).setTableName(tableName)
                 .build().buildException();
             }
-            if ((isPhoenixTTLEnabled() ? phoenixTTLProp != null
+            if ((isPhoenixTTLEnabled() ? ttlProp != null
                     : TableProperty.TTL.getValue(commonFamilyProps) != null)
                     && transactionProvider != null 
                     && transactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.SET_TTL)) {
@@ -2430,9 +2435,9 @@ public class MetaDataClient {
             if (transactionProvider != null) {
                 // If TTL set, use transaction context TTL property name instead
                 // Note: After PHOENIX-6627, is PhoenixTransactionContext.PROPERTY_TTL still useful?
-                Object ttl = commonFamilyProps.remove(ColumnFamilyDescriptorBuilder.TTL);
-                if (ttl != null) {
-                    commonFamilyProps.put(PhoenixTransactionContext.PROPERTY_TTL, ttl);
+                Object transactionTTL = commonFamilyProps.remove(ColumnFamilyDescriptorBuilder.TTL);
+                if (transactionTTL != null) {
+                    commonFamilyProps.put(PhoenixTransactionContext.PROPERTY_TTL, transactionTTL);
                 }
             }
 
@@ -2528,9 +2533,6 @@ public class MetaDataClient {
                         // set to the parent value if the property is not set on the view
                         updateCacheFrequency = parent.getUpdateCacheFrequency();
                     }
-                    if (viewType == ViewType.UPDATABLE) {
-                        phoenixTTL = getTTLFromParent(parent);
-                    }
                     disableWAL = (disableWALProp == null ? parent.isWALDisabled() : disableWALProp);
                     defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
                     // TODO PHOENIX-4766 Add an options to stop sending parent metadata when creating views
@@ -3235,10 +3237,10 @@ public class MetaDataClient {
                 tableUpsert.setString(33, streamingTopicName);
             }
 
-            if (phoenixTTL == null) {
+            if (ttl == null || ttl == TTL_NOT_DEFINED) {
                 tableUpsert.setNull(34, Types.INTEGER);
             } else {
-                tableUpsert.setInt(34, phoenixTTL);
+                tableUpsert.setInt(34, ttl);
             }
 
             if (rowKeyPrefix == null) {
@@ -3393,7 +3395,7 @@ public class MetaDataClient {
                         .setExternalSchemaId(result.getTable() != null ?
                         result.getTable().getExternalSchemaId() : null)
                         .setStreamingTopicName(streamingTopicName)
-                        .setTTL(phoenixTTL == null ? TTL_NOT_DEFINED : phoenixTTL)
+                        .setTTL(ttl == null || ttl == TTL_NOT_DEFINED ? ttlFromHierarchy : ttl)
                         .setRowKeyPrefix(rowKeyPrefix)
                         .setIndexWhere(statement.getWhereClause() == null ? null
                                 : statement.getWhereClause().toString())
@@ -3858,10 +3860,10 @@ public class MetaDataClient {
     }
 
     private  long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional,
-                                       Long updateCacheFrequency, Integer ttl, String physicalTableName,
+                                       Long updateCacheFrequency, String physicalTableName,
                                        String schemaVersion, QualifierEncodingScheme columnEncodedBytes) throws SQLException {
         return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, null,
-            updateCacheFrequency, null, null, null, null, -1L, null, null, null, ttl, false, physicalTableName,
+            updateCacheFrequency, null, null, null, null, -1L, null, null, null, null, false, physicalTableName,
                 schemaVersion, columnEncodedBytes, null);
     }
 
@@ -3928,7 +3930,7 @@ public class MetaDataClient {
             mutateBooleanProperty(connection, tenantId, schemaName, tableName, USE_STATS_FOR_PARALLELIZATION, useStatsForParallelization);
         }
         if (ttl != null) {
-            mutateIntegerProperty(connection, tenantId, schemaName, tableName, TTL, ttl);
+            mutateIntegerProperty(connection, tenantId, schemaName, tableName, TTL, ttl == TTL_NOT_DEFINED ? null : ttl);
         }
         if (isChangeDetectionEnabled != null) {
             mutateBooleanProperty(connection, tenantId, schemaName, tableName, CHANGE_DETECTION_ENABLED, isChangeDetectionEnabled);
@@ -4139,6 +4141,7 @@ public class MetaDataClient {
 
             boolean retried = false;
             boolean changingPhoenixTableProperty = false;
+            MutableBoolean areWeIntroducingTTLAtThisLevel = new MutableBoolean(false);
             MetaProperties metaProperties = new MetaProperties();
             while (true) {
                 Map<String, List<Pair<String, Object>>> properties=new HashMap<>(stmtProperties.size());;
@@ -4176,7 +4179,37 @@ public class MetaDataClient {
                 }
 
                 MetaPropertiesEvaluated metaPropertiesEvaluated = new MetaPropertiesEvaluated();
-                changingPhoenixTableProperty = evaluateStmtProperties(metaProperties,metaPropertiesEvaluated,table,schemaName,tableName);
+                changingPhoenixTableProperty = evaluateStmtProperties(metaProperties,metaPropertiesEvaluated,table,schemaName,tableName,areWeIntroducingTTLAtThisLevel);
+                if (areWeIntroducingTTLAtThisLevel.booleanValue()) {
+                    //As we are introducing TTL for the first time at this level, we need to check
+                    //if TTL is already defined up or down in the hierarchy.
+                    Integer ttlAlreadyDefined = TTL_NOT_DEFINED;
+                    //Check up the hierarchy
+                    if (table.getType() != PTableType.TABLE) {
+                        ttlAlreadyDefined = checkAndGetTTLFromHierarchy(PhoenixRuntime.getTableNoCache(connection, table.getParentName().toString()));
+                    }
+                    if (ttlAlreadyDefined != TTL_NOT_DEFINED) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.
+                                TTL_ALREADY_DEFINED_IN_HIERARCHY)
+                                .setSchemaName(schemaName)
+                                .setTableName(tableName)
+                                .build()
+                                .buildException();
+                    }
+
+                    /**
+                     * To check if TTL is defined at any of the child below we are checking it at
+                     * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List,
+                     * ColumnMutator, int, PTable, PTable, boolean)} level where in function
+                     * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
+                     * validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[],
+                     * byte[], byte[], List, int)} we are already traversing through
+                     * allDescendantViews.
+                     */
+
+
+
+                }
 
                 boolean isTransformNeeded = Transform.checkIsTransformNeeded(metaProperties, schemaName, table, tableName, null, tenantIdToUse, connection);
                 if (isTransformNeeded) {
@@ -4340,12 +4373,11 @@ public class MetaDataClient {
 
                 if (!table.getIndexes().isEmpty() &&
                         (numPkColumnsAdded>0 || metaProperties.getNonTxToTx() ||
-                                metaPropertiesEvaluated.getUpdateCacheFrequency() != null || metaPropertiesEvaluated.getTTL() != null)) {
+                                metaPropertiesEvaluated.getUpdateCacheFrequency() != null)) {
                     for (PTable index : table.getIndexes()) {
                         incrementTableSeqNum(index, index.getType(), numPkColumnsAdded,
                                 metaProperties.getNonTxToTx() ? Boolean.TRUE : null,
                                 metaPropertiesEvaluated.getUpdateCacheFrequency(),
-                                metaPropertiesEvaluated.getTTL(),
                                 metaPropertiesEvaluated.getPhysicalTableName(),
                                 metaPropertiesEvaluated.getSchemaVersion(),
                                 metaProperties.getColumnEncodedBytesProp());
@@ -4359,7 +4391,6 @@ public class MetaDataClient {
                         incrementTableSeqNum(index, index.getType(), columnDefs.size(),
                                 Boolean.FALSE,
                                 metaPropertiesEvaluated.getUpdateCacheFrequency(),
-                                metaPropertiesEvaluated.getTTL(),
                                 metaPropertiesEvaluated.getPhysicalTableName(),
                                 metaPropertiesEvaluated.getSchemaVersion(),
                                 metaPropertiesEvaluated.getColumnEncodedBytes());
@@ -4845,7 +4876,7 @@ public class MetaDataClient {
                     }
                     if (!indexColumnsToDrop.isEmpty()) {
                         long indexTableSeqNum = incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size(),
-                                null, null, null, null, null, null);
+                                null, null, null, null, null);
                         dropColumnMutations(index, indexColumnsToDrop);
                         long clientTimestamp = MutationState.getTableTimestamp(timeStamp, connection.getSCN());
                         connection.removeColumn(tenantId, index.getName().getString(),
@@ -4857,7 +4888,7 @@ public class MetaDataClient {
                 connection.rollback();
 
                 long seqNum = incrementTableSeqNum(table, statement.getTableType(), -tableColumnsToDrop.size(),
-                        null, null, null, null, null, null);
+                        null, null, null, null, null);
                 tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                 connection.rollback();
                 // Force table header to be first in list
@@ -5072,7 +5103,7 @@ public class MetaDataClient {
 
             boolean isTransformNeeded = Transform.checkIsTransformNeeded(metaProperties, schemaName, table, indexName, dataTableName, tenantId, connection);
             MetaPropertiesEvaluated metaPropertiesEvaluated = new MetaPropertiesEvaluated();
-            boolean changingPhoenixTableProperty= evaluateStmtProperties(metaProperties,metaPropertiesEvaluated,table,schemaName,tableName);
+            boolean changingPhoenixTableProperty= evaluateStmtProperties(metaProperties,metaPropertiesEvaluated,table,schemaName,tableName,new MutableBoolean(false));
 
             PIndexState newIndexState = statement.getIndexState();
 
@@ -5484,7 +5515,10 @@ public class MetaDataClient {
         return metaProperties;
     }
 
-    private boolean evaluateStmtProperties(MetaProperties metaProperties, MetaPropertiesEvaluated metaPropertiesEvaluated, PTable table, String schemaName, String tableName)
+    private boolean evaluateStmtProperties(MetaProperties metaProperties,
+                                           MetaPropertiesEvaluated metaPropertiesEvaluated,
+                                           PTable table, String schemaName, String tableName,
+                                           MutableBoolean areWeIntroducingTTLAtThisLevel)
             throws SQLException {
         boolean changingPhoenixTableProperty = false;
 
@@ -5643,9 +5677,10 @@ public class MetaDataClient {
                         .build()
                         .buildException();
             }
-            if (table.getType() != PTableType.TABLE) {
+            if (table.getType() != PTableType.TABLE && (table.getType() != PTableType.VIEW ||
+                    table.getViewType() != UPDATABLE)) {
                 throw new SQLExceptionInfo.Builder(
-                        SQLExceptionCode.TTL_SUPPORTED_FOR_TABLES_ONLY)
+                        SQLExceptionCode.TTL_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY)
                         .build()
                         .buildException();
             }
@@ -5653,6 +5688,9 @@ public class MetaDataClient {
                 metaPropertiesEvaluated.setTTL(metaProperties.getTTL());
                 changingPhoenixTableProperty = true;
             }
+            //Updating Introducing TTL variable to true so that we will check if TTL is already
+            //defined in hierarchy or not.
+            areWeIntroducingTTLAtThisLevel.setTrue();
         }
 
         if (metaProperties.isChangeDetectionEnabled() != null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 910d6889d7..4914d22c19 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -247,7 +247,7 @@ public enum TableProperty {
         }
     },
 
-    TTL(PhoenixDatabaseMetaData.TTL, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, true, false, false) {
+    TTL(PhoenixDatabaseMetaData.TTL, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, true, true, true) {
         /**
          * PHOENIX_TTL can take any values ranging between 0 < PHOENIX_TTL <= HConstants.LATEST_TIMESTAMP.
          * special values :-
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
index d826f0fdad..c379e846e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
@@ -65,6 +65,7 @@ import org.apache.phoenix.schema.types.PLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
@@ -85,6 +86,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_BYTES;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
@@ -132,20 +134,81 @@ public class ViewUtil {
         List<PTable> legitimateChildViews = new ArrayList<>();
         List<TableInfo> orphanChildViews = new ArrayList<>();
 
-        findAllDescendantViews(sysCatOrsysChildLink, serverSideConfig, tenantId, schemaName,
+        return findAllDescendantViews(sysCatOrsysChildLink, serverSideConfig, tenantId, schemaName,
                 tableOrViewName, clientTimeStamp, legitimateChildViews, orphanChildViews,
                 findJustOneLegitimateChildView);
-        return new Pair<>(legitimateChildViews, orphanChildViews);
     }
 
-    private static void findAllDescendantViews(Table sysCatOrsysChildLink,
-            Configuration serverSideConfig, byte[] parentTenantId, byte[] parentSchemaName,
-            byte[] parentTableOrViewName, long clientTimeStamp, List<PTable> legitimateChildViews,
-            List<TableInfo> orphanChildViews, boolean findJustOneLegitimateChildView)
+    public static Pair<List<PTable>, List<TableInfo>> findAllDescendantViews(Table sysCatOrsysChildLink,
+           Configuration serverSideConfig, byte[] parentTenantId, byte[] parentSchemaName,
+           byte[] parentTableOrViewName, long clientTimeStamp, List<PTable> legitimateChildViews,
+           List<TableInfo> orphanChildViews, boolean findJustOneLegitimateChildView)
+           throws IOException, SQLException{
+
+         return findAllDescendantViews(sysCatOrsysChildLink, null, serverSideConfig,
+                 parentTenantId, parentSchemaName, parentTableOrViewName, clientTimeStamp,
+                 legitimateChildViews, orphanChildViews, findJustOneLegitimateChildView,
+                 new Pair<>(false, false));
+
+    }
+
+    /**
+     * Find all the descendant views of a given table or view in a depth-first fashion.
+     * Note that apart from scanning the {@code parent->child } links, we also validate each view
+     * by trying to resolve it.
+     * Use {@link ViewUtil#findAllRelatives(Table, byte[], byte[], byte[], LinkType,
+     * TableViewFinderResult)} if you want to find other links and don't care about orphan results.
+     *
+     * @param sysCatOrsysChildLink Table corresponding to either SYSTEM.CATALOG or SYSTEM.CHILD_LINK
+     * @param sysCat Table corresponding to SYSTEM.CATALOG especially for checking if TTL is defined
+     *               at any of the descendant view. This can be null then we are not scanning it for
+     *               checking if TTL is defined or not.
+     * @param serverSideConfig server-side configuration
+     * @param parentTenantId tenantId of the view (null if it is a table or global view)
+     * @param parentSchemaName schema name of the table/view
+     * @param parentTableOrViewName name of the table/view
+     * @param clientTimeStamp client timestamp
+     * @param legitimateChildViews List to be returned as first element of Pair containing
+     *                             legitimate child views
+     * @param orphanChildViews list to be returned as second element of Pair containing orphan views
+     * @param findJustOneLegitimateChildView if true, we are only interested in knowing if there is
+     *                                       at least one legitimate child view, so we return early.
+     *                                       If false, we want to find all legitimate child views
+     *                                       and all orphan views (views that no longer exist)
+     *                                       stemming from this table/view and all of its legitimate
+     *                                       child views.
+     * @param scanSysCatForTTLDefinedOnAnyChildPair Boolean pair, where first element is used in
+     *                                              {@link ViewUtil#findImmediateRelatedViews(Table,
+     *                                              Table, byte[], byte[], byte[], LinkType, long,
+     *                                              Pair)} to determine if we have to scan the
+     *                                              sysCat or not for checking if TTL is defined.
+     *                                              Second element is used to store the result if
+     *                                              we found atleast one children in hierarchy where
+     *                                              TTL is defined or not.
+     *
+     * @return a Pair where the first element is a list of all legitimate child views (or just 1
+     * child view in case findJustOneLegitimateChildView is true) and where the second element is
+     * a list of all orphan views stemming from this table/view and all of its legitimate child
+     * views (in case findJustOneLegitimateChildView is true, this list will be incomplete since we
+     * are not interested in it anyhow)
+     *
+     * @throws IOException thrown if there is an error scanning SYSTEM.CHILD_LINK or SYSTEM.CATALOG
+     * @throws SQLException thrown if there is an error getting a connection to the server or an
+     * error retrieving the PTable for a child view
+     */
+
+
+    public static Pair<List<PTable>, List<TableInfo>> findAllDescendantViews(
+            Table sysCatOrsysChildLink, Table sysCat, Configuration serverSideConfig,
+            byte[] parentTenantId, byte[] parentSchemaName, byte[] parentTableOrViewName,
+            long clientTimeStamp, List<PTable> legitimateChildViews,
+            List<TableInfo> orphanChildViews, boolean findJustOneLegitimateChildView,
+            Pair<Boolean, Boolean> scanSysCatForTTLDefinedOnAnyChildPair)
             throws IOException, SQLException {
         TableViewFinderResult currentResult =
-                findImmediateRelatedViews(sysCatOrsysChildLink, parentTenantId, parentSchemaName,
-                        parentTableOrViewName, LinkType.CHILD_TABLE, clientTimeStamp);
+                findImmediateRelatedViews(sysCatOrsysChildLink, sysCat, parentTenantId,
+                        parentSchemaName, parentTableOrViewName, LinkType.CHILD_TABLE,
+                        clientTimeStamp, scanSysCatForTTLDefinedOnAnyChildPair);
         for (TableInfo viewInfo : currentResult.getLinks()) {
             byte[] viewTenantId = viewInfo.getTenantId();
             byte[] viewSchemaName = viewInfo.getSchemaName();
@@ -185,10 +248,11 @@ public class ViewUtil {
                     }
                     // Note that we only explore this branch if the current view is a legitimate
                     // child view, else we ignore it and move on to the next potential child view
-                    findAllDescendantViews(sysCatOrsysChildLink, serverSideConfig,
+                    findAllDescendantViews(sysCatOrsysChildLink, sysCat, serverSideConfig,
                             viewInfo.getTenantId(), viewInfo.getSchemaName(),
                             viewInfo.getTableName(), clientTimeStamp, legitimateChildViews,
-                            orphanChildViews, findJustOneLegitimateChildView);
+                            orphanChildViews, findJustOneLegitimateChildView,
+                            scanSysCatForTTLDefinedOnAnyChildPair);
                 } else {
                     logger.error("Found an orphan parent->child link keyed by this parent."
                             + " Parent Tenant Id: '" + Bytes.toString(parentTenantId)
@@ -201,6 +265,7 @@ public class ViewUtil {
                 }
             }
         }
+        return new Pair<>(legitimateChildViews, orphanChildViews);
     }
 
     private static boolean isLegitimateChildView(PTable view, byte[] parentSchemaName,
@@ -246,16 +311,48 @@ public class ViewUtil {
         }
     }
 
+    private static TableViewFinderResult findImmediateRelatedViews(Table sysCatOrsysChildLink,
+        byte[] tenantId, byte[] schema, byte[] table, PTable.LinkType linkType, long timestamp)
+        throws IOException {
+
+        return findImmediateRelatedViews(sysCatOrsysChildLink, null, tenantId, schema, table,
+                linkType, timestamp, new Pair<>(false, false));
+    }
+
     /**
      * Runs a scan on SYSTEM.CATALOG or SYSTEM.CHILD_LINK to get the immediate related tables/views.
+     * @param sysCatOrsysChildLink Table corresponding to either SYSTEM.CATALOG or SYSTEM.CHILD_LINK
+     * @param sysCat Table corresponding to SYSTEM.CATALOG especially for checking if TTL is defined
+     *               at immediate related view. This can be null then we are not scanning it for
+     *               checking if TTL is defined or not.
+     * @param tenantId tenantId of the key (null if it is a table or global view)
+     * @param schema schema name to use in the key
+     * @param table table/view name to use in the key
+     * @param linkType link type
+     * @param timestamp client timestamp
+     * @param scanSysCatForTTLDefinedOnAnyChildPair Boolean pair, where first element is used to
+     *                                              determine if we have to scan the
+     *                                              sysCat or not for checking if TTL is defined.
+     *                                              Second element is used to store the result if
+     *                                              we found atleast one children in hierarchy where
+     *                                              TTL is defined or not.
+     * @return TableViewFinderResult of the scan to get immediate related table/views.
+     * @throws IOException thrown if there is an error scanning SYSTEM.CHILD_LINK or SYSTEM.CATALOG
      */
     private static TableViewFinderResult findImmediateRelatedViews(Table sysCatOrsysChildLink,
-            byte[] tenantId, byte[] schema, byte[] table, PTable.LinkType linkType, long timestamp)
+            @Nullable Table sysCat, byte[] tenantId, byte[] schema, byte[] table,
+            PTable.LinkType linkType, long timestamp,
+            Pair<Boolean, Boolean> scanSysCatForTTLDefinedOnAnyChildPair)
             throws IOException {
         if (linkType==PTable.LinkType.INDEX_TABLE || linkType==PTable.LinkType.EXCLUDED_COLUMN) {
             throw new IllegalArgumentException("findAllRelatives does not support link type "
                     + linkType);
         }
+        if (sysCat == null) {
+            //Means no scan is need on SYSCAT for TTL Values of each child view.
+            scanSysCatForTTLDefinedOnAnyChildPair.setFirst(false);
+        }
+
         byte[] key = SchemaUtil.getTableKey(tenantId, schema, table);
 		Scan scan = MetaDataUtil.newTableRowsScan(key, MetaDataProtocol.MIN_TABLE_TIMESTAMP,
                 timestamp);
@@ -281,7 +378,7 @@ public class ViewUtil {
                     viewTenantId = rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX];
                 } else if (linkType==PTable.LinkType.VIEW_INDEX_PARENT_TABLE) {
                     viewTenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
-                } 
+                }
                 else if (linkType==PTable.LinkType.PHYSICAL_TABLE &&
                         result.getValue(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES)!=null) {
                     // do not links from indexes to their physical table
@@ -294,6 +391,18 @@ public class ViewUtil {
                         rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX])
                         .getBytes(StandardCharsets.UTF_8);
                 tableInfoList.add(new TableInfo(viewTenantId, viewSchemaName, viewName));
+                if (scanSysCatForTTLDefinedOnAnyChildPair.getFirst()) {
+                    byte[] viewKey = SchemaUtil.getTableKey(viewTenantId, viewSchemaName, viewName);
+                    Scan ttlScan = MetaDataUtil.newTableRowsScan(viewKey,
+                            MetaDataProtocol.MIN_TABLE_TIMESTAMP, timestamp);
+                    Result ttlResult = sysCat.getScanner(ttlScan).next();
+                    if (ttlResult != null) {
+                        if (ttlResult.getValue(TABLE_FAMILY_BYTES, TTL_BYTES) != null) {
+                            scanSysCatForTTLDefinedOnAnyChildPair.setSecond(true);
+                            scanSysCatForTTLDefinedOnAnyChildPair.setFirst(false);
+                        }
+                    }
+                }
             }
             return new TableViewFinderResult(tableInfoList);
         } 
@@ -954,9 +1063,6 @@ public class ViewUtil {
             PTable parent, ExtendedCellBuilder extendedCellBuilder) {
         byte[] parentUpdateCacheFreqBytes = null;
         byte[] parentUseStatsForParallelizationBytes = null;
-        //Commenting out phoenixTTL property to exclude.
-        //TODO:- re-enable after introducing TTL for views.
-        //byte[] parentPhoenixTTLBytes = null;
         if (parent != null) {
             parentUpdateCacheFreqBytes = new byte[PLong.INSTANCE.getByteSize()];
             PLong.INSTANCE.getCodec().encodeLong(parent.getUpdateCacheFrequency(),
@@ -965,9 +1071,6 @@ public class ViewUtil {
                 parentUseStatsForParallelizationBytes =
                         PBoolean.INSTANCE.toBytes(parent.useStatsForParallelization());
             }
-            //parentPhoenixTTLBytes = new byte[PLong.INSTANCE.getByteSize()];
-            //PLong.INSTANCE.getCodec().encodeLong(parent.getPhoenixTTL(),
-            //        parentPhoenixTTLBytes, 0);
         }
         for (Mutation m: tableMetaData) {
             if (m instanceof Put) {
@@ -983,12 +1086,6 @@ public class ViewUtil {
                         extendedCellBuilder,
                         parentUseStatsForParallelizationBytes,
                         MetaDataEndpointImpl.VIEW_MODIFIED_PROPERTY_BYTES);
-                //MetaDataUtil.conditionallyAddTagsToPutCells((Put)m,
-                //        PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                //        PhoenixDatabaseMetaData.PHOENIX_TTL_BYTES,
-                //        extendedCellBuilder,
-                //        parentPhoenixTTLBytes,
-                //        MetaDataEndpointImpl.VIEW_MODIFIED_PROPERTY_BYTES);
             }
 
         }