You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/04/23 05:30:13 UTC

[phoenix] branch 4.x updated: PHOENIX-5804: Implement strong verification with -v ONLY option for old design of secondry indexes (#758)

This is an automated email from the ASF dual-hosted git repository.

skadam pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new b8ba647  PHOENIX-5804: Implement strong verification with -v ONLY option for old design of secondry indexes (#758)
b8ba647 is described below

commit b8ba647664c50cc6f677f463f928d3f9bc214766
Author: Swaroopa Kadam <sw...@gmail.com>
AuthorDate: Wed Apr 22 22:30:00 2020 -0700

    PHOENIX-5804: Implement strong verification with -v ONLY option for old design of secondry indexes (#758)
    
    Co-authored-by: s.kadam <s....@apache.org>
---
 .../end2end/IndexToolForNonTxGlobalIndexIT.java    | 483 +++++++++++++++++++++
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 398 +----------------
 .../end2end/IndexVerificationOldDesignIT.java      | 161 +++++++
 .../coprocessor/GlobalIndexRegionScanner.java      | 203 +++++++++
 .../coprocessor/IndexRebuildRegionScanner.java     | 182 +-------
 .../phoenix/coprocessor/IndexerRegionScanner.java  | 354 +++++++++++++++
 .../UngroupedAggregateRegionObserver.java          |  23 +-
 .../phoenix/hbase/index/IndexRegionObserver.java   |  10 +-
 .../index/PhoenixIndexToolJobCounters.java         |   2 +-
 .../phoenix/index/VerifySingleIndexRowTest.java    |   3 +-
 10 files changed, 1248 insertions(+), 571 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
new file mode 100644
index 0000000..e151040
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.util.IndexScrutiny;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT {
+
+    private final String tableDDLOptions;
+    private boolean directApi = true;
+    private boolean useSnapshot = false;
+    private boolean mutable;
+
+    public IndexToolForNonTxGlobalIndexIT(boolean mutable) {
+        StringBuilder optionBuilder = new StringBuilder();
+        this.mutable = mutable;
+        if (!mutable) {
+            optionBuilder.append(" IMMUTABLE_ROWS=true ");
+        }
+        optionBuilder.append(" SPLIT ON(1,2)");
+        this.tableDDLOptions = optionBuilder.toString();
+    }
+
+    @Parameterized.Parameters(name = "mutable={0}")
+    public static synchronized Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {true},
+                {false} });
+    }
+
+    @BeforeClass
+    public static synchronized void setup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
+        serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+        serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5));
+        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+                QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
+        clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
+        clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
+        clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString());
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+                new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+
+    @Test
+    public void testWithSetNull() throws Exception {
+        // This tests the cases where a column having a null value is overwritten with a not null value and vice versa;
+        // and after that the index table is still rebuilt correctly
+        if(!this.mutable) {
+            return;
+        }
+        final int NROWS = 2 * 3 * 5 * 7;
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
+                    + tableDDLOptions);
+            String upsertStmt = "UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)";
+            PreparedStatement stmt = conn.prepareStatement(upsertStmt);
+            IndexToolIT.setEveryNthRowWithNull(NROWS, 2, stmt);
+            conn.commit();
+            IndexToolIT.setEveryNthRowWithNull(NROWS, 3, stmt);
+            conn.commit();
+            conn.createStatement().execute(String.format(
+                    "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2) ASYNC ", indexTableName, dataTableFullName));
+            // Run the index MR job and verify that the index table is built correctly
+            IndexTool
+                    indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            long actualRowCount = IndexScrutiny
+                    .scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+            IndexToolIT.setEveryNthRowWithNull(NROWS, 5, stmt);
+            conn.commit();
+            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+            IndexToolIT.setEveryNthRowWithNull(NROWS, 7, stmt);
+            conn.commit();
+            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+            indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null,
+                    0, IndexTool.IndexVerifyType.ONLY, new String[0]);
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            IndexToolIT.dropIndexToolTables(conn);
+        }
+    }
+
+    @Test
+    public void testIndexToolVerifyWithExpiredIndexRows() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
+            // Insert a row
+            conn.createStatement()
+                    .execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
+            conn.commit();
+            conn.createStatement()
+                    .execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC",
+                            indexTableName, dataTableFullName));
+            IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+                    IndexTool.IndexVerifyType.ONLY);
+            Cell cell =
+                    IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName,
+                            indexTableFullName);
+            try {
+                String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW;
+                String actualErrorMsg = Bytes
+                        .toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                assertTrue(expectedErrorMsg.equals(actualErrorMsg));
+            } catch(Exception ex) {
+                Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
+            }
+
+            // Run the index tool to populate the index while verifying rows
+            IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+                    IndexTool.IndexVerifyType.AFTER);
+
+            // Set ttl of index table ridiculously low so that all data is expired
+            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+            TableName indexTable = TableName.valueOf(indexTableFullName);
+            HColumnDescriptor desc = admin.getTableDescriptor(indexTable).getColumnFamilies()[0];
+            desc.setTimeToLive(1);
+            admin.modifyColumn(indexTable, desc);
+            Thread.sleep(1000);
+            Pair<Integer, Integer> status = admin.getAlterStatus(indexTable);
+            int retry = 0;
+            while (retry < 20 && status.getFirst() != 0) {
+                Thread.sleep(2000);
+                status = admin.getAlterStatus(indexTable);
+            }
+            assertTrue(status.getFirst() == 0);
+
+            TableName indexToolOutputTable = TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
+            admin.disableTable(indexToolOutputTable);
+            admin.deleteTable(indexToolOutputTable);
+            // Run the index tool using the only-verify option, verify it gives no mismatch
+            IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+                    IndexTool.IndexVerifyType.ONLY);
+            Scan scan = new Scan();
+            Table hIndexToolTable =
+                    conn.unwrap(PhoenixConnection.class).getQueryServices()
+                            .getTable(indexToolOutputTable.getName());
+            Result r = hIndexToolTable.getScanner(scan).next();
+            assertTrue(r == null);
+            IndexToolIT.dropIndexToolTables(conn);
+        }
+    }
+
+    @Test
+    public void testSecondaryGlobalIndexFailure() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String stmString1 =
+                    "CREATE TABLE " + dataTableFullName
+                            + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+                            + tableDDLOptions;
+            conn.createStatement().execute(stmString1);
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+            // Insert two rows
+            IndexToolIT.upsertRow(stmt1, 1);
+            IndexToolIT.upsertRow(stmt1, 2);
+            conn.commit();
+
+            String stmtString2 =
+                    String.format(
+                            "CREATE INDEX %s ON %s  (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ", indexTableName, dataTableFullName);
+            conn.createStatement().execute(stmtString2);
+
+            // Run the index MR job.
+            IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName);
+
+            String qIndexTableName = SchemaUtil.getQualifiedTableName(schemaName, indexTableName);
+
+            // Verify that the index table is in the ACTIVE state
+            assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName));
+
+            ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
+            Admin admin = queryServices.getAdmin();
+            TableName tableName = TableName.valueOf(qIndexTableName);
+            admin.disableTable(tableName);
+
+            // Run the index MR job and it should fail (return -1)
+            IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+                    null, -1, new String[0]);
+
+            // Verify that the index table should be still in the ACTIVE state
+            assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName));
+        }
+    }
+
+    @Test
+    public void testBuildSecondaryIndexAndScrutinize() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String stmString1 =
+                    "CREATE TABLE " + dataTableFullName
+                            + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+                            + tableDDLOptions;
+            conn.createStatement().execute(stmString1);
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+            // Insert NROWS rows
+            final int NROWS = 1000;
+            for (int i = 0; i < NROWS; i++) {
+                IndexToolIT.upsertRow(stmt1, i);
+            }
+            conn.commit();
+            String stmtString2 =
+                    String.format(
+                            "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ", indexTableName, dataTableFullName);
+            conn.createStatement().execute(stmtString2);
+
+            // Run the index MR job and verify that the index table is built correctly
+            IndexTool indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]);
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+            long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+
+            // Add more rows and make sure that these rows will be visible to IndexTool
+            for (int i = NROWS; i < 2 * NROWS; i++) {
+                IndexToolIT.upsertRow(stmt1, i);
+            }
+            conn.commit();
+            indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BOTH, new String[0]);
+            assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+            assertEquals(2 * NROWS, actualRowCount);
+            IndexToolIT.dropIndexToolTables(conn);
+        }
+    }
+
+    @Test
+    public void testIndexToolVerifyBeforeAndBothOptions() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String schemaName = generateUniqueName();
+            String dataTableName = generateUniqueName();
+            String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+            String indexTableName = generateUniqueName();
+            String viewName = generateUniqueName();
+            String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+                    + tableDDLOptions);
+            conn.commit();
+            conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
+            conn.commit();
+            // Insert a row
+            conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
+            conn.commit();
+            conn.createStatement().execute(String.format(
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
+            TestUtil.addCoprocessor(conn, "_IDX_" + dataTableFullName, IndexToolIT.MutationCountingRegionObserver.class);
+            // Run the index MR job and verify that the index table rebuild succeeds
+            IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.AFTER);
+            assertEquals(1, IndexToolIT.MutationCountingRegionObserver.getMutationCount());
+            IndexToolIT.MutationCountingRegionObserver.setMutationCount(0);
+            // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should not
+            // write any index rows
+            IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.BEFORE);
+            assertEquals(0, IndexToolIT.MutationCountingRegionObserver.getMutationCount());
+            // The "-v BOTH" option should not write any index rows either
+            IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.BOTH);
+            assertEquals(0, IndexToolIT.MutationCountingRegionObserver.getMutationCount());
+            IndexToolIT.dropIndexToolTables(conn);
+        }
+    }
+
+    @Test
+    public void testIndexToolVerifyAfterOption() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String schemaName = generateUniqueName();
+            String dataTableName = generateUniqueName();
+            String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+            String indexTableName = generateUniqueName();
+            String viewName = generateUniqueName();
+            String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+                    + tableDDLOptions);
+            conn.commit();
+            conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
+            conn.commit();
+            // Insert a row
+            conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
+            conn.commit();
+            // Configure IndexRegionObserver to fail the first write phase. This should not
+            // lead to any change on index and thus the index verify during index rebuild should fail
+            IndexRegionObserver.setIgnoreIndexRebuildForTesting(true);
+            conn.createStatement().execute(String.format(
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
+            // Run the index MR job and verify that the index table rebuild fails
+            IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+                    null, -1, IndexTool.IndexVerifyType.AFTER);
+            // The index tool output table should report that there is a missing index row
+            Cell cell = IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName);
+            try {
+                String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW;
+                String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                assertTrue(expectedErrorMsg.equals(actualErrorMsg));
+            } catch(Exception ex){
+                Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
+            }
+            IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
+            IndexToolIT.dropIndexToolTables(conn);
+        }
+    }
+
+    @Test
+    public void testIndexToolOnlyVerifyOption() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
+            // Insert a row
+            conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
+            conn.commit();
+            conn.createStatement().execute(String.format(
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", indexTableName, dataTableFullName));
+            // Run the index MR job to only verify that each data table row has a corresponding index row
+            // IndexTool will go through each data table row and record the mismatches in the output table
+            // called PHOENIX_INDEX_TOOL
+            IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
+            Cell cell = IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
+            try {
+                String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW;
+                String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                assertTrue(expectedErrorMsg.equals(actualErrorMsg));
+            } catch(Exception ex) {
+                Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
+            }
+            // Delete the output table for the next test
+            IndexToolIT.dropIndexToolTables(conn);
+            // Run the index tool to populate the index while verifying rows
+            IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.AFTER);
+            IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
+            IndexToolIT.dropIndexToolTables(conn);
+        }
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 02b253f..e5db314 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -189,7 +189,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         return TestUtil.filterTxParamData(list,0);
     }
 
-    private void setEveryNthRowWithNull(int nrows, int nthRowNull, PreparedStatement stmt) throws Exception {
+    protected static void setEveryNthRowWithNull(int nrows, int nthRowNull, PreparedStatement stmt) throws Exception {
         for (int i = 1; i <= nrows; i++) {
             stmt.setInt(1, i);
             stmt.setInt(2, i + 1);
@@ -203,66 +203,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
-    public void testWithSetNull() throws Exception {
-        // This test is for building non-transactional mutable global indexes with direct api
-        if (localIndex || transactional || !directApi || useSnapshot || useTenantId || !mutable) {
-            return;
-        }
-        // This tests the cases where a column having a null value is overwritten with a not null value and vice versa;
-        // and after that the index table is still rebuilt correctly
-        final int NROWS = 2 * 3 * 5 * 7;
-        String schemaName = generateUniqueName();
-        String dataTableName = generateUniqueName();
-        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-        String indexTableName = generateUniqueName();
-        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
-                    + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
-                    + tableDDLOptions);
-            String upsertStmt = "UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)";
-            PreparedStatement stmt = conn.prepareStatement(upsertStmt);
-            setEveryNthRowWithNull(NROWS, 2, stmt);
-            conn.commit();
-            setEveryNthRowWithNull(NROWS, 3, stmt);
-            conn.commit();
-            conn.createStatement().execute(String.format(
-                    "CREATE %s INDEX %s ON %s (VAL1) INCLUDE (VAL2) ASYNC ",
-                    (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName));
-            // Run the index MR job and verify that the index table is built correctly
-            IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
-            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
-            long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
-            assertEquals(NROWS, actualRowCount);
-            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
-            assertEquals(NROWS, actualRowCount);
-            setEveryNthRowWithNull(NROWS, 5, stmt);
-            conn.commit();
-            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
-            assertEquals(NROWS, actualRowCount);
-            setEveryNthRowWithNull(NROWS, 7, stmt);
-            conn.commit();
-            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
-            assertEquals(NROWS, actualRowCount);
-            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
-            assertEquals(NROWS, actualRowCount);
-            indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null,
-                    0, IndexTool.IndexVerifyType.ONLY, new String[0]);
-            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
-            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
-            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
-            dropIndexToolTables(conn);
-        }
-    }
-
-    @Test
     public void testSecondaryIndex() throws Exception {
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
@@ -370,7 +310,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
-    private void dropIndexToolTables(Connection conn) throws Exception {
+    protected static void dropIndexToolTables(Connection conn) throws Exception {
         Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
         TableName indexToolOutputTable =
             TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
@@ -381,80 +321,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         admin.deleteTable(indexToolResultTable);
     }
 
-    @Test
-    public void testBuildSecondaryIndexAndScrutinize() throws Exception {
-        // This test is for building non-transactional global indexes with direct api
-        if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
-            return;
-        }
-        String schemaName = generateUniqueName();
-        String dataTableName = generateUniqueName();
-        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-        String indexTableName = generateUniqueName();
-        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            String stmString1 =
-                    "CREATE TABLE " + dataTableFullName
-                            + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
-                            + tableDDLOptions;
-            conn.createStatement().execute(stmString1);
-            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
-            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
-
-            // Insert NROWS rows
-            final int NROWS = 1000;
-            for (int i = 0; i < NROWS; i++) {
-                upsertRow(stmt1, i);
-            }
-            conn.commit();
-            String stmtString2 =
-                    String.format(
-                            "CREATE %s INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ",
-                            (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
-            conn.createStatement().execute(stmtString2);
-
-            // Run the index MR job and verify that the index table is built correctly
-            IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]);
-            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
-            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
-            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
-            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
-            long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
-            assertEquals(NROWS, actualRowCount);
-
-            // Add more rows and make sure that these rows will be visible to IndexTool
-            for (int i = NROWS; i < 2 * NROWS; i++) {
-                upsertRow(stmt1, i);
-            }
-            conn.commit();
-            indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BOTH, new String[0]);
-            assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
-            assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
-            assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
-            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
-            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
-            assertEquals(2 * NROWS, actualRowCount);
-            dropIndexToolTables(conn);
-        }
-    }
-
     public static class MutationCountingRegionObserver extends SimpleRegionObserver {
         public static AtomicInteger mutationCount = new AtomicInteger(0);
 
@@ -473,7 +339,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
-    private void verifyIndexTableRowKey(byte[] rowKey, String indexTableFullName) {
+    private static void verifyIndexTableRowKey(byte[] rowKey, String indexTableFullName) {
         // The row key for the output table : timestamp | index table name | data row key
         // The row key for the result table : timestamp | index table name | datable table region name |
         //                                    scan start row | scan stop row
@@ -489,7 +355,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             IndexVerificationResultRepository.ROW_KEY_SEPARATOR_BYTE[0]);
     }
 
-    private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName)
+    public static Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName)
             throws Exception {
         byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
         byte[] dataTableFullNameBytes = Bytes.toBytes(dataTableFullName);
@@ -536,209 +402,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
-    public void testIndexToolVerifyBeforeAndBothOptions() throws Exception {
-        // This test is for building non-transactional global indexes with direct api
-        if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
-            return;
-        }
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            String schemaName = generateUniqueName();
-            String dataTableName = generateUniqueName();
-            String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-            String indexTableName = generateUniqueName();
-            String viewName = generateUniqueName();
-            String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
-            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
-                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
-                    + tableDDLOptions);
-            conn.commit();
-            conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
-            conn.commit();
-            // Insert a row
-            conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
-            conn.commit();
-            conn.createStatement().execute(String.format(
-                    "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
-            TestUtil.addCoprocessor(conn, "_IDX_" + dataTableFullName, MutationCountingRegionObserver.class);
-            // Run the index MR job and verify that the index table rebuild succeeds
-            runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
-                    null, 0, IndexTool.IndexVerifyType.AFTER);
-            assertEquals(1, MutationCountingRegionObserver.getMutationCount());
-            MutationCountingRegionObserver.setMutationCount(0);
-            // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should not
-            // write any index rows
-            runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
-                    null, 0, IndexTool.IndexVerifyType.BEFORE);
-            assertEquals(0, MutationCountingRegionObserver.getMutationCount());
-            // The "-v BOTH" option should not write any index rows either
-            runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
-                    null, 0, IndexTool.IndexVerifyType.BOTH);
-            assertEquals(0, MutationCountingRegionObserver.getMutationCount());
-            dropIndexToolTables(conn);
-        }
-    }
-
-    @Test
-    public void testIndexToolVerifyAfterOption() throws Exception {
-        // This test is for building non-transactional global indexes with direct api
-        if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
-            return;
-        }
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            String schemaName = generateUniqueName();
-            String dataTableName = generateUniqueName();
-            String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-            String indexTableName = generateUniqueName();
-            String viewName = generateUniqueName();
-            String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
-            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
-                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
-                    + tableDDLOptions);
-            conn.commit();
-            conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
-            conn.commit();
-            // Insert a row
-            conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
-            conn.commit();
-            // Configure IndexRegionObserver to fail the first write phase. This should not
-            // lead to any change on index and thus the index verify during index rebuild should fail
-            IndexRegionObserver.setIgnoreIndexRebuildForTesting(true);
-            conn.createStatement().execute(String.format(
-                    "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
-            // Run the index MR job and verify that the index table rebuild fails
-            runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
-                    null, -1, IndexTool.IndexVerifyType.AFTER);
-            // The index tool output table should report that there is a missing index row
-            Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName);
-            try {
-                String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
-                String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-                assertTrue(expectedErrorMsg.equals(actualErrorMsg));
-            } catch(Exception ex){
-                Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
-            }
-            IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
-            dropIndexToolTables(conn);
-        }
-    }
-
-    @Test
-    public void testIndexToolOnlyVerifyOption() throws Exception {
-        // This test is for building non-transactional global indexes with direct api
-        if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
-            return;
-        }
-        String schemaName = generateUniqueName();
-        String dataTableName = generateUniqueName();
-        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-        String indexTableName = generateUniqueName();
-        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
-                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
-            // Insert a row
-            conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
-            conn.commit();
-            conn.createStatement().execute(String.format(
-                    "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", indexTableName, dataTableFullName));
-            // Run the index MR job to only verify that each data table row has a corresponding index row
-            // IndexTool will go through each data table row and record the mismatches in the output table
-            // called PHOENIX_INDEX_TOOL
-            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
-                    null, 0, IndexTool.IndexVerifyType.ONLY);
-            Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
-            try {
-                String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
-                String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-                assertTrue(expectedErrorMsg.equals(actualErrorMsg));
-            } catch(Exception ex) {
-                Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
-            }
-            // Delete the output table for the next test
-            dropIndexToolTables(conn);
-            // Run the index tool to populate the index while verifying rows
-            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
-                    null, 0, IndexTool.IndexVerifyType.AFTER);
-            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
-                    null, 0, IndexTool.IndexVerifyType.ONLY);
-            dropIndexToolTables(conn);
-        }
-    }
-
-    @Test
-    public void testIndexToolVerifyWithExpiredIndexRows() throws Exception {
-        // This test is for building non-transactional global indexes with direct api
-        if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
-            return;
-        }
-        String schemaName = generateUniqueName();
-        String dataTableName = generateUniqueName();
-        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-        String indexTableName = generateUniqueName();
-        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
-                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
-            // Insert a row
-            conn.createStatement()
-                    .execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
-            conn.commit();
-            conn.createStatement()
-                    .execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC",
-                        indexTableName, dataTableFullName));
-            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
-                IndexTool.IndexVerifyType.ONLY);
-            Cell cell =
-                    getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName,
-                        indexTableFullName);
-            try {
-                String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
-                String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-                assertTrue(expectedErrorMsg.equals(actualErrorMsg));
-            } catch(Exception ex) {
-                Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
-            }
-
-            // Run the index tool to populate the index while verifying rows
-            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
-                IndexTool.IndexVerifyType.AFTER);
-
-            // Set ttl of index table ridiculously low so that all data is expired
-            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
-            TableName indexTable = TableName.valueOf(indexTableFullName);
-            HColumnDescriptor desc = admin.getTableDescriptor(indexTable).getColumnFamilies()[0];
-            desc.setTimeToLive(1);
-            admin.modifyColumn(indexTable, desc);
-            Thread.sleep(1000);
-            Pair<Integer, Integer> status = admin.getAlterStatus(indexTable);
-            int retry = 0;
-            while (retry < 20 && status.getFirst() != 0) {
-                Thread.sleep(2000);
-                status = admin.getAlterStatus(indexTable);
-            }
-            assertTrue(status.getFirst() == 0);
-
-            TableName indexToolOutputTable = TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
-            admin.disableTable(indexToolOutputTable);
-            admin.deleteTable(indexToolOutputTable);
-            // Run the index tool using the only-verify option, verify it gives no mismatch
-            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
-                IndexTool.IndexVerifyType.ONLY);
-            Scan scan = new Scan();
-            Table hIndexToolTable =
-                    conn.unwrap(PhoenixConnection.class).getQueryServices()
-                            .getTable(indexToolOutputTable.getName());
-            Result r = hIndexToolTable.getScanner(scan).next();
-            assertTrue(r == null);
-            dropIndexToolTables(conn);
-        }
-    }
-
-    @Test
     public void testIndexToolWithTenantId() throws Exception {
         if (!useTenantId) { return;}
         String tenantId = generateUniqueName();
@@ -823,59 +486,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
-    public void testSecondaryGlobalIndexFailure() throws Exception {
-        // This test is for building non-transactional global indexes with direct api
-        if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
-            return;
-        }
-        String schemaName = generateUniqueName();
-        String dataTableName = generateUniqueName();
-        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-        String indexTableName = generateUniqueName();
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            String stmString1 =
-                    "CREATE TABLE " + dataTableFullName
-                            + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
-                            + tableDDLOptions;
-            conn.createStatement().execute(stmString1);
-            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
-            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
-
-            // Insert two rows
-            upsertRow(stmt1, 1);
-            upsertRow(stmt1, 2);
-            conn.commit();
-
-            String stmtString2 =
-                    String.format(
-                            "CREATE %s INDEX %s ON %s  (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ",
-                            (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
-            conn.createStatement().execute(stmtString2);
-
-            // Run the index MR job.
-            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName);
-
-            String qIndexTableName = SchemaUtil.getQualifiedTableName(schemaName, indexTableName);
-
-            // Verify that the index table is in the ACTIVE state
-            assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName));
-
-            ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
-            Admin admin = queryServices.getAdmin();
-            TableName tableName = TableName.valueOf(qIndexTableName);
-            admin.disableTable(tableName);
-
-            // Run the index MR job and it should fail (return -1)
-            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
-                    null, -1, new String[0]);
-
-            // Verify that the index table should be still in the ACTIVE state
-            assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName));
-        }
-    }
-    
-    @Test
     public void testSaltedVariableLengthPK() throws Exception {
         if (!mutable) return;
         if (transactional) return;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexVerificationOldDesignIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexVerificationOldDesignIT.java
new file mode 100644
index 0000000..477f74f
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexVerificationOldDesignIT.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+
+public class IndexVerificationOldDesignIT extends  BaseUniqueNamesOwnClusterIT {
+
+    ManualEnvironmentEdge injectEdge;
+
+    @BeforeClass
+    public static synchronized void setup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
+        serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+        serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5));
+        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+                QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
+        clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
+        clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
+        clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString());
+        clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB,
+                Boolean.toString(false));
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+                new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+
+    @Test
+    public void testIndexToolOnlyVerifyOption() throws Exception {
+        long ttl=3600;
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0, TTL="+ttl);
+            conn.createStatement().execute(String.format(
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE)", indexTableName, dataTableFullName));
+
+            upsertValidRows(conn, dataTableFullName);
+
+            IndexTool indexTool = IndexToolIT.runIndexTool(true, false, schemaName, dataTableName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
+
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(6, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+
+            conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix5', 6,'G')");
+            conn.commit();
+            indexTool = IndexToolIT.runIndexTool(true, false, schemaName, dataTableName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
+
+            assertEquals(1, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(5, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+
+            injectEdge = new ManualEnvironmentEdge();
+            injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis() + ttl*1000);
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+
+            indexTool = IndexToolIT.runIndexTool(true, false, schemaName, dataTableName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
+
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+        }
+    }
+
+    @Test
+    public void testIndexToolOnlyVerifyOption_viewIndex() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        String viewName = generateUniqueName();
+        String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
+            conn.createStatement().execute("CREATE VIEW " + fullViewName
+                    + " AS SELECT * FROM "+dataTableFullName);
+            conn.createStatement().execute(String.format(
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE)", indexTableName, fullViewName));
+
+            upsertValidRows(conn, fullViewName);
+
+            IndexToolIT.runIndexTool(true, false, schemaName, viewName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
+
+            conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix5', 6,'G')");
+            conn.createStatement().execute("delete from " + indexTableFullName + " where \"0:CODE\" = 'D'");
+            conn.commit();
+
+            IndexTool indexTool = IndexToolIT.runIndexTool(true, false, schemaName, viewName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
+            assertEquals(1, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(4, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(1, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+        }
+    }
+
+    private void upsertValidRows(Connection conn, String table) throws SQLException {
+        conn.createStatement().execute("upsert into " + table + " values (1, 'Phoenix', 'A')");
+        conn.createStatement().execute("upsert into " + table + " values (2, 'Phoenix1', 'B')");
+        conn.createStatement().execute("upsert into " + table + " values (3, 'Phoenix2', 'C')");
+        conn.createStatement().execute("upsert into " + table + " values (4, 'Phoenix3', 'D')");
+        conn.createStatement().execute("upsert into " + table + " values (5, 'Phoenix4', 'E')");
+        conn.createStatement().execute("upsert into " + table + " values (6, 'Phoenix5', 'F')");
+        conn.commit();
+    }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
new file mode 100644
index 0000000..35a0a8a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.parallel.TaskRunner;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ServerUtil;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+
+public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
+
+    public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
+    public static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
+    public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max";
+    public static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048;
+    public static final String NO_EXPECTED_MUTATION = "No expected mutation";
+    public static final String ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty";
+    public static final String ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK = "Missing index row beyond maxLookBack";
+    public static final String ERROR_MESSAGE_MISSING_INDEX_ROW = "Missing index row";
+
+    protected long pageSizeInRows = Long.MAX_VALUE;
+    protected int rowCountPerTask;
+    protected boolean hasMore;
+    protected int maxBatchSize;
+    protected byte[] indexMetaData;
+    protected Scan scan;
+    protected RegionScanner innerScanner;
+    protected Region region;
+    protected IndexMaintainer indexMaintainer;
+    protected Table indexHTable;
+    protected TaskRunner pool;
+    protected TaskBatch<Boolean> tasks;
+    protected String exceptionMessage;
+    protected HTableFactory hTableFactory;
+    protected int indexTableTTL;
+    protected long maxLookBackInMills;
+    protected IndexToolVerificationResult verificationResult;
+    protected IndexVerificationResultRepository verificationResultRepository;
+
+    public GlobalIndexRegionScanner(RegionScanner innerScanner, final Region region, final Scan scan,
+            final RegionCoprocessorEnvironment env) throws IOException {
+        super(innerScanner);
+        final Configuration config = env.getConfiguration();
+        if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
+            pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
+                    QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+        }
+        maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+        indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+        if (indexMetaData == null) {
+            indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+        }
+        List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+        indexMaintainer = maintainers.get(0);
+        this.scan = scan;
+        this.innerScanner = innerScanner;
+        this.region = region;
+        verificationResult = new IndexToolVerificationResult(scan);
+        // Create the following objects only for rebuilds by IndexTool
+        hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
+        indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+        indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
+        maxLookBackInMills = ScanInfoUtil.getMaxLookbackInMillis(config);
+        verificationResultRepository =
+                new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory);
+        pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
+                new ThreadPoolBuilder("IndexVerify",
+                        env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
+                        DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
+                        INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
+        rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
+                DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
+    }
+
+    public static class SimpleValueGetter implements ValueGetter {
+        final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
+        final Put put;
+        public SimpleValueGetter (final Put put) {
+            this.put = put;
+        }
+        @Override
+        public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) {
+            List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
+            if (cellList == null || cellList.isEmpty()) {
+                return null;
+            }
+            Cell cell = cellList.get(0);
+            valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+            return valuePtr;
+        }
+
+        @Override
+        public byte[] getRowKey() {
+            return put.getRow();
+        }
+
+    }
+
+    public static byte[] getIndexRowKey(IndexMaintainer indexMaintainer, final Put dataRow) {
+        ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+        return indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()),
+                null, null, HConstants.LATEST_TIMESTAMP);
+    }
+
+    public static long getMaxTimestamp(Mutation m) {
+        long ts = 0;
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            if (cells == null) {
+                continue;
+            }
+            for (Cell cell : cells) {
+                if (ts < cell.getTimestamp()) {
+                    ts = cell.getTimestamp();
+                }
+            }
+        }
+        return ts;
+    }
+
+    public static long getTimestamp(Mutation m) {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                return cell.getTimestamp();
+            }
+        }
+        throw new IllegalStateException("No cell found");
+    }
+
+    protected static boolean isTimestampBeforeTTL(long tableTTL, long currentTime, long tsToCheck) {
+        if (tableTTL == HConstants.FOREVER) {
+            return false;
+        }
+        return tsToCheck < (currentTime - tableTTL * 1000);
+    }
+
+    protected static boolean isTimestampBeyondMaxLookBack(long maxLookBackInMills,
+            long currentTime, long tsToCheck) {
+        if (!ScanInfoUtil.isMaxLookbackTimeEnabled(maxLookBackInMills)) {
+            return false;
+        }
+        return tsToCheck < (currentTime - maxLookBackInMills);
+    }
+
+    protected static long getMaxTimestamp(Pair<Put, Delete> pair) {
+        Put put = pair.getFirst();
+        long ts1 = 0;
+        if (put != null) {
+            ts1 = getMaxTimestamp(put);
+        }
+        Delete del = pair.getSecond();
+        long ts2 = 0;
+        if (del != null) {
+            ts1 = getMaxTimestamp(del);
+        }
+        return (ts1 > ts2) ? ts1 : ts2;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 8b7e3f2..e767a23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -24,8 +24,6 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
-import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
-import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
 
 import java.io.IOException;
@@ -54,12 +52,10 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
@@ -67,22 +63,18 @@ import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.ValueGetter;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
 import org.apache.phoenix.hbase.index.parallel.Task;
 import org.apache.phoenix.hbase.index.parallel.TaskBatch;
-import org.apache.phoenix.hbase.index.parallel.TaskRunner;
 import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
 import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
 import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
-import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
-import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.KeyRange;
@@ -98,86 +90,50 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 
-public class IndexRebuildRegionScanner extends BaseRegionScanner {
-    public static final String ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK = "Missing index row beyond maxLookBack";
+public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
-    public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
-    private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
-    public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max";
-    private static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048;
-    public static final String NO_EXPECTED_MUTATION = "No expected mutation";
-    public static final String
-            ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty";
-    private long pageSizeInRows = Long.MAX_VALUE;
-    private int rowCountPerTask;
-    private boolean hasMore;
-    private final int maxBatchSize;
-    private UngroupedAggregateRegionObserver.MutationList mutations;
+
     private final long maxBatchSizeBytes;
     private final long blockingMemstoreSize;
     private final byte[] clientVersionBytes;
-    private byte[] indexMetaData;
     private boolean useProto = true;
-    private Scan scan;
-    private RegionScanner innerScanner;
-    private Region region;
-    private IndexMaintainer indexMaintainer;
-    private byte[] indexRowKey = null;
-    private Table indexHTable = null;
+    private byte[] indexRowKey;
     private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
     private boolean verify = false;
     private Map<byte[], List<Mutation>> indexKeyToMutationMap;
     private Map<byte[], Pair<Put, Delete>> dataKeyToMutationMap;
-    private TaskRunner pool;
-    private TaskBatch<Boolean> tasks;
-    private String exceptionMessage;
     private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
-    private HTableFactory hTableFactory;
-    private int indexTableTTL = 0;
-    private IndexToolVerificationResult verificationResult;
+    protected UngroupedAggregateRegionObserver.MutationList mutations;
     private boolean isBeforeRebuilt = true;
     private boolean partialRebuild = false;
-    private int  singleRowRebuildReturnCode;
+    private int singleRowRebuildReturnCode;
     private Map<byte[], NavigableSet<byte[]>> familyMap;
     private byte[][] viewConstants;
-    private IndexVerificationResultRepository verificationResultRepository;
     private IndexVerificationOutputRepository verificationOutputRepository;
-    private long maxLookBackInMills;
 
     @VisibleForTesting
     public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
                               final RegionCoprocessorEnvironment env,
                               UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
-        super(innerScanner);
+        super(innerScanner, region, scan, env);
         final Configuration config = env.getConfiguration();
-        if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
-            pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
-                    QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
-        } else {
+        if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) == null) {
             partialRebuild = true;
         }
-        maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-        mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
         maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
                 QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+        mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
         blockingMemstoreSize = UngroupedAggregateRegionObserver.getBlockingMemstoreSize(region, config);
         clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
         indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
         if (indexMetaData == null) {
             useProto = false;
-            indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
         }
-        List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
-        indexMaintainer = maintainers.get(0);
-        this.scan = scan;
         familyMap = scan.getFamilyMap();
         if (familyMap.isEmpty()) {
             familyMap = null;
         }
-
-        this.innerScanner = innerScanner;
-        this.region = region;
         this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
         indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
         if (indexRowKey != null) {
@@ -186,17 +142,10 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         }
         byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
         if (valueBytes != null) {
-            verificationResult = new IndexToolVerificationResult(scan);
             verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
             if (verifyType != IndexTool.IndexVerifyType.NONE) {
                 verify = true;
                 viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
-                // Create the following objects only for rebuilds by IndexTool
-                hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
-                indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
-                indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
-                verificationResultRepository =
-                    new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory);
                 verificationOutputRepository =
                     new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName(), hTableFactory);
                 indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
@@ -206,12 +155,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                                 env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
                                 DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
                                 INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
-                rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
-                        DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
             }
         }
-
-        maxLookBackInMills = ScanInfoUtil.getMaxLookbackInMillis(config);
     }
 
     private void setReturnCodeForSingleRowRebuild() throws IOException {
@@ -310,41 +255,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return 0;
     }
 
-    public static class SimpleValueGetter implements ValueGetter {
-        final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
-        final Put put;
-
-        public SimpleValueGetter(final Put put) {
-            this.put = put;
-        }
-
-        @Override
-        public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
-            List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
-            if (cellList == null || cellList.isEmpty()) {
-                return null;
-            }
-            Cell cell = cellList.get(0);
-            valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-            return valuePtr;
-        }
-
-        @Override
-        public byte[] getRowKey() {
-            return put.getRow();
-        }
-
-    }
-
-    public byte[] getIndexRowKey(final Put dataRow) throws IOException {
-        ValueGetter valueGetter = new SimpleValueGetter(dataRow);
-        byte[] builtIndexRowKey = indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()),
-                null, null, HConstants.LATEST_TIMESTAMP);
-        return builtIndexRowKey;
-    }
 
     private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException {
-        byte[] builtIndexRowKey = getIndexRowKey(put);
+        byte[] builtIndexRowKey = getIndexRowKey(indexMaintainer, put);
         if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
                 indexRowKey, 0, indexRowKey.length) != 0) {
             return false;
@@ -366,21 +279,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             region.getRegionInfo().getTable().getName(), isBeforeRebuilt);
     }
 
-    private static long getMaxTimestamp(Mutation m) {
-        long ts = 0;
-        for (List<Cell> cells : m.getFamilyCellMap().values()) {
-            if (cells == null) {
-                continue;
-            }
-            for (Cell cell : cells) {
-                if (ts < cell.getTimestamp()) {
-                    ts = cell.getTimestamp();
-                }
-            }
-        }
-        return ts;
-    }
-
     private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) {
         List<Cell> cellList = m.getFamilyCellMap().get(family);
         if (cellList == null) {
@@ -529,17 +427,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return false;
     }
 
-    private boolean isDeleteFamilyVersion(Mutation mutation) {
-        for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
-            for (Cell cell : cells) {
-                if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamilyVersion) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
     @VisibleForTesting
     public List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException {
         Put put = null;
@@ -724,7 +611,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             // get a value back from index if it has already expired between our rebuild and
             // verify
             // TODO: have a metric to update for these cases
-            if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) {
+            if (isTimestampBeforeTTL(indexTableTTL, currentTime, getTimestamp(expected))) {
                 verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1);
                 return true;
             }
@@ -782,7 +669,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             return true;
         }
 
-        if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(expectedMutationList.get(expectedIndex)))){
+        if (isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, getTimestamp(expectedMutationList.get(expectedIndex)))){
             if (expectedIndex > 0) {
                 // if current expected index mutation is beyond max look back window, we only need to make sure its latest
                 // mutation is a matching one, as an SCN query is required.
@@ -822,20 +709,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         }
     }
 
-    private static long getMaxTimestamp(Pair<Put, Delete> pair) {
-        Put put = pair.getFirst();
-        long ts1 = 0;
-        if (put != null) {
-            ts1 = getMaxTimestamp(put);
-        }
-        Delete del = pair.getSecond();
-        long ts2 = 0;
-        if (del != null) {
-            ts1 = getMaxTimestamp(del);
-        }
-        return (ts1 > ts2) ? ts1 : ts2;
-    }
-
     private void verifyIndexRows(List<KeyRange> keys,
             IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
         List<KeyRange> invalidKeys = new ArrayList<>();
@@ -847,6 +720,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         indexScan.setFilter(new SkipScanFilter(skipScanFilter, true));
         indexScan.setRaw(true);
         indexScan.setMaxVersions();
+        indexScan.setCacheBlocks(false);
         try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
             for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
                 KeyRange keyRange = PVarbinary.INSTANCE.getKeyRange(result.getRow());
@@ -870,7 +744,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 KeyRange keyRange = itr.next();
                 byte[] key = keyRange.getLowerRange();
                 List<Mutation> mutationList = indexKeyToMutationMap.get(key);
-                if (isTimestampBeforeTTL(currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) {
+                if (isTimestampBeforeTTL(indexTableTTL, currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) {
                     itr.remove();
                     verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1);
                 }
@@ -886,13 +760,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 }
                 long currentTime = EnvironmentEdgeManager.currentTimeMillis();
                 String errorMsg;
-                if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(mutation))){
+                if (isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, getTimestamp(mutation))){
                     errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
                     verificationPhaseResult.
                         setBeyondMaxLookBackMissingIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackMissingIndexRowCount() + 1);
                 }
                 else {
-                    errorMsg = "Missing index row";
+                    errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW;
                     verificationPhaseResult.setMissingIndexRowCount(verificationPhaseResult.getMissingIndexRowCount() + 1);
                 }
                 byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
@@ -906,19 +780,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         keys.addAll(invalidKeys);
     }
 
-    private boolean isTimestampBeforeTTL(long currentTime, long tsToCheck) {
-        if (indexTableTTL == HConstants.FOREVER) {
-            return false;
-        }
-        return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
-    }
-
-    private boolean isTimestampBeyondMaxLookBack(long currentTime, long tsToCheck){
-        if (!ScanInfoUtil.isMaxLookbackTimeEnabled(maxLookBackInMills))
-            return true;
-        return tsToCheck < (currentTime - maxLookBackInMills);
-    }
-
     private void addVerifyTask(final List<KeyRange> keys,
                                final IndexToolVerificationResult.PhaseResult verificationPhaseResult) {
         tasks.add(new Task<Boolean>() {
@@ -1081,15 +942,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return set.contains(qualifier);
     }
 
-    public static long getTimestamp(Mutation m) {
-        for (List<Cell> cells : m.getFamilyCellMap().values()) {
-            for (Cell cell : cells) {
-                return cell.getTimestamp();
-            }
-        }
-        throw new IllegalStateException("No cell found");
-    }
-
     /**
      * This is to reorder the mutations in ascending order by the tuple of timestamp and mutation type where
      * put comes before delete
@@ -1248,7 +1100,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 if (mutation.getFamilyCellMap().size() != 0) {
                     // Add this put on top of the current data row state to get the next data row state
                     Put nextDataRow = (currentDataRowState == null) ? new Put((Put)mutation) : applyNew((Put)mutation, currentDataRowState);
-                    ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRow);
+                    ValueGetter nextDataRowVG = new SimpleValueGetter(nextDataRow);
                     Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
                     indexMutations.add(indexPut);
                     // Delete the current index row if the new index key is different than the current one
@@ -1288,7 +1140,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                         currentDataRowState = null;
                         indexRowKeyForCurrentDataRow = null;
                     } else {
-                        ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState);
+                        ValueGetter nextDataRowVG = new SimpleValueGetter(nextDataRowState);
                         Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
                         indexMutations.add(indexPut);
                         // Delete the current index row if the new index key is different than the current one
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
new file mode 100644
index 0000000..88bac86
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+
+public class IndexerRegionScanner extends GlobalIndexRegionScanner {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexerRegionScanner.class);
+    protected Map<byte[], Put> indexKeyToDataPutMap;
+
+    IndexerRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
+            final RegionCoprocessorEnvironment env) throws IOException {
+        super(innerScanner, region, scan, env);
+        indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() { return false; }
+
+    @Override
+    public void close() throws IOException {
+        innerScanner.close();
+        try {
+            verificationResultRepository.logToIndexToolResultTable(verificationResult,
+                    IndexTool.IndexVerifyType.ONLY, region.getRegionInfo().getRegionName());
+        } finally {
+            this.pool.stop("IndexerRegionScanner is closing");
+            hTableFactory.shutdown();
+            indexHTable.close();
+            verificationResultRepository.close();
+        }
+    }
+
+    private boolean verifySingleIndexRow(Result indexRow, final Put dataRow,
+            IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+        ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+        long ts = getMaxTimestamp(dataRow);
+        Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null);
+
+        if (indexPut == null) {
+            // This means the data row does not have any covered column values
+            indexPut = new Put(indexRow.getRow());
+        }
+        // Add the empty column
+        indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                indexMaintainer.getEmptyKeyValueQualifier(), ts, EMPTY_COLUMN_VALUE_BYTES);
+
+        int cellCount = 0;
+        long currentTime = EnvironmentEdgeManager.currentTime();
+        for (List<Cell> cells : indexPut.getFamilyCellMap().values()) {
+            if (cells == null) {
+                break;
+            }
+            for (Cell expectedCell : cells) {
+                byte[] family = CellUtil.cloneFamily(expectedCell);
+                byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
+                Cell actualCell = indexRow.getColumnLatestCell(family, qualifier);
+                if (actualCell == null) {
+                    // Check if cell expired as per the current server's time and data table ttl
+                    // Index table should have the same ttl as the data table, hence we might not
+                    // get a value back from index if it has already expired between our rebuild and
+                    // verify
+
+                    // or if cell timestamp is beyond maxlookback
+                    if (isTimestampBeforeTTL(indexTableTTL, currentTime, expectedCell.getTimestamp())) {
+                        continue;
+                    }
+
+                    return false;
+                }
+                if (actualCell.getTimestamp() < ts) {
+                    // Skip older cells since a Phoenix index row is composed of cells with the same timestamp
+                    continue;
+                }
+                // Check all columns
+                if (!CellUtil.matchingValue(actualCell, expectedCell) || actualCell.getTimestamp() != ts) {
+                    if(isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, actualCell.getTimestamp())) {
+                        verificationPhaseResult
+                                .setBeyondMaxLookBackInvalidIndexRowCount(verificationPhaseResult
+                                        .getBeyondMaxLookBackInvalidIndexRowCount()+1);
+                        continue;
+                    }
+                    return false;
+                }
+                cellCount++;
+            }
+        }
+        return cellCount == indexRow.rawCells().length;
+    }
+
+    private void verifyIndexRows(List<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap,
+            IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+        ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
+        Scan indexScan = new Scan();
+        indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
+        scanRanges.initializeScan(indexScan);
+        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+        indexScan.setFilter(skipScanFilter);
+        indexScan.setCacheBlocks(false);
+        try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
+            for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
+                Put dataPut = indexKeyToDataPutMap.get(result.getRow());
+                if (dataPut == null) {
+                    // This should never happen
+                    exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
+                    throw new IOException(exceptionMessage);
+                }
+                if (verifySingleIndexRow(result, dataPut, verificationPhaseResult)) {
+                    verificationPhaseResult.setValidIndexRowCount(verificationPhaseResult.getValidIndexRowCount()+1);
+                } else {
+                    verificationPhaseResult.setInvalidIndexRowCount(verificationPhaseResult.getInvalidIndexRowCount()+1);
+                }
+                perTaskDataKeyToDataPutMap.remove(dataPut.getRow());
+            }
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(indexHTable.getName().toString(), t);
+        }
+        // Check if any expected rows from index(which we didn't get) are already expired due to TTL
+        if (!perTaskDataKeyToDataPutMap.isEmpty()) {
+            Iterator<Entry<byte[], Put>> itr = perTaskDataKeyToDataPutMap.entrySet().iterator();
+            long currentTime = EnvironmentEdgeManager.currentTime();
+            while(itr.hasNext()) {
+                Entry<byte[], Put> entry = itr.next();
+                long ts = getMaxTimestamp(entry.getValue());
+                if (isTimestampBeforeTTL(indexTableTTL, currentTime, ts)) {
+                    itr.remove();
+                    verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount()+1);
+                }
+            }
+        }
+        // Check if any expected rows from index(which we didn't get) are beyond max look back and have been compacted away
+        if (!perTaskDataKeyToDataPutMap.isEmpty()) {
+            for (Entry<byte[], Put> entry : perTaskDataKeyToDataPutMap.entrySet()) {
+                Put put = entry.getValue();
+                long ts = getMaxTimestamp(put);
+                long currentTime = EnvironmentEdgeManager.currentTime();
+                if (isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, ts)) {
+                    verificationPhaseResult.
+                            setBeyondMaxLookBackMissingIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackMissingIndexRowCount() + 1);
+                } else {
+                    verificationPhaseResult.setMissingIndexRowCount(
+                            verificationPhaseResult.getMissingIndexRowCount() + 1);
+                }
+            }
+        }
+    }
+
+    private void addVerifyTask(final List<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap,
+            final IndexToolVerificationResult.PhaseResult verificationPhaseResult) {
+        tasks.add(new Task<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                try {
+                    if (Thread.currentThread().isInterrupted()) {
+                        exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName();
+                        throw new IOException(exceptionMessage);
+                    }
+                    verifyIndexRows(keys, perTaskDataKeyToDataPutMap, verificationPhaseResult);
+                } catch (Exception e) {
+                    throw e;
+                }
+                return Boolean.TRUE;
+            }
+        });
+    }
+
+    private void parallelizeIndexVerify(IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+        int taskCount = (indexKeyToDataPutMap.size() + rowCountPerTask - 1) / rowCountPerTask;
+        tasks = new TaskBatch<>(taskCount);
+
+        List<Map<byte[], Put>> dataPutMapList = new ArrayList<>(taskCount);
+        List<IndexToolVerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
+        List<KeyRange> keys = new ArrayList<>(rowCountPerTask);
+
+        Map<byte[], Put> perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+
+        dataPutMapList.add(perTaskDataKeyToDataPutMap);
+
+        IndexToolVerificationResult.PhaseResult perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
+        verificationPhaseResultList.add(perTaskVerificationPhaseResult);
+
+        for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey()));
+            perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), entry.getValue());
+            if (keys.size() == rowCountPerTask) {
+                addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
+                keys = new ArrayList<>(rowCountPerTask);
+                perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+                dataPutMapList.add(perTaskDataKeyToDataPutMap);
+                perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
+                verificationPhaseResultList.add(perTaskVerificationPhaseResult);
+            }
+        }
+        if (keys.size() > 0) {
+            addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
+        }
+        List<Boolean> taskResultList = null;
+        try {
+            LOGGER.debug("Waiting on index verify tasks to complete...");
+            taskResultList = this.pool.submitUninterruptible(tasks);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+        } catch (EarlyExitFailure e) {
+            throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
+        }
+        for (Boolean result : taskResultList) {
+            if (result == null) {
+                // there was a failure
+                throw new IOException(exceptionMessage);
+            }
+        }
+        for (IndexToolVerificationResult.PhaseResult result : verificationPhaseResultList) {
+            verificationPhaseResult.add(result);
+        }
+    }
+
+    private void verifyIndex() throws IOException {
+        IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult(scan);
+        nextVerificationResult.setScannedDataRowCount(indexKeyToDataPutMap.size());
+        IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
+        // For these options we start with verifying index rows
+        parallelizeIndexVerify(verificationPhaseResult);
+        nextVerificationResult.getBefore().add(verificationPhaseResult);
+        indexKeyToDataPutMap.clear();
+        verificationResult.add(nextVerificationResult);
+    }
+
+    @Override
+    public boolean next(List<Cell> results) throws IOException {
+        Cell lastCell = null;
+        int rowCount = 0;
+        region.startRegionOperation();
+        try {
+            synchronized (innerScanner) {
+                do {
+                    List<Cell> row = new ArrayList<>();
+                    hasMore = innerScanner.nextRaw(row);
+                    if (!row.isEmpty()) {
+                        lastCell = row.get(0);
+                        Put put = null;
+                        for (Cell cell : row) {
+                            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                                if (put == null) {
+                                    put = new Put(CellUtil.cloneRow(cell));
+                                }
+                                put.add(cell);
+                            } else {
+                                throw new DoNotRetryIOException("Scan without raw found a deleted cell");
+                            }
+                        }
+                        rowCount++;
+                        indexKeyToDataPutMap
+                                .put(getIndexRowKey(indexMaintainer, put), put);
+                    }
+                } while (hasMore && rowCount < pageSizeInRows);
+                verifyIndex();
+            }
+        } catch (IOException e) {
+            LOGGER.error(String.format("IOException during rebuilding: %s", Throwables.getStackTraceAsString(e)));
+            throw e;
+        } finally {
+            region.closeRegionOperation();
+            indexKeyToDataPutMap.clear();
+        }
+        byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
+        final Cell aggKeyValue;
+        if (lastCell == null) {
+            aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+                    SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes,0, rowCountBytes.length);
+        } else {
+            aggKeyValue = KeyValueUtil.newKeyValue(CellUtil.cloneRow(lastCell), SINGLE_COLUMN_FAMILY,
+                    SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+        }
+        results.add(aggKeyValue);
+        return hasMore;
+    }
+
+    @Override
+    public long getMaxResultSize() {
+        return scan.getMaxResultSize();
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index b9cdafd..2c93b3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -87,6 +87,7 @@ import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.expression.aggregator.Aggregators;
 import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.exception.IndexWriteException;
@@ -101,6 +102,7 @@ import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -120,7 +122,7 @@ import org.apache.phoenix.schema.stats.NoOpStatisticsCollector;
 import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
 import org.apache.phoenix.schema.stats.StatisticsCollector;
 import org.apache.phoenix.schema.stats.StatisticsCollectorFactory;
-import org.apache.phoenix.schema.stats.StatisticsScanner;
+
 import org.apache.phoenix.schema.stats.StatsCollectionDisabledOnServerException;
 import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
@@ -389,8 +391,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     
     @Override
     protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException {
-        RegionCoprocessorEnvironment env = c.getEnvironment();
-        Region region = env.getRegion();
+        final RegionCoprocessorEnvironment env = c.getEnvironment();
+        final Region region = env.getRegion();
         long ts = scan.getTimeRange().getMax();
         boolean localIndexScan = ScanUtil.isLocalIndex(scan);
         if (ScanUtil.isAnalyzeTable(scan)) {
@@ -408,7 +410,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 return collectStats(s, statsCollector, region, scan, env.getConfiguration());
             }
         } else if (ScanUtil.isIndexRebuild(scan)) {
-            return rebuildIndices(s, region, scan, env);
+            return User.runAsLoginUser(new PrivilegedExceptionAction<RegionScanner>() {
+                @Override
+                public RegionScanner run() throws Exception {
+                    return rebuildIndices(s, region, scan, env);
+                }
+            });
         }
 
         PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
@@ -1062,12 +1069,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
                                          final RegionCoprocessorEnvironment env) throws IOException {
+        boolean oldCoproc = region.getTableDesc().hasCoprocessor(Indexer.class.getCanonicalName());
+        byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
+        IndexTool.IndexVerifyType verifyType = (valueBytes != null) ?
+                IndexTool.IndexVerifyType.fromValue(valueBytes):IndexTool.IndexVerifyType.NONE;
+        if(oldCoproc  && verifyType == IndexTool.IndexVerifyType.ONLY) {
+            return new IndexerRegionScanner(innerScanner, region, scan, env);
+        }
         if (!scan.isRaw()) {
             Scan rawScan = new Scan(scan);
             rawScan.setRaw(true);
             rawScan.setMaxVersions();
             rawScan.getFamilyMap().clear();
             rawScan.setFilter(null);
+            rawScan.setCacheBlocks(false);
             for (byte[] family : scan.getFamilyMap().keySet()) {
                 rawScan.addFamily(family);
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 6246697..40fc883 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -27,7 +27,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -66,6 +65,7 @@ import org.apache.htrace.TraceScope;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner;
 import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.LockManager.RowLock;
@@ -78,7 +78,6 @@ import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -88,7 +87,6 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
-import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
@@ -648,7 +646,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
                 IndexMaintainer indexMaintainer = pair.getFirst();
                 HTableInterfaceReference hTableInterfaceReference = pair.getSecond();
                 if (nextDataRowState != null) {
-                    ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState);
+                    ValueGetter nextDataRowVG = new GlobalIndexRegionScanner.SimpleValueGetter(nextDataRowState);
                     Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
                             nextDataRowVG, rowKeyPtr, ts, null, null);
                     if (indexPut == null) {
@@ -666,7 +664,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
                             new Pair<Mutation, byte[]>(indexPut, rowKeyPtr.get()));
                     // Delete the current index row if the new index key is different than the current one
                     if (currentDataRowState != null) {
-                        ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState);
+                        ValueGetter currentDataRowVG = new GlobalIndexRegionScanner.SimpleValueGetter(currentDataRowState);
                         byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
                                 null, null, HConstants.LATEST_TIMESTAMP);
                         if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
@@ -677,7 +675,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
                         }
                     }
                 } else if (currentDataRowState != null) {
-                    ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState);
+                    ValueGetter currentDataRowVG = new GlobalIndexRegionScanner.SimpleValueGetter(currentDataRowState);
                     byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
                             null, null, HConstants.LATEST_TIMESTAMP);
                     Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
index b736787..4baad43 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
@@ -35,5 +35,5 @@ public enum PhoenixIndexToolJobCounters {
     AFTER_REBUILD_MISSING_INDEX_ROW_COUNT,
     AFTER_REBUILD_INVALID_INDEX_ROW_COUNT,
     AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT,
-    AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT
+    AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
index 5df041d..868e6ad 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner;
 import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
 import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
@@ -269,7 +270,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
 
     private void initializeGlobalMockitoSetup() throws IOException {
         //setup
-        when(rebuildScanner.getIndexRowKey(put)).thenCallRealMethod();
+        when(GlobalIndexRegionScanner.getIndexRowKey(indexMaintainer, put)).thenCallRealMethod();
         when(rebuildScanner.prepareIndexMutations(put, delete)).thenCallRealMethod();
         when(rebuildScanner.verifySingleIndexRow(Matchers.<Result>any(),
                 Matchers.<IndexToolVerificationResult.PhaseResult>any())).thenCallRealMethod();