You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2024/03/11 20:08:19 UTC

(phoenix) branch master updated: PHOENIX-7006: Configure maxLookbackAge at table level (#1751)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 57415bed06 PHOENIX-7006: Configure maxLookbackAge at table level  (#1751)
57415bed06 is described below

commit 57415bed0660e9c2ee697f81f34a533d3536acc4
Author: sanjeet006py <36...@users.noreply.github.com>
AuthorDate: Tue Mar 12 01:38:13 2024 +0530

    PHOENIX-7006: Configure maxLookbackAge at table level  (#1751)
---
 .../org/apache/phoenix/compile/QueryCompiler.java  |  17 +-
 .../phoenix/compile/ServerBuildIndexCompiler.java  |   1 +
 .../ServerBuildTransformingTableCompiler.java      |   1 +
 .../BaseScannerRegionObserverConstants.java        |   2 +-
 .../coprocessorclient/MetaDataProtocol.java        |   2 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |   1 +
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |   3 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |  13 +-
 .../org/apache/phoenix/query/QueryConstants.java   |   2 +
 .../org/apache/phoenix/schema/DelegateTable.java   |   6 +
 .../org/apache/phoenix/schema/MetaDataClient.java  |  71 +++++-
 .../java/org/apache/phoenix/schema/PTable.java     |   5 +
 .../java/org/apache/phoenix/schema/PTableImpl.java |  28 ++-
 .../org/apache/phoenix/schema/TableProperty.java   |  24 ++
 .../phoenix/schema/transform/TransformClient.java  |   1 +
 .../java/org/apache/phoenix/util/MetaDataUtil.java |  14 ++
 .../java/org/apache/phoenix/util/ScanUtil.java     |  15 ++
 phoenix-core-client/src/main/protobuf/PTable.proto |   1 +
 .../coprocessor/BaseScannerRegionObserver.java     |  50 +++-
 .../coprocessor/GlobalIndexRegionScanner.java      |   4 +-
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  | 119 +++++++++-
 .../UngroupedAggregateRegionObserver.java          |   4 +-
 .../PhoenixServerBuildIndexInputFormat.java        |   1 +
 .../mapreduce/index/IndexScrutinyMapper.java       |   3 +-
 .../phoenix/mapreduce/index/IndexScrutinyTool.java |  31 +--
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  14 ++
 .../org/apache/phoenix/end2end/AlterTableIT.java   | 106 ++++++++-
 .../org/apache/phoenix/end2end/CreateTableIT.java  |  78 +++++++
 .../end2end/IndexRepairRegionScannerIT.java        | 111 +++++----
 .../phoenix/end2end/IndexScrutinyToolBaseIT.java   |   7 +-
 .../end2end/IndexScrutinyWithMaxLookbackIT.java    | 120 +++++++---
 .../end2end/IndexToolForNonTxGlobalIndexIT.java    |  61 +++++
 .../phoenix/end2end/MaxLookbackExtendedIT.java     |  75 +++++-
 .../org/apache/phoenix/end2end/MaxLookbackIT.java  | 251 +++++++++++++++++++++
 .../org/apache/phoenix/end2end/TableTTLIT.java     |  34 ++-
 .../it/java/org/apache/phoenix/end2end/ViewIT.java |  85 +++++++
 .../org/apache/phoenix/end2end/ViewMetadataIT.java |  34 +++
 .../phoenix/end2end/index/IndexMetadataIT.java     |  24 ++
 .../phoenix/end2end/transform/TransformToolIT.java | 144 +++++++++++-
 .../java/org/apache/phoenix/query/BaseTest.java    |  44 ++++
 .../java/org/apache/phoenix/util/TestUtil.java     |   4 +-
 41 files changed, 1461 insertions(+), 150 deletions(-)

diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 168b404379..dcd9198f7a 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -87,6 +87,8 @@ import org.apache.phoenix.util.ParseNodeUtil;
 import org.apache.phoenix.util.ParseNodeUtil.RewriteResult;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.hadoop.conf.Configuration;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
@@ -190,8 +192,19 @@ public class QueryCompiler {
         if (scn == null) {
             return;
         }
-        long maxLookBackAgeInMillis =
-            BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conn.getQueryServices().getConfiguration());
+        List<TableRef> involvedTables = resolver.getTables();
+        Long maxLookBackAgeInMillis = null;
+        for(TableRef tableRef: involvedTables) {
+            PTable table = tableRef.getTable();
+            if (maxLookBackAgeInMillis == null) {
+                maxLookBackAgeInMillis = table.getMaxLookbackAge();
+            }
+            else if (table.getMaxLookbackAge() != null) {
+                maxLookBackAgeInMillis = Long.min(maxLookBackAgeInMillis, table.getMaxLookbackAge());
+            }
+        }
+        Configuration conf = conn.getQueryServices().getConfiguration();
+        maxLookBackAgeInMillis = MetaDataUtil.getMaxLookbackAge(conf, maxLookBackAgeInMillis);
         long now = EnvironmentEdgeManager.currentTimeMillis();
         if (maxLookBackAgeInMillis > 0 && now - maxLookBackAgeInMillis > scn){
             throw new SQLExceptionInfo.Builder(
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 69699c6e48..bd8dcd480c 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -136,6 +136,7 @@ public class ServerBuildIndexCompiler {
             } else {
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
                 scan.setAttribute(BaseScannerRegionObserverConstants.REBUILD_INDEXES, TRUE_BYTES);
+                ScanUtil.setScanAttributeForMaxLookbackAge(scan, dataTable.getMaxLookbackAge());
                 ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
                 scan.setAttribute(BaseScannerRegionObserverConstants.INDEX_REBUILD_PAGING, TRUE_BYTES);
                 // Serialize page row size only if we're overriding, else use server side value
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java
index c1ed37b067..71f50beef6 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java
@@ -73,6 +73,7 @@ public class ServerBuildTransformingTableCompiler extends ServerBuildIndexCompil
             ScanUtil.annotateScanWithMetadataAttributes(dataTable, scan);
             scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
             scan.setAttribute(BaseScannerRegionObserverConstants.REBUILD_INDEXES, TRUE_BYTES);
+            ScanUtil.setScanAttributeForMaxLookbackAge(scan, dataTable.getMaxLookbackAge());
             ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
             scan.setAttribute(BaseScannerRegionObserverConstants.INDEX_REBUILD_PAGING, TRUE_BYTES);
             // Serialize page row size only if we're overriding, else use server side value
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
index f0fac9c278..9446d372a3 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
@@ -136,6 +136,7 @@ public class BaseScannerRegionObserverConstants {
     public static final String INDEX_ROW_KEY = "_IndexRowKey";
     public static final String READ_REPAIR_TRANSFORMING_TABLE = "_ReadRepairTransformingTable";
 
+    public static final String MAX_LOOKBACK_AGE = "MAX_LOOKBACK_AGE";
     /**
      * The scan attribute to provide the scan start rowkey for analyze table queries.
      */
@@ -165,7 +166,6 @@ public class BaseScannerRegionObserverConstants {
      */
     public static final String SCAN_SERVER_RETURN_VALID_ROW_KEY = "_ScanServerValidRowKey";
 
-
     public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
     public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);
     // In case of Index Write failure, we need to determine that Index mutation
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/MetaDataProtocol.java b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/MetaDataProtocol.java
index 60a21a35e5..d28f76ad81 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/MetaDataProtocol.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/MetaDataProtocol.java
@@ -100,7 +100,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = MIN_TABLE_TIMESTAMP + 33;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_1_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = MIN_TABLE_TIMESTAMP + 38;
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 = MIN_TABLE_TIMESTAMP + 39;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 = MIN_TABLE_TIMESTAMP + 40;
     // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0;
 
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index c3596b4f11..5e76e47e53 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -359,6 +359,7 @@ public enum SQLExceptionCode {
             + "only if none of the child views extends primary key"),
     VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES(10956, "44A38", "View can extend parent primary key"
             + " only if none of the parents have indexes in the parent hierarchy"),
+    MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY(10957, "44A39", "Max lookback age can only be set for tables"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index a511c990b2..3216656fbf 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -75,6 +75,7 @@ import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 
@@ -432,6 +433,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
 
     public static final String SYSTEM_TRANSFORM_TABLE = "TRANSFORM";
     public static final String SYSTEM_TRANSFORM_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_TRANSFORM_TABLE);
+    public static final String MAX_LOOKBACK_AGE = BaseScannerRegionObserverConstants.MAX_LOOKBACK_AGE;
+    public static final byte[] MAX_LOOKBACK_AGE_BYTES = Bytes.toBytes(MAX_LOOKBACK_AGE);
 
     //SYSTEM:LOG
     public static final String SYSTEM_LOG_TABLE = "LOG";
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 83e38dd0ab..00ea859f5b 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4127,29 +4127,32 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0) {
             metaConnection =
                 addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                    MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 4,
+                    MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 5,
                     PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME + " "
                         + PVarchar.INSTANCE.getSqlTypeName());
             metaConnection =
                 addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                    MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 3,
+                    MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 4,
                     PhoenixDatabaseMetaData.SCHEMA_VERSION + " "
                         + PVarchar.INSTANCE.getSqlTypeName());
             metaConnection =
                 addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                    MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 2,
+                    MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 3,
                     PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID + " "
                         + PVarchar.INSTANCE.getSqlTypeName());
             metaConnection =
                 addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                    MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 1,
+                    MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 2,
                     PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME + " "
                         + PVarchar.INSTANCE.getSqlTypeName());
             metaConnection =
                 addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                    MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0,
+                    MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 1,
                     PhoenixDatabaseMetaData.INDEX_WHERE + " "
                         + PVarchar.INSTANCE.getSqlTypeName());
+            metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                    MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0,
+                    PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE + " " + PLong.INSTANCE.getSqlTypeName());
             UpgradeUtil.bootstrapLastDDLTimestampForIndexes(metaConnection);
         }
         return metaConnection;
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index a75cc7e117..32b2f4e26f 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -175,6 +175,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE;
 
 /**
  *
@@ -372,6 +373,7 @@ public interface QueryConstants {
             EXTERNAL_SCHEMA_ID + " VARCHAR, \n" +
             STREAMING_TOPIC_NAME + " VARCHAR, \n" +
             INDEX_WHERE + " VARCHAR, \n" +
+            MAX_LOOKBACK_AGE + " BIGINT, \n" +
             // Column metadata (will be null for table row)
             DATA_TYPE + " INTEGER," +
             COLUMN_SIZE + " INTEGER," +
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 1a5d83e728..662206f419 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -424,6 +424,12 @@ public class DelegateTable implements PTable {
             throws SQLException {
         return delegate.getIndexWhereColumns(connection);
     }
+
+    @Override
+    public Long getMaxLookbackAge() {
+        return delegate.getMaxLookbackAge();
+    }
+
     @Override public Map<String, String> getPropertyValues() { return delegate.getPropertyValues(); }
 
     @Override public Map<String, String> getDefaultPropertyValues() { return delegate.getDefaultPropertyValues(); }
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index ea738bfe55..0e78202c43 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -99,6 +99,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE;
 import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
 import static org.apache.phoenix.query.QueryServices.DEFAULT_DISABLE_VIEW_SUBTREE_VALIDATION;
 import static org.apache.phoenix.query.QueryServices.DISABLE_VIEW_SUBTREE_VALIDATION;
@@ -155,6 +156,7 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.HashSet;
+import java.util.Objects;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Table;
@@ -353,9 +355,10 @@ public class MetaDataClient {
                     PHYSICAL_TABLE_NAME + "," +
                     SCHEMA_VERSION + "," +
                     STREAMING_TOPIC_NAME + "," +
-                    INDEX_WHERE +
+                    INDEX_WHERE + "," +
+                    MAX_LOOKBACK_AGE +
                     ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " +
-                "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+                "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
     private static final String CREATE_SCHEMA = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE
             + "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)";
@@ -2111,6 +2114,16 @@ public class MetaDataClient {
 
             String schemaVersion = (String) TableProperty.SCHEMA_VERSION.getValue(tableProps);
             String streamingTopicName = (String) TableProperty.STREAMING_TOPIC_NAME.getValue(tableProps);
+            Long maxLookbackAge = (Long) TableProperty.MAX_LOOKBACK_AGE.getValue(tableProps);
+
+            if (maxLookbackAge != null && tableType != TABLE) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.
+                        MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY)
+                        .setSchemaName(schemaName)
+                        .setTableName(tableName)
+                        .build()
+                        .buildException();
+            }
 
             if (parent != null && tableType == PTableType.INDEX) {
                 timestamp = TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider);
@@ -3153,6 +3166,12 @@ public class MetaDataClient {
             } else {
                 tableUpsert.setNull(36, Types.VARCHAR);
             }
+            if (maxLookbackAge == null) {
+                tableUpsert.setNull(37, Types.BIGINT);
+            }
+            else {
+                tableUpsert.setLong(37, maxLookbackAge);
+            }
             tableUpsert.execute();
 
             if (asyncCreatedDate != null) {
@@ -3302,6 +3321,7 @@ public class MetaDataClient {
                         .setStreamingTopicName(streamingTopicName)
                         .setIndexWhere(statement.getWhereClause() == null ? null
                                 : statement.getWhereClause().toString())
+                        .setMaxLookbackAge(maxLookbackAge)
                         .build();
                 result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
                 addTableToCache(result);
@@ -3759,7 +3779,8 @@ public class MetaDataClient {
                 metaPropertiesEvaluated.getPhysicalTableName(),
                 metaPropertiesEvaluated.getSchemaVersion(),
                 metaPropertiesEvaluated.getColumnEncodedBytes(),
-                metaPropertiesEvaluated.getStreamingTopicName());
+                metaPropertiesEvaluated.getStreamingTopicName(),
+                metaPropertiesEvaluated.getMaxLookbackAge());
     }
 
     private  long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional,
@@ -3767,7 +3788,7 @@ public class MetaDataClient {
                                        String schemaVersion, QualifierEncodingScheme columnEncodedBytes) throws SQLException {
         return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, null,
             updateCacheFrequency, null, null, null, null, -1L, null, null, null,phoenixTTL, false, physicalTableName,
-                schemaVersion, columnEncodedBytes, null);
+                schemaVersion, columnEncodedBytes, null, null);
     }
 
     private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta,
@@ -3776,7 +3797,7 @@ public class MetaDataClient {
             Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, Boolean appendOnlySchema,
             ImmutableStorageScheme immutableStorageScheme, Boolean useStatsForParallelization,
             Long phoenixTTL, Boolean isChangeDetectionEnabled, String physicalTableName, String schemaVersion,
-                                      QualifierEncodingScheme columnEncodedBytes, String streamingTopicName)
+                                      QualifierEncodingScheme columnEncodedBytes, String streamingTopicName, Long maxLookbackAge)
             throws SQLException {
         String schemaName = table.getSchemaName().getString();
         String tableName = table.getTableName().getString();
@@ -3847,6 +3868,9 @@ public class MetaDataClient {
         if (!Strings.isNullOrEmpty(streamingTopicName)) {
             mutateStringProperty(connection, tenantId, schemaName, tableName, STREAMING_TOPIC_NAME, streamingTopicName);
         }
+        if (maxLookbackAge != null) {
+            mutateLongProperty(connection, tenantId, schemaName, tableName, MAX_LOOKBACK_AGE, maxLookbackAge);
+        }
         return seqNum;
     }
 
@@ -5356,6 +5380,8 @@ public class MetaDataClient {
                         metaProperties.setSchemaVersion((String) value);
                     } else if (propName.equalsIgnoreCase(STREAMING_TOPIC_NAME)) {
                         metaProperties.setStreamingTopicName((String) value);
+                    } else if (propName.equalsIgnoreCase(MAX_LOOKBACK_AGE)) {
+                        metaProperties.setMaxLookbackAge((Long) value);
                     }
                 }
                 // if removeTableProps is true only add the property if it is not an HTable or Phoenix Table property
@@ -5564,6 +5590,21 @@ public class MetaDataClient {
             }
         }
 
+        if (metaProperties.getMaxLookbackAge() != null) {
+            if (table.getType() != TABLE) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.
+                        MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY)
+                        .setSchemaName(schemaName)
+                        .setTableName(tableName)
+                        .build()
+                        .buildException();
+            }
+            if (! Objects.equals(metaProperties.getMaxLookbackAge(), table.getMaxLookbackAge())) {
+                metaPropertiesEvaluated.setMaxLookbackAge(metaProperties.getMaxLookbackAge());
+                changingPhoenixTableProperty = true;
+            }
+        }
+
         return changingPhoenixTableProperty;
     }
 
@@ -5588,6 +5629,8 @@ public class MetaDataClient {
         private String schemaVersion = null;
         private String streamingTopicName = null;
 
+        private Long maxLookbackAge = null;
+
         public Boolean getImmutableRowsProp() {
             return isImmutableRowsProp;
         }
@@ -5735,6 +5778,14 @@ public class MetaDataClient {
         public void setStreamingTopicName(String streamingTopicName) {
             this.streamingTopicName = streamingTopicName;
         }
+
+        public Long getMaxLookbackAge() {
+            return maxLookbackAge;
+        }
+
+        public void setMaxLookbackAge(Long maxLookbackAge) {
+            this.maxLookbackAge = maxLookbackAge;
+        }
     }
 
     private static class MetaPropertiesEvaluated {
@@ -5756,6 +5807,8 @@ public class MetaDataClient {
         private String schemaVersion = null;
         private String streamingTopicName = null;
 
+        private Long maxLookbackAge = null;
+
         public Boolean getIsImmutableRows() {
             return isImmutableRows;
         }
@@ -5884,6 +5937,14 @@ public class MetaDataClient {
         public void setStreamingTopicName(String streamingTopicName) {
             this.streamingTopicName = streamingTopicName;
         }
+
+        public Long getMaxLookbackAge() {
+            return maxLookbackAge;
+        }
+
+        public void setMaxLookbackAge(Long maxLookbackAge) {
+            this.maxLookbackAge = maxLookbackAge;
+        }
     }
 
 
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
index 6fd89ad4bb..9407c5f86c 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -997,6 +997,11 @@ public interface PTable extends PMetaDataEntity {
      * @throws SQLException
      */
     Set<ColumnReference> getIndexWhereColumns(PhoenixConnection connection) throws SQLException;
+
+    /**
+     * Returns: Table level max lookback age if configured else null.
+     */
+    Long getMaxLookbackAge();
     /**
      * Class to help track encoded column qualifier counters per column family.
      */
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index b5c2eee2e7..128bca0d7f 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -30,6 +30,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
@@ -218,6 +219,7 @@ public class PTableImpl implements PTable {
     private String indexWhere;
     private Expression indexWhereExpression;
     private Set<ColumnReference> indexWhereColumns;
+    private Long maxLookbackAge;
 
     public static class Builder {
         private PTableKey key;
@@ -284,6 +286,7 @@ public class PTableImpl implements PTable {
         private String externalSchemaId;
         private String streamingTopicName;
         private String indexWhere;
+        private Long maxLookbackAge;
 
         // Used to denote which properties a view has explicitly modified
         private BitSet viewModifiedPropSet = new BitSet(3);
@@ -711,6 +714,14 @@ public class PTableImpl implements PTable {
             return this;
         }
 
+        public Builder setMaxLookbackAge(Long maxLookbackAge) {
+            if (maxLookbackAge != null) {
+                propertyValues.put(MAX_LOOKBACK_AGE, String.valueOf(maxLookbackAge));
+            }
+            this.maxLookbackAge = maxLookbackAge;
+            return this;
+        }
+
         /**
          * Populate derivable attributes of the PTable
          * @return PTableImpl.Builder object
@@ -1002,6 +1013,7 @@ public class PTableImpl implements PTable {
         this.externalSchemaId = builder.externalSchemaId;
         this.streamingTopicName = builder.streamingTopicName;
         this.indexWhere = builder.indexWhere;
+        this.maxLookbackAge = builder.maxLookbackAge;
     }
 
     // When cloning table, ignore the salt column as it will be added back in the constructor
@@ -1082,7 +1094,8 @@ public class PTableImpl implements PTable {
                 .setSchemaVersion(table.getSchemaVersion())
                 .setExternalSchemaId(table.getExternalSchemaId())
                 .setStreamingTopicName(table.getStreamingTopicName())
-                .setIndexWhere(table.getIndexWhere());
+                .setIndexWhere(table.getIndexWhere())
+                .setMaxLookbackAge(table.getMaxLookbackAge());
     }
 
     @Override
@@ -2028,6 +2041,10 @@ public class PTableImpl implements PTable {
             indexWhere =
                     (String) PVarchar.INSTANCE.toObject(table.getIndexWhere().toByteArray());
         }
+        Long maxLookbackAge = null;
+        if (table.hasMaxLookbackAge()) {
+            maxLookbackAge = table.getMaxLookbackAge();
+        }
         try {
             return new PTableImpl.Builder()
                     .setType(tableType)
@@ -2086,6 +2103,7 @@ public class PTableImpl implements PTable {
                     .setExternalSchemaId(externalSchemaId)
                     .setStreamingTopicName(streamingTopicName)
                     .setIndexWhere(indexWhere)
+                    .setMaxLookbackAge(maxLookbackAge)
                     .build();
         } catch (SQLException e) {
             throw new RuntimeException(e); // Impossible
@@ -2229,6 +2247,9 @@ public class PTableImpl implements PTable {
             builder.setIndexWhere(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(
                     table.getIndexWhere())));
         }
+        if (table.getMaxLookbackAge() != null) {
+            builder.setMaxLookbackAge(table.getMaxLookbackAge());
+        }
         return builder.build();
     }
 
@@ -2376,6 +2397,11 @@ public class PTableImpl implements PTable {
         return indexWhere;
     }
 
+    @Override
+    public Long getMaxLookbackAge() {
+        return maxLookbackAge;
+    }
+
     private void buildIndexWhereExpression(PhoenixConnection connection) throws SQLException {
         PhoenixPreparedStatement
                 pstmt =
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 24d0432543..f3ffea9221 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -37,6 +37,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.schema.types.PLong;
 
 public enum TableProperty {
 
@@ -336,6 +337,29 @@ public enum TableProperty {
         @Override public Object getPTableValue(PTable table) {
             return table.getStreamingTopicName();
         }
+    },
+
+    MAX_LOOKBACK_AGE(PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE, true, false, false) {
+        @Override
+        public Object getValue(Object value) {
+            if (value == null) {
+                return null;
+            }
+            else if (value instanceof Integer) {
+                return Long.valueOf((Integer) value);
+            }
+            else if (value instanceof Long) {
+                return value;
+            }
+            else {
+                throw new IllegalArgumentException("Table level MAX_LOOKBACK_AGE should be a " + PLong.INSTANCE.getSqlTypeName() + " value in milli-seconds");
+            }
+        }
+
+        @Override
+        public Object getPTableValue(PTable table) {
+            return table.getMaxLookbackAge();
+        }
     };
 
     private final String propertyName;
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformClient.java
index 760ff5923d..6649882a0c 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformClient.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformClient.java
@@ -272,6 +272,7 @@ public class TransformClient {
                 .setSchemaVersion(table.getSchemaVersion())
                 .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
                 .setStreamingTopicName(table.getStreamingTopicName())
+                .setMaxLookbackAge(table.getMaxLookbackAge())
                 // Transformables
                 .setImmutableStorageScheme(
                         (changedProps.getImmutableStorageSchemeProp() != null? changedProps.getImmutableStorageSchemeProp():table.getImmutableStorageScheme()))
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index 96a40ac9c4..2fa3ac109f 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -25,6 +25,7 @@ import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.*;
 
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PColumn;
@@ -82,6 +83,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Iterables;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 
 public class MetaDataUtil {
     private static final Logger LOGGER = LoggerFactory.getLogger(MetaDataUtil.class);
@@ -1182,4 +1184,16 @@ public class MetaDataUtil {
             connection.setAutoCommit(isAutoCommit);
         }
     }
+
+    /**
+     * @param conf Cluster configuration
+     * @param maxLookbackAge Input max lookback age
+     * @return Input max lookback age, if not null. If null, fallback to cluster level
+     * max lookback age. Will always return non-null long value.
+     */
+    public static long getMaxLookbackAge(Configuration conf, Long maxLookbackAge) {
+        Preconditions.checkNotNull(conf);
+        return maxLookbackAge != null ? maxLookbackAge :
+                BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf);
+    }
 }
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 0718066d9f..dd476653b2 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -108,6 +108,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Iterators;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 
 /**
  * 
@@ -1630,6 +1631,20 @@ public class ScanUtil {
         return null;
     }
 
+    public static void setScanAttributeForMaxLookbackAge(Scan scan, Long maxLookbackAge) {
+        Preconditions.checkNotNull(scan);
+        if (maxLookbackAge != null) {
+            scan.setAttribute(BaseScannerRegionObserverConstants.MAX_LOOKBACK_AGE,
+                    Bytes.toBytes(maxLookbackAge));
+        }
+    }
+
+    public static Long getMaxLookbackAgeFromScanAttribute(Scan scan) {
+        Preconditions.checkNotNull(scan);
+        byte[] maxLookbackAge = scan.getAttribute(BaseScannerRegionObserverConstants.MAX_LOOKBACK_AGE);
+        return maxLookbackAge != null ? Bytes.toLong(maxLookbackAge) : null;
+    }
+
     /**
      * Verify whether the given row key is in the scan boundaries i.e. scan start and end keys.
      *
diff --git a/phoenix-core-client/src/main/protobuf/PTable.proto b/phoenix-core-client/src/main/protobuf/PTable.proto
index 88617e3636..5159bafaed 100644
--- a/phoenix-core-client/src/main/protobuf/PTable.proto
+++ b/phoenix-core-client/src/main/protobuf/PTable.proto
@@ -118,6 +118,7 @@ message PTable {
   optional PTable transformingNewTable=51;
   optional bytes streamingTopicName=52;
   optional bytes indexWhere=53;
+  optional int64 maxLookbackAge = 54;
 }
 
 message EncodedCQCounter {
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index ab52dd69d4..72f92e8b95 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
+import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.MemoryCompactionPolicy;
 import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -55,11 +57,16 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
 import org.apache.phoenix.iterate.RegionScannerFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.ClientUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.schema.PTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -366,9 +373,10 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
             setScanOptionsForFlushesAndCompactions(options);
             return;
         }
-        if (isMaxLookbackTimeEnabled(conf)) {
+        long maxLookbackAge = getMaxLookbackAge(c);
+        if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
             setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, store,
-                    scanType);
+                    scanType, maxLookbackAge);
         }
     }
 
@@ -380,9 +388,10 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
             setScanOptionsForFlushesAndCompactions(options);
             return;
         }
-        if (isMaxLookbackTimeEnabled(conf)) {
+        long maxLookbackAge = getMaxLookbackAge(c);
+        if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
             setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, store,
-                    ScanType.COMPACT_RETAIN_DELETES);
+                    ScanType.COMPACT_RETAIN_DELETES, maxLookbackAge);
         }
     }
 
@@ -395,7 +404,8 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
             setScanOptionsForFlushesAndCompactions(options);
             return;
         }
-        if (isMaxLookbackTimeEnabled(conf)) {
+        long maxLookbackAge = getMaxLookbackAge(c);
+        if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
             MemoryCompactionPolicy inMemPolicy =
                     store.getColumnFamilyDescriptor().getInMemoryCompaction();
             ScanType scanType;
@@ -408,7 +418,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
                 scanType = ScanType.COMPACT_RETAIN_DELETES;
             }
             setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, store,
-                    scanType);
+                    scanType, maxLookbackAge);
         }
     }
 
@@ -486,10 +496,9 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
      */
     public long getTimeToLiveForCompactions(Configuration conf,
             ColumnFamilyDescriptor columnDescriptor,
-            ScanOptions options) {
+            ScanOptions options, long maxLookbackTtl) {
         long ttlConfigured = columnDescriptor.getTimeToLive();
         long ttlInMillis = ttlConfigured * 1000;
-        long maxLookbackTtl = BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf);
         if (isMaxLookbackTimeEnabled(maxLookbackTtl)) {
             if (ttlConfigured == HConstants.FOREVER
                     && columnDescriptor.getKeepDeletedCells() != KeepDeletedCells.TRUE) {
@@ -509,10 +518,10 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
     public void setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(Configuration conf,
             ScanOptions options,
             final Store store,
-            ScanType type) {
+            ScanType type, long maxLookbackAge) {
         ColumnFamilyDescriptor cfDescriptor = store.getColumnFamilyDescriptor();
         options.setTTL(getTimeToLiveForCompactions(conf, cfDescriptor,
-                options));
+                options, maxLookbackAge));
         options.setKeepDeletedCells(getKeepDeletedCells(options, type));
         options.setMaxVersions(Integer.MAX_VALUE);
         options.setMinVersions(getMinVersions(options, cfDescriptor));
@@ -527,6 +536,27 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
         return maxLookbackTime > 0L;
     }
 
+    private static long getMaxLookbackAge(ObserverContext<RegionCoprocessorEnvironment> c) {
+        TableName tableName = c.getEnvironment().getRegion().getRegionInfo().getTable();
+        String fullTableName = tableName.getNameAsString();
+        Configuration conf = c.getEnvironment().getConfiguration();
+        PTable table;
+        try(PhoenixConnection conn = QueryUtil.getConnectionOnServer(
+                conf).unwrap(PhoenixConnection.class)) {
+            table = conn.getTableNoCache(fullTableName);
+        }
+        catch (SQLException e) {
+            if (e instanceof TableNotFoundException) {
+                LOGGER.debug("Ignoring HBase table that is not a Phoenix table: {}", fullTableName);
+                // non-Phoenix HBase tables won't be found, do nothing
+            } else {
+                LOGGER.error("Unable to fetch table level max lookback age for {}", fullTableName, e);
+            }
+            return MetaDataUtil.getMaxLookbackAge(conf, null);
+        }
+        return MetaDataUtil.getMaxLookbackAge(conf, table.getMaxLookbackAge());
+    }
+
     public static boolean isPhoenixTableTTLEnabled(Configuration conf) {
         return conf.getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
                 QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 5af9950aa6..cf39407334 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -74,6 +74,8 @@ import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.ScanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -237,7 +239,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
         }
         // Create the following objects only for rebuilds by IndexTool
         hTableFactory = IndexWriterUtils.getDefaultDelegateHTableFactory(env);
-        maxLookBackInMills = BaseScannerRegionObserverConstants.getMaxLookbackInMillis(config);
+        maxLookBackInMills = MetaDataUtil.getMaxLookbackAge(config, ScanUtil.getMaxLookbackAgeFromScanAttribute(scan));
         rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
                 DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
 
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 4c60dccb26..bdba476a90 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -57,6 +57,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT_BYTES
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_HWM_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
@@ -83,6 +84,9 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE_BYTES;
+import static org.apache.phoenix.schema.PTable.LinkType.PHYSICAL_TABLE;
+import static org.apache.phoenix.schema.PTable.LinkType.VIEW_INDEX_PARENT_TABLE;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
@@ -109,6 +113,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.Objects;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -130,6 +135,7 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
@@ -366,6 +372,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
     private static final Cell INDEX_WHERE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
             TABLE_FAMILY_BYTES, INDEX_WHERE_BYTES);
 
+    private static final Cell MAX_LOOKBACK_AGE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MAX_LOOKBACK_AGE_BYTES);
+
     private static final List<Cell> TABLE_KV_COLUMNS = Lists.newArrayList(
             EMPTY_KEYVALUE_KV,
             TABLE_TYPE_KV,
@@ -405,7 +413,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             SCHEMA_VERSION_KV,
             EXTERNAL_SCHEMA_ID_KV,
             STREAMING_TOPIC_NAME_KV,
-            INDEX_WHERE_KV
+            INDEX_WHERE_KV,
+            MAX_LOOKBACK_AGE_KV
     );
 
     static {
@@ -455,6 +464,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         TABLE_KV_COLUMNS.indexOf(STREAMING_TOPIC_NAME_KV);
     private static final int INDEX_WHERE_INDEX =
             TABLE_KV_COLUMNS.indexOf(INDEX_WHERE_KV);
+
+    private static final int MAX_LOOKBACK_AGE_INDEX = TABLE_KV_COLUMNS.indexOf(MAX_LOOKBACK_AGE_KV);
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
     private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
@@ -1430,6 +1441,17 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 : null;
         builder.setIndexWhere(indexWhere != null ? indexWhere
                 : oldTable != null ? oldTable.getIndexWhere() : null);
+
+        Cell maxLookbackAgeKv = tableKeyValues[MAX_LOOKBACK_AGE_INDEX];
+        Long maxLookbackAge = maxLookbackAgeKv == null ? null :
+                PLong.INSTANCE.getCodec().decodeLong(maxLookbackAgeKv.getValueArray(),
+                        maxLookbackAgeKv.getValueOffset(), SortOrder.getDefault());
+        if (tableType == PTableType.VIEW) {
+            byte[] viewKey = SchemaUtil.getTableKey(tenantId == null ? null : tenantId.getBytes(),
+                    schemaName == null ? null : schemaName.getBytes(), tableNameBytes);
+            maxLookbackAge = scanMaxLookbackAgeFromParent(viewKey, clientTimeStamp);
+        }
+
         // Check the cell tag to see whether the view has modified this property
         final byte[] tagUseStatsForParallelization = (useStatsForParallelizationKv == null) ?
                 HConstants.EMPTY_BYTE_ARRAY :
@@ -1463,6 +1485,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
 
         PTable transformingNewTable = null;
         boolean isRegularView = (tableType == PTableType.VIEW && viewType != ViewType.MAPPED);
+        boolean isViewIndex = false;
         for (List<Cell> columnCellList : allColumnCellList) {
 
             Cell colKv = columnCellList.get(LINK_TYPE_INDEX);
@@ -1546,6 +1569,12 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                         }
                     }
                 }
+                else if (linkType == VIEW_INDEX_PARENT_TABLE) {
+                    isViewIndex = true;
+                    byte[] indexKey = SchemaUtil.getTableKey(tenantId == null ? null : tenantId.getBytes(),
+                            schemaName == null ? null : schemaName.getBytes(), tableNameBytes);
+                    maxLookbackAge = scanMaxLookbackAgeFromParent(indexKey, clientTimeStamp);
+                }
             } else {
                 long columnTimestamp =
                     columnCellList.get(0).getTimestamp() != HConstants.LATEST_TIMESTAMP ?
@@ -1557,6 +1586,13 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     isSalted, baseColumnCount, isRegularView, columnTimestamp);
             }
         }
+        if (tableType == INDEX && ! isViewIndex) {
+            byte[] tableKey = SchemaUtil.getTableKey(tenantId == null ? null : tenantId.getBytes(),
+                    parentSchemaName == null ? null : parentSchemaName.getBytes(), parentTableName.getBytes());
+            maxLookbackAge = scanMaxLookbackAgeFromParent(tableKey, clientTimeStamp);
+        }
+        builder.setMaxLookbackAge(maxLookbackAge != null ? maxLookbackAge :
+                (oldTable != null ? oldTable.getMaxLookbackAge() : null));
         builder.setEncodedCQCounter(cqCounter);
 
         builder.setIndexes(indexes != null ? indexes : oldTable != null
@@ -1587,6 +1623,57 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         return builder.build();
     }
 
+    private Long scanMaxLookbackAgeFromParent(byte[] key, long clientTimeStamp) throws IOException {
+        Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
+        try(Table sysCat = ServerUtil.getHTableForCoprocessorScan(this.env,
+                SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
+            ResultScanner scanner = sysCat.getScanner(scan)) {
+            Result result = scanner.next();
+            boolean startCheckingForLink = false;
+            byte[] parentTableKey = null;
+            do {
+                if (result == null) {
+                    return null;
+                }
+                else if (startCheckingForLink) {
+                    byte[] linkTypeBytes = result.getValue(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
+                    if (linkTypeBytes != null) {
+                        LinkType linkType = LinkType.fromSerializedValue(linkTypeBytes[0]);
+                        int rowKeyColMetadataLength = 5;
+                        byte[][] rowKeyMetaData = new byte[rowKeyColMetadataLength][];
+                        getVarChars(result.getRow(), rowKeyColMetadataLength, rowKeyMetaData);
+                        if (linkType == VIEW_INDEX_PARENT_TABLE) {
+                            parentTableKey = getParentTableKeyFromChildRowKeyMetaData(rowKeyMetaData);
+                            return scanMaxLookbackAgeFromParent(parentTableKey, clientTimeStamp);
+                        }
+                        else if (linkType == PHYSICAL_TABLE) {
+                            parentTableKey = getParentTableKeyFromChildRowKeyMetaData(rowKeyMetaData);
+                        }
+                    }
+                }
+                else {
+                    byte[] maxLookbackAgeInBytes = result.getValue(TABLE_FAMILY_BYTES, MAX_LOOKBACK_AGE_BYTES);
+                    if (maxLookbackAgeInBytes != null) {
+                        return PLong.INSTANCE.getCodec().decodeLong(maxLookbackAgeInBytes, 0, SortOrder.getDefault());
+                    }
+                }
+                result = scanner.next();
+                startCheckingForLink = true;
+            } while (result != null);
+            return parentTableKey == null ? null : scanMaxLookbackAgeFromParent(parentTableKey, clientTimeStamp);
+        }
+    }
+
+    private byte[] getParentTableKeyFromChildRowKeyMetaData(byte[][] rowKeyMetaData) {
+        byte[] parentTenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+        String parentSchema = SchemaUtil.getSchemaNameFromFullName(
+                rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
+        byte[] parentSchemaName = parentSchema != null ? parentSchema.getBytes(StandardCharsets.UTF_8) : null;
+        byte[] parentTableName = SchemaUtil.getTableNameFromFullName(
+                        rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]).getBytes(StandardCharsets.UTF_8);
+        return SchemaUtil.getTableKey(parentTenantId, parentSchemaName, parentTableName);
+    }
+
     private Long getViewIndexId(Cell[] tableKeyValues, PDataType viewIndexIdType) {
         Cell viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
         return viewIndexIdKv == null ? null :
@@ -3389,8 +3476,12 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                         && result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                     return result;
                 } else {
+                    PTable oldTable = table;
                     table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP,
                             clientVersion);
+                    if (table != null && hasInheritableTablePropertyChanged(table, oldTable)) {
+                        invalidateAllChildTablesAndIndexes(table, childViews);
+                    }
                     if (clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG && type == PTableType.VIEW) {
                         try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(
                                 env.getConfiguration()).unwrap(PhoenixConnection.class)) {
@@ -3422,6 +3513,32 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         }
     }
 
+    private boolean hasInheritableTablePropertyChanged(PTable newTable, PTable oldTable) {
+        return ! Objects.equals(newTable.getMaxLookbackAge(), oldTable.getMaxLookbackAge());
+    }
+
+    private void invalidateAllChildTablesAndIndexes(PTable table, List<PTable> childViews) {
+        List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
+        if (table.getIndexes() != null) {
+            for(PTable index: table.getIndexes()) {
+                invalidateList.add(new ImmutableBytesPtr(SchemaUtil.getTableKey(index)));
+            }
+        }
+        for(PTable childView: childViews) {
+            invalidateList.add(new ImmutableBytesPtr(SchemaUtil.getTableKey(childView)));
+            if (childView.getIndexes() != null) {
+                for(PTable viewIndex: childView.getIndexes()) {
+                    invalidateList.add(new ImmutableBytesPtr(SchemaUtil.getTableKey(viewIndex)));
+                }
+            }
+        }
+        Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
+                GlobalCache.getInstance(this.env).getMetaDataCache();
+        for(ImmutableBytesPtr invalidateKey: invalidateList) {
+            metaDataCache.invalidate(invalidateKey);
+        }
+    }
+
     /**
      * Removes the table from the server side cache
      */
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 1eff655e41..c07e1e25c8 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -118,6 +118,7 @@ import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -640,7 +641,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                         if (table != null && !isDisabled && isPhoenixTableTTLEnabled) {
                             internalScanner =
                                     new CompactionScanner(c.getEnvironment(), store, scanner,
-                                            BaseScannerRegionObserverConstants.getMaxLookbackInMillis(c.getEnvironment().getConfiguration()),
+                                            MetaDataUtil.getMaxLookbackAge(
+                                                    c.getEnvironment().getConfiguration(), table.getMaxLookbackAge()),
                                             SchemaUtil.getEmptyColumnFamily(table),
                                             table.getEncodingScheme()
                                                     == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 4c5ea62c7b..2492394494 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -131,6 +131,7 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD,
                         ByteUtil.copyKeyBytesIfNecessary(ptr));
                 scan.setAttribute(BaseScannerRegionObserverConstants.REBUILD_INDEXES, TRUE_BYTES);
+                ScanUtil.setScanAttributeForMaxLookbackAge(scan, pIndexTable.getMaxLookbackAge());
                 ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
             }
             return plan;
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index 30db799e23..365ff9c59f 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -170,7 +170,8 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
             LOGGER.info("Target table base query: " + targetTableQuery);
             md5 = MessageDigest.getInstance("MD5");
             ttl = getTableTtl();
-            maxLookbackAgeMillis = BaseScannerRegionObserverConstants.getMaxLookbackInMillis(configuration);
+            maxLookbackAgeMillis = MetaDataUtil.getMaxLookbackAge(configuration,
+                    PhoenixConfigurationUtil.getMaxLookbackAge(configuration));
         } catch (SQLException | NoSuchAlgorithmException e) {
             tryClosingResourceSilently(this.outputUpsertStmt);
             tryClosingResourceSilently(this.connection);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
index 2dbbc2f997..cfd0adf880 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
@@ -59,6 +59,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -271,6 +272,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
 
             // set CURRENT_SCN for our scan so that incoming writes don't throw off scrutiny
             configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, Long.toString(ts));
+            PhoenixConfigurationUtil.setMaxLookbackAge(configuration, pdataTable.getMaxLookbackAge());
 
             // set the source table to either data or index table
             SourceTargetColumnNames columnNames =
@@ -420,8 +422,6 @@ public class IndexScrutinyTool extends Configured implements Tool {
                             ? Long.parseLong(cmdLine.getOptionValue(TIMESTAMP.getOpt()))
                             : EnvironmentEdgeManager.currentTimeMillis() - 60000;
 
-            validateTimestamp(configuration, ts);
-
             if (indexTable != null) {
                 if (!IndexTool.isValidIndexTable(connection, qDataTable, indexTable, tenantId)) {
                     throw new IllegalArgumentException(String
@@ -429,6 +429,9 @@ public class IndexScrutinyTool extends Configured implements Tool {
                 }
             }
 
+            PTable pDataTable = connection.unwrap(PhoenixConnection.class).getTable(qDataTable);
+            validateTimestamp(configuration, ts, pDataTable.getMaxLookbackAge());
+
             String outputFormatOption = cmdLine.getOptionValue(OUTPUT_FORMAT_OPTION.getOpt());
             OutputFormat outputFormat =
                     outputFormatOption != null
@@ -445,13 +448,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
                 Configuration outputConfiguration = HBaseConfiguration.create(configuration);
                 outputConfiguration.unset(PhoenixRuntime.TENANT_ID_ATTRIB);
                 try (Connection outputConn = ConnectionUtil.getOutputConnection(outputConfiguration)) {
-                    outputConn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_TABLE_DDL);
-                    outputConn.createStatement().
-                        execute(IndexScrutinyTableOutput.OUTPUT_TABLE_BEYOND_LOOKBACK_DDL);
-                    outputConn.createStatement()
-                            .execute(IndexScrutinyTableOutput.OUTPUT_METADATA_DDL);
-                    outputConn.createStatement().
-                        execute(IndexScrutinyTableOutput.OUTPUT_METADATA_BEYOND_LOOKBACK_COUNTER_DDL);
+                    createScrutinyToolTables(outputConn);
                 }
             }
 
@@ -517,16 +514,15 @@ public class IndexScrutinyTool extends Configured implements Tool {
         }
     }
 
-    private void validateTimestamp(Configuration configuration, long ts) {
-        long maxLookBackAge = BaseScannerRegionObserverConstants.getMaxLookbackInMillis(configuration);
+    private void validateTimestamp(Configuration configuration, long ts, Long dataTableMaxLookback) {
+        long maxLookBackAge = MetaDataUtil.getMaxLookbackAge(configuration, dataTableMaxLookback);
         if (maxLookBackAge != BaseScannerRegionObserverConstants.DEFAULT_PHOENIX_MAX_LOOKBACK_AGE * 1000L) {
             long minTimestamp = EnvironmentEdgeManager.currentTimeMillis() - maxLookBackAge;
             if (ts < minTimestamp){
                 throw new IllegalArgumentException("Index scrutiny can't look back past the configured" +
-                    "max lookback age: " + maxLookBackAge / 1000 + " seconds");
+                    " max lookback age: " + maxLookBackAge / 1000 + " seconds");
             }
         }
-
     }
 
     @VisibleForTesting
@@ -539,4 +535,13 @@ public class IndexScrutinyTool extends Configured implements Tool {
         System.exit(result);
     }
 
+    public static void createScrutinyToolTables(Connection conn) throws Exception {
+        conn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_TABLE_DDL);
+        conn.createStatement().
+                execute(IndexScrutinyTableOutput.OUTPUT_TABLE_BEYOND_LOOKBACK_DDL);
+        conn.createStatement()
+                .execute(IndexScrutinyTableOutput.OUTPUT_METADATA_DDL);
+        conn.createStatement().
+                execute(IndexScrutinyTableOutput.OUTPUT_METADATA_BEYOND_LOOKBACK_COUNTER_DDL);
+    }
 }
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index eb5d2f51a7..8cfa5382db 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper;
 import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
@@ -920,4 +921,17 @@ public final class PhoenixConfigurationUtil {
         return configuration.getBoolean(MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER,
             DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER);
     }
+
+    public static void setMaxLookbackAge(Configuration configuration, Long maxLookbackAge) {
+        Preconditions.checkNotNull(configuration);
+        if (maxLookbackAge != null) {
+            configuration.setLong(BaseScannerRegionObserverConstants.MAX_LOOKBACK_AGE, maxLookbackAge);
+        }
+    }
+
+    public static Long getMaxLookbackAge(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        String maxLookbackAgeStr = configuration.get(BaseScannerRegionObserverConstants.MAX_LOOKBACK_AGE);
+        return maxLookbackAgeStr != null ? Long.valueOf(maxLookbackAgeStr) : null;
+    }
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 9f6be13c1b..81dd83ed1a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -37,6 +37,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThrows;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -48,6 +49,7 @@ import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Properties;
+import java.time.Duration;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
@@ -74,7 +76,6 @@ import org.apache.phoenix.schema.export.DefaultSchemaWriter;
 import org.apache.phoenix.schema.export.SchemaRegistryRepositoryFactory;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -111,6 +112,8 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
     private final boolean columnEncoded;
     private static final Logger LOGGER = LoggerFactory.getLogger(AlterTableIT.class);
 
+    private final long oneDayInMillis = Duration.ofDays(1).toMillis();
+
     public AlterTableIT(boolean columnEncoded) {
         this.columnEncoded = columnEncoded;
         this.tableDDLOptions = columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0";
@@ -1796,4 +1799,103 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
                     newLastDDLTimestamp > oldLastDDLTimestamp);
         }
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testChangeTableLevelMaxLookbackAge() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        Long maxLookbackAge = oneDayInMillis;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String ddl = "CREATE TABLE  " + fullTableName +
+                    "  (a_string varchar not null PRIMARY KEY, col1 integer) MAX_LOOKBACK_AGE=" + maxLookbackAge;
+            conn.createStatement().execute(ddl);
+            assertMaxLookbackAge(fullTableName, maxLookbackAge);
+            maxLookbackAge = 3L * oneDayInMillis;
+            alterTableLevelMaxLookbackAge(fullTableName, maxLookbackAge.toString());
+            assertMaxLookbackAge(fullTableName, maxLookbackAge);
+            maxLookbackAge = 2L * oneDayInMillis;
+            alterTableLevelMaxLookbackAge(fullTableName, maxLookbackAge.toString());
+            assertMaxLookbackAge(fullTableName, maxLookbackAge);
+            maxLookbackAge = 0L;
+            alterTableLevelMaxLookbackAge(fullTableName, maxLookbackAge.toString());
+            assertMaxLookbackAge(fullTableName, maxLookbackAge);
+        }
+    }
+
+    @Test
+    public void testChangeTableLevelMaxLookbackAgeToInvalid() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            String ddl = "CREATE TABLE  " + fullTableName +
+                    "  (a_string varchar not null PRIMARY KEY, col1 integer) MAX_LOOKBACK_AGE=" + oneDayInMillis;
+            conn.createStatement().execute(ddl);
+        }
+        assertThrows(IllegalArgumentException.class, () -> alterTableLevelMaxLookbackAge(fullTableName, "2309.3"));
+        assertThrows(IllegalArgumentException.class, () -> alterTableLevelMaxLookbackAge(fullTableName, "forty"));
+    }
+
+    @Test
+    public void testMaxLookbackAgeOfChildViewsAndIndexesWithAlterTable() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        Long maxLookbackAge = oneDayInMillis;
+        try(Connection conn = DriverManager.getConnection(getUrl());
+            Statement stmt = conn.createStatement()) {
+            String ddl = "CREATE TABLE  " + fullDataTableName +
+                    "  (a_string varchar not null PRIMARY KEY, col1 integer) MAX_LOOKBACK_AGE=" + maxLookbackAge;
+            stmt.execute(ddl);
+            assertMaxLookbackAge(fullDataTableName, maxLookbackAge);
+            String indexName = generateUniqueName();
+            String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+            ddl = "CREATE INDEX " + indexName + " ON " + fullDataTableName + " (COL1)";
+            stmt.execute(ddl);
+            assertMaxLookbackAge(fullIndexName, maxLookbackAge);
+            String childViewName = generateUniqueName();
+            String fullChildViewName = SchemaUtil.getTableName(schemaName, childViewName);
+            ddl = "CREATE VIEW " + fullChildViewName + " AS SELECT * FROM " + fullDataTableName;
+            stmt.execute(ddl);
+            assertMaxLookbackAge(fullChildViewName, maxLookbackAge);
+            String childViewIndexName = generateUniqueName();
+            String fullChildViewIndexName = SchemaUtil.getTableName(schemaName, childViewIndexName);
+            ddl = "CREATE INDEX " + childViewIndexName + " ON " + fullChildViewName + " (COL1)";
+            stmt.execute(ddl);
+            assertMaxLookbackAge(fullChildViewIndexName, maxLookbackAge);
+            String grandChildViewName = generateUniqueName();
+            String fullGrandChildViewName = SchemaUtil.getTableName(schemaName, grandChildViewName);
+            ddl = "CREATE VIEW " + fullGrandChildViewName + " (col2 varchar) AS SELECT * FROM " + fullChildViewName;
+            stmt.execute(ddl);
+            assertMaxLookbackAge(fullGrandChildViewName, maxLookbackAge);
+            String grandChildViewIndexName = generateUniqueName();
+            String fullGrandChildViewIndexName = SchemaUtil.getTableName(schemaName, grandChildViewIndexName);
+            ddl = "CREATE INDEX " + grandChildViewIndexName + " ON " + fullGrandChildViewName + " (COL2)";
+            stmt.execute(ddl);
+            assertMaxLookbackAge(fullGrandChildViewIndexName, maxLookbackAge);
+            maxLookbackAge = 2 * oneDayInMillis;
+            alterTableLevelMaxLookbackAge(fullDataTableName, maxLookbackAge.toString());
+            assertMaxLookbackAge(fullDataTableName, maxLookbackAge);
+            assertMaxLookbackAge(fullIndexName, maxLookbackAge);
+            assertMaxLookbackAge(fullChildViewName, maxLookbackAge);
+            assertMaxLookbackAge(fullChildViewIndexName, maxLookbackAge);
+            assertMaxLookbackAge(fullGrandChildViewName, maxLookbackAge);
+            assertMaxLookbackAge(fullGrandChildViewIndexName, maxLookbackAge);
+        }
+    }
+
+    private void assertMaxLookbackAge(String fullTableName, Long expectedMaxLookbackAge) throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+            assertEquals(expectedMaxLookbackAge, pconn.getTableNoCache(fullTableName).getMaxLookbackAge());
+        }
+    }
+
+    private void alterTableLevelMaxLookbackAge(String fullTableName, String maxLookbackAge) throws Exception {
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            String ddl  = "ALTER TABLE " + fullTableName + " SET MAX_LOOKBACK_AGE = " + maxLookbackAge;
+            conn.createStatement().execute(ddl);
+        }
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index 62b15cb9b6..d7473e5baf 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.ROLLBACK_OP;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.exception.SQLExceptionCode.MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -26,6 +27,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThrows;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -1280,6 +1282,7 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
                 .getColumnQualifierBytes()));
         assertEquals(14, encodingScheme.decode(table.getColumnForColumnName("INT3")
                 .getColumnQualifierBytes()));
+        assertNull(table.getMaxLookbackAge());
     }
 
     @Test
@@ -1699,6 +1702,62 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
         }
     }
 
+    @Test
+    public void testCreateTableWithTableLevelMaxLookbackAge() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        Long maxLookbackAge = 259200L;
+        createTableWithTableLevelMaxLookbackAge(fullTableName, maxLookbackAge.toString());
+        assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(fullTableName));
+        schemaName = generateUniqueName();
+        dataTableName = generateUniqueName();
+        fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        maxLookbackAge = 25920000000L;
+        createTableWithTableLevelMaxLookbackAge(fullTableName, maxLookbackAge.toString());
+        assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(fullTableName));
+        String indexTableName = generateUniqueName();
+        createIndexOnTableWithMaxLookbackAge(indexTableName, fullTableName);
+        assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(SchemaUtil.getTableName(schemaName, indexTableName)));
+    }
+
+    @Test
+    public void testCreateTableWithTableLevelMaxLookbackAgeAsNull() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        createTableWithTableLevelMaxLookbackAge(fullTableName, "NULL");
+        assertNull(queryTableLevelMaxLookbackAge(fullTableName));
+        String indexTableName = generateUniqueName();
+        createIndexOnTableWithMaxLookbackAge(indexTableName, fullTableName);
+        assertNull(queryTableLevelMaxLookbackAge(SchemaUtil.getTableName(schemaName, indexTableName)));
+    }
+
+    @Test
+    public void testCreateTableWithInvalidTableLevelMaxLookbackAge() {
+        String errMsg = "Table level MAX_LOOKBACK_AGE should be a BIGINT value in milli-seconds";
+        IllegalArgumentException err = assertThrows(IllegalArgumentException.class,
+                () -> createTableWithTableLevelMaxLookbackAge(
+                        SchemaUtil.getTableName(generateUniqueName(), generateUniqueName()), "2.3"));
+        assertEquals(errMsg, err.getMessage());
+        err = assertThrows(IllegalArgumentException.class, () -> createTableWithTableLevelMaxLookbackAge(
+                SchemaUtil.getTableName(generateUniqueName(), generateUniqueName()), "three"));
+        assertEquals(errMsg, err.getMessage());
+    }
+
+    @Test
+    public void testCreateIndexWithTableLevelMaxLookbackAge() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        createTableWithTableLevelMaxLookbackAge(fullTableName, "NULL");
+        String indexOptions = "MAX_LOOKBACK_AGE=300";
+        SQLException err = assertThrows(SQLException.class,
+                () -> createIndexOnTableWithMaxLookbackAge(indexTableName, fullTableName, indexOptions));
+        assertEquals(MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), err.getErrorCode());
+    }
+
     public static long verifyLastDDLTimestamp(String tableFullName, long startTS, Connection conn) throws SQLException {
         long endTS = EnvironmentEdgeManager.currentTimeMillis();
         //Now try the PTable API
@@ -1727,4 +1786,23 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
         }
     }
 
+    private void createTableWithTableLevelMaxLookbackAge(String fullTableName, String maxLookbackAge) throws Exception {
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            String createDdl = "CREATE TABLE " + fullTableName +
+                    " (id char(1) NOT NULL PRIMARY KEY,  col1 integer) MAX_LOOKBACK_AGE="+maxLookbackAge;
+            conn.createStatement().execute(createDdl);
+        }
+    }
+
+    private void createIndexOnTableWithMaxLookbackAge(String indexTableName, String fullTableName, String indexOptions) throws Exception {
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            String createIndexDdl = "CREATE INDEX " + indexTableName + " ON " + fullTableName + " (COL1) " +
+                    (indexOptions == null ? "" : indexOptions);
+            conn.createStatement().execute(createIndexDdl);
+        }
+    }
+
+    private void createIndexOnTableWithMaxLookbackAge(String indexTableName, String fullTableName) throws Exception {
+        createIndexOnTableWithMaxLookbackAge(indexTableName, fullTableName, null);
+    }
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
index 603e2c2cb0..1e13f3a123 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
@@ -22,8 +22,6 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -48,7 +46,6 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexScrutiny;
 import org.apache.phoenix.util.ManualEnvironmentEdge;
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
@@ -69,7 +66,6 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
-import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -77,14 +73,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_NAME;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.*;
 import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertArrayEquals;
@@ -99,14 +88,14 @@ import static org.junit.Assert.fail;
 public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexRepairRegionScannerIT.class);
-    private final String tableDDLOptions;
+    private String tableDDLOptions;
     private final String indexDDLOptions;
     private boolean mutable;
+    StringBuilder optionBuilder = new StringBuilder();
     @Rule
     public ExpectedException exceptionRule = ExpectedException.none();
 
     public IndexRepairRegionScannerIT(boolean mutable, boolean singleCellIndex) {
-        StringBuilder optionBuilder = new StringBuilder();
         StringBuilder indexOptionBuilder = new StringBuilder();
         this.mutable = mutable;
         if (!mutable) {
@@ -296,12 +285,6 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
         }
     }
 
-    static private void resetIndexRegionObserverFailPoints() {
-        IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
-        IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
-        IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
-    }
-
     static private void commitWithException(Connection conn) {
         try {
             conn.commit();
@@ -843,29 +826,73 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
         }
     }
 
-    public void deleteAllRows(Connection conn, TableName tableName) throws SQLException,
-            IOException, InterruptedException {
-        Scan scan = new Scan();
-        Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().
-                getAdmin();
-        org.apache.hadoop.hbase.client.Connection hbaseConn = admin.getConnection();
-        Table table = hbaseConn.getTable(tableName);
-        boolean deletedRows = false;
-        try (ResultScanner scanner = table.getScanner(scan)) {
-            for (Result r : scanner) {
-                Delete del = new Delete(r.getRow());
-                table.delete(del);
-                deletedRows = true;
-            }
-        } catch (Exception e) {
-            //if the table doesn't exist, we have no rows to delete. Easier to catch
-            //than to pre-check for existence
+    @Test
+    public void testInvalidIndexRowWithTableLevelMaxLookback() throws Exception {
+        if (!mutable) {
+            return;
         }
-        //don't flush/compact if we didn't write anything, because we'll hang forever
-        if (deletedRows) {
-            getUtility().getAdmin().flush(tableName);
-            TestUtil.majorCompact(getUtility(), tableName);
+        final int NROWS = 4;
+        final int maxLookbackAge = 12000;
+        if ((optionBuilder.length() > 0 && optionBuilder.toString().trim().startsWith("SPLIT ON"))
+                || optionBuilder.length() == 0) {
+            optionBuilder.insert(0, "MAX_LOOKBACK_AGE=" + maxLookbackAge + " ");
         }
-    }
 
+        else {
+            optionBuilder.insert(0, "MAX_LOOKBACK_AGE=" + maxLookbackAge + ", ");
+        }
+        tableDDLOptions = optionBuilder.toString();
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions);
+            conn.createStatement().execute(String.format(
+                    "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));
+
+            PreparedStatement dataPreparedStatement =
+                    conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
+            for (int i = 1; i <= NROWS; i++) {
+                dataPreparedStatement.setInt(1, i);
+                dataPreparedStatement.setInt(2, i + 1);
+                dataPreparedStatement.setInt(3, i * 2);
+                dataPreparedStatement.execute();
+            }
+            conn.commit();
+
+            ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge();
+            customEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(customEdge);
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            conn.createStatement().execute("UPSERT INTO " + dataTableFullName + " VALUES(3, 100, 200)");
+            conn.commit();
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+            customEdge.incrementValue(5);
+            IndexTool indexTool = IndexToolIT.runIndexTool(false, schemaName, dataTableName,
+                    indexTableName, null, 0, IndexVerifyType.BEFORE, "-fi");
+            CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+            assertEquals(2,
+                    mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue());
+            assertEquals(0,
+                    mrJobCounters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT.name()).getValue());
+            assertEquals(2, mrJobCounters.findCounter(REBUILT_INDEX_ROW_COUNT.name()).getValue());
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            conn.createStatement().execute("UPSERT INTO " + dataTableFullName + " VALUES(2, 102, 202)");
+            conn.commit();
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+            customEdge.incrementValue(maxLookbackAge + 1);
+            indexTool = IndexToolIT.runIndexTool(false, schemaName, dataTableName,
+                    indexTableName, null, 0, IndexVerifyType.BEFORE, "-fi");
+            mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+            assertEquals(0,
+                    mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue());
+            assertEquals(2,
+                    mrJobCounters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT.name()).getValue());
+            assertEquals(2, mrJobCounters.findCounter(REBUILT_INDEX_ROW_COUNT.name()).getValue());
+        }
+    }
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
index 7f652c5fef..6d028d287c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
@@ -66,11 +66,16 @@ public class IndexScrutinyToolBaseIT extends BaseTest {
 
     protected List<Job> runScrutiny(Class<? extends IndexScrutinyMapper> mapperClass,
                                     String[] cmdArgs) throws Exception {
+        return runScrutiny(mapperClass, cmdArgs, 0);
+    }
+
+    protected List<Job> runScrutiny(Class<? extends IndexScrutinyMapper> mapperClass,
+                                    String[] cmdArgs, int expectedStatus) throws Exception {
         IndexScrutinyTool scrutiny = new IndexScrutinyTool(mapperClass);
         Configuration conf = new Configuration(getUtility().getConfiguration());
         scrutiny.setConf(conf);
         int status = scrutiny.run(cmdArgs);
-        assertEquals(0, status);
+        assertEquals(expectedStatus, status);
         for (Job job : scrutiny.getJobs()) {
             assertTrue(job.waitForCompletion(true));
         }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java
index 114e6b68a7..71a2673fd5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.end2end;
 
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.mapreduce.Counters;
@@ -26,7 +27,6 @@ import org.apache.phoenix.mapreduce.index.IndexScrutinyMapper;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ManualEnvironmentEdge;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -37,14 +37,21 @@ import org.apache.phoenix.util.TestUtil;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.After;
+import org.junit.experimental.categories.Category;
+import org.junit.Ignore;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BEYOND_MAX_LOOKBACK_COUNT;
@@ -55,6 +62,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
 public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
 
     private static PreparedStatement upsertDataStmt;
@@ -67,13 +76,24 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
     private static boolean isViewIndex;
     private static ManualEnvironmentEdge testClock;
     public static final String UPSERT_DATA = "UPSERT INTO %s VALUES (?, ?, ?)";
-    public static final String DELETE_SCRUTINY_METADATA = "DELETE FROM "
-        + IndexScrutinyTableOutput.OUTPUT_METADATA_TABLE_NAME;
-    public static final String DELETE_SCRUTINY_OUTPUT = "DELETE FROM " +
-        IndexScrutinyTableOutput.OUTPUT_TABLE_NAME;
-    public static final int MAX_LOOKBACK = 6;
+    public static final int MAX_LOOKBACK = 12;
+    public static final int TABLE_LEVEL_MAX_LOOKBACK = 8;
     private long scrutinyTs;
+    private final boolean hasTableLevelMaxLookback;
 
+    public IndexScrutinyWithMaxLookbackIT(boolean hasTableLevelMaxLookback) {
+        this.hasTableLevelMaxLookback = hasTableLevelMaxLookback;
+    }
+
+    @Parameterized.Parameters(name = "hasTableLevelMaxLookback={0}")
+    public static synchronized Collection<Object[]> data() {
+        List<Object[]> list = Lists.newArrayListWithExpectedSize(2);
+        boolean[] Booleans = new boolean[]{true, false};
+        for (boolean mutable : Booleans) {
+            list.add(new Object[]{mutable});
+        }
+        return list;
+    }
 
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
@@ -85,18 +105,27 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
     }
 
     @Before
-    public void setupTest() throws SQLException {
+    public void setupTest() throws Exception {
         try(Connection conn = DriverManager.getConnection(getUrl(),
-            PropertiesUtil.deepCopy(TEST_PROPERTIES))){
-            conn.createStatement().execute(DELETE_SCRUTINY_METADATA);
-            conn.createStatement().execute(DELETE_SCRUTINY_OUTPUT);
-            conn.commit();
-        } catch (TableNotFoundException tnfe){
-            //This will happen the first time
+            PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            IndexScrutinyTool.createScrutinyToolTables(conn);
+        }
+    }
+
+    @After
+    public void cleanup () throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            deleteAllRows(conn,
+                    TableName.valueOf(IndexScrutinyTableOutput.OUTPUT_METADATA_TABLE_NAME));
+            deleteAllRows(conn,
+                    TableName.valueOf(IndexScrutinyTableOutput.OUTPUT_TABLE_NAME));
         }
+        EnvironmentEdgeManager.reset();
     }
 
     @Test
+    @Ignore("Test is already broken")
     public void testScrutinyOnRowsBeyondMaxLookBack() throws Exception {
         setupTables();
         try {
@@ -109,6 +138,7 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
     }
 
     @Test
+    @Ignore("Test is already broken")
     public void testScrutinyOnRowsBeyondMaxLookback_viewIndex() throws Exception {
         schema = "S"+generateUniqueName();
         dataTableName = "T"+generateUniqueName();
@@ -160,11 +190,32 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
     @Test
     public void testScrutinyOnDeletedRowsBeyondMaxLookBack() throws Exception {
         setupTables();
-        try {
-            upsertDataThenDeleteAndScrutinize(dataTableName, dataTableFullName, testClock);
-            assertBeyondMaxLookbackOutput(dataTableFullName, indexTableFullName);
-        } finally {
-            EnvironmentEdgeManager.reset();
+        upsertDataThenDeleteAndScrutinize(dataTableName, dataTableFullName, testClock);
+        assertBeyondMaxLookbackOutput(dataTableFullName, indexTableFullName);
+    }
+
+    @Test
+    public void testSCNBeyondMaxLookback() throws Exception {
+        setupTables();
+        testClock.setValue(EnvironmentEdgeManager.currentTimeMillis());
+        EnvironmentEdgeManager.injectEdge(testClock);
+        long beforeInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+        testClock.incrementValue(1);
+        try (Connection conn =
+                     DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            populateTable(dataTableFullName, conn);
+            testClock.incrementValue(1);
+            long maxLookbackAge = hasTableLevelMaxLookback ? TABLE_LEVEL_MAX_LOOKBACK : MAX_LOOKBACK;
+            testClock.incrementValue(maxLookbackAge * 1000);
+            if (hasTableLevelMaxLookback) {
+                assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+                        beforeInsertSCN + MAX_LOOKBACK * 1000);
+            }
+            runScrutiny(schema, dataTableName, indexTableName, beforeInsertSCN, -1);
+        }
+        catch (IllegalArgumentException e) {
+            assertTrue(e.getMessage().contains(
+                    "Index scrutiny can't look back past the configured max lookback age"));
         }
     }
 
@@ -177,6 +228,9 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
         isViewIndex = false;
         String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, "
             + "ZIP INTEGER) COLUMN_ENCODED_BYTES=0, VERSIONS=1";
+        if (hasTableLevelMaxLookback) {
+            dataTableDDL += ", MAX_LOOKBACK_AGE=" + TABLE_LEVEL_MAX_LOOKBACK * 1000;
+        }
         String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP)";
         testClock = new ManualEnvironmentEdge();
 
@@ -234,17 +288,23 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
         throws Exception {
         try(Connection conn =
                 DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
-            populateTable(tableFullName, conn);
-            long afterInsertSCN = EnvironmentEdgeManager.currentTimeMillis() + 1;
-            testClock.setValue(afterInsertSCN);
+            testClock.setValue(EnvironmentEdgeManager.currentTimeMillis());
             EnvironmentEdgeManager.injectEdge(testClock);
+            long beforeInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+            testClock.incrementValue(1);
+            populateTable(tableFullName, conn);
+            testClock.incrementValue(1);
             deleteIndexRows(conn);
             //move forward to the time we want to scrutinize, which is less than max lookback age
             //for the initial inserts
-            testClock.incrementValue(MAX_LOOKBACK /2  * 1000);
+            long maxLookbackAge = hasTableLevelMaxLookback ? TABLE_LEVEL_MAX_LOOKBACK : MAX_LOOKBACK;
+            testClock.incrementValue(maxLookbackAge / 2  * 1000);
             scrutinyTs = EnvironmentEdgeManager.currentTimeMillis();
             //now go past max lookback age for the initial 2 inserts
-            testClock.incrementValue(MAX_LOOKBACK /2  * 1000);
+            testClock.incrementValue(maxLookbackAge / 2  * 1000);
+            if (hasTableLevelMaxLookback) {
+                assertTrue(EnvironmentEdgeManager.currentTimeMillis() < beforeInsertSCN + MAX_LOOKBACK * 1000);
+            }
 
             List<Job> completedJobs = runScrutiny(schema, tableName, indexTableName, scrutinyTs);
             Job job = completedJobs.get(0);
@@ -273,18 +333,24 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
     private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
                                   Long scrutinyTs)
         throws Exception {
-        return runScrutiny(schemaName, dataTableName, indexTableName, null, null, scrutinyTs);
+        return runScrutiny(schemaName, dataTableName, indexTableName, null, null, scrutinyTs, 0);
+    }
+
+    private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+                                  Long scrutinyTs, int expectedStatus) throws Exception {
+        return runScrutiny(schemaName, dataTableName, indexTableName, null, null,
+                scrutinyTs, expectedStatus);
     }
 
     private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
                                   Long batchSize, IndexScrutinyTool.SourceTable sourceTable,
-                                  Long scrutinyTs) throws Exception {
+                                  Long scrutinyTs, int expectedStatus) throws Exception {
         final String[]
             cmdArgs =
             getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
                 true, IndexScrutinyTool.OutputFormat.TABLE,
                 null, null, scrutinyTs);
-        return runScrutiny(MaxLookbackIndexScrutinyMapper.class, cmdArgs);
+        return runScrutiny(MaxLookbackIndexScrutinyMapper.class, cmdArgs, expectedStatus);
     }
 
     private static class MaxLookbackIndexScrutinyMapper extends IndexScrutinyMapper {
@@ -309,4 +375,4 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
 
         }
     }
-}
\ No newline at end of file
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
index 47ad02419e..1620604c9f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
 
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
@@ -183,6 +184,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest {
         //In HBase 2.2.6 and HBase 2.3.0+, helps region server assignments when under an injected
         // edge clock. See HBASE-24511.
         serverProps.put("hbase.regionserver.rpc.retry.interval", Long.toString(0));
+        // Need to set dispatcher delay to 0 to account for injected edge else queue might get stuck
+        serverProps.put("hbase.procedure.remote.dispatcher.delay.msec", Integer.toString(0));
         Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(4);
         clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
         clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
@@ -1478,6 +1481,64 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest {
         }
     }
 
+    @Test
+    public void testMissingIndexRowsBeyondTableLevelMaxLookback() throws Exception {
+        if (! mutable) {
+            return;
+        }
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        int maxLookbackAge = 12000;
+        int delta = 5;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullDataTableName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) MAX_LOOKBACK_AGE=" + maxLookbackAge);
+            ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge();
+            customEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(customEdge);
+            long startTime = EnvironmentEdgeManager.currentTimeMillis();
+            customEdge.incrementValue(delta);
+            // Insert a row
+            conn.createStatement()
+                    .execute("upsert into " + fullDataTableName + " values (1, 'Phoenix', 'A')");
+            conn.commit();
+            customEdge.incrementValue(delta);
+            conn.createStatement()
+                    .execute("upsert into " + fullDataTableName + " values (2, 'Phoenix', 'B')");
+            conn.commit();
+            customEdge.incrementValue(delta);
+            String dataTableSql = "SELECT * FROM " + fullDataTableName;
+            ResultSet rs = conn.createStatement().executeQuery(dataTableSql);
+            Assert.assertTrue(rs.next());
+            Assert.assertTrue(rs.next());
+            customEdge.incrementValue(delta);
+            conn.createStatement()
+                    .execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC " + this.indexDDLOptions,
+                            indexTableName, fullDataTableName));
+            customEdge.incrementValue(delta);
+            IndexTool indexTool = IndexToolIT.runIndexTool(useSnapshot, schemaName, dataTableName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
+            CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+            assertEquals(2,
+                    mrJobCounters.findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT.name()).getValue());
+            assertEquals(0, mrJobCounters.findCounter(
+                    BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT.name()).getValue());
+            customEdge.incrementValue(maxLookbackAge + 1);
+            indexTool = IndexToolIT.runIndexTool(useSnapshot, schemaName, dataTableName, indexTableName, null,
+                    0, IndexTool.IndexVerifyType.BEFORE);
+            mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+            assertEquals(2, mrJobCounters.findCounter(
+                    BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT.name()).getValue());
+            assertEquals(0,
+                    mrJobCounters.findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT.name()).getValue());
+            assertEquals(2, mrJobCounters.findCounter(REBUILT_INDEX_ROW_COUNT.name()).getValue());
+            Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+                    startTime + MAX_LOOKBACK_AGE * 1000);
+        }
+    }
+
     private Pair<Integer, Integer> countPutsAndDeletes(String tableName) throws Exception {
         int numPuts = 0;
         int numDeletes = 0;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
index 7c31b9c95a..f95918159a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
@@ -65,6 +65,7 @@ import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
 @RunWith(Parameterized.class)
 public class MaxLookbackExtendedIT extends BaseTest {
     private static final int MAX_LOOKBACK_AGE = 15;
+    private static final int TABLE_LEVEL_MAX_LOOKBACK_AGE = 10;
     private static final int ROWS_POPULATED = 2;
     public static final int WAIT_AFTER_TABLE_CREATION_MILLIS = 1;
     private String tableDDLOptions;
@@ -72,9 +73,11 @@ public class MaxLookbackExtendedIT extends BaseTest {
     ManualEnvironmentEdge injectEdge;
     private int ttl;
     private boolean multiCF;
+    private boolean hasTableLevelMaxLookback;
 
-    public MaxLookbackExtendedIT(boolean multiCF) {
+    public MaxLookbackExtendedIT(boolean multiCF, boolean hasTableLevelMaxLookback) {
         this.multiCF = multiCF;
+        this.hasTableLevelMaxLookback = hasTableLevelMaxLookback;
     }
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
@@ -103,21 +106,30 @@ public class MaxLookbackExtendedIT extends BaseTest {
         Assert.assertFalse("refCount leaked", refCountLeaked);
     }
 
-    @Parameterized.Parameters(name = "MaxLookbackExtendedIT_multiCF={0}")
-    public static Collection<Boolean> data() {
-        return Arrays.asList(false, true);
+    @Parameterized.Parameters(name = "MaxLookbackExtendedIT_multiCF={0},hasTableLevelMaxLookback={1}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {false, true},
+                {false, false},
+                {true, false},
+                {true, true}
+        });
     }
     @Test
     public void testKeepDeletedCellsWithMaxLookbackAge() throws Exception {
         int versions = 2;
         optionBuilder.append(", VERSIONS=" + versions);
         optionBuilder.append(", KEEP_DELETED_CELLS=TRUE");
+        if(hasTableLevelMaxLookback) {
+            optionBuilder.append(", MAX_LOOKBACK_AGE=" + TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+        }
         tableDDLOptions = optionBuilder.toString();
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String dataTableName = generateUniqueName();
             createTable(dataTableName);
             injectEdge.setValue(System.currentTimeMillis());
             EnvironmentEdgeManager.injectEdge(injectEdge);
+            long firstUpsertSCN = EnvironmentEdgeManager.currentTimeMillis();
             conn.createStatement().execute("upsert into " + dataTableName
                     + " values ('a', 'ab1', 'abc1', 'abcd1')");
             conn.commit();
@@ -136,7 +148,12 @@ public class MaxLookbackExtendedIT extends BaseTest {
             String dml = "DELETE from " + dataTableName + " WHERE id  = 'a'";
             Assert.assertEquals(1, conn.createStatement().executeUpdate(dml));
             conn.commit();
-            injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+            long timeToAdvance = hasTableLevelMaxLookback ? TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE;
+            injectEdge.incrementValue(timeToAdvance * 1000);
+            if (hasTableLevelMaxLookback) {
+                Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() < firstUpsertSCN +
+                        MAX_LOOKBACK_AGE * 1000);
+            }
             dml = "DELETE from " + dataTableName + " WHERE id  = 'b'";
             Assert.assertEquals(1, conn.createStatement().executeUpdate(dml));
             conn.commit();
@@ -191,6 +208,10 @@ public class MaxLookbackExtendedIT extends BaseTest {
     }
     @Test
     public void testTooLowSCNWithMaxLookbackAge() throws Exception {
+        if(hasTableLevelMaxLookback) {
+            optionBuilder.append(", MAX_LOOKBACK_AGE=" + TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+            tableDDLOptions = optionBuilder.toString();
+        }
         String dataTableName = generateUniqueName();
         createTable(dataTableName);
         injectEdge.setValue(System.currentTimeMillis());
@@ -199,7 +220,12 @@ public class MaxLookbackExtendedIT extends BaseTest {
         injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
         populateTable(dataTableName);
         long populateTime = EnvironmentEdgeManager.currentTimeMillis();
-        injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+        long timeToAdvance = hasTableLevelMaxLookback ? TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE;
+        injectEdge.incrementValue(timeToAdvance * 1000 + 1000);
+        if (hasTableLevelMaxLookback) {
+            Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+                    (populateTime + MAX_LOOKBACK_AGE * 1000));
+        }
         Properties props = new Properties();
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
             Long.toString(populateTime));
@@ -216,6 +242,10 @@ public class MaxLookbackExtendedIT extends BaseTest {
 
     @Test(timeout=120000L)
     public void testRecentlyDeletedRowsNotCompactedAway() throws Exception {
+        if(hasTableLevelMaxLookback) {
+            optionBuilder.append(", MAX_LOOKBACK_AGE=" + TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+            tableDDLOptions = optionBuilder.toString();
+        }
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String dataTableName = generateUniqueName();
             String indexName = generateUniqueName();
@@ -229,7 +259,7 @@ public class MaxLookbackExtendedIT extends BaseTest {
             TableName indexTable = TableName.valueOf(indexName);
             injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
             long beforeDeleteSCN = EnvironmentEdgeManager.currentTimeMillis();
-            injectEdge.incrementValue(10); //make sure we delete at a different ts
+            injectEdge.incrementValue(5); //make sure we delete at a different ts
             Statement stmt = conn.createStatement();
             stmt.execute("DELETE FROM " + dataTableName + " WHERE " + " id = 'a'");
             Assert.assertEquals(1, stmt.getUpdateCount());
@@ -252,7 +282,8 @@ public class MaxLookbackExtendedIT extends BaseTest {
             assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
             assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
             //wait for the lookback time. After this compactions should purge the deleted row
-            injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+            long timeToAdvance = hasTableLevelMaxLookback ? TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE;
+            injectEdge.incrementValue(timeToAdvance * 1000);
             long beforeSecondCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
             String notDeletedRowSql =
                 String.format("SELECT * FROM %s WHERE id = 'b'", dataTableName);
@@ -266,6 +297,10 @@ public class MaxLookbackExtendedIT extends BaseTest {
                 " values ('c', 'cd', 'cde', 'cdef')");
             conn.commit();
             injectEdge.incrementValue(1L);
+            if (hasTableLevelMaxLookback) {
+                Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+                        beforeDeleteSCN + MAX_LOOKBACK_AGE * 1000);
+            }
             majorCompact(dataTable);
             majorCompact(indexTable);
             // Deleted row versions should be removed.
@@ -283,6 +318,9 @@ public class MaxLookbackExtendedIT extends BaseTest {
 
     @Test(timeout=60000L)
     public void testTTLAndMaxLookbackAge() throws Exception {
+        if(hasTableLevelMaxLookback) {
+            return;
+        }
         Configuration conf = getUtility().getConfiguration();
         //disable automatic memstore flushes
         long oldMemstoreFlushInterval = conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL,
@@ -365,6 +403,9 @@ public class MaxLookbackExtendedIT extends BaseTest {
     public void testRecentMaxVersionsNotCompactedAway() throws Exception {
         int versions = 2;
         optionBuilder.append(", VERSIONS=" + versions);
+        if(hasTableLevelMaxLookback) {
+            optionBuilder.append(", MAX_LOOKBACK_AGE=" + TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+        }
         tableDDLOptions = optionBuilder.toString();
         String firstValue = "abc";
         String secondValue = "def";
@@ -420,18 +461,24 @@ public class MaxLookbackExtendedIT extends BaseTest {
             // at the appropriate times
             assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
             assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
-            // Make the first row versions outside the max lookback window
-            injectEdge.setValue(afterInsertSCN + MAX_LOOKBACK_AGE * 1000);
+            long timeToAdvance = hasTableLevelMaxLookback ? TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE;
+            // Make the first two row versions of row a outside the max lookback window. Only the first version should
+            // be collected while the other more recent one should be retained.
+            injectEdge.setValue(afterFirstUpdateSCN + timeToAdvance * 1000);
+            if (hasTableLevelMaxLookback) {
+                Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+                        afterFirstUpdateSCN + MAX_LOOKBACK_AGE * 1000);
+            }
             majorCompact(dataTable);
             majorCompact(indexTable);
             // At this moment, the data table has three row versions for row a within the max
             // lookback window.
             // These versions have the following cells {empty, ab, abc, abcd}, {empty, def} and
             // {empty, ghi}.
-            assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 8);
+            assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 6);
             // The index table will have three full row versions for "a" and each will have 3 cells,
             // one for each of columns empty, val2, and val3.
-            assertRawCellCount(conn, indexTable, Bytes.toBytes("ab\u0000a"), 9);
+            assertRawCellCount(conn, indexTable, Bytes.toBytes("ab\u0000a"), 6);
             //empty column + 1 version each of val1,2 and 3 = 4
             assertRawCellCount(conn, dataTable, Bytes.toBytes("b"), 4);
             //1 version of empty column, 1 version of val2, 1 version of val3 = 3
@@ -441,6 +488,10 @@ public class MaxLookbackExtendedIT extends BaseTest {
 
     @Test(timeout=60000)
     public void testOverrideMaxLookbackForCompaction() throws Exception {
+        if(hasTableLevelMaxLookback) {
+            optionBuilder.append(", MAX_LOOKBACK_AGE=" + TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+            tableDDLOptions = optionBuilder.toString();
+        }
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String tableNameOne = generateUniqueName();
             createTable(tableNameOne);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
index e5e60a665a..4889e65bab 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
@@ -60,6 +60,7 @@ import static org.junit.Assert.assertFalse;
 @Category(NeedsOwnMiniClusterTest.class)
 public class MaxLookbackIT extends BaseTest {
     private static final int MAX_LOOKBACK_AGE = 15;
+    private static final int TABLE_LEVEL_MAX_LOOKBACK_AGE = 10;
     private static final int ROWS_POPULATED = 2;
     public static final int WAIT_AFTER_TABLE_CREATION_MILLIS = 1;
     private String tableDDLOptions;
@@ -119,6 +120,48 @@ public class MaxLookbackIT extends BaseTest {
         Assert.fail("We should have thrown an exception for the too-early SCN");
     }
 
+    @Test
+    public void testTooLowSCNWithTableLevelMaxLookback() throws Exception {
+        String dataTableName1 = generateUniqueName();
+        StringBuilder optionBuilderForDataTable1 = new StringBuilder(optionBuilder);
+        optionBuilderForDataTable1.append("MAX_LOOKBACK_AGE="+((TABLE_LEVEL_MAX_LOOKBACK_AGE + 3) * 1000));
+        tableDDLOptions = optionBuilderForDataTable1.toString();
+        createTable(dataTableName1);
+        StringBuilder optionBuilderForDataTable2 = new StringBuilder(optionBuilder);
+        optionBuilderForDataTable2.append("MAX_LOOKBACK_AGE="+(TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000));
+        String dataTableName2 = generateUniqueName();
+        tableDDLOptions = optionBuilderForDataTable2.toString();
+        createTable(dataTableName2);
+        Assert.assertEquals(new Long((TABLE_LEVEL_MAX_LOOKBACK_AGE + 3) * 1000),
+                queryTableLevelMaxLookbackAge(dataTableName1));
+        Assert.assertEquals(new Long(TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000),
+                queryTableLevelMaxLookbackAge(dataTableName2));
+        injectEdge.setValue(System.currentTimeMillis());
+        EnvironmentEdgeManager.injectEdge(injectEdge);
+        //increase long enough to make sure we can find the syscat row for the table
+        injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+        populateTable(dataTableName1);
+        populateTable(dataTableName2);
+        long populateTime = EnvironmentEdgeManager.currentTimeMillis();
+        injectEdge.incrementValue(TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000 + 1000);
+        Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+                (populateTime + MAX_LOOKBACK_AGE * 1000));
+        Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+                populateTime + (TABLE_LEVEL_MAX_LOOKBACK_AGE + 3) * 1000);
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(populateTime));
+        try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
+            connscn.createStatement().executeQuery("select * from " + dataTableName1 + " where " +
+                    "val3 in (select val3 from " + dataTableName2 + ")");
+        } catch (SQLException se) {
+            SQLExceptionCode code =
+                    SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE;
+            TestUtil.assertSqlExceptionCode(code, se);
+            return;
+        }
+        Assert.fail("We should have thrown an exception for the too-early SCN");
+    }
+
     @Test(timeout=120000L)
     public void testRecentlyDeletedRowsNotCompactedAway() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
@@ -186,6 +229,75 @@ public class MaxLookbackIT extends BaseTest {
         }
     }
 
+    @Test
+    public void testRecentlyDeletedRowsNotCompactedAwayWithTableLevelMaxLookback() throws Exception {
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            optionBuilder.append("MAX_LOOKBACK_AGE="+(TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000));
+            tableDDLOptions = optionBuilder.toString();
+            String indexName = generateUniqueName();
+            createTable(dataTableName);
+
+            TableName dataTable = TableName.valueOf(dataTableName);
+            populateTable(dataTableName);
+            createIndex(dataTableName, indexName, 1);
+            injectEdge.setValue(System.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            TableName indexTable = TableName.valueOf(indexName);
+            injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+            long beforeDeleteSCN = EnvironmentEdgeManager.currentTimeMillis();
+            injectEdge.incrementValue(5); //make sure we delete at a different ts
+            Statement stmt = conn.createStatement();
+            stmt.execute("DELETE FROM " + dataTableName + " WHERE " + " id = 'a'");
+            Assert.assertEquals(1, stmt.getUpdateCount());
+            conn.commit();
+            //select stmt to get row we deleted
+            String sql = String.format("SELECT * FROM %s WHERE id = 'a'", dataTableName);
+            String indexSql = String.format("SELECT * FROM %s WHERE val1 = 'ab'", dataTableName);
+            int rowsPlusDeleteMarker = ROWS_POPULATED;
+            assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+            assertExplainPlan(conn, indexSql, dataTableName, indexName);
+            assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
+            flush(dataTable);
+            flush(indexTable);
+            assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+            assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
+            injectEdge.incrementValue(1); //new ts for major compaction
+            majorCompact(dataTable);
+            majorCompact(indexTable);
+            assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
+            assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
+            //wait for the lookback time. After this compactions should purge the deleted row
+            injectEdge.incrementValue(TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+            long beforeSecondCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
+            String notDeletedRowSql =
+                    String.format("SELECT * FROM %s WHERE id = 'b'", dataTableName);
+            String notDeletedIndexRowSql =
+                    String.format("SELECT * FROM %s WHERE val1 = 'bc'", dataTableName);
+            assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true);
+            assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, beforeSecondCompactSCN, true);
+            assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
+            assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('c', 'cd', 'cde', 'cdef')");
+            conn.commit();
+            injectEdge.incrementValue(1L);
+            Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+                    beforeDeleteSCN + MAX_LOOKBACK_AGE * 1000);
+            majorCompact(dataTable);
+            majorCompact(indexTable);
+            //should still be ROWS_POPULATED because we added one and deleted one
+            assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+            assertRawRowCount(conn, indexTable, ROWS_POPULATED);
+
+            //deleted row should be gone, but not deleted row should still be there.
+            assertRowExistsAtSCN(getUrl(), sql, beforeSecondCompactSCN, false);
+            assertRowExistsAtSCN(getUrl(), indexSql, beforeSecondCompactSCN, false);
+            assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true);
+            assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, beforeSecondCompactSCN, true);
+        }
+    }
+
     @Test(timeout=60000L)
     public void testTTLAndMaxLookbackAge() throws Exception {
         ttl = 20;
@@ -258,6 +370,145 @@ public class MaxLookbackIT extends BaseTest {
         }
     }
 
+    @Test
+    public void testTTLAndTableLevelMaxLookbackWithMaxLookbackMoreThanTTL() throws Exception {
+        ttl = 8;
+        long maxLookbackAge = 12;
+        optionBuilder.append("TTL=").append(ttl).append(", MAX_LOOKBACK_AGE=").
+                append(maxLookbackAge * 1000);
+        tableDDLOptions = optionBuilder.toString();
+        Configuration conf = getUtility().getConfiguration();
+        //disable automatic memstore flushes
+        long oldMemstoreFlushInterval = conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL,
+                HRegion.DEFAULT_CACHE_FLUSH_INTERVAL);
+        conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0L);
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexName = generateUniqueName();
+            createTable(dataTableName);
+            populateTable(dataTableName);
+            createIndex(dataTableName, indexName, 1);
+            injectEdge.setValue(System.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            injectEdge.incrementValue(1);
+            long afterFirstInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+            TableName dataTable = TableName.valueOf(dataTableName);
+            TableName indexTable = TableName.valueOf(indexName);
+            assertTableHasTtl(conn, dataTable, ttl);
+            assertTableHasTtl(conn, indexTable, ttl);
+            //first make sure we inserted correctly
+            String sql = String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName);
+            String indexSql = String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName);
+            assertRowExistsAtSCN(getUrl(),sql, afterFirstInsertSCN, true);
+            assertExplainPlan(conn, indexSql, dataTableName, indexName);
+            assertRowExistsAtSCN(getUrl(),indexSql, afterFirstInsertSCN, true);
+            int originalRowCount = 2;
+            assertRawRowCount(conn, dataTable, originalRowCount);
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            //force a flush
+            flush(dataTable);
+            flush(indexTable);
+            //flush shouldn't have changed it
+            assertRawRowCount(conn, dataTable, originalRowCount);
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            long timeToAdvance = (ttl * 1000) -
+                    (EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN);
+            if (timeToAdvance > 0) {
+                injectEdge.incrementValue(timeToAdvance);
+            }
+            //make sure it's still on disk
+            assertRawRowCount(conn, dataTable, originalRowCount);
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            injectEdge.incrementValue(1); //get a new timestamp for compaction
+            majorCompact(dataTable);
+            majorCompact(indexTable);
+            //nothing should have been purged by this major compaction
+            assertRawRowCount(conn, dataTable, originalRowCount);
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            //now wait till max lookback as max lookback is greater than TTL
+            timeToAdvance = (maxLookbackAge * 1000) -
+                    (EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN);
+            if (timeToAdvance > 0) {
+                injectEdge.incrementValue(timeToAdvance);
+            }
+            Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() >
+                    afterFirstInsertSCN + ttl * 1000);
+            Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+                    afterFirstInsertSCN + MAX_LOOKBACK_AGE * 1000);
+            //make sure that we can compact away the now-expired rows
+            majorCompact(dataTable);
+            majorCompact(indexTable);
+            //note that before HBase 1.4, we don't have HBASE-17956
+            // and this will always return 0 whether it's still on-disk or not
+            assertRawRowCount(conn, dataTable, 0);
+            assertRawRowCount(conn, indexTable, 0);
+        }
+        finally {
+            conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, oldMemstoreFlushInterval);
+        }
+    }
+
+    @Test
+    public void testExpiredRowsAreNotFlushedWithTableLevelMaxLookback() throws Exception {
+        ttl = 8;
+        long maxLookbackAge = 12;
+        optionBuilder.append("TTL=").append(ttl).append(", MAX_LOOKBACK_AGE=").
+                append(maxLookbackAge * 1000);
+        tableDDLOptions = optionBuilder.toString();
+        int delta = 1;
+        Configuration conf = getUtility().getConfiguration();
+        //disable automatic memstore flushes
+        long oldMemstoreFlushInterval = conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL,
+                HRegion.DEFAULT_CACHE_FLUSH_INTERVAL);
+        conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0L);
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexName = generateUniqueName();
+            createTable(dataTableName);
+            createIndex(dataTableName, indexName, 1);
+            TableName dataTable = TableName.valueOf(dataTableName);
+            TableName indexTable = TableName.valueOf(indexName);
+            assertTableHasTtl(conn, dataTable, ttl);
+            assertTableHasTtl(conn, indexTable, ttl);
+            injectEdge.setValue(System.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            injectEdge.incrementValue(delta);
+            populateTable(dataTableName);
+            long afterFirstInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+            injectEdge.incrementValue(delta);
+            int expectedNumRows = 2;
+            assertRawRowCount(conn, dataTable, expectedNumRows);
+            assertRawRowCount(conn, indexTable, expectedNumRows);
+            injectEdge.incrementValue(ttl * 1000);
+            Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+                    afterFirstInsertSCN + maxLookbackAge * 1000);
+            flush(dataTable);
+            flush(indexTable);
+            assertRawRowCount(conn, dataTable, expectedNumRows);
+            assertRawRowCount(conn, indexTable, expectedNumRows);
+            injectEdge.incrementValue(delta);
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('c', 'cd', 'cde', 'cdef')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('d', 'de', 'def', 'defg')");
+            conn.commit();
+            expectedNumRows += 2;
+            injectEdge.incrementValue(delta);
+            assertRawRowCount(conn, dataTable, expectedNumRows);
+            assertRawRowCount(conn, indexTable, expectedNumRows);
+            injectEdge.incrementValue(maxLookbackAge * 1000);
+            flush(dataTable);
+            flush(indexTable);
+            // Recent two rows expired so, they won't be flushed
+            assertRawRowCount(conn, dataTable, expectedNumRows - 2);
+            assertRawRowCount(conn, indexTable, expectedNumRows - 2);
+        }
+        finally {
+            conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, oldMemstoreFlushInterval);
+        }
+    }
+
     @Test(timeout=60000)
     public void testRecentMaxVersionsNotCompactedAway() throws Exception {
         int versions = 2;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index e65bee738c..852cf8f01f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -68,14 +68,16 @@ public class TableTTLIT extends BaseTest {
     private final boolean multiCF;
     private final boolean columnEncoded;
     private final KeepDeletedCells keepDeletedCells;
+    private final Integer tableLevelMaxLooback;
 
     public TableTTLIT(boolean multiCF, boolean columnEncoded,
-            KeepDeletedCells keepDeletedCells, int versions, int ttl) {
+            KeepDeletedCells keepDeletedCells, int versions, int ttl, Integer tableLevelMaxLooback) {
         this.multiCF = multiCF;
         this.columnEncoded = columnEncoded;
         this.keepDeletedCells = keepDeletedCells;
         this.versions = versions;
         this.ttl = ttl;
+        this.tableLevelMaxLooback = tableLevelMaxLooback;
     }
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
@@ -103,6 +105,9 @@ public class TableTTLIT extends BaseTest {
         } else {
             optionBuilder.append(", COLUMN_ENCODED_BYTES=0");
         }
+        if (tableLevelMaxLooback != null) {
+            optionBuilder.append(", MAX_LOOKBACK_AGE=").append(tableLevelMaxLooback * 1000);
+        }
         this.tableDDLOptions = optionBuilder.toString();
         injectEdge = new ManualEnvironmentEdge();
         injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
@@ -117,15 +122,18 @@ public class TableTTLIT extends BaseTest {
     }
 
     @Parameterized.Parameters(name = "TableTTLIT_multiCF={0}, columnEncoded={1}, "
-            + "keepDeletedCells={2}, versions={3}, ttl={4}")
+            + "keepDeletedCells={2}, versions={3}, ttl={4}, tableLevelMaxLookback={5}")
     public static synchronized Collection<Object[]> data() {
             return Arrays.asList(new Object[][] {
-                    { false, false, KeepDeletedCells.FALSE, 1, 100},
-                    { false, false, KeepDeletedCells.TRUE, 5, 50},
-                    { false, false, KeepDeletedCells.TTL, 1, 25},
-                    { true, false, KeepDeletedCells.FALSE, 5, 50},
-                    { true, false, KeepDeletedCells.TRUE, 1, 25},
-                    { true, false, KeepDeletedCells.TTL, 5, 100} });
+                    { false, false, KeepDeletedCells.FALSE, 1, 100, null},
+                    { false, false, KeepDeletedCells.TRUE, 5, 50, null},
+                    { false, false, KeepDeletedCells.TTL, 1, 25, null},
+                    { true, false, KeepDeletedCells.FALSE, 5, 50, null},
+                    { true, false, KeepDeletedCells.TRUE, 1, 25, null},
+                    { true, false, KeepDeletedCells.TTL, 5, 100, null},
+                    { false, false, KeepDeletedCells.FALSE, 1, 100, 15},
+                    { false, false, KeepDeletedCells.TRUE, 5, 50, 15},
+                    { false, false, KeepDeletedCells.TTL, 1, 25, 15}});
     }
 
     /**
@@ -146,7 +154,8 @@ public class TableTTLIT extends BaseTest {
 
     @Test
     public void testMaskingAndCompaction() throws Exception {
-        final int maxDeleteCounter = MAX_LOOKBACK_AGE;
+        final int maxLookbackAge = tableLevelMaxLooback != null ? tableLevelMaxLooback : MAX_LOOKBACK_AGE;
+        final int maxDeleteCounter = maxLookbackAge;
         final int maxCompactionCounter = ttl / 2;
         final int maxMaskingCounter = 2 * ttl;
         final byte[] rowKey = Bytes.toBytes("a");
@@ -174,7 +183,7 @@ public class TableTTLIT extends BaseTest {
                 }
                 if (maskingCounter-- == 0) {
                     updateRow(conn, tableName, noCompactTableName, "a");
-                    injectEdge.incrementValue((ttl + MAX_LOOKBACK_AGE + 1) * 1000);
+                    injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 1000);
                     LOG.debug("Masking " + i + " current time: " + injectEdge.currentTime());
                     ResultSet rs = conn.createStatement().executeQuery(
                             "SELECT count(*) FROM " + tableName);
@@ -205,7 +214,7 @@ public class TableTTLIT extends BaseTest {
                 }
                 afterCompaction = false;
                 compareRow(conn, tableName, noCompactTableName, "a", MAX_COLUMN_INDEX);
-                long scn = injectEdge.currentTime() - MAX_LOOKBACK_AGE * 1000;
+                long scn = injectEdge.currentTime() - maxLookbackAge * 1000;
                 long scnEnd = injectEdge.currentTime();
                 long scnStart = Math.max(scn, startTime);
                 for (scn = scnEnd; scn >= scnStart; scn -= 1000) {
@@ -224,6 +233,9 @@ public class TableTTLIT extends BaseTest {
 
     @Test
     public void testRowSpansMultipleTTLWindows() throws Exception {
+        if (tableLevelMaxLooback != null) {
+            return;
+        }
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String tableName = generateUniqueName();
             createTable(tableName);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
index 60de875c40..eb39199380 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
@@ -21,6 +21,7 @@ import static org.apache.phoenix.thirdparty.com.google.common.collect.Lists
         .newArrayListWithExpectedSize;
 import static org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost
         .PHOENIX_META_DATA_COPROCESSOR_CONF_KEY;
+import static org.apache.phoenix.exception.SQLExceptionCode.MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY;
 import static org.apache.phoenix.util.TestUtil.analyzeTable;
 import static org.apache.phoenix.util.TestUtil.getAllSplits;
 import static org.junit.Assert.assertArrayEquals;
@@ -29,6 +30,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThrows;
 
 import java.math.BigDecimal;
 import java.sql.Connection;
@@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.ExplainPlanAttributes;
@@ -1296,4 +1299,86 @@ public class ViewIT extends SplitSystemCatalogIT {
         }
     }
 
+    @Test
+    public void testCreateViewWithTableLevelMaxLookbackAge() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        Long maxLookbackAge = 300L;
+        String createDdl = "CREATE TABLE " + fullTableName +
+                " (id char(1) NOT NULL PRIMARY KEY,  col1 integer) MAX_LOOKBACK_AGE="+maxLookbackAge;
+        try(Connection conn = DriverManager.getConnection(getUrl());
+            Statement stmt = conn.createStatement()) {
+            stmt.execute(createDdl);
+            assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(fullTableName));
+            String childViewName = generateUniqueName();
+            String fullChildViewName = SchemaUtil.getTableName(schemaName, childViewName);
+            createDdl = "CREATE VIEW " + fullChildViewName + " AS SELECT * FROM " + fullTableName;
+            stmt.execute(createDdl);
+            assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(fullChildViewName));
+            String grandChildViewName = generateUniqueName();
+            String fullGrandChildViewName = SchemaUtil.getTableName(schemaName, grandChildViewName);
+            createDdl = "CREATE VIEW " + fullGrandChildViewName + " (col2 varchar) AS SELECT * FROM " + fullChildViewName;
+            stmt.execute(createDdl);
+            assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(fullGrandChildViewName));
+            String childViewIndexName = generateUniqueName();
+            createDdl = "CREATE INDEX " + childViewIndexName + " ON " + fullChildViewName + " (COL1)";
+            stmt.execute(createDdl);
+            assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(
+                    SchemaUtil.getTableName(schemaName, childViewIndexName)));
+            String grandChildViewIndexName = generateUniqueName();
+            createDdl = "CREATE INDEX " + grandChildViewIndexName + " ON " + fullGrandChildViewName + " (COL2)";
+            stmt.execute(createDdl);
+            assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(
+                    SchemaUtil.getTableName(schemaName, grandChildViewIndexName)));
+        }
+    }
+
+    @Test
+    public void testCreateInvalidViewWithTableLevelMaxLookbackAge() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        Long maxLookbackAge = 300L;
+        String createDdl = "CREATE TABLE " + fullTableName +
+                " (id char(1) NOT NULL PRIMARY KEY,  col1 integer)";
+        try(Connection conn = DriverManager.getConnection(getUrl());
+            Statement stmt = conn.createStatement()) {
+            stmt.execute(createDdl);
+            String viewName = generateUniqueName();
+            String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+            createDdl = "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullTableName;
+            String viewOptions = " MAX_LOOKBACK_AGE=" + maxLookbackAge;
+            String createDdlWithViewOptions = createDdl + viewOptions;
+            SQLException err = assertThrows(SQLException.class, () -> stmt.execute(createDdlWithViewOptions));
+            assertEquals(MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), err.getErrorCode());
+            stmt.execute(createDdl);
+            String viewIndexName = generateUniqueName();
+            createDdl = "CREATE INDEX " + viewIndexName + " ON " + fullViewName + " (COL1)";
+            String createIndexDdlWithViewOptions = createDdl + viewOptions;
+            err = assertThrows(SQLException.class, () -> stmt.execute(createIndexDdlWithViewOptions));
+            assertEquals(MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), err.getErrorCode());
+        }
+    }
+
+    @Test
+    public void testMappedViewForNullMaxLookbackAge() throws Exception {
+        try(Connection conn = DriverManager.getConnection(getUrl());
+            Statement stmt = conn.createStatement()) {
+            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+            byte[] cfA = Bytes.toBytes(SchemaUtil.normalizeIdentifier("a"));
+            String dataTableName = generateUniqueName();
+            byte[] hbaseDataTableName = SchemaUtil.getTableNameAsBytes("", dataTableName);
+            TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(hbaseDataTableName));
+            builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cfA));
+            admin.createTable(builder.build());
+            String createDdl = "CREATE VIEW " + dataTableName + " (id varchar primary key, a.col1 varchar)";
+            stmt.execute(createDdl);
+            assertNull(queryTableLevelMaxLookbackAge(dataTableName));
+            String viewIndexName = generateUniqueName();
+            createDdl = "CREATE INDEX " + viewIndexName + " ON " + dataTableName + " (a.col1)";
+            stmt.execute(createDdl);
+            assertNull(queryTableLevelMaxLookbackAge(viewIndexName));
+        }
+    }
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
index f6b7b3a99e..cb07031ac5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
@@ -27,6 +27,8 @@ import static org.apache.phoenix.exception.SQLExceptionCode
 import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_MUTATE_TABLE;
 import static org.apache.phoenix.exception.SQLExceptionCode
         .NOT_NULLABLE_COLUMN_IN_ROW_KEY;
+import static org.apache.phoenix.exception.SQLExceptionCode.VIEW_WITH_PROPERTIES;
+import static org.apache.phoenix.exception.SQLExceptionCode.MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData
         .SYSTEM_LINK_HBASE_TABLE_NAME;
@@ -49,6 +51,8 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -1371,6 +1375,36 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
         assertPKs(rs, new String[] {"K1", "K2", "K3", "K4"});
     }
 
+    @Test
+    public void testAlterViewAndViewIndexMaxLookbackAgeFails() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        try(Connection conn = DriverManager.getConnection(getUrl());
+            Statement stmt = conn.createStatement()) {
+            String ddl = "CREATE TABLE " + fullDataTableName + " (ID VARCHAR NOT NULL PRIMARY KEY, COL1 INTEGER)";
+            stmt.execute(ddl);
+            assertNull(queryTableLevelMaxLookbackAge(fullDataTableName));
+            String viewName = generateUniqueName();
+            String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+            ddl = "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullDataTableName;
+            stmt.execute(ddl);
+            assertNull(queryTableLevelMaxLookbackAge(fullViewName));
+            String alterViewDdl = "ALTER VIEW " + fullViewName + " SET MAX_LOOKBACK_AGE = 300";
+            SQLException err = assertThrows(SQLException.class, () -> stmt.execute(alterViewDdl));
+            assertEquals(VIEW_WITH_PROPERTIES.getErrorCode(), err.getErrorCode());
+            String viewIndexName = generateUniqueName();
+            String fullViewIndexName = SchemaUtil.getTableName(schemaName, viewIndexName);
+            ddl = "CREATE INDEX " + viewIndexName + " ON " + fullViewName + " (COL1)";
+            stmt.execute(ddl);
+            assertNull(queryTableLevelMaxLookbackAge(fullViewIndexName));
+            String alterViewIndexDdl = "ALTER INDEX " + viewIndexName + " ON " + fullViewName +
+                    " ACTIVE SET MAX_LOOKBACK_AGE = 300";
+            err = assertThrows(SQLException.class, () -> stmt.execute(alterViewIndexDdl));
+            assertEquals(MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), err.getErrorCode());
+        }
+    }
+
     private void assertPKs(ResultSet rs, String[] expectedPKs)
             throws SQLException {
         List<String> pkCols = newArrayListWithExpectedSize(expectedPKs.length);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
index a75a61fc47..c306c4e886 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
@@ -21,12 +21,14 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_SET_OR_ALTER_UPDATE_CACHE_FREQ_FOR_INDEX;
+import static org.apache.phoenix.exception.SQLExceptionCode.MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_UPDATE_CACHE_FREQUENCY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThrows;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -948,6 +950,28 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
         asssertIsWALDisabled(conn,indexName,false);
     }
 
+    @Test
+    public void testAlterIndexMaxLookbackAgeFails() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        try(Connection conn = DriverManager.getConnection(getUrl());
+            Statement stmt = conn.createStatement()) {
+            String ddl = "CREATE TABLE " + fullDataTableName + " (id varchar not null primary key, col1 integer)";
+            stmt.execute(ddl);
+            assertNull(queryTableLevelMaxLookbackAge(fullDataTableName));
+            String indexName = generateUniqueName();
+            String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+            ddl = "CREATE INDEX " + indexName + " ON " + fullDataTableName + " (COL1)";
+            stmt.execute(ddl);
+            assertNull(queryTableLevelMaxLookbackAge(fullIndexName));
+            String alterIndexDdl = "ALTER INDEX " + indexName + " ON " + fullDataTableName +
+                    " ACTIVE SET MAX_LOOKBACK_AGE = 300";
+            SQLException err = assertThrows(SQLException.class, () -> stmt.execute(alterIndexDdl));
+            assertEquals(MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), err.getErrorCode());
+        }
+    }
+
     private static void asssertIsWALDisabled(Connection conn, String fullTableName, boolean expectedValue) throws SQLException {
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         assertEquals(expectedValue, pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isWALDisabled());
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
index 894bf907a0..ba36ccc796 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
@@ -19,8 +19,13 @@ package org.apache.phoenix.end2end.transform;
 
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
+import org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtilHelper;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -43,14 +48,21 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.Ignore;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -66,17 +78,12 @@ import java.util.UUID;
 
 import static org.apache.phoenix.end2end.index.ImmutableIndexExtendedIT.getRowCountForEmptyColValue;
 import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.*;
 import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES;
 import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.apache.phoenix.util.TestUtil.getRowCount;
+import static org.apache.phoenix.schema.PTable.TransformStatus;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -84,10 +91,13 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
 public class TransformToolIT extends ParallelStatsDisabledIT {
     private static final Logger LOGGER = LoggerFactory.getLogger(TransformToolIT.class);
-    private final String tableDDLOptions;
+    private String tableDDLOptions;
+    private boolean mutable;
+    private StringBuilder optionBuilder = new StringBuilder();
 
     @Parameterized.Parameters(
             name = "mutable={0}")
@@ -101,8 +111,8 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
     }
 
     public TransformToolIT(boolean mutable) {
-        StringBuilder optionBuilder = new StringBuilder();
         optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=NONE ");
+        this.mutable = mutable;
         if (!mutable) {
             optionBuilder.append(", IMMUTABLE_ROWS=true ");
         }
@@ -127,10 +137,32 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
     }
 
     @AfterClass
-    public static synchronized void cleanup() {
+    public static synchronized void tearDown() {
         TransformMonitorTask.disableTransformMonitorTask(false);
     }
 
+    @Before
+    public void createIndexToolTables() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            IndexTool.createIndexToolTables(conn);
+        }
+        resetIndexRegionObserverFailPoints();
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            deleteAllRows(conn,
+                    TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES));
+            deleteAllRows(conn,
+                    TableName.valueOf(IndexVerificationResultRepository.RESULT_TABLE_NAME));
+        }
+        EnvironmentEdgeManager.reset();
+        resetIndexRegionObserverFailPoints();
+    }
+
     private void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows) throws SQLException {
         createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
     }
@@ -281,6 +313,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
      * Test presplitting an index table
      */
     @Test
+    @Ignore("Test is already broken")
     public void testSplitTable() throws Exception {
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
@@ -353,6 +386,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
     }
 
     @Test
+    @Ignore("Test is already broken")
     public void testDataAfterTransformingMultiColFamilyTable() throws Exception {
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
@@ -460,6 +494,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
     }
 
     @Test
+    @Ignore("Test is already broken")
     public void testTransformMutationReadRepair() throws Exception {
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
@@ -570,6 +605,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
     }
 
     @Test
+    @Ignore("Test is already broken")
     public void testTransformMutationFailureRepair() throws Exception {
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
@@ -684,6 +720,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
     }
 
     @Test
+    @Ignore("Test is already broken")
     public void testTransformVerify() throws Exception {
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
@@ -793,6 +830,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
     }
 
     @Test
+    @Ignore("Test is already broken")
     public void testTransformVerify_shouldFixUnverified() throws Exception {
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
@@ -959,6 +997,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
     }
 
     @Test
+    @Ignore("Test is already broken")
     public void testTransformForGlobalViews() throws Exception {
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
@@ -1033,6 +1072,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
     }
 
     @Test
+    @Ignore("Test is already broken")
     public void testTransformForTenantViews() throws Exception {
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
@@ -1120,6 +1160,85 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
         }
     }
 
+    @Test
+    public void testInvalidRowsWithTableLevelMaxLookback() throws Exception {
+        if (! mutable) {
+            return;
+        }
+        int maxLookbackAge = 12000;
+        optionBuilder.append(", MAX_LOOKBACK_AGE=").append(maxLookbackAge);
+        tableDDLOptions = optionBuilder.toString();
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        int delta = 5;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 2;
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN,
+                    PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName + " SET COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName,
+                    null, null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            assertEquals(Long.valueOf(maxLookbackAge), queryTableLevelMaxLookbackAge(record.getNewPhysicalTableName()));
+            assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN,
+                    PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+            ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge();
+            customEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(customEdge);
+            customEdge.incrementValue(delta);
+
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+            PreparedStatement stmt = conn.prepareStatement(upsertQuery);
+            IndexToolIT.upsertRow(stmt, ++numOfRows);
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+            customEdge.incrementValue(delta);
+            // 2 initial rows are missing in the new table as TransformTool hasn't run yet
+            assertEquals(numOfRows - 2, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(numOfRows, getRowCount(conn, dataTableFullName));
+            List<String> args = getArgList(schemaName, dataTableName, null, null, null,
+                    null, false, false, false, false, false);
+            args.add("-v");
+            args.add(IndexTool.IndexVerifyType.BEFORE.getValue());
+            TransformTool transformTool = runTransformTool(args.toArray(new String[0]), 0);
+            CounterGroup mrJobCounters = getMRJobCounters(transformTool);
+            assertEquals(1,
+                    mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue());
+            assertEquals(0,
+                    mrJobCounters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT.name()).getValue());
+            assertEquals(numOfRows, mrJobCounters.findCounter(REBUILT_INDEX_ROW_COUNT.name()).getValue());
+            customEdge.incrementValue(delta);
+            record = Transform.getTransformRecord(schemaName, dataTableName,
+                    null, null, conn.unwrap(PhoenixConnection.class));
+            assertEquals(TransformStatus.PENDING_CUTOVER.name(), record.getTransformStatus());
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+            stmt = conn.prepareStatement(upsertQuery);
+            IndexToolIT.upsertRow(stmt, ++numOfRows);
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+            customEdge.incrementValue(delta);
+            assertEquals(numOfRows, getRowCount(conn, record.getNewPhysicalTableName()));
+            assertEquals(numOfRows, getRowCount(conn, dataTableFullName));
+            customEdge.incrementValue(maxLookbackAge);
+            args = getArgList(schemaName, dataTableName, null, null, null,
+                    null, false, false, false, false, false);
+            args.add("-v");
+            args.add(IndexTool.IndexVerifyType.BEFORE.getValue());
+            transformTool = runTransformTool(args.toArray(new String[0]), 0);
+            mrJobCounters = getMRJobCounters(transformTool);
+            assertEquals(0,
+                    mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue());
+            assertEquals(1,
+                    mrJobCounters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT.name()).getValue());
+            assertEquals(1, mrJobCounters.findCounter(REBUILT_INDEX_ROW_COUNT.name()).getValue());
+        }
+    }
 
     public static Connection getTenantConnection(String tenant) throws SQLException {
         Properties props = new Properties();
@@ -1219,4 +1338,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
         return tt;
     }
 
-}
\ No newline at end of file
+    public static CounterGroup getMRJobCounters(TransformTool transformTool) throws IOException {
+        return transformTool.getJob().getCounters().getGroup(PhoenixIndexToolJobCounters.class.getName());
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 219374b9bf..76175e4dc9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -127,6 +127,11 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
@@ -142,6 +147,7 @@ import org.apache.phoenix.end2end.ParallelStatsEnabledIT;
 import org.apache.phoenix.end2end.ParallelStatsEnabledTest;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
@@ -2148,4 +2154,42 @@ public abstract class BaseTest {
         }
         return false;
     }
+
+    protected Long queryTableLevelMaxLookbackAge(String fullTableName) throws Exception {
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+            return pconn.getTableNoCache(fullTableName).getMaxLookbackAge();
+        }
+    }
+
+    public void deleteAllRows(Connection conn, TableName tableName) throws SQLException,
+            IOException, InterruptedException {
+        Scan scan = new Scan();
+        Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().
+                getAdmin();
+        org.apache.hadoop.hbase.client.Connection hbaseConn = admin.getConnection();
+        Table table = hbaseConn.getTable(tableName);
+        boolean deletedRows = false;
+        try (ResultScanner scanner = table.getScanner(scan)) {
+            for (Result r : scanner) {
+                Delete del = new Delete(r.getRow());
+                table.delete(del);
+                deletedRows = true;
+            }
+        } catch (Exception e) {
+            //if the table doesn't exist, we have no rows to delete. Easier to catch
+            //than to pre-check for existence
+        }
+        //don't flush/compact if we didn't write anything, because we'll hang forever
+        if (deletedRows) {
+            getUtility().getAdmin().flush(tableName);
+            TestUtil.majorCompact(getUtility(), tableName);
+        }
+    }
+
+    static public void resetIndexRegionObserverFailPoints() {
+        IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+        IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+        IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+    }
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index cf076f08ef..79b0168f77 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -830,8 +830,8 @@ public class TestUtil {
             // In HBase 2.5 getLastMajorCompactionTimestamp doesn't seem to get updated when the
             // clock is stopped, so check for the state going to NONE instead
             if (state.equals(CompactionState.NONE) && (previousState != null
-                    && previousState.equals(CompactionState.MAJOR_AND_MINOR)
-                    || previousState.equals(CompactionState.MAJOR))) {
+                    && (previousState.equals(CompactionState.MAJOR_AND_MINOR)
+                    || previousState.equals(CompactionState.MAJOR)))) {
                 break;
             }
             previousState = state;