You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2019/02/28 04:28:11 UTC

[phoenix] branch 4.x-HBase-1.2 updated: PHOENIX-5089 Add tenantId parameter to IndexScrunityTool

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.x-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push:
     new 33f2a3d  PHOENIX-5089 Add tenantId parameter to IndexScrunityTool
33f2a3d is described below

commit 33f2a3d69e0f82e12c8f286040c167313217604a
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Wed Feb 27 16:35:05 2019 -0800

    PHOENIX-5089 Add tenantId parameter to IndexScrunityTool
    
    Signed-off-by: Geoffrey Jacoby <gj...@apache.org>
---
 .../phoenix/end2end/IndexScrutinyToolIT.java       | 1406 +++++++++++---------
 .../phoenix/mapreduce/index/IndexScrutinyTool.java |   58 +-
 .../apache/phoenix/mapreduce/index/IndexTool.java  |    5 +-
 .../phoenix/mapreduce/util/IndexColumnNames.java   |    3 +
 4 files changed, 821 insertions(+), 651 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index cbce7b2..318a076 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -47,9 +47,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
@@ -59,6 +62,7 @@ import org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters;
 import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -73,6 +77,7 @@ import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -80,680 +85,859 @@ import org.junit.runners.Parameterized;
  * Tests for the {@link IndexScrutinyTool}
  */
 @Category(NeedsOwnMiniClusterTest.class)
-@RunWith(Parameterized.class)
-public class IndexScrutinyToolIT extends BaseTest {
+@RunWith(Enclosed.class)
+public class IndexScrutinyToolIT {
+
+    abstract public static class SharedIndexToolIT extends BaseTest {
+        protected String outputDir;
+
+        @BeforeClass public static void doSetup() throws Exception {
+            Map<String, String> serverProps = Maps.newHashMap();
+            //disable major compactions
+            serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
+            Map<String, String> clientProps = Maps.newHashMap();
+            setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+                    new ReadOnlyProps(clientProps.entrySet().iterator()));
+        }
 
-    private String dataTableDdl;
-    private String indexTableDdl;
+        protected List<Job> runScrutiny(String[] cmdArgs) throws Exception {
+            IndexScrutinyTool scrutiny = new IndexScrutinyTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            scrutiny.setConf(conf);
+            int status = scrutiny.run(cmdArgs);
+            assertEquals(0, status);
+            for (Job job : scrutiny.getJobs()) {
+                assertTrue(job.waitForCompletion(true));
+            }
+            return scrutiny.getJobs();
+        }
 
-    private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
+        protected String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
+                SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows, String tenantId, Long scrutinyTs) {
+            final List<String> args = Lists.newArrayList();
+            if (schemaName != null) {
+                args.add("-s");
+                args.add(schemaName);
+            }
+            args.add("-dt");
+            args.add(dataTable);
+            args.add("-it");
+            args.add(indxTable);
+
+            // TODO test snapshot reads
+            // if(useSnapshot) {
+            // args.add("-snap");
+            // }
+
+            if (OutputFormat.FILE.equals(outputFormat)) {
+                args.add("-op");
+                outputDir = "/tmp/" + UUID.randomUUID().toString();
+                args.add(outputDir);
+            }
+            args.add("-t");
+            args.add(String.valueOf(scrutinyTs));
+            args.add("-run-foreground");
+            if (batchSize != null) {
+                args.add("-b");
+                args.add(String.valueOf(batchSize));
+            }
 
-    private static final String INDEX_UPSERT_SQL =
-        "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
+            // default to using data table as the source table
+            args.add("-src");
+            if (sourceTable == null) {
+                args.add(SourceTable.DATA_TABLE_SOURCE.name());
+            } else {
+                args.add(sourceTable.name());
+            }
+            if (outputInvalidRows) {
+                args.add("-o");
+            }
+            if (outputFormat != null) {
+                args.add("-of");
+                args.add(outputFormat.name());
+            }
+            if (maxOutputRows != null) {
+                args.add("-om");
+                args.add(maxOutputRows.toString());
+            }
+            if (tenantId != null) {
+                args.add("-tenant");
+                args.add(tenantId);
+            }
+            return args.toArray(new String[0]);
+        }
 
-    private static final String DELETE_SQL = "DELETE FROM %s ";
+        protected long getCounterValue(Counters counters, Enum<PhoenixScrutinyJobCounters> counter) {
+            return counters.findCounter(counter).getValue();
+        }
+    }
 
-    private String schemaName;
-    private String dataTableName;
-    private String dataTableFullName;
-    private String indexTableName;
-    private String indexTableFullName;
-    private String outputDir;
+    @RunWith(Parameterized.class)
+    public static class IndexScrutinyToolNonTenantIT extends SharedIndexToolIT {
 
-    private Connection conn;
+        private String dataTableDdl;
+        private String indexTableDdl;
 
-    private PreparedStatement dataTableUpsertStmt;
+        private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
 
-    private PreparedStatement indexTableUpsertStmt;
+        private static final String
+                INDEX_UPSERT_SQL =
+                "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
 
-    private long testTime;
-    private Properties props;
+        private static final String DELETE_SQL = "DELETE FROM %s ";
 
-    @Parameterized.Parameters
-    public static Collection<Object[]> data() {
-        return Arrays.asList(new Object[][] {
-            { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)", "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" },
-            { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" },
-            { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }
-            });
-    }
+        private String schemaName;
+        private String dataTableName;
+        private String dataTableFullName;
+        private String indexTableName;
+        private String indexTableFullName;
 
-    public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) {
-        this.dataTableDdl = dataTableDdl;
-        this.indexTableDdl = indexTableDdl;
-    }
+        private Connection conn;
 
-    @BeforeClass
-    public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMap();
-        //disable major compactions
-        serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
-        Map<String, String> clientProps = Maps.newHashMap();
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
-    }
+        private PreparedStatement dataTableUpsertStmt;
 
-    /**
-     * Create the test data and index tables
-     */
-    @Before
-    public void setup() throws SQLException {
-        generateUniqueTableNames();
-        createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
-        createTestTable(getUrl(),
-            String.format(indexTableDdl, indexTableName, dataTableFullName));
-        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        conn = DriverManager.getConnection(getUrl(), props);
-        String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
-        dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
-        String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName);
-        indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
-        conn.setAutoCommit(false);
-        testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
+        private PreparedStatement indexTableUpsertStmt;
 
-    }
+        private long testTime;
+        private Properties props;
 
-    @After
-    public void teardown() throws SQLException {
-        if (conn != null) {
-            conn.close();
+        @Parameterized.Parameters public static Collection<Object[]> data() {
+            return Arrays.asList(new Object[][] { { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)",
+                    "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
+                    "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2",
+                    "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } });
         }
-    }
 
-    /**
-     * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid.
-     */
-    @Test
-    public void testValidIndex() throws Exception {
-        // insert two rows
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        conn.commit();
-
-        int numDataRows = countRows(dataTableFullName);
-        int numIndexRows = countRows(indexTableFullName);
-
-        // scrutiny should report everything as ok
-        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
-
-        // make sure row counts weren't modified by scrutiny
-        assertEquals(numDataRows, countRows(dataTableFullName));
-        assertEquals(numIndexRows, countRows(indexTableFullName));
-    }
+        public IndexScrutinyToolNonTenantIT(String dataTableDdl, String indexTableDdl) {
+            this.dataTableDdl = dataTableDdl;
+            this.indexTableDdl = indexTableDdl;
+        }
 
-    /**
-     * Tests running a scrutiny while updates and deletes are happening.
-     * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
-     */
-    @Test
-    public void testScrutinyWhileTakingWrites() throws Exception {
-        int id = 0;
-        while (id < 1000) {
-            int index = 1;
-            dataTableUpsertStmt.setInt(index++, id);
-            dataTableUpsertStmt.setString(index++, "name-" + id);
-            dataTableUpsertStmt.setInt(index++, id);
-            dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime));
-            dataTableUpsertStmt.executeUpdate();
-            id++;
-        }
-        conn.commit();
-
-        //CURRENT_SCN for scrutiny
-        long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
-
-        // launch background upserts and deletes
-        final Random random = new Random(0);
-        Runnable backgroundUpserts = new Runnable() {
-            @Override
-            public void run() {
-                int idToUpsert = random.nextInt(1000);
-                try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-                    PreparedStatement dataPS =
-                            conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName));
-                    upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000);
-                    conn.commit();
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                }
+        /**
+         * Create the test data and index tables
+         */
+        @Before public void setup() throws SQLException {
+            generateUniqueTableNames();
+            createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
+            createTestTable(getUrl(), String.format(indexTableDdl, indexTableName, dataTableFullName));
+            props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            conn = DriverManager.getConnection(getUrl(), props);
+            String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
+            dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
+            String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName);
+            indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
+            conn.setAutoCommit(false);
+            testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
+
+        }
+
+        @After public void teardown() throws SQLException {
+            if (conn != null) {
+                conn.close();
             }
-        };
-        Runnable backgroundDeletes = new Runnable() {
-            @Override
-            public void run() {
-                int idToDelete = random.nextInt(1000);
-                try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-                    String deleteSql =
-                            String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"="
-                                    + idToDelete;
-                    conn.createStatement().executeUpdate(deleteSql);
-                    conn.commit();
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                }
+        }
+
+        /**
+         * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid.
+         */
+        @Test public void testValidIndex() throws Exception {
+            // insert two rows
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            conn.commit();
+
+            int numDataRows = countRows(dataTableFullName);
+            int numIndexRows = countRows(indexTableFullName);
+
+            // scrutiny should report everything as ok
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+
+            // make sure row counts weren't modified by scrutiny
+            assertEquals(numDataRows, countRows(dataTableFullName));
+            assertEquals(numIndexRows, countRows(indexTableFullName));
+        }
+
+        /**
+         * Tests running a scrutiny while updates and deletes are happening.
+         * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
+         */
+        @Test public void testScrutinyWhileTakingWrites() throws Exception {
+            int id = 0;
+            while (id < 1000) {
+                int index = 1;
+                dataTableUpsertStmt.setInt(index++, id);
+                dataTableUpsertStmt.setString(index++, "name-" + id);
+                dataTableUpsertStmt.setInt(index++, id);
+                dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime));
+                dataTableUpsertStmt.executeUpdate();
+                id++;
             }
-        };
-        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
-        scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200,
-            TimeUnit.MILLISECONDS);
-        scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200,
-            TimeUnit.MILLISECONDS);
-
-        // scrutiny should report everything as ok
-        List<Job> completedJobs =
-                runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName,
-                    scrutinyTS);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
-        scheduledThreadPool.shutdown();
-        scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
-    }
+            conn.commit();
+
+            //CURRENT_SCN for scrutiny
+            long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
+
+            // launch background upserts and deletes
+            final Random random = new Random(0);
+            Runnable backgroundUpserts = new Runnable() {
+                @Override public void run() {
+                    int idToUpsert = random.nextInt(1000);
+                    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+                        PreparedStatement
+                                dataPS =
+                                conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName));
+                        upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000);
+                        conn.commit();
+                    } catch (SQLException e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            Runnable backgroundDeletes = new Runnable() {
+                @Override public void run() {
+                    int idToDelete = random.nextInt(1000);
+                    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+                        String
+                                deleteSql =
+                                String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"=" + idToDelete;
+                        conn.createStatement().executeUpdate(deleteSql);
+                        conn.commit();
+                    } catch (SQLException e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
+            scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200, TimeUnit.MILLISECONDS);
+            scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200, TimeUnit.MILLISECONDS);
+
+            // scrutiny should report everything as ok
+            List<Job>
+                    completedJobs =
+                    runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName, scrutinyTS);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+            scheduledThreadPool.shutdown();
+            scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
+        }
 
-    /**
-     * Tests an index with the same # of rows as the data table, but one of the index rows is
-     * incorrect Scrutiny should report the invalid rows.
-     */
-    @Test
-    public void testEqualRowCountIndexIncorrect() throws Exception {
-        // insert one valid row
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        conn.commit();
-
-        // disable the index and insert another row which is not indexed
-        disableIndex();
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        conn.commit();
-
-        // insert a bad row into the index
-        upsertIndexRow("badName", 2, 9999);
-        conn.commit();
-
-        // scrutiny should report the bad row
-        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
-    }
+        /**
+         * Tests an index with the same # of rows as the data table, but one of the index rows is
+         * incorrect Scrutiny should report the invalid rows.
+         */
+        @Test public void testEqualRowCountIndexIncorrect() throws Exception {
+            // insert one valid row
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            conn.commit();
+
+            // disable the index and insert another row which is not indexed
+            disableIndex();
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            conn.commit();
+
+            // insert a bad row into the index
+            upsertIndexRow("badName", 2, 9999);
+            conn.commit();
+
+            // scrutiny should report the bad row
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+        }
 
-    /**
-     * Tests an index where the index pk is correct (indexed col values are indexed correctly), but
-     * a covered index value is incorrect. Scrutiny should report the invalid row
-     */
-    @Test
-    public void testCoveredValueIncorrect() throws Exception {
-        // insert one valid row
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        conn.commit();
-
-        // disable index and insert another data row
-        disableIndex();
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        conn.commit();
-
-        // insert a bad index row for the above data row
-        upsertIndexRow("name-2", 2, 9999);
-        conn.commit();
-
-        // scrutiny should report the bad row
-        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
-        assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT));
-    }
+        /**
+         * Tests an index where the index pk is correct (indexed col values are indexed correctly), but
+         * a covered index value is incorrect. Scrutiny should report the invalid row
+         */
+        @Test public void testCoveredValueIncorrect() throws Exception {
+            // insert one valid row
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            conn.commit();
+
+            // disable index and insert another data row
+            disableIndex();
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            conn.commit();
+
+            // insert a bad index row for the above data row
+            upsertIndexRow("name-2", 2, 9999);
+            conn.commit();
+
+            // scrutiny should report the bad row
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+            assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT));
+        }
 
-    /**
-     * Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs
-     * scrutiny with batchsize of 10,
-     */
-    @Test
-    public void testBatching() throws Exception {
-        // insert 1001 data and index rows
-        int numTestRows = 1001;
-        for (int i = 0; i < numTestRows; i++) {
-            upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000);
-        }
-        conn.commit();
-
-        disableIndex();
-
-        // randomly delete some rows from the index
-        Random random = new Random();
-        for (int i = 0; i < 100; i++) {
-            int idToDelete = random.nextInt(numTestRows);
-            deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete);
-        }
-        conn.commit();
-        int numRows = countRows(indexTableFullName);
-        int numDeleted = numTestRows - numRows;
-
-        // run scrutiny with batch size of 10
-        List<Job> completedJobs =
-                runScrutiny(schemaName, dataTableName, indexTableName, 10L);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT));
-        assertEquals(numTestRows / 10 + numTestRows % 10,
-            getCounterValue(counters, BATCHES_PROCESSED_COUNT));
-    }
+        /**
+         * Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs
+         * scrutiny with batchsize of 10,
+         */
+        @Test public void testBatching() throws Exception {
+            // insert 1001 data and index rows
+            int numTestRows = 1001;
+            for (int i = 0; i < numTestRows; i++) {
+                upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000);
+            }
+            conn.commit();
 
-    /**
-     * Tests when there are more data table rows than index table rows Scrutiny should report the
-     * number of incorrect rows
-     */
-    @Test
-    public void testMoreDataRows() throws Exception {
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
-        conn.commit();
-        disableIndex();
-        // these rows won't have a corresponding index row
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95124);
-        upsertRow(dataTableUpsertStmt, 3, "name-3", 95125);
-        conn.commit();
-
-        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
-    }
+            disableIndex();
 
-    /**
-     * Tests when there are more index table rows than data table rows Scrutiny should report the
-     * number of incorrect rows when run with the index as the source table
-     */
-    @Test
-    public void testMoreIndexRows() throws Exception {
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
-        conn.commit();
-        disableIndex();
-        // these index rows won't have a corresponding data row
-        upsertIndexRow("name-2", 2, 95124);
-        upsertIndexRow("name-3", 3, 95125);
-        conn.commit();
-
-        List<Job> completedJobs =
-                runScrutiny(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.INDEX_TABLE_SOURCE);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
-    }
+            // randomly delete some rows from the index
+            Random random = new Random();
+            for (int i = 0; i < 100; i++) {
+                int idToDelete = random.nextInt(numTestRows);
+                deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete);
+            }
+            conn.commit();
+            int numRows = countRows(indexTableFullName);
+            int numDeleted = numTestRows - numRows;
+
+            // run scrutiny with batch size of 10
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName, 10L);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT));
+            assertEquals(numTestRows / 10 + numTestRows % 10,
+                    getCounterValue(counters, BATCHES_PROCESSED_COUNT));
+        }
 
-    /**
-     * Tests running with both the index and data tables as the source table If we have an
-     * incorrectly indexed row, it should be reported in each direction
-     */
-    @Test
-    public void testBothDataAndIndexAsSource() throws Exception {
-        // insert one valid row
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        conn.commit();
-
-        // disable the index and insert another row which is not indexed
-        disableIndex();
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        conn.commit();
-
-        // insert a bad row into the index
-        upsertIndexRow("badName", 2, 9999);
-        conn.commit();
-
-        List<Job> completedJobs =
-                runScrutiny(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.BOTH);
-        assertEquals(2, completedJobs.size());
-        for (Job job : completedJobs) {
+        /**
+         * Tests when there are more data table rows than index table rows Scrutiny should report the
+         * number of incorrect rows
+         */
+        @Test public void testMoreDataRows() throws Exception {
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
+            conn.commit();
+            disableIndex();
+            // these rows won't have a corresponding index row
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95124);
+            upsertRow(dataTableUpsertStmt, 3, "name-3", 95125);
+            conn.commit();
+
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+            Job job = completedJobs.get(0);
             assertTrue(job.isSuccessful());
             Counters counters = job.getCounters();
             assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-            assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+            assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
+        }
+
+        /**
+         * Tests when there are more index table rows than data table rows Scrutiny should report the
+         * number of incorrect rows when run with the index as the source table
+         */
+        @Test public void testMoreIndexRows() throws Exception {
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
+            conn.commit();
+            disableIndex();
+            // these index rows won't have a corresponding data row
+            upsertIndexRow("name-2", 2, 95124);
+            upsertIndexRow("name-3", 3, 95125);
+            conn.commit();
+
+            List<Job>
+                    completedJobs =
+                    runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.INDEX_TABLE_SOURCE);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
+        }
+
+        /**
+         * Tests running with both the index and data tables as the source table If we have an
+         * incorrectly indexed row, it should be reported in each direction
+         */
+        @Test public void testBothDataAndIndexAsSource() throws Exception {
+            // insert one valid row
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            conn.commit();
+
+            // disable the index and insert another row which is not indexed
+            disableIndex();
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            conn.commit();
+
+            // insert a bad row into the index
+            upsertIndexRow("badName", 2, 9999);
+            conn.commit();
+
+            List<Job>
+                    completedJobs =
+                    runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.BOTH);
+            assertEquals(2, completedJobs.size());
+            for (Job job : completedJobs) {
+                assertTrue(job.isSuccessful());
+                Counters counters = job.getCounters();
+                assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+                assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+            }
         }
-    }
 
-    /**
-     * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file
-     */
-    @Test
-    public void testOutputInvalidRowsToFile() throws Exception {
-        insertOneValid_OneBadVal_OneMissingTarget();
-
-        String[] argValues =
-                getArgValues(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null);
-        runScrutiny(argValues);
-
-        // check the output files
-        Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName);
-        DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem();
-        List<Path> paths = Lists.newArrayList();
-        Path firstPart = null;
-        for (FileStatus outputFile : fs.listStatus(outputPath)) {
-            if (outputFile.getPath().getName().startsWith("part")) {
-                if (firstPart == null) {
-                    firstPart = outputFile.getPath();
-                } else {
-                    paths.add(outputFile.getPath());
+        /**
+         * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file
+         */
+        @Test public void testOutputInvalidRowsToFile() throws Exception {
+            insertOneValid_OneBadVal_OneMissingTarget();
+
+            String[]
+                    argValues =
+                    getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null);
+            runScrutiny(argValues);
+
+            // check the output files
+            Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName);
+            DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem();
+            List<Path> paths = Lists.newArrayList();
+            Path firstPart = null;
+            for (FileStatus outputFile : fs.listStatus(outputPath)) {
+                if (outputFile.getPath().getName().startsWith("part")) {
+                    if (firstPart == null) {
+                        firstPart = outputFile.getPath();
+                    } else {
+                        paths.add(outputFile.getPath());
+                    }
                 }
             }
+            if (dataTableDdl.contains("SALT_BUCKETS")) {
+                fs.concat(firstPart, paths.toArray(new Path[0]));
+            }
+            Path outputFilePath = firstPart;
+            assertTrue(fs.exists(outputFilePath));
+            FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
+            BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream));
+            TreeSet<String> lines = Sets.newTreeSet();
+            try {
+                String line = null;
+                while ((line = reader.readLine()) != null) {
+                    lines.add(line);
+                }
+            } finally {
+                IOUtils.closeQuietly(reader);
+                IOUtils.closeQuietly(fsDataInputStream);
+            }
+            Iterator<String> lineIterator = lines.iterator();
+            assertEquals(
+                    "[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime).toString() + ", 9999]", lineIterator.next());
+            assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found",
+                    lineIterator.next());
+
         }
-        if (dataTableDdl.contains("SALT_BUCKETS")) {
-            fs.concat(firstPart, paths.toArray(new Path[0]));
-        }
-        Path outputFilePath = firstPart;
-        assertTrue(fs.exists(outputFilePath));
-        FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
-        BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream));
-        TreeSet<String> lines = Sets.newTreeSet();
-        try {
-            String line = null;
-            while ((line = reader.readLine()) != null) {
-                lines.add(line);
+
+        /**
+         * Tests writing of results to the output table
+         */
+        @Test public void testOutputInvalidRowsToTable() throws Exception {
+            insertOneValid_OneBadVal_OneMissingTarget();
+            String[]
+                    argValues =
+                    getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null);
+            List<Job> completedJobs = runScrutiny(argValues);
+
+            // check that the output table contains the invalid rows
+            long
+                    scrutinyTimeMillis =
+                    PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
+            String
+                    invalidRowsQuery =
+                    IndexScrutinyTableOutput
+                            .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis);
+            ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc");
+            assertTrue(rs.next());
+            assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
+            assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
+            assertTrue(rs.getBoolean("HAS_TARGET_ROW"));
+            assertEquals(2, rs.getInt("ID"));
+            assertEquals(2, rs.getInt(":ID"));
+            assertEquals(95123, rs.getInt("ZIP"));
+            assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect
+            assertTrue(rs.next());
+            assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
+            assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
+            assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
+            assertEquals(3, rs.getInt("ID"));
+            assertEquals(null, rs.getObject(":ID")); // null for missing target row
+            assertFalse(rs.next());
+
+            // check that the job results were written correctly to the metadata table
+            assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery);
+        }
+
+        /**
+         * Tests that the config for max number of output rows is observed
+         */
+        @Test public void testMaxOutputRows() throws Exception {
+            insertOneValid_OneBadVal_OneMissingTarget();
+            // set max to 1.  There are two bad rows, but only 1 should get written to output table
+            String[]
+                    argValues =
+                    getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1));
+            List<Job> completedJobs = runScrutiny(argValues);
+            long
+                    scrutinyTimeMillis =
+                    PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
+            String
+                    invalidRowsQuery =
+                    IndexScrutinyTableOutput
+                            .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis);
+            ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
+            assertTrue(rs.next());
+            if (dataTableDdl.contains("SALT_BUCKETS")) {
+                assertTrue(rs.next());
+                assertFalse(rs.next());
+            } else {
+                assertFalse(rs.next());
             }
-        } finally {
-            IOUtils.closeQuietly(reader);
-            IOUtils.closeQuietly(fsDataInputStream);
         }
-        Iterator<String> lineIterator = lines.iterator();
-        assertEquals(
-            "[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime)
-                .toString() + ", 9999]", lineIterator.next());
-        assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found",
-            lineIterator.next());
 
-    }
+        private SourceTargetColumnNames getColNames() throws SQLException {
+            PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName);
+            PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName);
+            SourceTargetColumnNames
+                    columnNames =
+                    new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable);
+            return columnNames;
+        }
 
-    /**
-     * Tests writing of results to the output table
-     */
-    @Test
-    public void testOutputInvalidRowsToTable() throws Exception {
-        insertOneValid_OneBadVal_OneMissingTarget();
-        String[] argValues =
-                getArgValues(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null);
-        List<Job> completedJobs = runScrutiny(argValues);
-
-        // check that the output table contains the invalid rows
-        long scrutinyTimeMillis =
-                PhoenixConfigurationUtil
-                        .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
-        String invalidRowsQuery =
-                IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, getColNames(),
-                    scrutinyTimeMillis);
-        ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc");
-        assertTrue(rs.next());
-        assertEquals(dataTableFullName,
-            rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
-        assertEquals(indexTableFullName,
-            rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
-        assertTrue(rs.getBoolean("HAS_TARGET_ROW"));
-        assertEquals(2, rs.getInt("ID"));
-        assertEquals(2, rs.getInt(":ID"));
-        assertEquals(95123, rs.getInt("ZIP"));
-        assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect
-        assertTrue(rs.next());
-        assertEquals(dataTableFullName,
-            rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
-        assertEquals(indexTableFullName,
-            rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
-        assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
-        assertEquals(3, rs.getInt("ID"));
-        assertEquals(null, rs.getObject(":ID")); // null for missing target row
-        assertFalse(rs.next());
-
-        // check that the job results were written correctly to the metadata table
-        assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery);
-    }
+        // inserts one valid data/index row, one data row with a missing index row,
+        // and one data row with an index row that has a bad covered col val
+        private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException {
+            // insert one valid row
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            conn.commit();
+
+            // disable the index and insert another row which is not indexed
+            disableIndex();
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            upsertRow(dataTableUpsertStmt, 3, "name-3", 95123);
+            conn.commit();
+
+            // insert a bad index row for one of the above data rows
+            upsertIndexRow("name-2", 2, 9999);
+            conn.commit();
+        }
 
-    /**
-     * Tests that the config for max number of output rows is observed
-     */
-    @Test
-    public void testMaxOutputRows() throws Exception {
-        insertOneValid_OneBadVal_OneMissingTarget();
-        // set max to 1.  There are two bad rows, but only 1 should get written to output table
-        String[] argValues =
-                getArgValues(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1));
-        List<Job> completedJobs = runScrutiny(argValues);
-        long scrutinyTimeMillis =
-                PhoenixConfigurationUtil
-                        .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
-        String invalidRowsQuery =
-                IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, getColNames(),
-                    scrutinyTimeMillis);
-        ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
-        assertTrue(rs.next());
-        if (dataTableDdl.contains("SALT_BUCKETS")) {
+        private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis, String invalidRowsQuery) throws SQLException {
+            ResultSet rs;
+            ResultSet
+                    metadataRs =
+                    IndexScrutinyTableOutput
+                            .queryAllMetadata(conn, dataTableFullName, indexTableFullName,
+                                    scrutinyTimeMillis);
+            assertTrue(metadataRs.next());
+            List<? extends Object>
+                    expected =
+                    Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
+                            SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
+                            2L, 1L, 1L,
+                            "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+                            "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
+                            invalidRowsQuery);
+            if (dataTableDdl.contains("SALT_BUCKETS")) {
+                expected =
+                        Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
+                                SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
+                                2L, 1L, 2L,
+                                "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+                                "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
+                                invalidRowsQuery);
+            }
+
+            assertRsValues(metadataRs, expected);
+            String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
+            rs = conn.createStatement().executeQuery(missingTargetQuery);
             assertTrue(rs.next());
+            assertEquals(3, rs.getInt("ID"));
             assertFalse(rs.next());
-        } else {
+            String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL");
+            rs = conn.createStatement().executeQuery(badCoveredColQuery);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt("ID"));
             assertFalse(rs.next());
         }
-    }
 
-    private SourceTargetColumnNames getColNames() throws SQLException {
-        PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName);
-        PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName);
-        SourceTargetColumnNames columnNames =
-                new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable);
-        return columnNames;
-    }
+        // assert the result set contains the expected values in the given order
+        private void assertRsValues(ResultSet rs, List<? extends Object> expected)
+                throws SQLException {
+            for (int i = 0; i < expected.size(); i++) {
+                assertEquals(expected.get(i), rs.getObject(i + 1));
+            }
+        }
 
-    // inserts one valid data/index row, one data row with a missing index row,
-    // and one data row with an index row that has a bad covered col val
-    private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException {
-        // insert one valid row
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        conn.commit();
-
-        // disable the index and insert another row which is not indexed
-        disableIndex();
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        upsertRow(dataTableUpsertStmt, 3, "name-3", 95123);
-        conn.commit();
-
-        // insert a bad index row for one of the above data rows
-        upsertIndexRow("name-2", 2, 9999);
-        conn.commit();
-    }
+        private void generateUniqueTableNames() {
+            schemaName = generateUniqueName();
+            dataTableName = generateUniqueName();
+            dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+            indexTableName = generateUniqueName();
+            indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        }
 
-    private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis,
-            String invalidRowsQuery) throws SQLException {
-        ResultSet rs;
-        ResultSet metadataRs =
-                IndexScrutinyTableOutput.queryAllMetadata(conn, dataTableFullName,
-                    indexTableFullName, scrutinyTimeMillis);
-        assertTrue(metadataRs.next());
-        List<? extends Object> expected =
-            Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
-                SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
-                2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
-                "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
-        if (dataTableDdl.contains("SALT_BUCKETS")) {
-            expected = Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
-                SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
-                2L, 1L, 2L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
-                "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
-        }
-
-        assertRsValues(metadataRs, expected);
-        String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
-        rs = conn.createStatement().executeQuery(missingTargetQuery);
-        assertTrue(rs.next());
-        assertEquals(3, rs.getInt("ID"));
-        assertFalse(rs.next());
-        String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL");
-        rs = conn.createStatement().executeQuery(badCoveredColQuery);
-        assertTrue(rs.next());
-        assertEquals(2, rs.getInt("ID"));
-        assertFalse(rs.next());
-    }
+        private int countRows(String tableFullName) throws SQLException {
+            ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName);
+            count.next();
+            int numRows = count.getInt(1);
+            return numRows;
+        }
 
-    // assert the result set contains the expected values in the given order
-    private void assertRsValues(ResultSet rs, List<? extends Object> expected) throws SQLException {
-        for (int i = 0; i < expected.size(); i++) {
-            assertEquals(expected.get(i), rs.getObject(i + 1));
+        private void upsertIndexRow(String name, int id, int zip) throws SQLException {
+            indexTableUpsertStmt.setString(1, name);
+            indexTableUpsertStmt.setInt(2, id); // id
+            indexTableUpsertStmt.setInt(3, zip); // bad zip
+            indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime));
+            indexTableUpsertStmt.executeUpdate();
         }
-    }
 
-    private void generateUniqueTableNames() {
-        schemaName = generateUniqueName();
-        dataTableName = generateUniqueName();
-        dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-        indexTableName = generateUniqueName();
-        indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
-    }
+        private void disableIndex() throws SQLException {
+            conn.createStatement().execute(
+                    String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName));
+            conn.commit();
+        }
 
-    private int countRows(String tableFullName) throws SQLException {
-        ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName);
-        count.next();
-        int numRows = count.getInt(1);
-        return numRows;
-    }
+        private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
+                SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows) {
+            return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable,
+                    outputInvalidRows, outputFormat, maxOutputRows, null, Long.MAX_VALUE);
+        }
 
-    private void upsertIndexRow(String name, int id, int zip) throws SQLException {
-        indexTableUpsertStmt.setString(1, name);
-        indexTableUpsertStmt.setInt(2, id); // id
-        indexTableUpsertStmt.setInt(3, zip); // bad zip
-        indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime));
-        indexTableUpsertStmt.executeUpdate();
-    }
+        private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
+            return runScrutiny(
+                    getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH,
+                            false, null, null, null, scrutinyTS));
+        }
 
-    private void disableIndex() throws SQLException {
-        conn.createStatement().execute(
-            String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName));
-        conn.commit();
-    }
+        private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception {
+            return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
+        }
 
-    private long getCounterValue(Counters counters, Enum<PhoenixScrutinyJobCounters> counter) {
-        return counters.findCounter(counter).getValue();
-    }
+        private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+                Long batchSize) throws Exception {
+            return runScrutiny(schemaName, dataTableName, indexTableName, batchSize, null);
+        }
 
-    private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
-            SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
-            Long maxOutputRows) {
-        return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable,
-            outputInvalidRows, outputFormat, maxOutputRows, Long.MAX_VALUE);
-    }
+        private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+                Long batchSize, SourceTable sourceTable) throws Exception {
+            final String[]
+                    cmdArgs =
+                    getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
+                            false, null, null, null, Long.MAX_VALUE);
+            return runScrutiny(cmdArgs);
+        }
 
-    private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
-            SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
-            Long maxOutputRows, Long scrutinyTs) {
-        final List<String> args = Lists.newArrayList();
-        if (schemaName != null) {
-            args.add("-s");
-            args.add(schemaName);
-        }
-        args.add("-dt");
-        args.add(dataTable);
-        args.add("-it");
-        args.add(indxTable);
-
-        // TODO test snapshot reads
-        // if(useSnapshot) {
-        // args.add("-snap");
-        // }
-
-        if (OutputFormat.FILE.equals(outputFormat)) {
-            args.add("-op");
-            outputDir = "/tmp/" + UUID.randomUUID().toString();
-            args.add(outputDir);
-        }
-        args.add("-t");
-        args.add(String.valueOf(scrutinyTs));
-        args.add("-run-foreground");
-        if (batchSize != null) {
-            args.add("-b");
-            args.add(String.valueOf(batchSize));
-        }
-
-        // default to using data table as the source table
-        args.add("-src");
-        if (sourceTable == null) {
-            args.add(SourceTable.DATA_TABLE_SOURCE.name());
-        } else {
-            args.add(sourceTable.name());
-        }
-        if (outputInvalidRows) {
-            args.add("-o");
-        }
-        if (outputFormat != null) {
-            args.add("-of");
-            args.add(outputFormat.name());
-        }
-        if (maxOutputRows != null) {
-            args.add("-om");
-            args.add(maxOutputRows.toString());
-        }
-        return args.toArray(new String[0]);
-    }
+        private void upsertRow(PreparedStatement stmt, int id, String name, int zip)
+                throws SQLException {
+            int index = 1;
+            // insert row
+            stmt.setInt(index++, id);
+            stmt.setString(index++, name);
+            stmt.setInt(index++, zip);
+            stmt.setTimestamp(index++, new Timestamp(testTime));
+            stmt.executeUpdate();
+        }
 
-    private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
-        return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH, false, null, null, scrutinyTS));
+        private int deleteRow(String fullTableName, String whereCondition) throws SQLException {
+            String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition;
+            PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
+            return deleteStmt.executeUpdate();
+        }
     }
 
-    private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception {
-        return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
-    }
+    public static class IndexScrutinyToolTenantIT extends SharedIndexToolIT {
+        private Connection connGlobal = null;
+        private Connection connTenant = null;
+
+        private String tenantId;
+        private String tenantViewName;
+        private String indexNameTenant;
+        private String multiTenantTable;
+        private String viewIndexTableName;
+
+        private final String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s";
+        private final String
+                upsertQueryStr =
+                "UPSERT INTO %s (COL1, ID, NAME) VALUES('%s' , %d, '%s')";
+        private final String createIndexStr = "CREATE INDEX %s ON %s (NAME) ";
+
+        /**
+         * Create the test data
+         */
+        @Before public void setup() throws SQLException {
+            tenantId = generateUniqueName();
+            tenantViewName = generateUniqueName();
+            indexNameTenant = generateUniqueName();
+            multiTenantTable = generateUniqueName();
+            viewIndexTableName = "_IDX_" + multiTenantTable;
+
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            connGlobal = DriverManager.getConnection(getUrl(), props);
+
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+            connTenant = DriverManager.getConnection(getUrl(), props);
+            String
+                    createTblStr =
+                    "CREATE TABLE %s (COL1 VARCHAR(15) NOT NULL,ID INTEGER NOT NULL" + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (COL1, ID)) MULTI_TENANT=true";
+
+            createTestTable(getUrl(), String.format(createTblStr, multiTenantTable));
+
+            connTenant.createStatement().execute(
+                    String.format(createViewStr, tenantViewName, multiTenantTable));
+
+            String idxStmtTenant = String.format(createIndexStr, indexNameTenant, tenantViewName);
+            connTenant.createStatement().execute(idxStmtTenant);
+        }
 
-    private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
-            Long batchSize) throws Exception {
-        return runScrutiny(schemaName, dataTableName, indexTableName, batchSize, null);
-    }
+        @After public void teardown() throws SQLException {
+            if (connGlobal != null) {
+                connGlobal.close();
+            }
+            if (connTenant != null) {
+                connTenant.close();
+            }
+        }
 
-    private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
-            Long batchSize, SourceTable sourceTable) throws Exception {
-        final String[] cmdArgs =
-                getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable, false,
-                    null, null, Long.MAX_VALUE);
-        return runScrutiny(cmdArgs);
-    }
+        /**
+         * Tests that the config for max number of output rows is observed
+         */
+        @Test public void testTenantViewAndIndexEqual() throws Exception {
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x"));
+            connTenant.commit();
+
+            String[] argValues =
+                    getArgValues("", tenantViewName, indexNameTenant, 10L,
+                            SourceTable.INDEX_TABLE_SOURCE, false, null, null, tenantId,
+                            EnvironmentEdgeManager.currentTimeMillis());
+
+            List<Job> completedJobs = runScrutiny(argValues);
+            // Sunny case, both index and view are equal. 1 row
+            assertEquals(1, completedJobs.size());
+            for (Job job : completedJobs) {
+                assertTrue(job.isSuccessful());
+                Counters counters = job.getCounters();
+                assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+                assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+            }
+        }
 
-    private List<Job> runScrutiny(String[] cmdArgs) throws Exception {
-        IndexScrutinyTool scrutiny = new IndexScrutinyTool();
-        Configuration conf = new Configuration(getUtility().getConfiguration());
-        scrutiny.setConf(conf);
-        int status = scrutiny.run(cmdArgs);
-        assertEquals(0, status);
-        for (Job job : scrutiny.getJobs()) {
-            assertTrue(job.waitForCompletion(true));
+        /**
+         * Tests global view on multi-tenant table should work too
+         **/
+        @Test public void testGlobalViewOnMultiTenantTable() throws Exception {
+            String globalViewName = generateUniqueName();
+            String indexNameGlobal = generateUniqueName();
+
+            connGlobal.createStatement().execute(
+                    String.format(createViewStr, globalViewName, multiTenantTable));
+
+            String idxStmtGlobal = String.format(createIndexStr, indexNameGlobal, globalViewName);
+            connGlobal.createStatement().execute(idxStmtGlobal);
+            connGlobal.createStatement()
+                    .execute(String.format(upsertQueryStr, globalViewName, "global", 5, "x"));
+            connGlobal.commit();
+            String[] argValues =
+                    getArgValues("", globalViewName, indexNameGlobal, 10L,
+                            SourceTable.INDEX_TABLE_SOURCE, false, null, null, null,
+                            EnvironmentEdgeManager.currentTimeMillis());
+            List<Job> completedJobs = runScrutiny(argValues);
+            // Sunny case, both index and view are equal. 1 row
+            assertEquals(1, completedJobs.size());
+            for (Job job : completedJobs) {
+                assertTrue(job.isSuccessful());
+                Counters counters = job.getCounters();
+                assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+                assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+            }
         }
-        return scrutiny.getJobs();
-    }
 
-    private void upsertRow(PreparedStatement stmt, int id, String name, int zip)
-            throws SQLException {
-        int index = 1;
-        // insert row
-        stmt.setInt(index++, id);
-        stmt.setString(index++, name);
-        stmt.setInt(index++, zip);
-        stmt.setTimestamp(index++, new Timestamp(testTime));
-        stmt.executeUpdate();
-    }
+        /**
+         * Use Both as source. Add 1 row to tenant view and disable index.
+         * Add 1 more to view and add a wrong row to index.
+         * Both have 1 invalid row, 1 valid row.
+         **/
+        @Test
+        public void testOneValidOneInvalidUsingBothAsSource() throws Exception {
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x"));
+            connTenant.commit();
+            connTenant.createStatement().execute(
+                    String.format("ALTER INDEX %s ON %S disable", indexNameTenant, tenantViewName));
+
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2"));
+
+            connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')",
+                    indexNameTenant, 5555, "wrongName"));
+            connTenant.commit();
+
+            String[]
+                    argValues =
+                    getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.BOTH, false,
+                            null, null, tenantId, EnvironmentEdgeManager.currentTimeMillis());
+            List<Job> completedJobs = runScrutiny(argValues);
+
+            assertEquals(2, completedJobs.size());
+            for (Job job : completedJobs) {
+                assertTrue(job.isSuccessful());
+                Counters counters = job.getCounters();
+                assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+                assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+            }
+        }
 
-    private int deleteRow(String fullTableName, String whereCondition) throws SQLException {
-        String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition;
-        PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
-        return deleteStmt.executeUpdate();
+        /**
+         * Add 3 rows to Tenant view.
+         * Empty index table and observe they are not equal.
+         * Use data table as source and output to file.
+         * Output to table doesn't work for tenantid connection because it can't create the scrutiny table as tenant.
+         **/
+        @Test public void testWithEmptyIndexTableOutputToFile() throws Exception{
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x"));
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2"));
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 3, "x3"));
+            connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')",
+                    indexNameTenant, 5555, "wrongName"));
+            connTenant.commit();
+
+            ConnectionQueryServices queryServices = connGlobal.unwrap(PhoenixConnection.class).getQueryServices();
+            Admin admin = queryServices.getAdmin();
+            TableName tableName = TableName.valueOf(viewIndexTableName);
+            admin.disableTable(tableName);
+            admin.truncateTable(tableName, false);
+
+            String[]
+                    argValues =
+                    getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null,
+                            tenantId, EnvironmentEdgeManager.currentTimeMillis());
+            List<Job> completedJobs = runScrutiny(argValues);
+
+            assertEquals(1, completedJobs.size());
+            for (Job job : completedJobs) {
+                assertTrue(job.isSuccessful());
+                Counters counters = job.getCounters();
+                assertEquals(0, getCounterValue(counters, VALID_ROW_COUNT));
+                assertEquals(3, getCounterValue(counters, INVALID_ROW_COUNT));
+            }
+        }
     }
-
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
index d9a14bf..637ffb5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
@@ -19,11 +19,10 @@ package org.apache.phoenix.mapreduce.index;
 
 import java.io.IOException;
 import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
 
+import com.google.common.base.Strings;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -111,6 +110,8 @@ public class IndexScrutinyTool extends Configured implements Tool {
     private static final Option OUTPUT_PATH_OPTION =
             new Option("op", "output-path", true, "Output path where the files are written");
     private static final Option OUTPUT_MAX = new Option("om", "output-max", true, "Max number of invalid rows to output per mapper.  Defaults to 1M");
+    private static final Option TENANT_ID_OPTION = new Option("tenant", "tenant-id", true,
+            "If specified, uses Tenant connection for tenant view index scrutiny (optional)");
     public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_SCRUTINY_[%s]_[%s]";
 
     /**
@@ -145,6 +146,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
         options.addOption(TIMESTAMP);
         options.addOption(BATCH_SIZE_OPTION);
         options.addOption(SOURCE_TABLE_OPTION);
+        options.addOption(TENANT_ID_OPTION);
         return options;
     }
 
@@ -202,10 +204,11 @@ public class IndexScrutinyTool extends Configured implements Tool {
         private String basePath;
         private long scrutinyExecuteTime;
         private long outputMaxRows; // per mapper
+        private String tenantId;
 
         public JobFactory(Connection connection, Configuration configuration, long batchSize,
                 boolean useSnapshot, long ts, boolean outputInvalidRows, OutputFormat outputFormat,
-                String basePath, long outputMaxRows) {
+                String basePath, long outputMaxRows, String tenantId) {
             this.outputInvalidRows = outputInvalidRows;
             this.outputFormat = outputFormat;
             this.basePath = basePath;
@@ -214,12 +217,16 @@ public class IndexScrutinyTool extends Configured implements Tool {
             this.connection = connection;
             this.configuration = configuration;
             this.useSnapshot = useSnapshot;
+            this.tenantId = tenantId;
             this.ts = ts; // CURRENT_SCN to set
             scrutinyExecuteTime = EnvironmentEdgeManager.currentTimeMillis(); // time at which scrutiny was run.
                                                               // Same for
             // all jobs created from this factory
             PhoenixConfigurationUtil.setScrutinyExecuteTimestamp(configuration,
                 scrutinyExecuteTime);
+            if (!Strings.isNullOrEmpty(tenantId)) {
+                PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+            }
         }
 
         public Job createSubmittableJob(String schemaName, String indexTable, String dataTable,
@@ -363,10 +370,17 @@ public class IndexScrutinyTool extends Configured implements Tool {
                 printHelpAndExit(e.getMessage(), getOptions());
             }
             final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf());
+            boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
+            String tenantId = null;
+            if (useTenantId) {
+                tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+                configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+                LOG.info(String.format("IndexScrutinyTool uses a tenantId %s", tenantId));
+            }
             connection = ConnectionUtil.getInputConnection(configuration);
             final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
             final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
-            final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+            String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
             final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
             String basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
             boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
@@ -389,7 +403,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
                             : EnvironmentEdgeManager.currentTimeMillis() - 60000;
 
             if (indexTable != null) {
-                if (!isValidIndexTable(connection, qDataTable, indexTable)) {
+                if (!IndexTool.isValidIndexTable(connection, qDataTable, indexTable, tenantId)) {
                     throw new IllegalArgumentException(String
                             .format(" %s is not an index table for %s ", indexTable, qDataTable));
                 }
@@ -421,7 +435,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
                 outputFormat, outputMaxRows));
             JobFactory jobFactory =
                     new JobFactory(connection, configuration, batchSize, useSnapshot, ts,
-                            outputInvalidRows, outputFormat, basePath, outputMaxRows);
+                            outputInvalidRows, outputFormat, basePath, outputMaxRows, tenantId);
             // If we are running the scrutiny with both tables as the source, run two separate jobs,
             // one for each direction
             if (SourceTable.BOTH.equals(sourceTable)) {
@@ -482,38 +496,6 @@ public class IndexScrutinyTool extends Configured implements Tool {
         return jobs;
     }
 
-    /**
-     * Checks for the validity of the index table passed to the job.
-     * @param connection
-     * @param masterTable
-     * @param indexTable
-     * @return
-     * @throws SQLException
-     */
-    private boolean isValidIndexTable(final Connection connection, final String masterTable,
-            final String indexTable) throws SQLException {
-        final DatabaseMetaData dbMetaData = connection.getMetaData();
-        final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable);
-        final String tableName =
-                SchemaUtil.normalizeIdentifier(SchemaUtil.getTableNameFromFullName(masterTable));
-
-        ResultSet rs = null;
-        try {
-            rs = dbMetaData.getIndexInfo("", schemaName, tableName, false, false);
-            while (rs.next()) {
-                final String indexName = rs.getString(6);
-                if (indexTable.equalsIgnoreCase(indexName)) {
-                    return true;
-                }
-            }
-        } finally {
-            if (rs != null) {
-                rs.close();
-            }
-        }
-        return false;
-    }
-
     public static void main(final String[] args) throws Exception {
         int result = ToolRunner.run(new IndexScrutinyTool(), args);
         System.exit(result);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index d1d6ca2..a71bc27 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -33,6 +33,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Strings;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -337,7 +338,7 @@ public class IndexTool extends Configured implements Tool {
             ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
             IndexMaintainer.serializeAdditional(pDataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class));
             PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr);
-            if (tenantId != null) {
+            if (!Strings.isNullOrEmpty(tenantId)) {
                 PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
             }
 
@@ -792,7 +793,7 @@ public class IndexTool extends Configured implements Tool {
      * @return
      * @throws SQLException
      */
-    private boolean isValidIndexTable(final Connection connection, final String masterTable,
+    public static boolean isValidIndexTable(final Connection connection, final String masterTable,
             final String indexTable, final String tenantId) throws SQLException {
         final DatabaseMetaData dbMetaData = connection.getMetaData();
         final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
index 6f2959f..df8c7ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
@@ -61,6 +61,9 @@ public class IndexColumnNames {
         if (pindexTable.getViewIndexId() != null) {
             offset++;
         }
+        if (pindexTable.isMultiTenant()) {
+            offset++;
+        }
         if (offset > 0) {
             pindexCols = pindexCols.subList(offset, pindexCols.size());
             pkColumns = pkColumns.subList(offset, pkColumns.size());