You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2020/03/26 18:04:00 UTC

[phoenix] branch 4.x updated: PHOENIX-5734 - IndexScrutinyTool should not report rows beyond maxLookBack age

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 5521fd9  PHOENIX-5734 - IndexScrutinyTool should not report rows beyond maxLookBack age
5521fd9 is described below

commit 5521fd9d130629ac17ba431a423e58e17432e303
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Thu Mar 5 09:57:56 2020 -0800

    PHOENIX-5734 - IndexScrutinyTool should not report rows beyond maxLookBack age
---
 .../phoenix/end2end/IndexScrutinyToolBaseIT.java   |   7 +-
 .../end2end/IndexScrutinyToolForTenantIT.java      |   9 +-
 .../phoenix/end2end/IndexScrutinyToolIT.java       |  89 +++++-
 .../end2end/IndexScrutinyWithMaxLookbackIT.java    | 304 +++++++++++++++++++++
 .../org/apache/phoenix/end2end/MaxLookbackIT.java  |  59 +---
 .../NonParameterizedIndexScrutinyToolIT.java       |   8 +-
 .../hadoop/hbase/regionserver/ScanInfoUtil.java    |   4 +-
 .../org/apache/phoenix/compile/QueryCompiler.java  |   1 +
 .../mapreduce/index/IndexScrutinyMapper.java       |  54 +++-
 .../mapreduce/index/IndexScrutinyTableOutput.java  |  48 +++-
 .../phoenix/mapreduce/index/IndexScrutinyTool.java |  32 ++-
 .../index/PhoenixScrutinyJobCounters.java          |   8 +-
 .../ManualEnvironmentEdge.java}                    |  43 ++-
 .../java/org/apache/phoenix/util/SchemaUtil.java   |  19 ++
 .../index/IndexScrutinyMapperForTest.java          |   1 +
 .../java/org/apache/phoenix/util/TestUtil.java     |  44 ++-
 16 files changed, 602 insertions(+), 128 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
index 54a5408..980c0fa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
@@ -16,7 +16,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.phoenix.mapreduce.index.IndexScrutinyMapperForTest;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyMapper;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
@@ -54,8 +54,9 @@ public class IndexScrutinyToolBaseIT extends BaseTest {
                 new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
 
-    protected List<Job> runScrutiny(String[] cmdArgs) throws Exception {
-        IndexScrutinyTool scrutiny = new IndexScrutinyTool(IndexScrutinyMapperForTest.class);
+    protected List<Job> runScrutiny(Class<? extends IndexScrutinyMapper> mapperClass,
+                                    String[] cmdArgs) throws Exception {
+        IndexScrutinyTool scrutiny = new IndexScrutinyTool(mapperClass);
         Configuration conf = new Configuration(getUtility().getConfiguration());
         scrutiny.setConf(conf);
         int status = scrutiny.run(cmdArgs);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
index 351af9c..f4e1ce7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
@@ -16,6 +16,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyMapperForTest;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
@@ -115,7 +116,7 @@ public  class IndexScrutinyToolForTenantIT extends IndexScrutinyToolBaseIT {
                 getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.INDEX_TABLE_SOURCE, false, null, null, tenantId,
                         EnvironmentEdgeManager.currentTimeMillis());
 
-        List<Job> completedJobs = runScrutiny(argValues);
+        List<Job> completedJobs = runScrutiny(IndexScrutinyMapperForTest.class, argValues);
         // Sunny case, both index and view are equal. 1 row
         assertEquals(1, completedJobs.size());
         for (Job job : completedJobs) {
@@ -145,7 +146,7 @@ public  class IndexScrutinyToolForTenantIT extends IndexScrutinyToolBaseIT {
                 argValues =
                 getArgValues("", globalViewName, indexNameGlobal, 10L, SourceTable.INDEX_TABLE_SOURCE, false, null, null, null,
                         EnvironmentEdgeManager.currentTimeMillis());
-        List<Job> completedJobs = runScrutiny(argValues);
+        List<Job> completedJobs = runScrutiny(IndexScrutinyMapperForTest.class, argValues);
         // Sunny case, both index and view are equal. 1 row
         assertEquals(1, completedJobs.size());
         for (Job job : completedJobs) {
@@ -200,7 +201,7 @@ public  class IndexScrutinyToolForTenantIT extends IndexScrutinyToolBaseIT {
         String[]
                 argValues =
                 getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.BOTH, false, null, null, tenantId, EnvironmentEdgeManager.currentTimeMillis());
-        List<Job> completedJobs = runScrutiny(argValues);
+        List<Job> completedJobs = runScrutiny(IndexScrutinyMapperForTest.class, argValues);
 
         assertEquals(2, completedJobs.size());
         for (Job job : completedJobs) {
@@ -246,7 +247,7 @@ public  class IndexScrutinyToolForTenantIT extends IndexScrutinyToolBaseIT {
                 argValues =
                 getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.DATA_TABLE_SOURCE, true, outputFormat, null,
                         tenantId, EnvironmentEdgeManager.currentTimeMillis());
-        List<Job> completedJobs = runScrutiny(argValues);
+        List<Job> completedJobs = runScrutiny(IndexScrutinyMapperForTest.class, argValues);
 
         assertEquals(1, completedJobs.size());
         for (Job job : completedJobs) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index 76fce98..a56ebb3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyMapperForTest;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
@@ -75,6 +76,28 @@ import com.google.common.collect.Lists;
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
 public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
+    public static final String MISSING_ROWS_QUERY_TEMPLATE =
+        "SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , " +
+        "\"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , " +
+        "\"BEYOND_MAX_LOOKBACK\" , \"ID\" , \"NAME\" , \"EMPLOY_DATE\" , \"ZIP\" , \":ID\" , " +
+        "\"0:NAME\" , \"0:EMPLOY_DATE\" , \"0:ZIP\" " +
+        "FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"NAME\" VARCHAR," +
+        "\"EMPLOY_DATE\" TIMESTAMP,\"ZIP\" INTEGER,\":ID\" INTEGER,\"0:NAME\" VARCHAR," +
+        "\"0:EMPLOY_DATE\" DECIMAL,\"0:ZIP\" INTEGER) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\"," +
+        "\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('[TableName]','[IndexName]'," +
+        "[ScrutinyTs],[HasTargetRow]))";
+    public static final String BEYOND_MAX_LOOKBACK_QUERY_TEMPLATE =
+        "SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , " +
+            "\"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , " +
+            "\"BEYOND_MAX_LOOKBACK\" , \"ID\" , \"NAME\" , \"EMPLOY_DATE\" , \"ZIP\" , \":ID\" , " +
+            "\"0:NAME\" , \"0:EMPLOY_DATE\" , \"0:ZIP\" " +
+            "FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"NAME\" VARCHAR," +
+            "\"EMPLOY_DATE\" TIMESTAMP,\"ZIP\" INTEGER,\":ID\" INTEGER,\"0:NAME\" VARCHAR," +
+            "\"0:EMPLOY_DATE\" DECIMAL,\"0:ZIP\" INTEGER) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\"," +
+            "\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\", \"BEYOND_MAX_LOOKBACK\") " +
+            "IN (('[TableName]','[IndexName]'," +
+            "[ScrutinyTs],false,true))";
+
     private String dataTableDdl;
     private String indexTableDdl;
 
@@ -402,7 +425,7 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
         String[]
                 argValues =
                 getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null);
-        runScrutiny(argValues);
+        runScrutiny(IndexScrutinyMapperForTest.class, argValues);
 
         // check the output files
         Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName);
@@ -450,7 +473,7 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
         String[]
                 argValues =
                 getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null);
-        List<Job> completedJobs = runScrutiny(argValues);
+        List<Job> completedJobs = runScrutiny(IndexScrutinyMapperForTest.class, argValues);
 
         // check that the output table contains the invalid rows
         long
@@ -460,6 +483,10 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
                 invalidRowsQuery =
                 IndexScrutinyTableOutput
                         .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis);
+        String missingRowsQuery = getMissingRowsQuery(scrutinyTimeMillis);
+        String invalidColsQuery = getInvalidColsQuery(scrutinyTimeMillis);
+        String beyondMaxLookbackQuery = getBeyondMaxLookbackQuery(scrutinyTimeMillis);
+
         ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc");
         assertTrue(rs.next());
         assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
@@ -473,12 +500,37 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
         assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
         assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
         assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
+        assertFalse(rs.getBoolean("BEYOND_MAX_LOOKBACK"));
         assertEquals(3, rs.getInt("ID"));
         assertEquals(null, rs.getObject(":ID")); // null for missing target row
         assertFalse(rs.next());
 
         // check that the job results were written correctly to the metadata table
-        assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery);
+        assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery, missingRowsQuery,
+            invalidColsQuery, beyondMaxLookbackQuery);
+    }
+
+    private String getMissingRowsQuery(long scrutinyTimeMillis) {
+        return MISSING_ROWS_QUERY_TEMPLATE.
+            replace("[TableName]", dataTableFullName).
+            replace("[IndexName]", indexTableFullName).
+            replace("[ScrutinyTs]", Long.toString(scrutinyTimeMillis)).
+            replace("[HasTargetRow]", "false");
+    }
+
+    private String getInvalidColsQuery(long scrutinyTimeMillis) {
+        return MISSING_ROWS_QUERY_TEMPLATE.
+            replace("[TableName]", dataTableFullName).
+            replace("[IndexName]", indexTableFullName).
+            replace("[ScrutinyTs]", Long.toString(scrutinyTimeMillis)).
+            replace("[HasTargetRow]", "true");
+    }
+
+    private String getBeyondMaxLookbackQuery(long scrutinyTimeMillis) {
+        return BEYOND_MAX_LOOKBACK_QUERY_TEMPLATE.
+            replace("[TableName]", dataTableFullName).
+            replace("[IndexName]", indexTableFullName).
+            replace("[ScrutinyTs]", Long.toString(scrutinyTimeMillis));
     }
 
     /**
@@ -490,7 +542,7 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
         String[]
                 argValues =
                 getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1));
-        List<Job> completedJobs = runScrutiny(argValues);
+        List<Job> completedJobs = runScrutiny(IndexScrutinyMapperForTest.class, argValues);
         long
                 scrutinyTimeMillis =
                 PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
@@ -535,7 +587,11 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
         conn.commit();
     }
 
-    private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis, String invalidRowsQuery) throws SQLException {
+    private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis,
+                                           String invalidRowsQuery,
+                                           String missingRowsQuery,
+                                           String invalidColValuesQuery,
+                                           String beyondMaxLookbackQuery) throws SQLException {
         ResultSet rs;
         ResultSet
                 metadataRs =
@@ -545,11 +601,13 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
         assertTrue(metadataRs.next());
         List<? extends Object>
                 expected =
-                Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L,
+                Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
+                    SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L,
                         1L, 2L, 1L, 1L,
                         "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
                         "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
-                        invalidRowsQuery);
+                        invalidRowsQuery, missingRowsQuery, invalidColValuesQuery,
+                    beyondMaxLookbackQuery, 0L);
         if (dataTableDdl.contains("SALT_BUCKETS")) {
             expected =
                     Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
@@ -557,9 +615,9 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
                             2L, 1L, 2L,
                             "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
                             "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
-                            invalidRowsQuery);
+                            invalidRowsQuery, missingRowsQuery,
+                        invalidColValuesQuery, beyondMaxLookbackQuery, 0L);
         }
-
         assertRsValues(metadataRs, expected);
         String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
         rs = conn.createStatement().executeQuery(missingTargetQuery);
@@ -571,13 +629,19 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
         assertTrue(rs.next());
         assertEquals(2, rs.getInt("ID"));
         assertFalse(rs.next());
+        String lookbackQuery = metadataRs.getString("INVALID_ROWS_QUERY_BEYOND_MAX_LOOKBACK");
+        rs = conn.createStatement().executeQuery(lookbackQuery);
+        assertFalse(rs.next());
     }
 
     // assert the result set contains the expected values in the given order
     private void assertRsValues(ResultSet rs, List<? extends Object> expected) throws SQLException {
         for (int i = 0; i < expected.size(); i++) {
-            assertEquals(expected.get(i), rs.getObject(i + 1));
+            assertEquals("Failed comparing 1-based column " + (i + 1) +
+                    " out of " + expected.size(),
+                expected.get(i), rs.getObject(i + 1));
         }
+
     }
 
     private void generateUniqueTableNames() {
@@ -609,7 +673,8 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
     }
 
     private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
-        return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH,
+        return runScrutiny(IndexScrutinyMapperForTest.class,
+            getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH,
                 false, null, null, null, scrutinyTS));
     }
 
@@ -629,7 +694,7 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
                 cmdArgs =
                 getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
                         false, null, null, null, Long.MAX_VALUE);
-        return runScrutiny(cmdArgs);
+        return runScrutiny(IndexScrutinyMapperForTest.class, cmdArgs);
     }
 
     private void upsertRow(PreparedStatement stmt, int id, String name, int zip) throws SQLException {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java
new file mode 100644
index 0000000..dea19ff
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyMapper;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BEYOND_MAX_LOOKBACK_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
+
+    private static PreparedStatement upsertDataStmt;
+    private static String dataTableFullName;
+    private static String schema;
+    private static String dataTableName;
+    private static String indexTableName;
+    private static String indexTableFullName;
+    private static String viewName;
+    private static boolean isViewIndex;
+    private static ManualEnvironmentEdge testClock;
+    public static final String UPSERT_DATA = "UPSERT INTO %s VALUES (?, ?, ?)";
+    public static final String DELETE_SCRUTINY_METADATA = "DELETE FROM "
+        + IndexScrutinyTableOutput.OUTPUT_METADATA_TABLE_NAME;
+    public static final String DELETE_SCRUTINY_OUTPUT = "DELETE FROM " +
+        IndexScrutinyTableOutput.OUTPUT_TABLE_NAME;
+    public static final int MAX_LOOKBACK = 6;
+    private long scrutinyTs;
+
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
+        props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+            Integer.toString(MAX_LOOKBACK));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Before
+    public void setupTest() throws SQLException {
+        try(Connection conn = DriverManager.getConnection(getUrl(),
+            PropertiesUtil.deepCopy(TEST_PROPERTIES))){
+            conn.createStatement().execute(DELETE_SCRUTINY_METADATA);
+            conn.createStatement().execute(DELETE_SCRUTINY_OUTPUT);
+            conn.commit();
+        } catch (TableNotFoundException tnfe){
+            //This will happen the first time
+        }
+    }
+
+    @Test
+    public void testScrutinyOnRowsBeyondMaxLookBack() throws Exception {
+        setupTables();
+        upsertDataAndScrutinize(dataTableName, dataTableFullName, testClock);
+        assertBeyondMaxLookbackOutput(dataTableFullName, indexTableFullName);
+
+    }
+
+    @Test
+    public void testScrutinyOnRowsBeyondMaxLookback_viewIndex() throws Exception {
+        schema = "S"+generateUniqueName();
+        dataTableName = "T"+generateUniqueName();
+        dataTableFullName = SchemaUtil.getTableName(schema,dataTableName);
+        indexTableName = "VI"+generateUniqueName();
+        isViewIndex = true;
+        viewName = "V"+generateUniqueName();
+        String viewFullName = SchemaUtil.getTableName(schema,viewName);
+        String indexFullName = SchemaUtil.getTableName(schema, indexTableName);
+        String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, "
+            + "ZIP INTEGER) COLUMN_ENCODED_BYTES = 0, VERSIONS = 1 ";
+        String viewDDL = "CREATE VIEW %s AS SELECT * FROM %s";
+        String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) VERSIONS = 1";
+        testClock = new ManualEnvironmentEdge();
+
+        try (Connection conn =
+                 DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(String.format(dataTableDDL, dataTableFullName));
+            conn.createStatement().execute(String.format(viewDDL, viewFullName, dataTableFullName));
+            conn.createStatement().execute(String.format(indexTableDDL, indexTableName,
+                viewFullName));
+            conn.commit();
+        }
+        upsertDataAndScrutinize(viewName, viewFullName, testClock);
+        assertBeyondMaxLookbackOutput(viewFullName, indexFullName);
+    }
+
+    private void assertBeyondMaxLookbackOutput(String dataTableName, String indexTableName)
+        throws Exception {
+        try (Connection conn =
+                 DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            ResultSet metadataRS = IndexScrutinyTableOutput.queryAllLatestMetadata(conn, dataTableName,
+                    indexTableName);
+            assertTrue("No results from scrutiny metadata query!", metadataRS.next());
+            assertEquals(1, metadataRS.getLong("BEYOND_MAX_LOOKBACK_COUNT"));
+            String beyondLookbackOutputSql =
+                metadataRS.getString("INVALID_ROWS_QUERY_BEYOND_MAX_LOOKBACK");
+            assertNotNull(beyondLookbackOutputSql);
+            ResultSet outputRS = conn.createStatement().executeQuery(beyondLookbackOutputSql);
+            assertTrue("No results from scrutiny beyond max lookback output query!",
+                outputRS.next());
+            assertTrue("Beyond max lookback flag not set",
+                outputRS.getBoolean("BEYOND_MAX_LOOKBACK"));
+            assertFalse("Too many rows output from scrutiny beyond max lookback output query!",
+                outputRS.next());
+        }
+    }
+
+    @Test
+    public void testScrutinyOnDeletedRowsBeyondMaxLookBack() throws Exception {
+        setupTables();
+        upsertDataThenDeleteAndScrutinize(dataTableName, dataTableFullName, testClock);
+        assertBeyondMaxLookbackOutput(dataTableFullName, indexTableFullName);
+    }
+
+    private void setupTables() throws SQLException {
+        schema = "S" + generateUniqueName();
+        dataTableName = "T" + generateUniqueName();
+        indexTableName = "I" + generateUniqueName();
+        dataTableFullName = SchemaUtil.getTableName(schema, dataTableName);
+        indexTableFullName = SchemaUtil.getTableName(schema, indexTableName);
+        isViewIndex = false;
+        String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, "
+            + "ZIP INTEGER) COLUMN_ENCODED_BYTES=0, VERSIONS=1";
+        String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP)";
+        testClock = new ManualEnvironmentEdge();
+
+        try (Connection conn =
+                 DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(String.format(dataTableDDL, dataTableFullName));
+            conn.createStatement().execute(String.format(indexTableDDL, indexTableName,
+                dataTableFullName));
+            conn.commit();
+        }
+    }
+
+    private void upsertDataAndScrutinize(String tableName, String tableFullName,
+                                         ManualEnvironmentEdge testClock)
+        throws Exception {
+        try(Connection conn =
+                DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            // insert two rows
+            populateTable(tableFullName, conn);
+            long afterInsertSCN = EnvironmentEdgeManager.currentTimeMillis() + 1;
+            testClock.setValue(afterInsertSCN);
+            EnvironmentEdgeManager.injectEdge(testClock);
+            //move forward to the time we want to scrutinize, which is less than max lookback age
+            //for the initial inserts
+            testClock.incrementValue(MAX_LOOKBACK /2  * 1000);
+            scrutinyTs = EnvironmentEdgeManager.currentTimeMillis();
+            updateIndexRows(conn);
+            //now go past max lookback age for the initial 2 inserts
+            testClock.incrementValue(MAX_LOOKBACK /2  * 1000);
+            List<Job> completedJobs = runScrutiny(schema, tableName, indexTableName, scrutinyTs);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            assertCounters(job.getCounters());
+        }
+    }
+
+    private void populateTable(String tableFullName, Connection conn) throws SQLException {
+        upsertDataStmt = getUpsertDataStmt(tableFullName, conn);
+
+        NonParameterizedIndexScrutinyToolIT.upsertRow(upsertDataStmt, 1, "name-1", 98051);
+        NonParameterizedIndexScrutinyToolIT.upsertRow(upsertDataStmt, 2, "name-2", 98052);
+        conn.commit();
+    }
+
+    private void updateIndexRows(Connection conn) throws SQLException {
+        String tableName = isViewIndex ?
+            SchemaUtil.getTableName(schema, viewName) : dataTableFullName;
+        PreparedStatement stmt = getUpsertDataStmt(tableName, conn);
+        NonParameterizedIndexScrutinyToolIT.upsertRow(stmt, 1, "name-1", 38139);
+        conn.commit();
+    }
+
+    private void upsertDataThenDeleteAndScrutinize(String tableName, String tableFullName,
+                                                   ManualEnvironmentEdge testClock)
+        throws Exception {
+        try(Connection conn =
+                DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            populateTable(tableFullName, conn);
+            long afterInsertSCN = EnvironmentEdgeManager.currentTimeMillis() + 1;
+            testClock.setValue(afterInsertSCN);
+            EnvironmentEdgeManager.injectEdge(testClock);
+            deleteIndexRows(conn);
+            //move forward to the time we want to scrutinize, which is less than max lookback age
+            //for the initial inserts
+            testClock.incrementValue(MAX_LOOKBACK /2  * 1000);
+            scrutinyTs = EnvironmentEdgeManager.currentTimeMillis();
+            //now go past max lookback age for the initial 2 inserts
+            testClock.incrementValue(MAX_LOOKBACK /2  * 1000);
+
+            List<Job> completedJobs = runScrutiny(schema, tableName, indexTableName, scrutinyTs);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            assertCounters(job.getCounters());
+        }
+    }
+
+    private void deleteIndexRows(Connection conn) throws SQLException {
+        String deleteSql = "DELETE FROM " + SchemaUtil.getTableName(schema, indexTableName) + " " +
+            "LIMIT 1";
+        conn.createStatement().execute(deleteSql);
+        conn.commit();
+    }
+
+    private static PreparedStatement getUpsertDataStmt(String tableFullName, Connection conn) throws SQLException {
+        return conn.prepareStatement(String.format(UPSERT_DATA, tableFullName));
+    }
+
+    private void assertCounters(Counters counters) {
+        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+        assertEquals(1, getCounterValue(counters, BEYOND_MAX_LOOKBACK_COUNT));
+        assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+    }
+
+    private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+                                  Long scrutinyTs)
+        throws Exception {
+        return runScrutiny(schemaName, dataTableName, indexTableName, null, null, scrutinyTs);
+    }
+
+    private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+                                  Long batchSize, IndexScrutinyTool.SourceTable sourceTable,
+                                  Long scrutinyTs) throws Exception {
+        final String[]
+            cmdArgs =
+            getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
+                true, IndexScrutinyTool.OutputFormat.TABLE,
+                null, null, scrutinyTs);
+        return runScrutiny(MaxLookbackIndexScrutinyMapper.class, cmdArgs);
+    }
+
+    private static class MaxLookbackIndexScrutinyMapper extends IndexScrutinyMapper {
+        @Override
+        public void postSetup(){
+            try {
+                String tableToCompact;
+                if (isViewIndex){
+                    String physicalDataTableName =
+                        SchemaUtil.getPhysicalHBaseTableName(schema, dataTableName, false).getString();
+                    tableToCompact = MetaDataUtil.getViewIndexPhysicalName(physicalDataTableName);
+                } else {
+                    tableToCompact = SchemaUtil.getPhysicalHBaseTableName(schema, indexTableName, false).getString();
+                }
+                TableName indexTable = TableName.valueOf(tableToCompact);
+                testClock.incrementValue(1);
+                getUtility().getHBaseAdmin().flush(indexTable);
+                TestUtil.majorCompact(getUtility(), indexTable);
+            } catch (Exception e){
+                throw new RuntimeException(e);
+            }
+
+        }
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
index 37a3c81..15396cd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
@@ -31,6 +31,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.EnvironmentEdge;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -67,24 +68,6 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
     ManualEnvironmentEdge injectEdge;
     private int ttl;
 
-    private class ManualEnvironmentEdge extends EnvironmentEdge {
-        // Sometimes 0 ts might have a special value, so lets start with 1
-        protected long value = 1L;
-
-        public void setValue(long newValue) {
-            value = newValue;
-        }
-
-        public void incrementValue(long addedValue) {
-            value += addedValue;
-        }
-
-        @Override
-        public long currentTime() {
-            return this.value;
-        }
-    }
-
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
@@ -159,8 +142,8 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
             assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
             long beforeFirstCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
             injectEdge.incrementValue(1); //new ts for major compaction
-            majorCompact(dataTable, beforeFirstCompactSCN);
-            majorCompact(indexTable, beforeFirstCompactSCN);
+            majorCompact(dataTable);
+            majorCompact(indexTable);
             assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
             assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
             //wait for the lookback time. After this compactions should purge the deleted row
@@ -177,8 +160,8 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("upsert into " + dataTableName +
                 " values ('c', 'cd', 'cde', 'cdef')");
             conn.commit();
-            majorCompact(dataTable, beforeSecondCompactSCN);
-            majorCompact(indexTable, beforeSecondCompactSCN);
+            majorCompact(dataTable);
+            majorCompact(indexTable);
             //should still be ROWS_POPULATED because we added one and deleted one
             assertRawRowCount(conn, dataTable, ROWS_POPULATED);
             assertRawRowCount(conn, indexTable, ROWS_POPULATED);
@@ -241,8 +224,8 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
             assertRawRowCount(conn, dataTable, originalRowCount);
             assertRawRowCount(conn, indexTable, originalRowCount);
             injectEdge.incrementValue(1); //get a new timestamp for compaction
-            majorCompact(dataTable, EnvironmentEdgeManager.currentTimeMillis());
-            majorCompact(indexTable, EnvironmentEdgeManager.currentTimeMillis());
+            majorCompact(dataTable);
+            majorCompact(indexTable);
             //nothing should have been purged by this major compaction
             assertRawRowCount(conn, dataTable, originalRowCount);
             assertRawRowCount(conn, indexTable, originalRowCount);
@@ -253,8 +236,8 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
                 injectEdge.incrementValue(timeToAdvance);
             }
             //make sure that we can compact away the now-expired rows
-            majorCompact(dataTable, EnvironmentEdgeManager.currentTimeMillis());
-            majorCompact(indexTable, EnvironmentEdgeManager.currentTimeMillis());
+            majorCompact(dataTable);
+            majorCompact(indexTable);
             //note that before HBase 1.4, we don't have HBASE-17956
             // and this will always return 0 whether it's still on-disk or not
             assertRawRowCount(conn, dataTable, 0);
@@ -317,16 +300,16 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
             //after flush, check to make sure we can see all three versions at the appropriate times
             assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
             assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
-            majorCompact(dataTable, EnvironmentEdgeManager.currentTimeMillis());
-            majorCompact(indexTable, EnvironmentEdgeManager.currentTimeMillis());
+            majorCompact(dataTable);
+            majorCompact(indexTable);
             //after major compaction, check to make sure we can see all three versions
             // at the appropriate times
             assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
             assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
             injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
             long afterLookbackAgeSCN = EnvironmentEdgeManager.currentTimeMillis();
-            majorCompact(dataTable, afterLookbackAgeSCN);
-            majorCompact(indexTable, afterLookbackAgeSCN);
+            majorCompact(dataTable);
+            majorCompact(indexTable);
             //empty column, 1 version of val 1, 3 versions of val2, 1 version of val3 = 6
             assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 6);
             //2 versions of empty column, 2 versions of val2,
@@ -344,20 +327,8 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
         admin.flush(table);
     }
 
-    private void majorCompact(TableName table, long compactionRequestedSCN) throws Exception {
-        Admin admin = getUtility().getHBaseAdmin();
-        admin.majorCompact(table);
-        long lastCompactionTimestamp;
-        AdminProtos.GetRegionInfoResponse.CompactionState state = null;
-        while ((lastCompactionTimestamp = admin.getLastMajorCompactionTimestamp(table)) < compactionRequestedSCN
-            || (state = admin.getCompactionState(table)).
-            equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR)){
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Last compaction time:" + lastCompactionTimestamp);
-                LOG.trace("CompactionState: " + state);
-            }
-            Thread.sleep(100);
-        }
+    private void majorCompact(TableName table) throws Exception {
+        TestUtil.majorCompact(getUtility(), table);
     }
 
     private void assertMultiVersionLookbacks(String dataTableSelectSql,
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java
index 93a723d..2c99062 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java
@@ -223,7 +223,7 @@ public class NonParameterizedIndexScrutinyToolIT extends IndexScrutinyToolBaseIT
         }
     }
 
-    private void upsertRow(PreparedStatement stmt, int id, String name, byte[] val) throws
+    public static void upsertRow(PreparedStatement stmt, int id, String name, byte[] val) throws
             SQLException {
         int index = 1;
         // insert row
@@ -233,13 +233,13 @@ public class NonParameterizedIndexScrutinyToolIT extends IndexScrutinyToolBaseIT
         stmt.executeUpdate();
     }
 
-    private void upsertRow(PreparedStatement stmt, int id, String name, int zip)
+    public static void upsertRow(PreparedStatement stmt, int id, String name, int val)
             throws SQLException {
         int index = 1;
         // insert row
         stmt.setInt(index++, id);
         stmt.setString(index++, name);
-        stmt.setInt(index++, zip);
+        stmt.setInt(index++, val);
         stmt.executeUpdate();
     }
 
@@ -254,6 +254,6 @@ public class NonParameterizedIndexScrutinyToolIT extends IndexScrutinyToolBaseIT
                 cmdArgs =
                 getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
                         false, null, null, null, Long.MAX_VALUE);
-        return runScrutiny(cmdArgs);
+        return runScrutiny(IndexScrutinyMapperForTest.class, cmdArgs);
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
index 6854a75..e70ffc7 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
@@ -56,7 +56,7 @@ public class ScanInfoUtil {
     public static long getTimeToLiveForCompactions(HColumnDescriptor columnDescriptor,
                                                    ScanInfo scanInfo) {
         long ttl = scanInfo.getTtl();
-        long maxLookbackTtl = getMaxLookback(scanInfo.getConfiguration());
+        long maxLookbackTtl = getMaxLookbackInMillis(scanInfo.getConfiguration());
         if (isMaxLookbackTimeEnabled(maxLookbackTtl)) {
             if (ttl == Long.MAX_VALUE
                 && columnDescriptor.getKeepDeletedCells() != KeepDeletedCells.TRUE) {
@@ -110,7 +110,7 @@ public class ScanInfoUtil {
             oldScanInfo.getComparator());
     }
 
-    private static long getMaxLookback(Configuration conf){
+    public static long getMaxLookbackInMillis(Configuration conf){
         //config param is in seconds, switch to millis
         return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
             DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 2cf9b10..5f75e8b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -87,6 +87,7 @@ import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.phoenix.util.SchemaUtil;
 import com.google.common.base.Optional;
 
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index 2359b4a..bf3c765 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -52,7 +53,6 @@ import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.ConnectionQueryServicesImpl;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.util.ColumnInfo;
@@ -66,7 +66,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 
-
 /**
  * Mapper that reads from the data table and checks the rows against the index table
  */
@@ -96,6 +95,12 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
     private long outputMaxRows;
     private MessageDigest md5;
     private long ttl;
+    private long scnTimestamp;
+    private long maxLookbackAgeMillis;
+
+    protected long getScrutinyTs(){
+        return scnTimestamp;
+    }
 
     @Override
     protected void setup(final Context context) throws IOException, InterruptedException {
@@ -107,6 +112,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
             final Properties overrideProps = new Properties();
             String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
             overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+            scnTimestamp = new Long(scn);
             connection = ConnectionUtil.getOutputConnection(configuration, overrideProps);
             connection.setAutoCommit(false);
             batchSize = PhoenixConfigurationUtil.getScrutinyBatchSize(configuration);
@@ -161,12 +167,18 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
             LOGGER.info("Target table base query: " + targetTableQuery);
             md5 = MessageDigest.getInstance("MD5");
             ttl = getTableTtl();
+            maxLookbackAgeMillis = ScanInfoUtil.getMaxLookbackInMillis(configuration);
         } catch (SQLException | NoSuchAlgorithmException e) {
             tryClosingResourceSilently(this.outputUpsertStmt);
             tryClosingResourceSilently(this.connection);
             tryClosingResourceSilently(this.outputConn);
             throw new RuntimeException(e);
         }
+        postSetup();
+    }
+
+    protected void postSetup() {
+
     }
 
     private static void tryClosingResourceSilently(AutoCloseable res) {
@@ -244,14 +256,10 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
             // fetch results from the target table and output invalid rows
             queryTargetTable(context, targetStatement, targetPkToSourceValues);
 
-            //check if there are any invalid rows that have been expired, report them
-            //with EXPIRED_ROW_COUNT
-            checkIfInvalidRowsExpired(context, targetPkToSourceValues);
+            //check if any invalid rows are just temporary side effects of ttl or compaction,
+            //and if so remove them from the list and count them as separate metrics
+            categorizeInvalidRows(context, targetPkToSourceValues);
 
-            // any source values we have left over are invalid (e.g. data table rows without
-            // corresponding index row)
-            context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT)
-                    .increment(targetPkToSourceValues.size());
             if (outputInvalidRows) {
                 for (Pair<Long, List<Object>> sourceRowWithoutTargetRow : targetPkToSourceValues.values()) {
                     List<Object> valuesWithoutTarget = sourceRowWithoutTargetRow.getSecond();
@@ -274,8 +282,8 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
 
     protected void preQueryTargetTable() { }
 
-    protected void checkIfInvalidRowsExpired(Context context,
-            Map<String, Pair<Long,
+    protected void categorizeInvalidRows(Context context,
+                                         Map<String, Pair<Long,
             List<Object>>> targetPkToSourceValues) {
         Set<Map.Entry<String, Pair<Long, List<Object>>>>
                 entrySet = targetPkToSourceValues.entrySet();
@@ -288,8 +296,15 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
             Pair<Long, List<Object>> sourceValues = entry.getValue();
             Long sourceTS = sourceValues.getFirst();
             if (hasRowExpiredOnSource(sourceTS, ttl)) {
-                context.getCounter(PhoenixScrutinyJobCounters.EXPIRED_ROW_COUNT).increment(1);
-                itr.remove();
+                context.getCounter(PhoenixScrutinyJobCounters.EXPIRED_ROW_COUNT).increment(1L);
+                itr.remove(); //don't output to the scrutiny table
+            } else if (isRowOlderThanMaxLookback(sourceTS)) {
+                context.getCounter(PhoenixScrutinyJobCounters.BEYOND_MAX_LOOKBACK_COUNT).increment(1L);
+                //still output to the scrutiny table just in case it's useful
+            } else {
+                // otherwise it's invalid (e.g. data table rows without corresponding index row)
+                context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT)
+                    .increment(1L);
             }
         }
     }
@@ -299,7 +314,16 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
         return ttl != Integer.MAX_VALUE && sourceTS + ttl*1000 < currentTS;
     }
 
-    private long getTableTtl() throws SQLException, IOException {
+    protected boolean isRowOlderThanMaxLookback(Long sourceTS){
+        if (maxLookbackAgeMillis == ScanInfoUtil.DEFAULT_PHOENIX_MAX_LOOKBACK_AGE * 1000){
+            return false;
+        }
+        long now =  EnvironmentEdgeManager.currentTimeMillis();
+        long maxLookBackTimeMillis = now - maxLookbackAgeMillis;
+        return sourceTS <= maxLookBackTimeMillis;
+    }
+
+    private int getTableTtl() throws SQLException, IOException {
         PTable pSourceTable = PhoenixRuntime.getTable(connection, qSourceTable);
         if (pSourceTable.getType() == PTableType.INDEX
                 && pSourceTable.getIndexType() == PTable.IndexType.LOCAL) {
@@ -417,6 +441,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
         outputUpsertStmt.setLong(index++, sourceTS); // SOURCE_TS
         outputUpsertStmt.setLong(index++, targetTS); // TARGET_TS
         outputUpsertStmt.setBoolean(index++, targetValues != null); // HAS_TARGET_ROW
+        outputUpsertStmt.setBoolean(index++, isRowOlderThanMaxLookback(sourceTS));
         index = setStatementObjects(sourceValues, index, sourceTblColumnMetadata);
         if (targetValues != null) {
             index = setStatementObjects(targetValues, index, targetTblColumnMetadata);
@@ -498,4 +523,5 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
             md5.reset();
         }
     }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
index 411214e..986b435 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
@@ -68,6 +68,7 @@ public class IndexScrutinyTableOutput {
             "    SOURCE_TS BIGINT,\n" +
             "    TARGET_TS BIGINT,\n" +
             "    HAS_TARGET_ROW BOOLEAN,\n" +
+            "    BEYOND_MAX_LOOKBACK BOOLEAN,\n" +
             "    CONSTRAINT PK PRIMARY KEY\n" +
             "    (\n" +
             "        " + SOURCE_TABLE_COL_NAME + ",\n" +
@@ -75,7 +76,10 @@ public class IndexScrutinyTableOutput {
             "        " + SCRUTINY_EXECUTE_TIME_COL_NAME + ",\n" + // time at which the scrutiny ran
             "        SOURCE_ROW_PK_HASH\n" + //  this hash makes the PK unique
             "    )\n" + // dynamic columns consisting of the source and target columns will follow
-            ")";
+            ")  COLUMN_ENCODED_BYTES = 0 "; //column encoding not supported with dyn columns (PHOENIX-5107)
+    public static final String OUTPUT_TABLE_BEYOND_LOOKBACK_DDL = "" +
+        "ALTER TABLE " + OUTPUT_TABLE_NAME + "\n" +
+        " ADD IF NOT EXISTS BEYOND_MAX_LOOKBACK BOOLEAN";
 
     /**
      * This table holds metadata about a scrutiny job - result counters and queries to fetch invalid
@@ -102,6 +106,8 @@ public class IndexScrutinyTableOutput {
             "    INVALID_ROWS_QUERY_ALL VARCHAR,\n" + // stored sql query to fetch all the invalid rows from the output table
             "    INVALID_ROWS_QUERY_MISSING_TARGET VARCHAR,\n" +  // stored sql query to fetch all the invalid rows which are missing a target row
             "    INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL VARCHAR,\n" + // stored sql query to fetch all the invalid rows which have bad covered column values
+            "    INVALID_ROWS_QUERY_BEYOND_MAX_LOOKBACK VARCHAR,\n" + // stored sql query to fetch all the potentially invalid rows which are before max lookback age
+            "    BEYOND_MAX_LOOKBACK_COUNT BIGINT,\n" +
             "    CONSTRAINT PK PRIMARY KEY\n" +
             "    (\n" +
             "        " + SOURCE_TABLE_COL_NAME + ",\n" +
@@ -109,8 +115,12 @@ public class IndexScrutinyTableOutput {
             "        " + SCRUTINY_EXECUTE_TIME_COL_NAME + "\n" +
             "    )\n" +
             ")\n";
+    public static final String OUTPUT_METADATA_BEYOND_LOOKBACK_COUNTER_DDL = "" +
+        "ALTER TABLE " + OUTPUT_METADATA_TABLE_NAME + "\n" +
+        " ADD IF NOT EXISTS INVALID_ROWS_QUERY_BEYOND_MAX_LOOKBACK VARCHAR, \n" +
+        " BEYOND_MAX_LOOKBACK_COUNT BIGINT";
 
-    public static final String UPSERT_METADATA_SQL = "UPSERT INTO " + OUTPUT_METADATA_TABLE_NAME + " VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+    public static final String UPSERT_METADATA_SQL = "UPSERT INTO " + OUTPUT_METADATA_TABLE_NAME + " VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
 
     /**
      * Gets the parameterized upsert sql to the output table Used by the scrutiny MR job to write
@@ -177,6 +187,21 @@ public class IndexScrutinyTableOutput {
         return paramQuery.replaceFirst("\\?", "true");
     }
 
+    public static String getSqlQueryBeyondMaxLookback(Connection conn,
+    SourceTargetColumnNames columnNames, long scrutinyTimeMillis) throws SQLException {
+        String whereQuery =
+            constructOutputTableQuery(conn, columnNames,
+                getPksCsv() + ", " + SchemaUtil.getEscapedFullColumnName("HAS_TARGET_ROW")
+                    + ", " + SchemaUtil.getEscapedFullColumnName("BEYOND_MAX_LOOKBACK"));
+        String inClause =
+            " IN " + QueryUtil.constructParameterizedInClause(getPkCols().size() + 2, 1);
+        String paramQuery = whereQuery + inClause;
+        paramQuery = bindPkCols(columnNames, scrutinyTimeMillis, paramQuery);
+        paramQuery = paramQuery.replaceFirst("\\?", "false"); //has_target_row false
+        paramQuery = paramQuery.replaceFirst("\\?", "true"); //beyond_max_lookback true
+        return paramQuery;
+    }
+
     /**
      * Query the metadata table for the given columns
      * @param conn connection to use
@@ -197,6 +222,22 @@ public class IndexScrutinyTableOutput {
         return ps.executeQuery();
     }
 
+    public static ResultSet queryAllLatestMetadata(Connection conn, String qSourceTableName,
+                                                   String qTargetTableName) throws SQLException {
+        String sql = "SELECT MAX(" + SCRUTINY_EXECUTE_TIME_COL_NAME + ") " +
+            "FROM " + OUTPUT_METADATA_TABLE_NAME +
+            " WHERE " + SOURCE_TABLE_COL_NAME + " = ?" + " AND " + TARGET_TABLE_COL_NAME + "= ?";
+        PreparedStatement stmt = conn.prepareStatement(sql);
+        stmt.setString(1, qSourceTableName);
+        stmt.setString(2, qTargetTableName);
+        ResultSet rs = stmt.executeQuery();
+        long scrutinyTimeMillis = 0L;
+        if (rs.next()){
+            scrutinyTimeMillis = rs.getLong(1);
+        } //even if we didn't find one, still need to do a query to return the right columns
+        return queryAllMetadata(conn, qSourceTableName, qTargetTableName, scrutinyTimeMillis);
+    }
+
     /**
      * Query the metadata table for all columns
      * @param conn connection to use
@@ -258,6 +299,9 @@ public class IndexScrutinyTableOutput {
             pStmt.setString(index++, getSqlQueryAllInvalidRows(conn, columnNames, scrutinyExecuteTime));
             pStmt.setString(index++, getSqlQueryMissingTargetRows(conn, columnNames, scrutinyExecuteTime));
             pStmt.setString(index++, getSqlQueryBadCoveredColVal(conn, columnNames, scrutinyExecuteTime));
+            pStmt.setString(index++, getSqlQueryBeyondMaxLookback(conn, columnNames, scrutinyExecuteTime));
+            pStmt.setLong(index++,
+                counters.findCounter(PhoenixScrutinyJobCounters.BEYOND_MAX_LOOKBACK_COUNT).getValue());
             pStmt.addBatch();
         }
         pStmt.executeBatch();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
index dda537f..b0d4d2c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -116,9 +117,9 @@ public class IndexScrutinyTool extends Configured implements Tool {
     public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_SCRUTINY_[%s]_[%s]";
 
     @Inject
-    Class<IndexScrutinyMapperForTest> mapperClass = null;
+    Class<? extends IndexScrutinyMapper> mapperClass = null;
 
-    public IndexScrutinyTool(Class<IndexScrutinyMapperForTest> indexScrutinyMapperForTestClass) {
+    public IndexScrutinyTool(Class<? extends IndexScrutinyMapper> indexScrutinyMapperForTestClass) {
         this.mapperClass = indexScrutinyMapperForTestClass;
     }
 
@@ -215,11 +216,12 @@ public class IndexScrutinyTool extends Configured implements Tool {
         private long scrutinyExecuteTime;
         private long outputMaxRows; // per mapper
         private String tenantId;
-        Class<IndexScrutinyMapperForTest> mapperClass;
+        Class<? extends IndexScrutinyMapper> mapperClass;
 
         public JobFactory(Connection connection, Configuration configuration, long batchSize,
                 boolean useSnapshot, long ts, boolean outputInvalidRows, OutputFormat outputFormat,
-                String basePath, long outputMaxRows, String tenantId, Class<IndexScrutinyMapperForTest> mapperClass) {
+                String basePath, long outputMaxRows, String tenantId,
+                          Class<? extends IndexScrutinyMapper> mapperClass) {
             this.outputInvalidRows = outputInvalidRows;
             this.outputFormat = outputFormat;
             this.basePath = basePath;
@@ -242,7 +244,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
         }
 
         public Job createSubmittableJob(String schemaName, String indexTable, String dataTable,
-                SourceTable sourceTable, Class<IndexScrutinyMapperForTest> mapperClass) throws Exception {
+                SourceTable sourceTable, Class<? extends IndexScrutinyMapper> mapperClass) throws Exception {
             Preconditions.checkArgument(SourceTable.DATA_TABLE_SOURCE.equals(sourceTable)
                     || SourceTable.INDEX_TABLE_SOURCE.equals(sourceTable));
 
@@ -336,7 +338,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
             return configureSubmittableJob(job, outputPath, mapperClass);
         }
 
-        private Job configureSubmittableJob(Job job, Path outputPath, Class<IndexScrutinyMapperForTest> mapperClass) throws Exception {
+        private Job configureSubmittableJob(Job job, Path outputPath, Class<? extends IndexScrutinyMapper> mapperClass) throws Exception {
             Configuration conf = job.getConfiguration();
             conf.setBoolean("mapreduce.job.user.classpath.first", true);
             HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
@@ -414,6 +416,8 @@ public class IndexScrutinyTool extends Configured implements Tool {
                             ? Long.parseLong(cmdLine.getOptionValue(TIMESTAMP.getOpt()))
                             : EnvironmentEdgeManager.currentTimeMillis() - 60000;
 
+            validateTimestamp(configuration, ts);
+
             if (indexTable != null) {
                 if (!IndexTool.isValidIndexTable(connection, qDataTable, indexTable, tenantId)) {
                     throw new IllegalArgumentException(String
@@ -438,8 +442,12 @@ public class IndexScrutinyTool extends Configured implements Tool {
                 outputConfiguration.unset(PhoenixRuntime.TENANT_ID_ATTRIB);
                 try (Connection outputConn = ConnectionUtil.getOutputConnection(outputConfiguration)) {
                     outputConn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_TABLE_DDL);
+                    outputConn.createStatement().
+                        execute(IndexScrutinyTableOutput.OUTPUT_TABLE_BEYOND_LOOKBACK_DDL);
                     outputConn.createStatement()
                             .execute(IndexScrutinyTableOutput.OUTPUT_METADATA_DDL);
+                    outputConn.createStatement().
+                        execute(IndexScrutinyTableOutput.OUTPUT_METADATA_BEYOND_LOOKBACK_COUNTER_DDL);
                 }
             }
 
@@ -505,6 +513,18 @@ public class IndexScrutinyTool extends Configured implements Tool {
         }
     }
 
+    private void validateTimestamp(Configuration configuration, long ts) {
+        long maxLookBackAge = ScanInfoUtil.getMaxLookbackInMillis(configuration);
+        if (maxLookBackAge != ScanInfoUtil.DEFAULT_PHOENIX_MAX_LOOKBACK_AGE * 1000L) {
+            long minTimestamp = EnvironmentEdgeManager.currentTimeMillis() - maxLookBackAge;
+            if (ts < minTimestamp){
+                throw new IllegalArgumentException("Index scrutiny can't look back past the configured" +
+                    "max lookback age: " + maxLookBackAge / 1000 + " seconds");
+            }
+        }
+
+    }
+
     @VisibleForTesting
     public List<Job> getJobs() {
         return jobs;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
index 43965b7..54e8a94 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
@@ -41,5 +41,11 @@ public enum PhoenixScrutinyJobCounters {
     /**
      * Number of batches processed
      */
-    BATCHES_PROCESSED_COUNT;
+    BATCHES_PROCESSED_COUNT,
+    /**
+     * Number of rows in source that became older than the max lookback age while scrutiny
+     * was comparing them with the target, and didn't match. We break these out separately because
+     * they could be due to extra versions being compacted, and are harmless.
+     */
+    BEYOND_MAX_LOOKBACK_COUNT;
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ManualEnvironmentEdge.java
similarity index 53%
copy from phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
copy to phoenix-core/src/main/java/org/apache/phoenix/util/ManualEnvironmentEdge.java
index 43965b7..0ba7210 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ManualEnvironmentEdge.java
@@ -1,4 +1,3 @@
-/**
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -16,30 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.phoenix.mapreduce.index;
+package org.apache.phoenix.util;
 
-/**
- * Counters used for Index Scrutiny MR job
- */
-public enum PhoenixScrutinyJobCounters {
-    /**
-     * number of rows in data table with a valid index row (or vice-versa)
-     */
-    VALID_ROW_COUNT,
-    /**
-     * number of rows in data table with an invalid index row (or vice-versa)
-     */
-    INVALID_ROW_COUNT,
-    /**
-     * Number of rows in the index table with an incorrect covered column value
-     */
-    BAD_COVERED_COL_VAL_COUNT,
-    /**
-     * Number of rows in source that have expired while scrutiny was comparing them with target
-     */
-    EXPIRED_ROW_COUNT,
-    /**
-     * Number of batches processed
-     */
-    BATCHES_PROCESSED_COUNT;
+public class ManualEnvironmentEdge extends EnvironmentEdge {
+    // Sometimes 0 ts might have a special value, so lets start with 1
+    protected long value = 1L;
+
+    public void setValue(long newValue) {
+        value = newValue;
+    }
+
+    public void incrementValue(long addedValue) {
+        value += addedValue;
+    }
+
+    @Override
+    public long currentTime() {
+        return this.value;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 9a23886..732e4cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -49,6 +49,9 @@ import javax.annotation.Nullable;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Result;
@@ -1183,6 +1186,22 @@ public class SchemaUtil {
 		return isNullable ? ResultSetMetaData.columnNullable : ResultSetMetaData.columnNoNulls;
 	}
 
+	public static int getTimeToLive(PhoenixConnection conn, String physicalName) throws SQLException {
+        byte[] tableQualifier = Bytes.toBytes(physicalName);
+        return getTimeToLive(conn, tableQualifier);
+    }
+
+    public static int getTimeToLive(PhoenixConnection conn, byte[] tableQualifier)
+     throws SQLException {
+        HTableDescriptor td = conn.getQueryServices().getTableDescriptor(tableQualifier);
+        HColumnDescriptor[] cds = td.getColumnFamilies();
+        if (cds.length > 0){
+            return cds[0].getTimeToLive();
+        } else {
+            return HConstants.FOREVER;
+        }
+    }
+
 	public static boolean hasGlobalIndex(PTable table) {
         for (PTable index : table.getIndexes()) {
             if (index.getIndexType() == IndexType.GLOBAL) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
similarity index 99%
rename from phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
rename to phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
index 99d50ee..bad2c1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
@@ -23,6 +23,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
 public class IndexScrutinyMapperForTest extends IndexScrutinyMapper {
 
     public static final int TEST_TABLE_TTL = 3600;
+
     public static class ScrutinyTestClock extends EnvironmentEdge {
         long initialTime;
         long delta;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 290ba8d..98104b4 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -58,6 +58,7 @@ import java.util.Properties;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -78,6 +79,7 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.AggregationManager;
 import org.apache.phoenix.compile.ColumnResolver;
@@ -801,6 +803,21 @@ public class TestUtil {
             conn.createStatement().execute(ddl);
     }
 
+    public static void majorCompact(HBaseTestingUtility utility, TableName table)
+        throws IOException, InterruptedException {
+        long compactionRequestedSCN = EnvironmentEdgeManager.currentTimeMillis();
+        Admin admin = utility.getHBaseAdmin();
+        admin.majorCompact(table);
+        long lastCompactionTimestamp;
+        AdminProtos.GetRegionInfoResponse.CompactionState state = null;
+        while ((lastCompactionTimestamp = admin.getLastMajorCompactionTimestamp(table))
+            < compactionRequestedSCN
+            || (state = admin.getCompactionState(table)).
+            equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR)){
+            Thread.sleep(100);
+        }
+    }
+
     /**
      * Runs a major compaction, and then waits until the compaction is complete before returning.
      *
@@ -895,7 +912,7 @@ public class TestUtil {
                     current = cellScanner.current();
                     System.out.println(current + "column= " +
                         Bytes.toString(CellUtil.cloneQualifier(current)) +
-                        " val=" + Bytes.toString(CellUtil.cloneValue(current)));
+                        " val=" + Bytes.toStringBinary(CellUtil.cloneValue(current)));
                 }
             }
         }
@@ -993,18 +1010,25 @@ public class TestUtil {
 
     public static void printResultSet(ResultSet rs) throws SQLException {
         while(rs.next()){
-            StringBuilder builder = new StringBuilder();
-            int columnCount = rs.getMetaData().getColumnCount();
-            for(int i = 0; i < columnCount; i++) {
-                Object value = rs.getObject(i+1);
-                String output = value == null ? "null" : value.toString();
-                builder.append(output);
-                if(i + 1 < columnCount){
-                    builder.append(",");
+            printResult(rs, false);
+        }
+    }
+
+    public static void printResult(ResultSet rs, boolean multiLine) throws SQLException {
+        StringBuilder builder = new StringBuilder();
+        int columnCount = rs.getMetaData().getColumnCount();
+        for(int i = 0; i < columnCount; i++) {
+            Object value = rs.getObject(i+1);
+            String output = value == null ? "null" : value.toString();
+            builder.append(output);
+            if(i + 1 < columnCount){
+                builder.append(",");
+                if (multiLine){
+                    builder.append("\n");
                 }
             }
-            System.out.println(builder.toString());
         }
+        System.out.println(builder.toString());
     }
 
     public static void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState indexState) throws InterruptedException, SQLException {