You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2023/07/25 03:54:07 UTC

[phoenix] branch PHOENIX-6978-feature updated: PHOENIX-6979 Use HBase TTL as TTL for Tables only at Phoenix Level when phoenix.table.ttl.enabled is true (#1622)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch PHOENIX-6978-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-6978-feature by this push:
     new b16f5320d2 PHOENIX-6979 Use HBase TTL as TTL for Tables only at Phoenix Level when phoenix.table.ttl.enabled is true (#1622)
b16f5320d2 is described below

commit b16f5320d2c1277ce3d4383c88426885fa2458d5
Author: Lokesh Khurana <kh...@gmail.com>
AuthorDate: Mon Jul 24 20:54:01 2023 -0700

    PHOENIX-6979 Use HBase TTL as TTL for Tables only at Phoenix Level when phoenix.table.ttl.enabled is true (#1622)
---
 .../org/apache/phoenix/end2end/CreateTableIT.java  |  20 +-
 .../DefaultPhoenixMultiViewListProviderIT.java     |  10 +-
 .../phoenix/end2end/FlappingAlterTableIT.java      |  22 +-
 .../phoenix/end2end/LoadSystemTableSnapshotIT.java |   1 +
 .../phoenix/end2end/MaxLookbackExtendedIT.java     |   4 +-
 .../org/apache/phoenix/end2end/MaxLookbackIT.java  |   4 +-
 .../apache/phoenix/end2end/PhoenixTTLToolIT.java   |  24 +-
 .../apache/phoenix/end2end/PropertiesInSyncIT.java |  21 +-
 .../org/apache/phoenix/end2end/SetPropertyIT.java  |  20 ++
 .../end2end/SetPropertyOnEncodedTableIT.java       |   2 +-
 .../end2end/SetPropertyOnNonEncodedTableIT.java    |   2 +-
 .../SystemTablesCreationOnConnectionIT.java        |   4 +
 .../apache/phoenix/end2end/TTLAsPhoenixTTLIT.java  | 279 +++++++++++++++++++++
 .../java/org/apache/phoenix/end2end/ViewTTLIT.java |   6 +
 .../phoenix/end2end/ViewTTLNotEnabledIT.java       |   3 +-
 .../index/IndexVerificationOutputRepositoryIT.java |   2 +-
 .../index/IndexVerificationResultRepositoryIT.java |   2 +-
 .../coprocessor/BaseScannerRegionObserver.java     |   1 +
 .../phoenix/coprocessor/CompactionScanner.java     |   6 +-
 .../coprocessor/GlobalIndexRegionScanner.java      |   2 +-
 .../coprocessor/IndexRebuildRegionScanner.java     |   7 +-
 .../coprocessor/IndexRepairRegionScanner.java      |   7 +-
 .../phoenix/coprocessor/IndexerRegionScanner.java  |   7 +-
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |   4 +-
 .../phoenix/coprocessor/TTLRegionScanner.java      |  11 +-
 .../UngroupedAggregateRegionObserver.java          |   9 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |   3 +
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |   5 +-
 .../mapreduce/index/IndexScrutinyMapper.java       |  26 +-
 .../phoenix/query/ConnectionQueryServicesImpl.java |  29 ++-
 .../org/apache/phoenix/schema/MetaDataClient.java  | 120 ++++++---
 .../java/org/apache/phoenix/schema/PTableImpl.java |   2 +
 .../org/apache/phoenix/schema/TableProperty.java   |  16 +-
 .../schema/tool/SchemaExtractionProcessor.java     |  20 +-
 .../java/org/apache/phoenix/util/ScanUtil.java     |  22 +-
 .../java/org/apache/phoenix/util/ViewUtil.java     |  22 +-
 .../query/ConnectionQueryServicesImplTest.java     |  30 +--
 .../java/org/apache/phoenix/util/TestUtil.java     |  12 +-
 38 files changed, 652 insertions(+), 135 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index 8e622f40b9..e8b3e721ba 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -37,6 +37,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
 import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -68,6 +70,7 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SchemaNotFoundException;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -75,12 +78,27 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(ParallelStatsDisabledTest.class)
+@Category(NeedsOwnMiniClusterTest.class)
 public class CreateTableIT extends ParallelStatsDisabledIT {
 
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(BaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(60*60)); // An hour
+        props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(false));
+        /**
+         * This test checks Table properties at ColumnFamilyDescriptor level, turing phoenix_table_ttl
+         * to false for them to test TTL and other props at HBase level. TTL being set at phoenix level
+         * is being tested in {@link TTLAsPhoenixTTLIT}
+         */
+        props.put(QueryServices.PHOENIX_TABLE_TTL_ENABLED, Boolean.toString(false));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
     @Test
     public void testStartKeyStopKey() throws SQLException {
         Properties props = new Properties();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultPhoenixMultiViewListProviderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultPhoenixMultiViewListProviderIT.java
index 94b8b566f3..1e69e3a048 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultPhoenixMultiViewListProviderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultPhoenixMultiViewListProviderIT.java
@@ -23,6 +23,7 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
 import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -34,6 +35,11 @@ import static org.apache.phoenix.mapreduce.PhoenixTTLTool.DELETE_ALL_VIEWS;
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE;
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Disabling this test as this works on TTL being set on View which is removed and will be added in future.
+ * TODO:- To enable this test after re-enabling TTL for view for more info check :- PHOENIX-6978
+ */
+@Ignore
 @Category(NeedsOwnMiniClusterTest.class)
 public class DefaultPhoenixMultiViewListProviderIT extends ParallelStatsDisabledIT {
     private final String BASE_TABLE_DDL = "CREATE TABLE %s (TENANT_ID CHAR(10) NOT NULL, " +
@@ -43,11 +49,11 @@ public class DefaultPhoenixMultiViewListProviderIT extends ParallelStatsDisabled
             "PK1 BIGINT PRIMARY KEY,A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
             " AS SELECT * FROM %s ";
     private final String VIEW_DDL_WITH_ID_PREFIX_AND_TTL =
-            VIEW_DDL + " PHOENIX_TTL = 1000";
+            VIEW_DDL + " TTL = 1000";
     private final String VIEW_INDEX_DDL = "CREATE INDEX %s ON %s(%s)";
     private final String TENANT_VIEW_DDL =
             "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s";
-    private final String TENANT_VIEW_DDL_WITH_TTL = TENANT_VIEW_DDL + " PHOENIX_TTL = 1000";;
+    private final String TENANT_VIEW_DDL_WITH_TTL = TENANT_VIEW_DDL + " TTL = 1000";;
 
     @Test
     public void testGetPhoenixMultiViewList() throws Exception {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingAlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingAlterTableIT.java
index 447326bde9..d93456db30 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingAlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingAlterTableIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.TableName;
@@ -29,16 +30,35 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(ParallelStatsDisabledTest.class)
+@Category(NeedsOwnMiniClusterTest.class)
 public class FlappingAlterTableIT extends ParallelStatsDisabledIT {
     private String dataTableFullName;
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(BaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(60*60)); // An hour
+        props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(false));
+        /**
+         * This test checks Table properties at ColumnFamilyDescriptor level, turing phoenix_table_ttl
+         * to false for them to test TTL and other props at HBase level. TTL being set at phoenix level
+         * is being tested in {@link TTLAsPhoenixTTLIT}
+         */
+        props.put(QueryServices.PHOENIX_TABLE_TTL_ENABLED, Boolean.toString(false));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
     
     @Before
     public void setupTableNames() throws Exception {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LoadSystemTableSnapshotIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LoadSystemTableSnapshotIT.java
index c8c0a95a99..603e5bd7b2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LoadSystemTableSnapshotIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LoadSystemTableSnapshotIT.java
@@ -65,6 +65,7 @@ import java.io.FileOutputStream;
  * (or even being committed to the ASF branches)
  */
 
+//TODO:- Snapshot here is storing integers as TTL Value and Phoenix Level TTL is Long, need to work on this.
 @Category(NeedsOwnMiniClusterTest.class)
 public class LoadSystemTableSnapshotIT extends BaseTest {
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
index 8b8e59601f..3a6df29241 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
@@ -300,8 +300,8 @@ public class MaxLookbackExtendedIT extends BaseTest {
             long afterFirstInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
             TableName dataTable = TableName.valueOf(dataTableName);
             TableName indexTable = TableName.valueOf(indexName);
-            assertTableHasTtl(conn, dataTable, ttl);
-            assertTableHasTtl(conn, indexTable, ttl);
+            assertTableHasTtl(conn, dataTable, ttl, true);
+            assertTableHasTtl(conn, indexTable, ttl, true);
             //first make sure we inserted correctly
             String sql = String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName);
             String indexSql = String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
index 712b2fcd50..6c6fe713e2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
@@ -208,8 +208,8 @@ public class MaxLookbackIT extends BaseTest {
             long afterFirstInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
             TableName dataTable = TableName.valueOf(dataTableName);
             TableName indexTable = TableName.valueOf(indexName);
-            assertTableHasTtl(conn, dataTable, ttl);
-            assertTableHasTtl(conn, indexTable, ttl);
+            assertTableHasTtl(conn, dataTable, ttl, false);
+            assertTableHasTtl(conn, indexTable, ttl, false);
             //first make sure we inserted correctly
             String sql = String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName);
             String indexSql = String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java
index 7b16c145cc..e2f5cf0b01 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java
@@ -36,6 +36,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -48,6 +49,11 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Disabling this test as this works on TTL being set on View which is removed and will be added in future.
+ * TODO:- To enable this test after re-enabling TTL for view for more info check :- PHOENIX-6978
+ */
+@Ignore
 @Category(NeedsOwnMiniClusterTest.class)
 public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
 
@@ -72,7 +78,7 @@ public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
             "UPSERT INTO %s (PK1,A,B,C,D,E,F) VALUES(1,1,1,1,1,1,1)";
     private final String VIEW_DDL_WITH_ID_PREFIX_AND_TTL = "CREATE VIEW %s (" +
             "PK1 BIGINT PRIMARY KEY,A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
-            " AS SELECT * FROM %s WHERE ID = '%s' PHOENIX_TTL = %d";
+            " AS SELECT * FROM %s WHERE ID = '%s' TTL = %d";
     private final String VIEW_INDEX_DDL = "CREATE INDEX %s ON %s(%s)";
     private final String TENANT_VIEW_DDL =
             "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s";
@@ -314,7 +320,7 @@ public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
             createMultiTenantTable(globalConn, baseTableFullName);
             String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
                     "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
-                    " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d";
+                    " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' TTL = %d";
 
             globalConn.createStatement().execute(
                     String.format(ddl, globalViewName1, VIEW_PREFIX1,
@@ -404,7 +410,7 @@ public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
             createMultiTenantTable(globalConn, baseTableFullName);
             String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
                     "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
-                    " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d";
+                    " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' TTL = %d";
 
             globalConn.createStatement().execute(
                     String.format(ddl, globalViewName1, VIEW_PREFIX1,
@@ -463,7 +469,7 @@ public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
 
             ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
                     "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
-                    " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d";
+                    " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' TTL = %d";
 
             globalConn.createStatement().execute(
                     String.format(ddl, globalViewName1, VIEW_PREFIX1,
@@ -523,7 +529,7 @@ public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
 
             ddl = "CREATE VIEW %s (PK2 BIGINT PRIMARY KEY, " +
                     "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
-                    " AS SELECT * FROM " + baseTableFullName + " WHERE PK1=%d PHOENIX_TTL = %d";
+                    " AS SELECT * FROM " + baseTableFullName + " WHERE PK1=%d TTL = %d";
 
             globalConn.createStatement().execute(
                     String.format(ddl, globalViewName1, 1, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
@@ -595,7 +601,7 @@ public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
             globalConn.createStatement().execute(String.format(ddl, globalViewName1));
 
             ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s " +
-                    "WHERE ID = '%s' PHOENIX_TTL = %d";
+                    "WHERE ID = '%s' TTL = %d";
             tenant1Connection.createStatement().execute(
                     String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1,
                             PHOENIX_TTL_EXPIRE_IN_A_SECOND));
@@ -662,7 +668,7 @@ public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
             globalConn.createStatement().execute(String.format(ddl, globalViewName2, 2));
 
             ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s " +
-                    "WHERE ID = '%s' PHOENIX_TTL = %d";
+                    "WHERE ID = '%s' TTL = %d";
             tenant1Connection.createStatement().execute(
                     String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1,
                             PHOENIX_TTL_EXPIRE_IN_A_SECOND));
@@ -734,7 +740,7 @@ public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
             createMultiTenantTable(globalConn, baseTableFullName);
             String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
                     "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
-                    " AS SELECT * FROM " + baseTableFullName + " WHERE NUM = %d PHOENIX_TTL = %d";
+                    " AS SELECT * FROM " + baseTableFullName + " WHERE NUM = %d TTL = %d";
 
             globalConn.createStatement().execute(
                     String.format(ddl, globalViewName1, 1, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
@@ -812,7 +818,7 @@ public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
             globalConn.createStatement().execute(String.format(globalViewDdl, globalViewName));
 
             String middleLevelViewDdl = "CREATE VIEW %s (C BIGINT, D BIGINT)" +
-                    " AS SELECT * FROM %s WHERE ID ='%s' PHOENIX_TTL = %d";
+                    " AS SELECT * FROM %s WHERE ID ='%s' TTL = %d";
 
             globalConn.createStatement().execute(String.format(middleLevelViewDdl,
                     middleLevelViewName1, globalViewName,
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PropertiesInSyncIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PropertiesInSyncIT.java
index 68aed02fd0..934ad98dc0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PropertiesInSyncIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PropertiesInSyncIT.java
@@ -26,14 +26,19 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -60,7 +65,7 @@ import static org.apache.phoenix.end2end.index.IndexMetadataIT.assertUpdateCache
 /**
  * Test properties that need to be kept in sync amongst all column families and indexes of a table
  */
-@Category(ParallelStatsDisabledTest.class)
+@Category(NeedsOwnMiniClusterTest.class)
 public class PropertiesInSyncIT extends ParallelStatsDisabledIT {
     private static final String COL_FAM1 = "CF1";
     private static final String COL_FAM2 = "CF2";
@@ -79,6 +84,20 @@ public class PropertiesInSyncIT extends ParallelStatsDisabledIT {
     private static final int MODIFIED_UPDATE_CACHE_FREQUENCY_VIEWS = INITIAL_UPDATE_CACHE_FREQUENCY_VIEWS + 300;
 
 
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(BaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(60*60)); // An hour
+        props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(false));
+        /**
+         * This test checks Table properties at ColumnFamilyDescriptor level, turing phoenix_table_ttl
+         * to false for them to test TTL and other props at HBase level. TTL being set at phoenix level
+         * is being tested in {@link TTLAsPhoenixTTLIT}
+         */
+        props.put(QueryServices.PHOENIX_TABLE_TTL_ENABLED, Boolean.toString(false));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
     // Test that we disallow specifying synced properties to be set per column family when creating a table
     @Test
     public void testDisallowSyncedPropsToBeSetColFamSpecificCreateTable() throws Exception {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyIT.java
index 6f30ac413b..19f40510c9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyIT.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.KeepDeletedCells;
@@ -35,13 +36,18 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -56,6 +62,20 @@ public abstract class SetPropertyIT extends ParallelStatsDisabledIT {
     private String tableDDLOptions;
     private final boolean columnEncoded;
 
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(BaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(60*60)); // An hour
+        props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(false));
+        /**
+         * This test checks Table properties at ColumnFamilyDescriptor level, turing phoenix_table_ttl
+         * to false for them to test TTL and other props at HBase level. TTL being set at phoenix level
+         * is being tested in {@link TTLAsPhoenixTTLIT}
+         */
+        props.put(QueryServices.PHOENIX_TABLE_TTL_ENABLED, Boolean.toString(false));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
     public SetPropertyIT(boolean columnEncoded) {
         this.columnEncoded = columnEncoded;
         this.tableDDLOptions = columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0";
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyOnEncodedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyOnEncodedTableIT.java
index 25805972fa..42653ac53d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyOnEncodedTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyOnEncodedTableIT.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 import org.junit.experimental.categories.Category;
 import org.junit.runners.Parameterized.Parameters;
 
-@Category(ParallelStatsDisabledTest.class)
+@Category(NeedsOwnMiniClusterTest.class)
 public class SetPropertyOnEncodedTableIT extends SetPropertyIT {
     public SetPropertyOnEncodedTableIT(boolean columnEncoded) {
         super(columnEncoded);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyOnNonEncodedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyOnNonEncodedTableIT.java
index b4ac48471e..6a9c4ab01b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyOnNonEncodedTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyOnNonEncodedTableIT.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 import org.junit.experimental.categories.Category;
 import org.junit.runners.Parameterized.Parameters;
 
-@Category(ParallelStatsDisabledTest.class)
+@Category(NeedsOwnMiniClusterTest.class)
 public class SetPropertyOnNonEncodedTableIT extends SetPropertyIT {
     public SetPropertyOnNonEncodedTableIT(boolean columnEncoded) {
         super(columnEncoded);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
index a1f1892982..e65f298acc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.end2end;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_FOR_MUTEX;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.ipc.controller.ServerToServerRpcController;
 import org.apache.phoenix.compat.hbase.CompatUtil;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.UpgradeRequiredException;
@@ -65,6 +67,8 @@ import org.apache.phoenix.query.ConnectionQueryServicesImpl;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesTestImpl;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.UpgradeUtil;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java
new file mode 100644
index 0000000000..d5a1fb5ab3
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX;
+import static org.apache.phoenix.exception.SQLExceptionCode.PHOENIX_TTL_SUPPORTED_FOR_TABLES_ONLY;
+import static org.apache.phoenix.exception.SQLExceptionCode.VIEW_WITH_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@Category(ParallelStatsDisabledTest.class)
+public class TTLAsPhoenixTTLIT extends ParallelStatsDisabledIT{
+
+    private static final long DEFAULT_TTL_FOR_TEST = 86400;
+
+    /**
+     * test TTL is being set as PhoenixTTL when PhoenixTTL is enabled.
+     */
+    @Test
+    public void testCreateTableWithTTL() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl());) {
+            assertEquals("TTL is not set correctly at Phoenix level", DEFAULT_TTL_FOR_TEST,
+                    conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null,
+                            createTableWithOrWithOutTTLAsItsProperty(conn, true))).getPhoenixTTL());
+        }
+    }
+
+    /**
+     * Tests that when: 1) DDL has both pk as well as key value columns 2) Key value columns have
+     *      * both default and explicit column family names 3) TTL specifier doesn't have column family
+     *      * name. Then it should not affect TTL being set at Phoenix Level.
+     */
+    @Test
+    public void  testCreateTableWithTTLWithDifferentColumnFamilies() throws  Exception {
+        String tableName = generateUniqueName();
+        String ddl =
+                "create table IF NOT EXISTS  " + tableName + "  (" + " id char(1) NOT NULL,"
+                        + " col1 integer NOT NULL," + " b.col2 bigint," + " col3 bigint, "
+                        + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1)"
+                        + " ) TTL=86400";
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(ddl);
+        assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), DEFAULT_TTL_FOR_TEST, tableName);
+    }
+
+    @Test
+    public void testSettingTTLAsAlterTableCommand() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl(), new Properties());
+             PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);){
+            String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, false);
+            //Checking Default TTL in case of PhoenixTTLEnabled
+            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED, tableName);
+            String ddl = "ALTER TABLE  " + tableName
+                    + " SET TTL=1000";
+            conn.createStatement().execute(ddl);
+            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), 1000, tableName);
+        }
+    }
+
+    @Test
+    public void testSettingTTLForIndexes() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())){
+            String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, true);
+
+            //By default, Indexes should set TTL what Base Table has
+            createIndexOnTableOrViewProvidedWithTTL(conn, tableName, PTable.IndexType.LOCAL, false);
+            createIndexOnTableOrViewProvidedWithTTL(conn, tableName, PTable.IndexType.GLOBAL, false);
+            List<PTable> indexes = conn.unwrap(PhoenixConnection.class).getTable(
+                    new PTableKey(null, tableName)).getIndexes();
+            for (PTable index : indexes) {
+                assertTTLValueOfIndex(DEFAULT_TTL_FOR_TEST, index);;
+            }
+
+            tableName = createTableWithOrWithOutTTLAsItsProperty(conn, false);
+
+            String localIndexName = createIndexOnTableOrViewProvidedWithTTL(conn, tableName, PTable.IndexType.LOCAL, false);
+            String globalIndexName = createIndexOnTableOrViewProvidedWithTTL(conn, tableName, PTable.IndexType.GLOBAL, false);
+            indexes = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName)).getIndexes();
+            for (PTable index : indexes) {
+                assertTTLValueOfIndex(PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED, index);
+            }
+
+            //Test setting TTL as index property not allowed while creating them or setting them explicitly.
+            try {
+                conn.createStatement().execute("ALTER TABLE " + localIndexName + " SET TTL = 1000");
+                fail();
+            } catch (SQLException sqe) {
+                assertEquals("Should fail with cannot set or alter property for index",
+                        CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX.getErrorCode(), sqe.getErrorCode());
+            }
+
+            try {
+                conn.createStatement().execute("ALTER TABLE " + globalIndexName + " SET TTL = 1000");
+                fail();
+            } catch (SQLException sqe) {
+                assertEquals("Should fail with cannot set or alter property for index",
+                        CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX.getErrorCode(), sqe.getErrorCode());
+            }
+
+            try {
+                createIndexOnTableOrViewProvidedWithTTL(conn, tableName, PTable.IndexType.LOCAL, true);
+                fail();
+            } catch (SQLException sqe) {
+                assertEquals("Should fail with cannot set or alter property for index",
+                        CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX.getErrorCode(), sqe.getErrorCode());
+            }
+
+            try {
+                createIndexOnTableOrViewProvidedWithTTL(conn, tableName, PTable.IndexType.GLOBAL, true);
+                fail();
+            } catch (SQLException sqe) {
+                assertEquals("Should fail with cannot set or alter property for index",
+                        CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX.getErrorCode(), sqe.getErrorCode());
+            }
+
+        }
+    }
+
+
+    @Test
+    public void testSettingTTLForViews() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tenantID = generateUniqueName();
+            String tenantID1 = generateUniqueName();
+
+            Properties props = new Properties();
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantID);
+            Connection tenantConn = DriverManager.getConnection(getUrl(), props);
+
+            Properties props1 = new Properties();
+            props1.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantID1);
+            Connection tenantConn1 = DriverManager.getConnection(getUrl(), props1);
+
+            String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, true);
+
+            //View gets TTL value from its hierarchy
+            String viewName = createViewOnTableWithTTL(conn, tableName, false);
+            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
+                    DEFAULT_TTL_FOR_TEST, viewName);
+
+            //Index on Global View should get TTL from View.
+            createIndexOnTableOrViewProvidedWithTTL(conn, viewName, PTable.IndexType.GLOBAL,
+                    false);
+            createIndexOnTableOrViewProvidedWithTTL(conn, viewName, PTable.IndexType.LOCAL,
+                    false);
+            List<PTable> indexes = conn.unwrap(PhoenixConnection.class).getTable(
+                    new PTableKey(null, viewName)).getIndexes();
+            for (PTable index : indexes) {
+                assertTTLValueOfIndex(DEFAULT_TTL_FOR_TEST, index);
+            }
+
+            //Child View gets TTL from parent View which gets from Table.
+            String childView = createViewOnViewWithTTL(tenantConn, viewName, false);
+            assertTTLValueOfTableOrView(tenantConn.unwrap(PhoenixConnection.class),
+                    DEFAULT_TTL_FOR_TEST, childView);
+
+            String childView1 = createViewOnViewWithTTL(tenantConn1, viewName, false);
+            assertTTLValueOfTableOrView(tenantConn1.unwrap(PhoenixConnection.class),
+                    DEFAULT_TTL_FOR_TEST, childView1);
+
+            //Setting TTL on Views should not be allowed.
+
+            try {
+                createViewOnTableWithTTL(conn, tableName, true);
+                fail();
+            } catch (SQLException sqe) {
+                assertEquals("Should fail with TTL supported for tables only",
+                        PHOENIX_TTL_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), sqe.getErrorCode());
+            }
+
+            try {
+                conn.createStatement().execute("ALTER VIEW " + viewName + " SET TTL = 1000");
+                fail();
+            } catch (SQLException sqe) {
+                assertEquals("Cannot Set or Alter TTL on Views",
+                        VIEW_WITH_PROPERTIES.getErrorCode(), sqe.getErrorCode());
+            }
+
+            try {
+                createIndexOnTableOrViewProvidedWithTTL(conn, viewName, PTable.IndexType.GLOBAL,true);
+                fail();
+            } catch (SQLException sqe) {
+                assertEquals("Should fail with Cannot set or Alter property for index",
+                        CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX.getErrorCode(), sqe.getErrorCode());
+            }
+        }
+    }
+
+    private void assertTTLValueOfTableOrView(PhoenixConnection conn, long expected, String name) throws SQLException {
+        assertEquals("TTL value did not match :-", expected,
+                conn.getTable(new PTableKey(conn.getTenantId(), name)).getPhoenixTTL());
+    }
+
+    private void assertTTLValueOfIndex(long expected, PTable index) {
+        assertEquals("TTL value is not what expected :-", expected, index.getPhoenixTTL());
+    }
+
+
+    private String createTableWithOrWithOutTTLAsItsProperty(Connection conn, boolean withTTL) throws SQLException {
+        String tableName = generateUniqueName();
+        conn.createStatement().execute("CREATE TABLE IF NOT EXISTS  " + tableName + "  ("
+                + " ID INTEGER NOT NULL,"
+                + " COL1 INTEGER NOT NULL,"
+                + " COL2 bigint NOT NULL,"
+                + " CREATED_DATE DATE,"
+                + " CREATION_TIME BIGINT,"
+                + " CONSTRAINT NAME_PK PRIMARY KEY (ID, COL1, COL2))"
+                + ( withTTL ? " TTL = " + DEFAULT_TTL_FOR_TEST : ""));
+        return tableName;
+    }
+
+    private String createIndexOnTableOrViewProvidedWithTTL(Connection conn, String baseTableOrViewName, PTable.IndexType indexType,
+                                                           boolean withTTL) throws SQLException {
+        switch (indexType) {
+            case LOCAL:
+                String localIndexName = baseTableOrViewName + "_Local_" + generateUniqueName();
+                conn.createStatement().execute("CREATE LOCAL INDEX " + localIndexName + " ON " +
+                        baseTableOrViewName + " (COL1) " + (withTTL ? "TTL = 1000" : ""));
+                return localIndexName;
+
+            case GLOBAL:
+                String globalIndexName = baseTableOrViewName + "_Global_" + generateUniqueName();
+                conn.createStatement().execute("CREATE INDEX " + globalIndexName + " ON " +
+                        baseTableOrViewName + " (COL1) " + (withTTL ? "TTL = 1000" : ""));
+                return globalIndexName;
+
+            default:
+                return baseTableOrViewName;
+        }
+    }
+
+    private String createViewOnTableWithTTL(Connection conn, String baseTableName,
+                                            boolean withTTL) throws SQLException {
+        String viewName = "VIEW_" + baseTableName + "_" + generateUniqueName();
+        conn.createStatement().execute("CREATE VIEW " + viewName
+                + " (" + generateUniqueName() + " SMALLINT) as select * from "
+                + baseTableName + " where id > 1 "
+                + (withTTL ? "TTL = 1000" : "") );
+        return viewName;
+    }
+
+    private String createViewOnViewWithTTL(Connection conn, String parentViewName,
+                                           boolean withTTL) throws SQLException {
+        String childView = parentViewName + "_" + generateUniqueName();
+        conn.createStatement().execute("CREATE VIEW " + childView +
+                " (E BIGINT, F BIGINT) AS SELECT * FROM " + parentViewName);
+        return childView;
+    }
+
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
index 77b3ec7ebd..bea751cba6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
@@ -65,6 +65,7 @@ import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -96,6 +97,11 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Disabling this test as this works on TTL being set on View which is removed and will be added in future.
+ * TODO:- To enable this test after re-enabling TTL for view for more info check :- PHOENIX-6978
+ */
+@Ignore
 @Category(NeedsOwnMiniClusterTest.class)
 public class ViewTTLIT extends ParallelStatsDisabledIT {
     private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLIT.class);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLNotEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLNotEnabledIT.java
index dfde99b546..a624c60051 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLNotEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLNotEnabledIT.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.query.PhoenixTestBuilder;
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import org.apache.phoenix.util.ScanUtil;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -38,7 +39,7 @@ import java.sql.Statement;
 import java.util.Properties;
 
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
-
+@Ignore
 @Category(NeedsOwnMiniClusterTest.class)
 public class ViewTTLNotEnabledIT extends ParallelStatsDisabledIT {
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java
index c4a8ec42b2..9f2e6fa0f2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java
@@ -132,7 +132,7 @@ public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT
                     new IndexVerificationOutputRepository(mockStringBytes, conn);
 
             outputRepository.createOutputTable(conn);
-            TestUtil.assertTableHasTtl(conn, TableName.valueOf(OUTPUT_TABLE_NAME_BYTES), DEFAULT_LOG_TTL);
+            TestUtil.assertTableHasTtl(conn, TableName.valueOf(OUTPUT_TABLE_NAME_BYTES), DEFAULT_LOG_TTL, false);
             ManualEnvironmentEdge customClock = new ManualEnvironmentEdge();
             customClock.setValue(EnvironmentEdgeManager.currentTimeMillis());
             EnvironmentEdgeManager.injectEdge(customClock);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java
index 1fb4b4d091..0f680b6189 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java
@@ -109,7 +109,7 @@ public class IndexVerificationResultRepositoryIT extends ParallelStatsDisabledIT
         IndexVerificationResultRepository resultRepository =
                 new IndexVerificationResultRepository(conn, indexNameBytes);
         resultRepository.createResultTable(conn);
-        TestUtil.assertTableHasTtl(conn, TableName.valueOf(RESULT_TABLE_NAME_BYTES), DEFAULT_LOG_TTL);
+        TestUtil.assertTableHasTtl(conn, TableName.valueOf(RESULT_TABLE_NAME_BYTES), DEFAULT_LOG_TTL, false);
         byte[] regionOne = Bytes.toBytes("a.1.00000000000000000000");
         byte[] regionTwo = Bytes.toBytes("a.2.00000000000000000000");
         resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 7493acceac..bc566a8364 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -132,6 +132,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
     public static final String MASK_PHOENIX_TTL_EXPIRED = "_MASK_TTL_EXPIRED";
     public static final String DELETE_PHOENIX_TTL_EXPIRED = "_DELETE_TTL_EXPIRED";
     public static final String PHOENIX_TTL_SCAN_TABLE_NAME = "_PhoenixTTLScanTableName";
+    public static final String IS_PHOENIX_TTL_SCAN_TABLE_SYSTEM = "_IsPhoenixScanTableSystem";
     public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
     public static final String REPLAY_WRITES = "_IGNORE_NEWER_MUTATIONS";
     public final static String SCAN_OFFSET = "_RowOffset";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 3bcc2cefa8..cb7dbf57d7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -85,7 +85,9 @@ public class CompactionScanner implements InternalScanner {
             InternalScanner storeScanner,
             long maxLookbackInMillis,
             byte[] emptyCF,
-            byte[] emptyCQ) {
+            byte[] emptyCQ,
+            long phoenixTTL,
+            boolean isSystemTable) {
         this.storeScanner = storeScanner;
         this.region = env.getRegion();
         this.store = store;
@@ -108,7 +110,7 @@ public class CompactionScanner implements InternalScanner {
         this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
                 compactionTime : compactionTime - (maxLookbackInMillis + 1);
         ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
-        ttl = cfd.getTimeToLive();
+        this.ttl = isSystemTable ? cfd.getTimeToLive() : phoenixTTL;
         this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - ttl * 1000;
         ttl *= 1000;
         this.maxLookbackWindowStart = Math.max(ttlWindowStart, maxLookbackWindowStart);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 4eebee0457..e420490242 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -154,7 +154,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
     protected TaskRunner pool;
     protected String exceptionMessage;
     protected HTableFactory hTableFactory;
-    protected int indexTableTTL;
+    protected long indexTableTTL;
     protected long maxLookBackInMills;
     protected IndexToolVerificationResult verificationResult = null;
     protected IndexVerificationResultRepository verificationResultRepository = null;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 986107d3fc..24951662d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -57,6 +57,7 @@ import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.slf4j.Logger;
@@ -88,7 +89,11 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner {
         super(innerScanner, region, scan, env, ungroupedAggregateRegionObserver);
 
         indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
-        indexTableTTL = indexHTable.getDescriptor().getColumnFamilies()[0].getTimeToLive();
+        if (BaseScannerRegionObserver.isPhoenixTableTTLEnabled(env.getConfiguration())) {
+            indexTableTTL = ScanUtil.getPhoenixTTL(scan);
+        } else {
+            indexTableTTL = indexHTable.getDescriptor().getColumnFamilies()[0].getTimeToLive();
+        }
         indexRowKeyforReadRepair = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
         if (indexRowKeyforReadRepair != null) {
             setReturnCodeForSingleRowRebuild();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
index a8147fb6b4..fd0e25cd78 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
@@ -62,6 +62,7 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,7 +87,11 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
 
         byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
         dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName));
-        indexTableTTL = region.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
+        if (BaseScannerRegionObserver.isPhoenixTableTTLEnabled(env.getConfiguration())) {
+            indexTableTTL = ScanUtil.getPhoenixTTL(scan);
+        } else {
+            indexTableTTL = indexHTable.getDescriptor().getColumnFamilies()[0].getTimeToLive();
+        }
         try (org.apache.hadoop.hbase.client.Connection connection =
                      HBaseFactoryProvider.getHConnectionFactory().createConnection(env.getConfiguration())) {
             regionEndKeys = connection.getRegionLocator(dataHTable.getName()).getEndKeys();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index 839ddb2e22..92006d7927 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -76,6 +76,7 @@ import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,7 +96,11 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner {
                           UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
         super(innerScanner, region, scan, env, ungroupedAggregateRegionObserver);
         indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
-        indexTableTTL = indexHTable.getDescriptor().getColumnFamilies()[0].getTimeToLive();
+        if (BaseScannerRegionObserver.isPhoenixTableTTLEnabled(env.getConfiguration())) {
+            indexTableTTL = ScanUtil.getPhoenixTTL(scan);
+        } else {
+            indexTableTTL = indexHTable.getDescriptor().getColumnFamilies()[0].getTimeToLive();
+        }
         pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
                 new ThreadPoolBuilder("IndexVerify",
                         env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 149afee79b..7234cb3da8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -138,9 +138,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
 import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.RpcCall;
 import org.apache.hadoop.hbase.ipc.RpcUtil;
@@ -3426,6 +3424,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     long newPhoenixTTL = (long) PLong.INSTANCE.toObject(cell.getValueArray(),
                             cell.getValueOffset(), cell.getValueLength());
                     hasNewPhoenixTTLAttribute =  newPhoenixTTL != PHOENIX_TTL_NOT_DEFINED ;
+                    //TODO:- Re-enable when we have ViewTTL
+                    return false;
                 }
             }
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index ce325bbf66..4942e2b2c3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.isPhoenixTableTTLEnabled;
 
 /**
  *  TTLRegionScanner masks expired rows using the empty column cell timestamp
@@ -69,7 +71,14 @@ public class TTLRegionScanner extends BaseRegionScanner {
         emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
         long currentTime = scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP ?
                 EnvironmentEdgeManager.currentTimeMillis() : scan.getTimeRange().getMax();
-        ttl = env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
+        byte[] isSystemTable = scan.getAttribute(BaseScannerRegionObserver.
+                IS_PHOENIX_TTL_SCAN_TABLE_SYSTEM);
+        if (isPhoenixTableTTLEnabled(env.getConfiguration()) && (isSystemTable == null
+                || !Bytes.toBoolean(isSystemTable))) {
+            ttl = ScanUtil.getPhoenixTTL(this.scan);
+        } else {
+            ttl = env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
+        }
         ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 1000;
         ttl *= 1000;
         // Regardless if the Phoenix Table TTL feature is disabled cluster wide or the client is
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index fd6b8af346..6613f91612 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.coprocessor;
 
 import static org.apache.phoenix.coprocessor.GlobalIndexRegionScanner.adjustScanFilter;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_PHOENIX_TTL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
 import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
@@ -641,7 +643,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                             table.getEncodingScheme()
                                                     == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
                                                     QueryConstants.EMPTY_COLUMN_BYTES :
-                                                    table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME)
+                                                    table.getEncodingScheme().
+                                                            encode(QueryConstants.
+                                                                    ENCODED_EMPTY_COLUMN_NAME),
+                                            table.getPhoenixTTL() == PHOENIX_TTL_NOT_DEFINED
+                                                    ? DEFAULT_PHOENIX_TTL : table.getPhoenixTTL(),
+                                            table.getType() == PTableType.SYSTEM
                                             );
                         }
                     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 67dd2635a1..9d8e2024fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -56,6 +56,7 @@ import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
 
 
 /**
@@ -351,6 +352,8 @@ public enum SQLExceptionCode {
             + PhoenixDatabaseMetaData.PHOENIX_TTL + " property on an view when parent/child view has PHOENIX_TTL set,"),
     CHANGE_DETECTION_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY(10954, "44A36",
         CHANGE_DETECTION_ENABLED + " is only supported on tables and views"),
+    PHOENIX_TTL_SUPPORTED_FOR_TABLES_ONLY(10955, "44A37", TTL
+            + "property can only be set for tables"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 478a0aa874..dc0fa85ea1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -391,8 +391,11 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final String USE_STATS_FOR_PARALLELIZATION = "USE_STATS_FOR_PARALLELIZATION";
     public static final byte[] USE_STATS_FOR_PARALLELIZATION_BYTES = Bytes.toBytes(USE_STATS_FOR_PARALLELIZATION);
 
-    // The PHOENIX_TTL property will hold the duration after which rows will be marked as expired.
+    // The TTL property will hold the duration after which rows will be marked as expired and
+    // is stored in column PHOENIX_TTL in SYSCAT. TODO:- Rename PHOENIX_TTL to TTL in SYSCAT!?
+    public static final String TTL = "TTL";
     public static final long PHOENIX_TTL_NOT_DEFINED = 0L;
+    public static final long DEFAULT_PHOENIX_TTL = HConstants.FOREVER;
     public static final String PHOENIX_TTL = "PHOENIX_TTL";
     public static final byte[] PHOENIX_TTL_BYTES = Bytes.toBytes(PHOENIX_TTL);
     // The phoenix ttl high watermark if set indicates the timestamp used for determining the expired rows.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index a43f7dce83..61ffb5ae3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -33,6 +33,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.conf.Configuration;
@@ -66,6 +68,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
 
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_PHOENIX_TTL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
+
 /**
  * Mapper that reads from the data table and checks the rows against the index table
  */
@@ -166,7 +171,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
                     PhoenixRuntime.generateColumnInfo(connection, qSourceTable, sourceColNames);
             LOGGER.info("Target table base query: " + targetTableQuery);
             md5 = MessageDigest.getInstance("MD5");
-            ttl = getTableTtl();
+            ttl = getTableTTL(configuration);
             maxLookbackAgeMillis = BaseScannerRegionObserver.getMaxLookbackInMillis(configuration);
         } catch (SQLException | NoSuchAlgorithmException e) {
             tryClosingResourceSilently(this.outputUpsertStmt);
@@ -323,7 +328,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
         return sourceTS <= maxLookBackTimeMillis;
     }
 
-    private int getTableTtl() throws SQLException, IOException {
+    private long getTableTTL(Configuration configuration) throws SQLException, IOException {
         PTable pSourceTable = PhoenixRuntime.getTable(connection, qSourceTable);
         if (pSourceTable.getType() == PTableType.INDEX
                 && pSourceTable.getIndexType() == PTable.IndexType.LOCAL) {
@@ -333,12 +338,19 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
                 cqsi = connection.unwrap(PhoenixConnection.class).getQueryServices();
         String physicalTable = getSourceTableName(pSourceTable,
                 SchemaUtil.isNamespaceMappingEnabled(null, cqsi.getProps()));
-        TableDescriptor tableDesc;
-        try (Admin admin = cqsi.getAdmin()) {
-            tableDesc = admin.getDescriptor(TableName
-                .valueOf(physicalTable));
+        if (configuration.getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
+                QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED)) {
+            return pSourceTable.getPhoenixTTL() == PHOENIX_TTL_NOT_DEFINED ? DEFAULT_PHOENIX_TTL
+                    : pSourceTable.getPhoenixTTL();
+        } else {
+            TableDescriptor tableDesc;
+            try (Admin admin = cqsi.getAdmin()) {
+                tableDesc = admin.getDescriptor(TableName
+                        .valueOf(physicalTable));
+            }
+            return tableDesc.getColumnFamily(SchemaUtil.getEmptyColumnFamily(pSourceTable)).
+                    getTimeToLive();
         }
-        return tableDesc.getColumnFamily(SchemaUtil.getEmptyColumnFamily(pSourceTable)).getTimeToLive();
     }
 
     @VisibleForTesting
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 752c46a76e..dad2ccb4c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -185,10 +185,10 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
+import org.apache.phoenix.coprocessor.PhoenixTTLRegionObserver;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
 import org.apache.phoenix.coprocessor.SequenceRegionObserver;
 import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
-import org.apache.phoenix.coprocessor.PhoenixTTLRegionObserver;
 import org.apache.phoenix.coprocessor.SystemCatalogRegionObserver;
 import org.apache.phoenix.coprocessor.TaskMetaDataEndpoint;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
@@ -1050,6 +1050,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         return false;
     }
 
+    private boolean isPhoenixTTLEnabled() {
+        boolean ttl = config.getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
+                QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
+        return ttl;
+    }
+
 
     private void addCoprocessors(byte[] tableName, TableDescriptorBuilder builder,
             PTableType tableType, Map<String, Object> tableProps, TableDescriptor existingDesc,
@@ -1300,6 +1306,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                     .build());
                 }
             }
+
             if (Arrays.equals(tableName, SchemaUtil.getPhysicalName(SYSTEM_CATALOG_NAME_BYTES, props).getName())) {
                 if (!newDesc.hasCoprocessor(SystemCatalogRegionObserver.class.getName())) {
                     builder.setCoprocessor(
@@ -2852,6 +2859,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         boolean willBeTransactional = false;
         boolean isOrWillBeTransactional = isTransactional;
         Integer newTTL = null;
+        Integer newPhoenixTTL = null;
         Integer newReplicationScope = null;
         KeepDeletedCells newKeepDeletedCells = null;
         TransactionFactory.Provider txProvider = null;
@@ -2902,10 +2910,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                             .setMessage("Property: " + propName).build()
                                             .buildException();
                                 }
-                                newTTL = ((Number)propValue).intValue();
-                                // Even though TTL is really a HColumnProperty we treat it specially.
-                                // We enforce that all column families have the same TTL.
-                                commonFamilyProps.put(propName, propValue);
+                                //If Phoenix level TTL is enabled we are using TTL as phoenix
+                                //Table level property.
+                                if (!isPhoenixTTLEnabled()) {
+                                    newTTL = ((Number) propValue).intValue();
+                                    //Even though TTL is really a HColumnProperty we treat it
+                                    //specially. We enforce that all CFs have the same TTL.
+                                    commonFamilyProps.put(propName, propValue);
+                                } else {
+                                    //Setting this here just to check if we need to throw Exception
+                                    //for Transaction's SET_TTL Feature.
+                                    newPhoenixTTL = ((Number) propValue).intValue();
+                                }
                             } else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && Boolean.TRUE.equals(propValue)) {
                                 willBeTransactional = isOrWillBeTransactional = true;
                                 tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, propValue);
@@ -2956,7 +2972,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         }
                     }
                 }
-                if (isOrWillBeTransactional && newTTL != null) {
+                if (isOrWillBeTransactional && (newTTL != null || newPhoenixTTL != null)) {
                     TransactionFactory.Provider isOrWillBeTransactionProvider = txProvider == null ? table.getTransactionProvider() : txProvider;
                     if (isOrWillBeTransactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.SET_TTL)) {
                         throw new SQLExceptionInfo.Builder(PhoenixTransactionProvider.Feature.SET_TTL.getCode())
@@ -3720,7 +3736,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             }
         }
     }
-
     /**
      * Check if the SYSTEM MUTEX table exists. If it does, ensure that its TTL is correct and if
      * not, modify its table descriptor
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 5efbf7a79b..6d0a500801 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -21,8 +21,10 @@ import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRA
 import static org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
 import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
 import static org.apache.phoenix.query.QueryServices.INDEX_CREATE_DEFAULT_STATE;
+import static org.apache.phoenix.schema.PTableType.SYSTEM;
 import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSet;
 import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSetWithExpectedSize;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB;
@@ -44,7 +46,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_ENCODED_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
@@ -64,7 +65,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
@@ -121,7 +121,6 @@ import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_P
 import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
 import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static org.apache.phoenix.schema.PTable.ViewType.MAPPED;
-import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.schema.PTableType.TABLE;
 import static org.apache.phoenix.schema.PTableType.VIEW;
@@ -290,8 +289,6 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
 import org.apache.phoenix.thirdparty.com.google.common.primitives.Ints;
 
-import edu.umd.cs.findbugs.annotations.SuppressWarnings;
-
 public class MetaDataClient {
     private static final Logger LOGGER = LoggerFactory.getLogger(MetaDataClient.class);
 
@@ -1061,6 +1058,14 @@ public class MetaDataClient {
                             .setMessage("Property: " + prop.getFirst()).build()
                             .buildException();
                 }
+                //If Phoenix Level TTL is enabled use TTL as Phoenix Table Property as skip
+                //TTL at HTableDescriptor level.
+                if (isPhoenixTTLEnabled() && prop.getFirst().equalsIgnoreCase(TTL)
+                        && tableType != PTableType.SYSTEM) {
+                    tableProps.put(prop.getFirst(), prop.getSecond());
+                    continue;
+                }
+
                 // HTableDescriptor property or Phoenix Table Property
                 if (defaultDescriptor.getValue(Bytes.toBytes(prop.getFirst())) == null) {
                     // See PHOENIX-4891
@@ -1078,6 +1083,12 @@ public class MetaDataClient {
         }
     }
 
+    private boolean isPhoenixTTLEnabled() {
+        return connection.getQueryServices().getConfiguration().
+                getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
+                QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
+    }
+
     public MutationState updateStatistics(UpdateStatisticsStatement updateStatisticsStmt)
             throws SQLException {
         // Don't mistakenly commit pending rows
@@ -1938,6 +1949,50 @@ public class MetaDataClient {
         }
     }
 
+    /**
+     * Get TTL defined for Index or View in its parent hierarchy as defining TTL directly on index
+     * or view is not allowed. View on SYSTEM table is not allowed and already handled during
+     * plan compilation.
+     * @param parent
+     * @return appropriate TTL for the entity calling the function.
+     * @throws TableNotFoundException
+     */
+    private Long getTTLFromParent(PTable parent) throws TableNotFoundException {
+        return (parent.getType() == TABLE) ? Long.valueOf(parent.getPhoenixTTL())
+                : (parent.getType() == VIEW ? getTTLFromAncestor(parent) : null);
+    }
+
+    /**
+     * Get TTL defined for the given View according to its hierarchy.
+     * @param view
+     * @return appropriate TTL from Views defined above for the entity calling.
+     * @throws TableNotFoundException
+     */
+    private Long getTTLFromAncestor(PTable view) throws TableNotFoundException {
+        try {
+            return view.getPhoenixTTL() != PHOENIX_TTL_NOT_DEFINED
+                    ? Long.valueOf(view.getPhoenixTTL()) : (checkIfParentIsTable(view)
+                    ? connection.getTable(new PTableKey(null,
+                            view.getPhysicalNames().get(0).getString())).getPhoenixTTL()
+                    : getTTLFromAncestor(connection.getTable(new PTableKey(
+                                    connection.getTenantId(), view.getParentName().getString()))));
+        } catch (TableNotFoundException tne) {
+            //Check again for TTL from ancestors, what if view here is tenant view on top of
+            //Global View without any tenant id.
+            return getTTLFromAncestor(connection.getTable(new PTableKey(null,
+                    view.getParentName().getString())));
+        }
+    }
+
+    private boolean checkIfParentIsTable(PTable view) {
+        PName parentName = view.getParentName();
+        if (parentName == null) {
+            //means this is a view on dataTable
+            return true;
+        }
+        return parentName.getString().equals(view.getPhysicalName().getString());
+    }
+
     private PTable createTableInternal(CreateTableStatement statement, byte[][] splits,
             final PTable parent, String viewStatement, ViewType viewType, PDataType viewIndexIdType,
             final byte[][] viewColumnConstants, final BitSet isViewColumnReferenced, boolean allocateIndexId,
@@ -1994,46 +2049,28 @@ public class MetaDataClient {
 
             Long phoenixTTL = PHOENIX_TTL_NOT_DEFINED;
             Long phoenixTTLHighWaterMark = MIN_PHOENIX_TTL_HWM;
-            Long phoenixTTLProp = (Long) TableProperty.PHOENIX_TTL.getValue(tableProps);;
+            Long phoenixTTLProp = (Long) TableProperty.TTL.getValue(tableProps);
 
             // Validate PHOENIX_TTL prop value if set
             if (phoenixTTLProp != null) {
                 if (phoenixTTLProp < 0) {
                     throw new SQLExceptionInfo.Builder(SQLExceptionCode.ILLEGAL_DATA)
-                            .setMessage(String.format("entity = %s, PHOENIX_TTL value should be > 0", tableName ))
+                            .setMessage(String.format("entity = %s, TTL value should be > 0",
+                                    tableName))
                             .build()
                             .buildException();
                 }
 
-                // TODO: PHOENIX_TABLE_TTL
-                if (tableType == VIEW  && parentPhysicalName != null) {
-                    TableDescriptor desc = connection.getQueryServices().getTableDescriptor(
-                        parentPhysicalName.getBytes(StandardCharsets.UTF_8));
-                    if (desc != null) {
-                        Integer tableTTLProp = desc.getColumnFamily(SchemaUtil.getEmptyColumnFamily(parent)).getTimeToLive();
-                        if ((tableTTLProp != null) && (tableTTLProp != HConstants.FOREVER)) {
-                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_OR_ALTER_PHOENIX_TTL_FOR_TABLE_WITH_TTL)
-                                    .setMessage(String.format("table = %s, view = %s", parentPhysicalName, tableName ))
-                                    .build()
-                                    .buildException();
-                        }
-                    }
-                }
-
-                // Cannot set PHOENIX_TTL if parent has already defined it.
-                if (tableType == VIEW  && parent != null && parent.getPhoenixTTL() != PHOENIX_TTL_NOT_DEFINED) {
-                    throw new SQLExceptionInfo.Builder(
-                            SQLExceptionCode.CANNOT_SET_OR_ALTER_PHOENIX_TTL)
-                            .setSchemaName(schemaName).setTableName(tableName).build().buildException();
-                }
-
-                if (tableType != VIEW) {
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.PHOENIX_TTL_SUPPORTED_FOR_VIEWS_ONLY)
+                if (tableType != TABLE) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.
+                            PHOENIX_TTL_SUPPORTED_FOR_TABLES_ONLY)
                             .setSchemaName(schemaName)
                             .setTableName(tableName)
                             .build()
                             .buildException();
                 }
+                //Set Only in case of Tables and System Tables.
+                phoenixTTL = phoenixTTLProp;
             }
 
             Boolean isChangeDetectionEnabledProp =
@@ -2047,7 +2084,8 @@ public class MetaDataClient {
                 timestamp = TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider);
                 isImmutableRows = parent.isImmutableRows();
                 isAppendOnlySchema = parent.isAppendOnlySchema();
-                phoenixTTL = parent.getPhoenixTTL();
+                //Check up hierarchy and get appropriate TTL
+                phoenixTTL = getTTLFromParent(parent);
 
                 // Index on view
                 // TODO: Can we support a multi-tenant index directly on a multi-tenant
@@ -2249,7 +2287,8 @@ public class MetaDataClient {
                 .setSchemaName(schemaName).setTableName(tableName)
                 .build().buildException();
             }
-            if (TableProperty.TTL.getValue(commonFamilyProps) != null 
+            if ((isPhoenixTTLEnabled() ? phoenixTTLProp != null
+                    : TableProperty.TTL.getValue(commonFamilyProps) != null)
                     && transactionProvider != null 
                     && transactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.SET_TTL)) {
                 throw new SQLExceptionInfo.Builder(PhoenixTransactionProvider.Feature.SET_TTL.getCode())
@@ -2365,7 +2404,7 @@ public class MetaDataClient {
                         updateCacheFrequency = parent.getUpdateCacheFrequency();
                     }
 
-                    phoenixTTL = phoenixTTLProp == null ? parent.getPhoenixTTL() : phoenixTTLProp;
+                    phoenixTTL = getTTLFromParent(parent);
 
                     disableWAL = (disableWALProp == null ? parent.isWALDisabled() : disableWALProp);
                     defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
@@ -3077,7 +3116,6 @@ public class MetaDataClient {
             } else {
                 tableUpsert.setString(35, streamingTopicName);
             }
-
             tableUpsert.execute();
 
             if (asyncCreatedDate != null) {
@@ -5267,7 +5305,7 @@ public class MetaDataClient {
                         metaProperties.setColumnEncodedBytesProp(QualifierEncodingScheme.fromSerializedValue((byte)value));
                     } else if (propName.equalsIgnoreCase(USE_STATS_FOR_PARALLELIZATION)) {
                         metaProperties.setUseStatsForParallelizationProp((Boolean)value);
-                    } else if (propName.equalsIgnoreCase(PHOENIX_TTL)) {
+                    } else if (propName.equalsIgnoreCase(TTL) && isPhoenixTTLEnabled()) {
                         metaProperties.setPhoenixTTL((Long)value);
                     } else if (propName.equalsIgnoreCase(CHANGE_DETECTION_ENABLED)) {
                         metaProperties.setChangeDetectionEnabled((Boolean) value);
@@ -5442,9 +5480,15 @@ public class MetaDataClient {
         }
 
         if (metaProperties.getPhoenixTTL() != null) {
-            if (table.getType() != PTableType.VIEW) {
+            if (table.getType() == PTableType.INDEX) {
+                throw new SQLExceptionInfo.Builder(
+                        SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX)
+                        .build()
+                        .buildException();
+            }
+            if (table.getType() != PTableType.TABLE && table.getType() != SYSTEM) {
                 throw new SQLExceptionInfo.Builder(
-                        SQLExceptionCode.PHOENIX_TTL_SUPPORTED_FOR_VIEWS_ONLY)
+                        SQLExceptionCode.PHOENIX_TTL_SUPPORTED_FOR_TABLES_ONLY)
                         .build()
                         .buildException();
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index fb23a381c3..ee4c6db0c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -34,6 +34,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAM
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES;
@@ -611,6 +612,7 @@ public class PTableImpl implements PTable {
         }
 
         public Builder setPhoenixTTL(long phoenixTTL) {
+            propertyValues.put(TTL, String.valueOf(phoenixTTL));
             this.phoenixTTL = phoenixTTL;
             return this;
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 24d0432543..34e5f0a8e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -30,7 +30,6 @@ import java.sql.SQLException;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -76,13 +75,6 @@ public enum TableProperty {
         }
     },
 
-    TTL(ColumnFamilyDescriptorBuilder.TTL, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, true, CANNOT_ALTER_PROPERTY, false, false) {
-        @Override
-        public Object getPTableValue(PTable table) {
-            return null;
-        }
-    },
-
     STORE_NULLS(PhoenixDatabaseMetaData.STORE_NULLS, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false, false) {
         @Override
         public Object getPTableValue(PTable table) {
@@ -255,7 +247,7 @@ public enum TableProperty {
         }
     },
 
-    PHOENIX_TTL(PhoenixDatabaseMetaData.PHOENIX_TTL, true, true, true) {
+    TTL(PhoenixDatabaseMetaData.TTL, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, true, false, false) {
         /**
          * PHOENIX_TTL can take any values ranging between 0 < PHOENIX_TTL <= HConstants.LATEST_TIMESTAMP.
          * special values :-
@@ -275,9 +267,9 @@ public enum TableProperty {
                     return PHOENIX_TTL_NOT_DEFINED;
                 }
             } else if (value != null) {
-                long valueInSeconds = ((Number) value).longValue();
-                // Value is specified in seconds, so convert it to ms.
-                return valueInSeconds * 1000;
+                //Not converting to seconds for better understanding at compaction and masking
+                //stage.As HBase Descriptor level gives this value in seconds.
+                return ((Number) value).longValue();
             }
             return value;
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
index 7e3913249a..f487d51a0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
@@ -31,6 +31,8 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
@@ -52,7 +54,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.stream.Collectors;
 
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY;
 import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
 import static org.apache.phoenix.util.MetaDataUtil.SYNCED_DATA_TABLE_AND_INDEX_COL_FAM_PROPERTIES;
@@ -73,6 +77,7 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
     private String ddl = null;
     private String tenantId;
     private boolean shouldGenerateWithDefaults = false;
+    private boolean isPhoenixTTLEnabled = true;
 
     public SchemaExtractionProcessor(String tenantId, Configuration conf,
             String pSchemaName, String pTableName)
@@ -80,6 +85,8 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
         this.tenantId = tenantId;
         this.conf = conf;
         this.table = getPTable(pSchemaName, pTableName);
+        this.isPhoenixTTLEnabled = conf.getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
+                QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
     }
 
     public SchemaExtractionProcessor(String tenantId, Configuration conf,
@@ -89,6 +96,8 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
         this.conf = conf;
         this.table = pTable;
         this.shouldGenerateWithDefaults = shouldGenerateWithDefaults;
+        this.isPhoenixTTLEnabled = conf.getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
+                QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
     }
 
     @Override
@@ -360,6 +369,9 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
         for (Map.Entry<Bytes, Bytes> entry : propsMap.entrySet()) {
             Bytes key = entry.getKey();
             Bytes globalValue = entry.getValue();
+            if (Bytes.toString(key.get()).equalsIgnoreCase(TTL) && isPhoenixTTLEnabled) {
+                continue;
+            }
             Map<String, String> cfToPropertyValueMap = new HashMap<String, String>();
             Set<Bytes> cfPropertyValueSet = new HashSet<>();
             for (ColumnFamilyDescriptor columnDescriptor: columnDescriptors) {
@@ -389,7 +401,13 @@ public class SchemaExtractionProcessor implements SchemaProcessor {
             String key = entry.getKey();
             String value = entry.getValue();
             if (value != null) {
-                definedProps.put(key, value);
+                if (!key.equalsIgnoreCase(TTL)) {
+                    definedProps.put(key, value);
+                } else {
+                    if (isPhoenixTTLEnabled && Long.parseLong(value) != PHOENIX_TTL_NOT_DEFINED) {
+                        definedProps.put(key, value);
+                    }
+                }
             }
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index c02acd9379..e2817db04f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -23,6 +23,8 @@ import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_AN
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.isPhoenixTableTTLEnabled;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_PHOENIX_TTL;
 import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
@@ -1027,7 +1029,7 @@ public class ScanUtil {
     public static long getPhoenixTTL(Scan scan) {
         byte[] phoenixTTL = scan.getAttribute(BaseScannerRegionObserver.PHOENIX_TTL);
         if (phoenixTTL == null) {
-            return 0L;
+            return DEFAULT_PHOENIX_TTL;
         }
         return Bytes.toLong(phoenixTTL);
     }
@@ -1243,10 +1245,14 @@ public class ScanUtil {
     public static void setScanAttributesForPhoenixTTL(Scan scan, PTable table,
             PhoenixConnection phoenixConnection) throws SQLException {
 
-        // If server side masking for PHOENIX_TTL is not enabled OR is a SYSTEM table then return.
-        if (!ScanUtil.isServerSideMaskingEnabled(phoenixConnection) || SchemaUtil.isSystemTable(
-                SchemaUtil.getTableNameAsBytes(table.getSchemaName().getString(),
-                        table.getTableName().getString()))) {
+        // If Phoenix level TTL is not enabled OR is a system table then return.
+        if (!isPhoenixTableTTLEnabled(phoenixConnection.getQueryServices().getConfiguration())) {
+            if (SchemaUtil.isSystemTable(
+                    SchemaUtil.getTableNameAsBytes(table.getSchemaName().getString(),
+                            table.getTableName().getString()))) {
+                scan.setAttribute(BaseScannerRegionObserver.IS_PHOENIX_TTL_SCAN_TABLE_SYSTEM,
+                        Bytes.toBytes(true));
+            }
             return;
         }
 
@@ -1275,7 +1281,6 @@ public class ScanUtil {
                 return;
             }
         }
-
         if (dataTable.getPhoenixTTL() != 0) {
             byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(table);
             byte[] emptyColumnName =
@@ -1338,8 +1343,9 @@ public class ScanUtil {
             long pageSizeMs = phoenixConnection.getQueryServices().getProps()
                     .getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1);
             if (pageSizeMs == -1) {
-                // Use the half of the HBase RPC timeout value as the the server page size to make sure that the HBase
-                // region server will be able to send a heartbeat message to the client before the client times out
+                // Use the half of the HBase RPC timeout value as the server page size to make sure
+                // that the HBase region server will be able to send a heartbeat message to the
+                // client before the client times out.
                 pageSizeMs = (long) (phoenixConnection.getQueryServices().getProps()
                         .getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT) * 0.5);
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
index 3693c916ab..d826f0fdad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
@@ -954,7 +954,9 @@ public class ViewUtil {
             PTable parent, ExtendedCellBuilder extendedCellBuilder) {
         byte[] parentUpdateCacheFreqBytes = null;
         byte[] parentUseStatsForParallelizationBytes = null;
-        byte[] parentPhoenixTTLBytes = null;
+        //Commenting out phoenixTTL property to exclude.
+        //TODO:- re-enable after introducing TTL for views.
+        //byte[] parentPhoenixTTLBytes = null;
         if (parent != null) {
             parentUpdateCacheFreqBytes = new byte[PLong.INSTANCE.getByteSize()];
             PLong.INSTANCE.getCodec().encodeLong(parent.getUpdateCacheFrequency(),
@@ -963,9 +965,9 @@ public class ViewUtil {
                 parentUseStatsForParallelizationBytes =
                         PBoolean.INSTANCE.toBytes(parent.useStatsForParallelization());
             }
-            parentPhoenixTTLBytes = new byte[PLong.INSTANCE.getByteSize()];
-            PLong.INSTANCE.getCodec().encodeLong(parent.getPhoenixTTL(),
-                    parentPhoenixTTLBytes, 0);
+            //parentPhoenixTTLBytes = new byte[PLong.INSTANCE.getByteSize()];
+            //PLong.INSTANCE.getCodec().encodeLong(parent.getPhoenixTTL(),
+            //        parentPhoenixTTLBytes, 0);
         }
         for (Mutation m: tableMetaData) {
             if (m instanceof Put) {
@@ -981,12 +983,12 @@ public class ViewUtil {
                         extendedCellBuilder,
                         parentUseStatsForParallelizationBytes,
                         MetaDataEndpointImpl.VIEW_MODIFIED_PROPERTY_BYTES);
-                MetaDataUtil.conditionallyAddTagsToPutCells((Put)m,
-                        PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                        PhoenixDatabaseMetaData.PHOENIX_TTL_BYTES,
-                        extendedCellBuilder,
-                        parentPhoenixTTLBytes,
-                        MetaDataEndpointImpl.VIEW_MODIFIED_PROPERTY_BYTES);
+                //MetaDataUtil.conditionallyAddTagsToPutCells((Put)m,
+                //        PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                //        PhoenixDatabaseMetaData.PHOENIX_TTL_BYTES,
+                //        extendedCellBuilder,
+                //        parentPhoenixTTLBytes,
+                //        MetaDataEndpointImpl.VIEW_MODIFIED_PROPERTY_BYTES);
             }
 
         }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
index 5d60423adb..52e494d0e2 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
@@ -96,10 +96,10 @@ public class ConnectionQueryServicesImplTest {
     private Table mockTable;
 
     public static final TableDescriptorBuilder SYS_TASK_TDB = TableDescriptorBuilder
-        .newBuilder(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME));
+            .newBuilder(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME));
     public static final TableDescriptorBuilder SYS_TASK_TDB_SP = TableDescriptorBuilder
-        .newBuilder(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME))
-        .setRegionSplitPolicyClassName("abc");
+            .newBuilder(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME))
+            .setRegionSplitPolicyClassName("abc");
 
 
     @Before
@@ -107,18 +107,18 @@ public class ConnectionQueryServicesImplTest {
             IllegalAccessException, SQLException {
         MockitoAnnotations.initMocks(this);
         Field props = ConnectionQueryServicesImpl.class
-            .getDeclaredField("props");
+                .getDeclaredField("props");
         props.setAccessible(true);
         props.set(mockCqs, readOnlyProps);
         props = ConnectionQueryServicesImpl.class.getDeclaredField("connection");
         props.setAccessible(true);
         props.set(mockCqs, mockConn);
         when(mockCqs.checkIfSysMutexExistsAndModifyTTLIfRequired(mockAdmin))
-            .thenCallRealMethod();
+                .thenCallRealMethod();
         when(mockCqs.updateAndConfirmSplitPolicyForTask(SYS_TASK_TDB))
-            .thenCallRealMethod();
+                .thenCallRealMethod();
         when(mockCqs.updateAndConfirmSplitPolicyForTask(SYS_TASK_TDB_SP))
-            .thenCallRealMethod();
+                .thenCallRealMethod();
         when(mockCqs.getSysMutexTable()).thenCallRealMethod();
         when(mockCqs.getAdmin()).thenCallRealMethod();
         when(mockCqs.getTable(Mockito.any())).thenCallRealMethod();
@@ -277,10 +277,10 @@ public class ConnectionQueryServicesImplTest {
             fail("Split policy for SYSTEM.TASK cannot be updated");
         } catch (SQLException e) {
             assertEquals("ERROR 908 (43M19): REGION SPLIT POLICY is incorrect."
-                + " Region split policy for table TASK is expected to be "
-                + "among: [null, org.apache.phoenix.schema.SystemTaskSplitPolicy]"
-                + " , actual split policy: abc tableName=SYSTEM.TASK",
-                e.getMessage());
+                            + " Region split policy for table TASK is expected to be "
+                            + "among: [null, org.apache.phoenix.schema.SystemTaskSplitPolicy]"
+                            + " , actual split policy: abc tableName=SYSTEM.TASK",
+                    e.getMessage());
         }
     }
 
@@ -289,12 +289,12 @@ public class ConnectionQueryServicesImplTest {
         when(mockAdmin.tableExists(any())).thenReturn(true);
         when(mockConn.getAdmin()).thenReturn(mockAdmin);
         when(mockConn.getTable(TableName.valueOf("SYSTEM.MUTEX")))
-            .thenReturn(mockTable);
+                .thenReturn(mockTable);
         assertSame(mockCqs.getSysMutexTable(), mockTable);
         verify(mockAdmin, Mockito.times(1)).tableExists(any());
         verify(mockConn, Mockito.times(1)).getAdmin();
         verify(mockConn, Mockito.times(1))
-            .getTable(TableName.valueOf("SYSTEM.MUTEX"));
+                .getTable(TableName.valueOf("SYSTEM.MUTEX"));
     }
 
     @Test
@@ -302,11 +302,11 @@ public class ConnectionQueryServicesImplTest {
         when(mockAdmin.tableExists(any())).thenReturn(false);
         when(mockConn.getAdmin()).thenReturn(mockAdmin);
         when(mockConn.getTable(TableName.valueOf("SYSTEM:MUTEX")))
-          .thenReturn(mockTable);
+                .thenReturn(mockTable);
         assertSame(mockCqs.getSysMutexTable(), mockTable);
         verify(mockAdmin, Mockito.times(1)).tableExists(any());
         verify(mockConn, Mockito.times(1)).getAdmin();
         verify(mockConn, Mockito.times(1))
-          .getTable(TableName.valueOf("SYSTEM:MUTEX"));
+                .getTable(TableName.valueOf("SYSTEM:MUTEX"));
     }
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index fe0d8d1b2c..690e86bcda 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1333,10 +1333,16 @@ public class TestUtil {
         assertEquals(code.getSQLState(), se.getSQLState());
     }
 
-    public static void assertTableHasTtl(Connection conn, TableName tableName, int ttl)
+    public static void assertTableHasTtl(Connection conn, TableName tableName, int ttl, boolean phoenixTTLEnabled)
         throws SQLException, IOException {
-        ColumnFamilyDescriptor cd = getColumnDescriptor(conn, tableName);
-        Assert.assertEquals(ttl, cd.getTimeToLive());
+        long tableTTL = -1;
+        if (phoenixTTLEnabled) {
+            tableTTL = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null,
+                    tableName.getNameAsString())).getPhoenixTTL();
+        } else {
+            tableTTL = getColumnDescriptor(conn, tableName).getTimeToLive();
+        }
+        Assert.assertEquals(ttl, tableTTL);
     }
 
     public static void assertTableHasVersions(Connection conn, TableName tableName, int versions)