You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/01/13 02:32:47 UTC

[phoenix] branch 4.15-HBase-1.5 updated: PHOENIX-5651: IndexScrutiny does not handle TTL/row-expiry

This is an automated email from the ASF dual-hosted git repository.

skadam pushed a commit to branch 4.15-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.15-HBase-1.5 by this push:
     new 57a1e1e  PHOENIX-5651: IndexScrutiny does not handle TTL/row-expiry
57a1e1e is described below

commit 57a1e1e9cbc01b12fd7422725227ee2ff985f914
Author: s.kadam <s....@salesforce.com>
AuthorDate: Fri Jan 10 10:27:09 2020 -0800

    PHOENIX-5651: IndexScrutiny does not handle TTL/row-expiry
---
 .../phoenix/end2end/IndexScrutinyToolBaseIT.java   |   3 +-
 .../end2end/IndexScrutinyToolForTenantIT.java      |   2 +
 .../phoenix/end2end/IndexScrutinyToolIT.java       |  56 -------
 .../NonParameterizedIndexScrutinyToolIT.java       | 164 +++++++++++++++++++++
 .../mapreduce/index/IndexScrutinyMapper.java       |  99 +++++++++++--
 .../index/IndexScrutinyMapperForTest.java          |  30 ++++
 .../phoenix/mapreduce/index/IndexScrutinyTool.java |  36 +++--
 .../index/PhoenixScrutinyJobCounters.java          |   4 +
 8 files changed, 310 insertions(+), 84 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
index 9f368a8..54a5408 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
@@ -16,6 +16,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyMapperForTest;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
@@ -54,7 +55,7 @@ public class IndexScrutinyToolBaseIT extends BaseTest {
     }
 
     protected List<Job> runScrutiny(String[] cmdArgs) throws Exception {
-        IndexScrutinyTool scrutiny = new IndexScrutinyTool();
+        IndexScrutinyTool scrutiny = new IndexScrutinyTool(IndexScrutinyMapperForTest.class);
         Configuration conf = new Configuration(getUtility().getConfiguration());
         scrutiny.setConf(conf);
         int status = scrutiny.run(cmdArgs);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
index 206e793..351af9c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
@@ -89,6 +89,8 @@ public  class IndexScrutinyToolForTenantIT extends IndexScrutinyToolBaseIT {
 
         String idxStmtTenant = String.format(createIndexStr, indexNameTenant, tenantViewName);
         connTenant.createStatement().execute(idxStmtTenant);
+        connTenant.commit();
+        connGlobal.commit();
     }
 
     @After public void teardown() throws SQLException {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index 13df56d..76fce98 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
@@ -69,7 +68,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 
 /**
  * Tests for the {@link IndexScrutinyTool}
@@ -164,51 +162,6 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
         assertEquals(numIndexRows, countRows(conn, indexTableFullName));
     }
 
-    @Test public void testScrutinyOnArrayTypes() throws Exception {
-        String dataTableName = generateUniqueName();
-        String indexTableName = generateUniqueName();
-        String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, VB VARBINARY)";
-        String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (VB)";
-        String upsertData = "UPSERT INTO %s VALUES (?, ?, ?)";
-        String upsertIndex = "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:VB\") values (?,?,?)";
-
-        try (Connection conn =
-                DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
-            conn.createStatement().execute(String.format(dataTableDDL, dataTableName));
-            conn.createStatement().execute(String.format(indexTableDDL, indexTableName, dataTableName));
-            // insert two rows
-            PreparedStatement upsertDataStmt = conn.prepareStatement(String.format(upsertData, dataTableName));
-            upsertRow(upsertDataStmt, 1, "name-1", new byte[] {127, 0, 0, 1});
-            upsertRow(upsertDataStmt, 2, "name-2", new byte[] {127, 1, 0, 5});
-            conn.commit();
-
-            List<Job> completedJobs = runScrutiny(null, dataTableName, indexTableName);
-            Job job = completedJobs.get(0);
-            assertTrue(job.isSuccessful());
-            Counters counters = job.getCounters();
-            assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
-            assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
-
-            // Now insert a different varbinary row
-            upsertRow(upsertDataStmt, 3, "name-3", new byte[] {1, 1, 1, 1});
-            conn.commit();
-
-            PreparedStatement upsertIndexStmt = conn.prepareStatement(String.format(upsertIndex, indexTableName));
-            upsertIndexStmt.setString(1, "name-3");
-            upsertIndexStmt.setInt(2, 3);
-            upsertIndexStmt.setBytes(3, new byte[] {0, 0, 0, 1});
-            upsertIndexStmt.executeUpdate();
-            conn.commit();
-
-            completedJobs = runScrutiny(null, dataTableName, indexTableName);
-            job = completedJobs.get(0);
-            assertTrue(job.isSuccessful());
-            counters = job.getCounters();
-            assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
-            assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
-        }
-    }
-
     /**
      * Tests running a scrutiny while updates and deletes are happening.
      * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
@@ -689,15 +642,6 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
         stmt.executeUpdate();
     }
 
-    private void upsertRow(PreparedStatement stmt, int id, String name, byte[] val) throws SQLException {
-        int index = 1;
-        // insert row
-        stmt.setInt(index++, id);
-        stmt.setString(index++, name);
-        stmt.setBytes(index++, val);
-        stmt.executeUpdate();
-    }
-
     private int deleteRow(String fullTableName, String whereCondition) throws SQLException {
         String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition;
         PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java
new file mode 100644
index 0000000..f70e6a2
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyMapperForTest;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.apache.phoenix.mapreduce.index.IndexScrutinyMapperForTest.TEST_TABLE_TTL;
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.EXPIRED_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class NonParameterizedIndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
+
+    @Test
+    public void testScrutinyOnArrayTypes() throws Exception {
+        String dataTableName = generateUniqueName();
+        String indexTableName = generateUniqueName();
+        String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, "
+                + "VB VARBINARY)";
+        String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (VB)";
+        String upsertData = "UPSERT INTO %s VALUES (?, ?, ?)";
+        String upsertIndex = "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:VB\") values (?,?,?)";
+
+        try (Connection conn =
+                DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(String.format(dataTableDDL, dataTableName));
+            conn.createStatement().execute(String.format(indexTableDDL, indexTableName,
+                    dataTableName));
+            // insert two rows
+            PreparedStatement upsertDataStmt = conn.prepareStatement(String.format(upsertData,
+                    dataTableName));
+            upsertRow(upsertDataStmt, 1, "name-1", new byte[] {127, 0, 0, 1});
+            upsertRow(upsertDataStmt, 2, "name-2", new byte[] {127, 1, 0, 5});
+            conn.commit();
+
+            List<Job> completedJobs = runScrutiny(null, dataTableName, indexTableName);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+
+            // Now insert a different varbinary row
+            upsertRow(upsertDataStmt, 3, "name-3", new byte[] {1, 1, 1, 1});
+            conn.commit();
+
+            PreparedStatement upsertIndexStmt = conn.prepareStatement(String.format(upsertIndex,
+                    indexTableName));
+            upsertIndexStmt.setString(1, "name-3");
+            upsertIndexStmt.setInt(2, 3);
+            upsertIndexStmt.setBytes(3, new byte[] {0, 0, 0, 1});
+            upsertIndexStmt.executeUpdate();
+            conn.commit();
+
+            completedJobs = runScrutiny(null, dataTableName, indexTableName);
+            job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            counters = job.getCounters();
+            assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+        }
+    }
+
+    @Test
+    public void testScrutinyOnRowsNearExpiry() throws Exception {
+        String dataTableName = generateUniqueName();
+        String indexTableName = generateUniqueName();
+        String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, "
+                + "ZIP INTEGER) TTL="+TEST_TABLE_TTL;
+        String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP)";
+        String upsertData = "UPSERT INTO %s VALUES (?, ?, ?)";
+        int initialDelta = -5000; //insert row with 5 secs before current timestamp
+        IndexScrutinyMapperForTest.ScrutinyTestClock
+                testClock = new IndexScrutinyMapperForTest.ScrutinyTestClock(initialDelta);
+
+        try (Connection conn =
+                DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(String.format(dataTableDDL, dataTableName));
+            conn.createStatement().execute(String.format(indexTableDDL, indexTableName,
+                    dataTableName));
+            // insert two rows
+            PreparedStatement
+                    upsertDataStmt = conn.prepareStatement(String.format(upsertData,
+                    dataTableName));
+
+            EnvironmentEdgeManager.injectEdge(testClock);
+            upsertRow(upsertDataStmt, 1, "name-1", 98051);
+            upsertRow(upsertDataStmt, 2, "name-2", 98052);
+            conn.commit();
+
+            List<Job> completedJobs = runScrutiny(null, dataTableName, indexTableName);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(2, getCounterValue(counters, EXPIRED_ROW_COUNT));
+            assertEquals(0, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+        }
+    }
+
+    private void upsertRow(PreparedStatement stmt, int id, String name, byte[] val) throws
+            SQLException {
+        int index = 1;
+        // insert row
+        stmt.setInt(index++, id);
+        stmt.setString(index++, name);
+        stmt.setBytes(index++, val);
+        stmt.executeUpdate();
+    }
+
+    private void upsertRow(PreparedStatement stmt, int id, String name, int zip)
+            throws SQLException {
+        int index = 1;
+        // insert row
+        stmt.setInt(index++, id);
+        stmt.setString(index++, name);
+        stmt.setInt(index++, zip);
+        stmt.executeUpdate();
+    }
+
+    private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName)
+            throws Exception {
+        return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
+    }
+
+    private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+            Long batchSize, IndexScrutinyTool.SourceTable sourceTable) throws Exception {
+        final String[]
+                cmdArgs =
+                getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
+                        false, null, null, null, Long.MAX_VALUE);
+        return runScrutiny(cmdArgs);
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index eceb658..b7a8cff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -27,16 +27,22 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.mapreduce.PhoenixJobCounters;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
@@ -45,7 +51,10 @@ import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -54,22 +63,23 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 
+
 /**
  * Mapper that reads from the data table and checks the rows against the index table
  */
 public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, Text, Text> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexScrutinyMapper.class);
-    private Connection connection;
+    protected Connection connection;
     private List<ColumnInfo> targetTblColumnMetadata;
     private long batchSize;
     // holds a batch of rows from the table the mapper is iterating over
     // Each row is a pair - the row TS, and the row values
-    private List<Pair<Long, List<Object>>> currentBatchValues = new ArrayList<>();
-    private String targetTableQuery;
-    private int numTargetPkCols;
-    private boolean outputInvalidRows;
-    private OutputFormat outputFormat = OutputFormat.FILE;
+    protected List<Pair<Long, List<Object>>> currentBatchValues = new ArrayList<>();
+    protected String targetTableQuery;
+    protected int numTargetPkCols;
+    protected boolean outputInvalidRows;
+    protected OutputFormat outputFormat = OutputFormat.FILE;
     private String qSourceTable;
     private String qTargetTable;
     private long executeTimestamp;
@@ -78,10 +88,11 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
     private List<ColumnInfo> sourceTblColumnMetadata;
 
     // used to write results to the output table
-    private Connection outputConn;
-    private PreparedStatement outputUpsertStmt;
+    protected Connection outputConn;
+    protected PreparedStatement outputUpsertStmt;
     private long outputMaxRows;
     private MessageDigest md5;
+    private long ttl;
 
     @Override
     protected void setup(final Context context) throws IOException, InterruptedException {
@@ -100,14 +111,12 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
                     PhoenixConfigurationUtil.getScrutinyOutputInvalidRows(configuration);
             outputFormat = PhoenixConfigurationUtil.getScrutinyOutputFormat(configuration);
             executeTimestamp = PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(configuration);
-
             // get the index table and column names
             String qDataTable = PhoenixConfigurationUtil.getScrutinyDataTableName(configuration);
             final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
             final String qIndexTable =
                     PhoenixConfigurationUtil.getScrutinyIndexTableName(configuration);
             final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
-
             // set the target table based on whether we're running the MR over the data or index
             // table
             SourceTable sourceTable =
@@ -148,6 +157,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
                     PhoenixRuntime.generateColumnInfo(connection, qSourceTable, sourceColNames);
             LOGGER.info("Target table base query: " + targetTableQuery);
             md5 = MessageDigest.getInstance("MD5");
+            ttl = getTableTtl();
         } catch (SQLException | NoSuchAlgorithmException e) {
             tryClosingResourceSilently(this.outputUpsertStmt);
             tryClosingResourceSilently(this.connection);
@@ -210,7 +220,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
         }
     }
 
-    private void processBatch(Context context)
+    protected void processBatch(Context context)
             throws SQLException, IOException, InterruptedException {
         if (currentBatchValues.size() == 0) return;
         context.getCounter(PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT).increment(1);
@@ -227,9 +237,14 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
             Map<String, Pair<Long, List<Object>>> targetPkToSourceValues =
                     buildTargetStatement(targetStatement);
 
+            preQueryTargetTable();
             // fetch results from the target table and output invalid rows
             queryTargetTable(context, targetStatement, targetPkToSourceValues);
 
+            //check if there are any invalid rows that have been expired, report them
+            //with EXPIRED_ROW_COUNT
+            checkIfInvalidRowsExpired(context, targetPkToSourceValues);
+
             // any source values we have left over are invalid (e.g. data table rows without
             // corresponding index row)
             context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT)
@@ -254,7 +269,62 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
         }
     }
 
-    private Map<String, Pair<Long, List<Object>>> buildTargetStatement(PreparedStatement targetStatement)
+    protected void preQueryTargetTable() { }
+
+    protected void checkIfInvalidRowsExpired(Context context,
+            Map<String, Pair<Long,
+            List<Object>>> targetPkToSourceValues) {
+        Set<Map.Entry<String, Pair<Long, List<Object>>>>
+                entrySet = targetPkToSourceValues.entrySet();
+
+        Iterator<Map.Entry<String, Pair<Long, List<Object>>>> itr = entrySet.iterator();
+
+        // iterate and remove items simultaneously
+        while(itr.hasNext()) {
+            Map.Entry<String, Pair<Long, List<Object>>> entry = itr.next();
+            Pair<Long, List<Object>> sourceValues = entry.getValue();
+            Long sourceTS = sourceValues.getFirst();
+            if (hasRowExpiredOnSource(sourceTS, ttl)) {
+                context.getCounter(PhoenixScrutinyJobCounters.EXPIRED_ROW_COUNT).increment(1);
+                itr.remove();
+            }
+        }
+    }
+
+    protected boolean hasRowExpiredOnSource(Long sourceTS, Long ttl) {
+        long currentTS = EnvironmentEdgeManager.currentTimeMillis();
+        return ttl != Integer.MAX_VALUE && sourceTS + ttl*1000 < currentTS;
+    }
+
+    private long getTableTtl() throws SQLException, IOException {
+        PTable psourceTable = PhoenixRuntime.getTable(connection, qSourceTable);
+        if (psourceTable.getType() == PTableType.INDEX
+                && psourceTable.getIndexType() == PTable.IndexType.LOCAL) {
+            return Integer.MAX_VALUE;
+        }
+        String schema = psourceTable.getSchemaName().toString();
+        String table = getSourceTableName(psourceTable);
+        Admin admin = connection.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+        String fullTableName = SchemaUtil.getQualifiedTableName(schema, table);
+        HTableDescriptor tableDesc = admin.getTableDescriptor(TableName.valueOf(fullTableName));
+        return tableDesc.getFamily(SchemaUtil.getEmptyColumnFamily(psourceTable)).getTimeToLive();
+    }
+
+    private String getSourceTableName(PTable psourceTable) {
+        String sourcePhysicalName = psourceTable.getPhysicalName().getString();
+        String table;
+        if (psourceTable.getType() == PTableType.VIEW) {
+            table = sourcePhysicalName;
+        } else if (MetaDataUtil.isViewIndex(sourcePhysicalName)) {
+            table = SchemaUtil.getParentTableNameFromIndexTable(sourcePhysicalName,
+                            MetaDataUtil.VIEW_INDEX_TABLE_PREFIX);
+        } else {
+            table = psourceTable.getTableName().toString();
+        }
+        return table;
+    }
+
+    protected Map<String, Pair<Long, List<Object>>> buildTargetStatement(PreparedStatement targetStatement)
             throws SQLException {
         Map<String, Pair<Long, List<Object>>> targetPkToSourceValues =
                 new HashMap<>(currentBatchValues.size());
@@ -278,11 +348,10 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
         return targetPkToSourceValues;
     }
 
-    private void queryTargetTable(Context context, PreparedStatement targetStatement,
+    protected void queryTargetTable(Context context, PreparedStatement targetStatement,
             Map<String, Pair<Long, List<Object>>> targetPkToSourceValues)
             throws SQLException, IOException, InterruptedException {
         ResultSet targetResultSet = targetStatement.executeQuery();
-
         while (targetResultSet.next()) {
             indxWritable.readFields(targetResultSet);
             List<Object> targetValues = indxWritable.getValues();
@@ -327,7 +396,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
     }
 
     // pass in null targetValues if the target row wasn't found
-    private void writeToOutputTable(Context context, List<Object> sourceValues, List<Object> targetValues, long sourceTS, long targetTS)
+    protected void writeToOutputTable(Context context, List<Object> sourceValues, List<Object> targetValues, long sourceTS, long targetTS)
             throws SQLException {
         if (context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT).getValue() > outputMaxRows) {
             return;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
new file mode 100644
index 0000000..bdd5e45
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
@@ -0,0 +1,30 @@
+package org.apache.phoenix.mapreduce.index;
+
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+public class IndexScrutinyMapperForTest extends IndexScrutinyMapper {
+
+    public static final int TEST_TABLE_TTL = 3600;
+    public static class ScrutinyTestClock extends EnvironmentEdge {
+        long initialTime;
+        long delta;
+
+        public ScrutinyTestClock(long delta) {
+            initialTime = System.currentTimeMillis() + delta;
+            this.delta = delta;
+        }
+
+        @Override
+        public long currentTime() {
+            return System.currentTimeMillis() + delta;
+        }
+    }
+
+    @Override
+    public void preQueryTargetTable() {
+        // change the current time past ttl
+        ScrutinyTestClock clock = new ScrutinyTestClock(TEST_TABLE_TTL*1000);
+        EnvironmentEdgeManager.injectEdge(clock);
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
index c201f02..dda537f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
 import java.util.List;
 
 import com.google.common.base.Strings;
+import com.google.inject.Inject;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -114,10 +115,19 @@ public class IndexScrutinyTool extends Configured implements Tool {
             "If specified, uses Tenant connection for tenant view index scrutiny (optional)");
     public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_SCRUTINY_[%s]_[%s]";
 
+    @Inject
+    Class<IndexScrutinyMapperForTest> mapperClass = null;
+
+    public IndexScrutinyTool(Class<IndexScrutinyMapperForTest> indexScrutinyMapperForTestClass) {
+        this.mapperClass = indexScrutinyMapperForTestClass;
+    }
+
+    public IndexScrutinyTool() { }
+
     /**
      * Which table to use as the source table
      */
-    public static enum SourceTable {
+    public enum SourceTable {
         DATA_TABLE_SOURCE, INDEX_TABLE_SOURCE,
         /**
          * Runs two separate jobs to iterate over both tables
@@ -125,7 +135,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
         BOTH;
     }
 
-    public static enum OutputFormat {
+    public enum OutputFormat {
         FILE, TABLE
     }
 
@@ -205,10 +215,11 @@ public class IndexScrutinyTool extends Configured implements Tool {
         private long scrutinyExecuteTime;
         private long outputMaxRows; // per mapper
         private String tenantId;
+        Class<IndexScrutinyMapperForTest> mapperClass;
 
         public JobFactory(Connection connection, Configuration configuration, long batchSize,
                 boolean useSnapshot, long ts, boolean outputInvalidRows, OutputFormat outputFormat,
-                String basePath, long outputMaxRows, String tenantId) {
+                String basePath, long outputMaxRows, String tenantId, Class<IndexScrutinyMapperForTest> mapperClass) {
             this.outputInvalidRows = outputInvalidRows;
             this.outputFormat = outputFormat;
             this.basePath = basePath;
@@ -220,17 +231,18 @@ public class IndexScrutinyTool extends Configured implements Tool {
             this.tenantId = tenantId;
             this.ts = ts; // CURRENT_SCN to set
             scrutinyExecuteTime = EnvironmentEdgeManager.currentTimeMillis(); // time at which scrutiny was run.
-                                                              // Same for
+            // Same for
             // all jobs created from this factory
             PhoenixConfigurationUtil.setScrutinyExecuteTimestamp(configuration,
                 scrutinyExecuteTime);
             if (!Strings.isNullOrEmpty(tenantId)) {
                 PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
             }
+            this.mapperClass = mapperClass;
         }
 
         public Job createSubmittableJob(String schemaName, String indexTable, String dataTable,
-                SourceTable sourceTable) throws Exception {
+                SourceTable sourceTable, Class<IndexScrutinyMapperForTest> mapperClass) throws Exception {
             Preconditions.checkArgument(SourceTable.DATA_TABLE_SOURCE.equals(sourceTable)
                     || SourceTable.INDEX_TABLE_SOURCE.equals(sourceTable));
 
@@ -321,10 +333,10 @@ public class IndexScrutinyTool extends Configured implements Tool {
                         SourceTable.DATA_TABLE_SOURCE.equals(sourceTable) ? pdataTable
                                 : pindexTable);
 
-            return configureSubmittableJob(job, outputPath);
+            return configureSubmittableJob(job, outputPath, mapperClass);
         }
 
-        private Job configureSubmittableJob(Job job, Path outputPath) throws Exception {
+        private Job configureSubmittableJob(Job job, Path outputPath, Class<IndexScrutinyMapperForTest> mapperClass) throws Exception {
             Configuration conf = job.getConfiguration();
             conf.setBoolean("mapreduce.job.user.classpath.first", true);
             HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
@@ -334,7 +346,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
                 job.setOutputFormatClass(TextOutputFormat.class);
                 FileOutputFormat.setOutputPath(job, outputPath);
             }
-            job.setMapperClass(IndexScrutinyMapper.class);
+            job.setMapperClass((mapperClass == null ? IndexScrutinyMapper.class : mapperClass));
             job.setNumReduceTasks(0);
             // Set the Output classes
             job.setMapOutputKeyClass(Text.class);
@@ -437,17 +449,17 @@ public class IndexScrutinyTool extends Configured implements Tool {
                 outputFormat, outputMaxRows));
             JobFactory jobFactory =
                     new JobFactory(connection, configuration, batchSize, useSnapshot, ts,
-                            outputInvalidRows, outputFormat, basePath, outputMaxRows, tenantId);
+                            outputInvalidRows, outputFormat, basePath, outputMaxRows, tenantId, mapperClass);
             // If we are running the scrutiny with both tables as the source, run two separate jobs,
             // one for each direction
             if (SourceTable.BOTH.equals(sourceTable)) {
                 jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable,
-                    SourceTable.DATA_TABLE_SOURCE));
+                    SourceTable.DATA_TABLE_SOURCE, mapperClass));
                 jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable,
-                    SourceTable.INDEX_TABLE_SOURCE));
+                    SourceTable.INDEX_TABLE_SOURCE, mapperClass));
             } else {
                 jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable,
-                    sourceTable));
+                    sourceTable, mapperClass));
             }
 
             if (!isForeground) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
index 3cf73fd..43965b7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
@@ -35,6 +35,10 @@ public enum PhoenixScrutinyJobCounters {
      */
     BAD_COVERED_COL_VAL_COUNT,
     /**
+     * Number of rows in source that have expired while scrutiny was comparing them with target
+     */
+    EXPIRED_ROW_COUNT,
+    /**
      * Number of batches processed
      */
     BATCHES_PROCESSED_COUNT;