You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/01/13 02:01:30 UTC

[phoenix] branch 4.x-HBase-1.5 updated: PHOENIX-5658 IndexTool to verify index rows inline

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
     new c47a145  PHOENIX-5658 IndexTool to verify index rows inline
c47a145 is described below

commit c47a14531699f9a113687f6836a92d9a73357d2f
Author: Kadir <ko...@salesforce.com>
AuthorDate: Sun Jan 12 17:55:11 2020 -0800

    PHOENIX-5658 IndexTool to verify index rows inline
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 168 +++++-
 .../phoenix/compile/ServerBuildIndexCompiler.java  |   5 -
 .../coprocessor/BaseScannerRegionObserver.java     |   3 +
 .../coprocessor/IndexRebuildRegionScanner.java     | 629 +++++++++++++++++++++
 .../UngroupedAggregateRegionObserver.java          | 236 +-------
 .../phoenix/hbase/index/IndexRegionObserver.java   |  17 +-
 .../PhoenixServerBuildIndexInputFormat.java        |   3 +-
 .../apache/phoenix/mapreduce/index/IndexTool.java  |  98 +++-
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  14 +
 .../apache/phoenix/query/QueryServicesOptions.java |   2 +-
 10 files changed, 926 insertions(+), 249 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index ff1b354..0edc3c4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -38,18 +38,26 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.index.GlobalIndexCheckerIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -369,6 +377,139 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName)
+            throws Exception {
+        byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
+        byte[] dataTableFullNameBytes = Bytes.toBytes(dataTableFullName);
+        Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .getTable(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+        Scan scan = new Scan();
+        ResultScanner scanner = hIndexTable.getScanner(scan);
+        boolean dataTableNameCheck = false;
+        boolean indexTableNameCheck = false;
+        Cell errorMessageCell = null;
+        Result result = scanner.next();
+        if (result != null) {
+            for (Cell cell : result.rawCells()) {
+                assertTrue(Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+                        IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, 0,
+                        IndexTool.OUTPUT_TABLE_COLUMN_FAMILY.length) == 0);
+                if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                        IndexTool.DATA_TABLE_NAME_BYTES, 0, IndexTool.DATA_TABLE_NAME_BYTES.length) == 0) {
+                    dataTableNameCheck = true;
+                    assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                            dataTableFullNameBytes, 0, dataTableFullNameBytes.length) == 0);
+                } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                        IndexTool.INDEX_TABLE_NAME_BYTES, 0, IndexTool.INDEX_TABLE_NAME_BYTES.length) == 0) {
+                    indexTableNameCheck = true;
+                    assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                            indexTableFullNameBytes, 0, indexTableFullNameBytes.length) == 0);
+                } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                        IndexTool.ERROR_MESSAGE_BYTES, 0, IndexTool.ERROR_MESSAGE_BYTES.length) == 0) {
+                    errorMessageCell = cell;
+                }
+            }
+        }
+        assertTrue(dataTableNameCheck && indexTableNameCheck && errorMessageCell != null);
+        return errorMessageCell;
+    }
+
+    @Test
+    public void testIndexToolVerifyAfterOption() throws Exception {
+        // This test is for building non-transactional global indexes with direct api
+        if (localIndex || transactional || !directApi || useSnapshot) {
+            return;
+        }
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String schemaName = generateUniqueName();
+            String dataTableName = generateUniqueName();
+            String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+            String indexTableName = generateUniqueName();
+            String viewName = generateUniqueName();
+            String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+                    + tableDDLOptions);
+            conn.commit();
+            conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
+            conn.commit();
+            // Insert a row
+            conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
+            conn.commit();
+            // Configure IndexRegionObserver to fail the first write phase. This should not
+            // lead to any change on index and thus the index verify during index rebuild should fail
+            IndexRegionObserver.setIgnoreIndexRebuildForTesting(true);
+            conn.createStatement().execute(String.format(
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
+            // Run the index MR job and verify that the index table rebuild fails
+            runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+                    null, -1, IndexTool.IndexVerifyType.AFTER);
+            // The index tool output table should report that there is a missing index row
+            Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName);
+            byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - Expected: 1 Actual: 0");
+            assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                    expectedValueBytes, 0, expectedValueBytes.length) == 0);
+            IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
+            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+            TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+            admin.disableTable(indexToolOutputTable);
+            admin.deleteTable(indexToolOutputTable);
+        }
+    }
+
+    @Test
+    public void testIndexToolOnlyVerifyOption() throws Exception {
+        // This test is for building non-transactional global indexes with direct api
+        if (localIndex || transactional || !directApi || useSnapshot) {
+            return;
+        }
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
+            // Insert a row
+            conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
+            conn.commit();
+            conn.createStatement().execute(String.format(
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", indexTableName, dataTableFullName));
+            // Run the index MR job to only verify that each data table row has a corresponding index row
+            // IndexTool will go through each data table row and record the mismatches in the output table
+            // called PHOENIX_INDEX_TOOL
+            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
+            Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
+            byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - Expected: 1 Actual: 0");
+            assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                    expectedValueBytes, 0, expectedValueBytes.length) == 0);
+            // Delete the output table for the next test
+            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+            TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+            admin.disableTable(indexToolOutputTable);
+            admin.deleteTable(indexToolOutputTable);
+            // Run the index tool to populate the index while verifying rows
+            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.AFTER);
+            // Corrupt one cell by writing directly into the index table
+            conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix', 1, 'B')");
+            conn.commit();
+            // Run the index tool using the only-verify option to detect this mismatch between the data and index table
+            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
+            cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
+            expectedValueBytes = Bytes.toBytes("Not matching value for 0:0:CODE E:A A:B");
+            assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                    expectedValueBytes, 0, expectedValueBytes.length) == 0);
+            admin.disableTable(indexToolOutputTable);
+            admin.deleteTable(indexToolOutputTable);
+        }
+    }
+
     @Test
     public void testIndexToolWithTenantId() throws Exception {
         if (!useTenantId) { return;}
@@ -642,9 +783,10 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             actualExplainPlan.contains(expectedExplainPlan));
     }
 
-    public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
-            String dataTable, String indxTable, String tenantId) {
-        final List<String> args = Lists.newArrayList();
+    private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
+                                            String dataTable, String indxTable, String tenantId,
+                                            IndexTool.IndexVerifyType verifyType) {
+        List<String> args = Lists.newArrayList();
         if (schemaName != null) {
             args.add("-s");
             args.add(schemaName);
@@ -656,6 +798,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         if (directApi) {
             args.add("-direct");
         }
+        args.add("-v" + verifyType.getValue()); // verify index rows inline
+
         // Need to run this job in foreground for the test to be deterministic
         args.add("-runfg");
         if (useSnapshot) {
@@ -669,6 +813,12 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
 
         args.add("-op");
         args.add("/tmp/" + UUID.randomUUID().toString());
+        return args;
+    }
+
+    public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
+            String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType) {
+        List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable, tenantId, verifyType);
         return args.toArray(new String[0]);
     }
 
@@ -714,12 +864,20 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     public static IndexTool runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
             String dataTableName, String indexTableName, String tenantId, int expectedStatus,
             String... additionalArgs) throws Exception {
+        return runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId, expectedStatus,
+                IndexTool.IndexVerifyType.NONE, additionalArgs);
+    }
+
+    public static IndexTool runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
+                                         String dataTableName, String indexTableName, String tenantId,
+                                         int expectedStatus, IndexTool.IndexVerifyType verifyType,
+                                         String... additionalArgs) throws Exception {
         IndexTool indexingTool = new IndexTool();
         Configuration conf = new Configuration(getUtility().getConfiguration());
         conf.set(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
         indexingTool.setConf(conf);
-        final String[] cmdArgs =
-                getArgValues(directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId);
+        final String[] cmdArgs = getArgValues(directApi, useSnapshot, schemaName, dataTableName,
+                indexTableName, tenantId, verifyType);
         List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
         cmdArgList.addAll(Arrays.asList(additionalArgs));
         int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()]));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 7d1c1b4..1b482aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.compile;
 
 import java.sql.SQLException;
 import java.util.Collections;
-import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Scan;
@@ -36,11 +35,7 @@ import org.apache.phoenix.schema.*;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.StringUtil;
-
-import com.google.common.collect.Lists;
 
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 71e7546..6e0a1e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -86,6 +86,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
     // The number of index rows to be rebuild in one RPC call
     public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
+    // Index verification type done by the index tool
+    public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType";
+
     /* 
     * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
     * Needed for backward compatibility purposes. TODO: get rid of this in next major release.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
new file mode 100644
index 0000000..142871b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.parallel.TaskRunner;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
+import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+
+public class IndexRebuildRegionScanner extends BaseRegionScanner {
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
+    public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
+    private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
+    public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max";
+    private static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048;
+    private long pageSizeInRows = Long.MAX_VALUE;
+    private int rowCountPerTask;
+    private boolean hasMore;
+    private final int maxBatchSize;
+    private UngroupedAggregateRegionObserver.MutationList mutations;
+    private final long maxBatchSizeBytes;
+    private final long blockingMemstoreSize;
+    private final byte[] clientVersionBytes;
+    private byte[] indexMetaData;
+    private boolean useProto = true;
+    private Scan scan;
+    private RegionScanner innerScanner;
+    private Region region;
+    private IndexMaintainer indexMaintainer;
+    private byte[] indexRowKey = null;
+    private Table indexHTable = null;
+    private Table outputHTable = null;
+    private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
+    private boolean verify = false;
+    private boolean onlyVerify = false;
+    private Map<byte[], Put> indexKeyToDataPutMap;
+    private TaskRunner pool;
+    private TaskBatch<Boolean> tasks;
+    private String exceptionMessage;
+    private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
+    private RegionCoprocessorEnvironment env;
+    private HTableFactory hTableFactory;
+
+    IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
+                               final RegionCoprocessorEnvironment env,
+                               UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
+        super(innerScanner);
+        final Configuration config = env.getConfiguration();
+        if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
+            pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
+                    QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+        }
+        maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+        mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
+        maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+                QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+        blockingMemstoreSize = UngroupedAggregateRegionObserver.getBlockingMemstoreSize(region, config);
+        clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+        indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+        if (indexMetaData == null) {
+            useProto = false;
+            indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+        }
+        if (!scan.isRaw()) {
+            List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+            indexMaintainer = maintainers.get(0);
+        }
+        this.scan = scan;
+        this.innerScanner = innerScanner;
+        this.region = region;
+        this.env = env;
+        this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
+        indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
+        byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
+        if (valueBytes != null) {
+            verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
+            if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.ONLY) {
+                verify = true;
+                if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+                    onlyVerify = true;
+                }
+                hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
+                indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+                outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
+                indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+                pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
+                        new ThreadPoolBuilder("IndexVerify",
+                                env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
+                                DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
+                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
+                tasks = new TaskBatch<>(DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS);
+                rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
+                        DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
+            }
+        }
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() { return hasMore; }
+
+    @Override
+    public void close() throws IOException {
+        innerScanner.close();
+        if (verify) {
+            this.pool.stop("IndexRebuildRegionScanner is closing");
+            indexHTable.close();
+            outputHTable.close();
+        }
+    }
+
+    private void setMutationAttributes(Mutation m, byte[] uuidValue) {
+        m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
+        m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+        m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+        m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
+        // Since we're replaying existing mutations, it makes no sense to write them to the wal
+        m.setDurability(Durability.SKIP_WAL);
+    }
+
+    private Delete generateDeleteMarkers(List<Cell> row) {
+        Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
+        if (row.size() == allColumns.size() + 1) {
+            // We have all the columns for the index table plus the empty column. So, no delete marker is needed
+            return null;
+        }
+        Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(row.size());
+        long ts = 0;
+        for (Cell cell : row) {
+            includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
+            if (ts < cell.getTimestamp()) {
+                ts = cell.getTimestamp();
+            }
+        }
+        byte[] rowKey;
+        Delete del = null;
+        for (ColumnReference column : allColumns) {
+            if (!includedColumns.contains(column)) {
+                if (del == null) {
+                    Cell cell = row.get(0);
+                    rowKey = CellUtil.cloneRow(cell);
+                    del = new Delete(rowKey);
+                }
+                del.addColumns(column.getFamily(), column.getQualifier(), ts);
+            }
+        }
+        return del;
+    }
+
+    private void addToBeVerifiedIndexRows() throws IOException {
+        for (Mutation mutation : mutations) {
+            if (mutation instanceof Put) {
+                indexKeyToDataPutMap.put(getIndexRowKey((Put)mutation), (Put)mutation);
+            }
+        }
+    }
+
+    private byte[] commitIfReady(byte[] uuidValue) throws IOException {
+        if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+            ungroupedAggregateRegionObserver.checkForRegionClosing();
+            ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+            uuidValue = ServerCacheClient.generateId();
+            if (verify) {
+                addToBeVerifiedIndexRows();
+            }
+            mutations.clear();
+        }
+        return uuidValue;
+    }
+
+    private class SimpleValueGetter implements ValueGetter {
+        final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
+        final Put put;
+        SimpleValueGetter (final Put put) {
+            this.put = put;
+        }
+        @Override
+        public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
+            List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
+            if (cellList == null || cellList.isEmpty()) {
+                return null;
+            }
+            Cell cell = cellList.get(0);
+            valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+            return valuePtr;
+        }
+
+        @Override
+        public byte[] getRowKey() {
+            return put.getRow();
+        }
+
+    }
+
+    private byte[] getIndexRowKey(final Put dataRow) throws IOException {
+        ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+        byte[] builtIndexRowKey = indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()),
+                null, null, HConstants.LATEST_TIMESTAMP);
+        return builtIndexRowKey;
+    }
+
+    private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException {
+        byte[] builtIndexRowKey = getIndexRowKey(put);
+        if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
+                indexRowKey, 0, indexRowKey.length) != 0) {
+            return false;
+        }
+        return true;
+    }
+
+    private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+                                           String errorMsg) {
+        logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
+                errorMsg, null, null);
+
+    }
+
+    private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+                                           String errorMsg, byte[] expectedValue,  byte[] actualValue) {
+        final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
+        final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
+        final int PREFIX_LENGTH = 3;
+        final int TOTAL_PREFIX_LENGTH = 6;
+
+        try {
+            int longLength = Long.SIZE / Byte.SIZE;
+            byte[] rowKey;
+            if (dataRowKey != null) {
+                rowKey = new byte[longLength + dataRowKey.length];
+                Bytes.putLong(rowKey, 0, scan.getTimeRange().getMax());
+                Bytes.putBytes(rowKey, longLength, dataRowKey, 0, dataRowKey.length);
+            } else {
+                rowKey = new byte[longLength];
+                Bytes.putLong(rowKey, 0, scan.getTimeRange().getMax());
+            }
+            Put put = new Put(rowKey);
+            long scanMaxTs = scan.getTimeRange().getMax();
+            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES,
+                    scanMaxTs, region.getRegionInfo().getTable().getName());
+            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES,
+                    scanMaxTs, indexMaintainer.getIndexTableName());
+            if (dataRowKey != null) {
+                put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
+                        scanMaxTs, Bytes.toBytes(dataRowTs));
+            }
+            if (indexRowKey != null) {
+                put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
+                        scanMaxTs, indexRowKey);
+                put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
+                        scanMaxTs, Bytes.toBytes(indexRowTs));
+            }
+            byte[] errorMessageBytes;
+            if (expectedValue != null) {
+                errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
+                        TOTAL_PREFIX_LENGTH];
+                Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
+                int length = errorMsg.length();
+                Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+                length += PREFIX_LENGTH;
+                Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
+                length += expectedValue.length;
+                Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+                length += PREFIX_LENGTH;
+                Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
+
+            }
+            else {
+                errorMessageBytes = Bytes.toBytes(errorMsg);
+            }
+            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
+            outputHTable.put(put);
+        } catch (IOException e) {
+            exceptionMessage = "LogToIndexToolOutputTable failed " + e;
+        }
+    }
+
+    private long getMaxTimestamp(Result result) {
+        long ts = 0;
+        for (Cell cell : result.rawCells()) {
+            if (ts < cell.getTimestamp()) {
+                ts = cell.getTimestamp();
+            }
+        }
+        return ts;
+    }
+
+    private void verifySingleIndexRow(Result indexRow, final Put dataRow) throws IOException {
+        ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+        long ts = 0;
+        for (List<Cell> cells : dataRow.getFamilyCellMap().values()) {
+            if (cells == null) {
+                break;
+            }
+            for (Cell cell : cells) {
+                if (ts < cell.getTimestamp()) {
+                    ts = cell.getTimestamp();
+                }
+            }
+        }
+        Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null);
+        if (indexPut == null) {
+            // This means the index row does not have any covered columns. We just need to check if the index row
+            // has only one cell (which is the empty column cell)
+            if (indexRow.rawCells().length == 1) {
+                return;
+            }
+            String errorMsg = "Expected to find only empty column cell but got "
+                    + indexRow.rawCells().length;
+            logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+            if (onlyVerify) {
+                return;
+            }
+            exceptionMessage = "Index verify failed - " + errorMsg + indexHTable.getName();
+            throw new IOException(exceptionMessage);
+        }
+        else {
+            // Remove the empty column prepared by Index codec as we need to change its value
+            removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                    indexMaintainer.getEmptyKeyValueQualifier());
+        }
+        // Add the empty column
+        indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
+        int cellCount = 0;
+        for (List<Cell> cells : indexPut.getFamilyCellMap().values()) {
+            if (cells == null) {
+                break;
+            }
+            for (Cell expectedCell : cells) {
+                byte[] family = CellUtil.cloneFamily(expectedCell);
+                byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
+                Cell actualCell = indexRow.getColumnLatestCell(family, qualifier);
+                if (actualCell == null) {
+                    String errorMsg = " Missing cell " + Bytes.toString(family) + ":" +
+                            Bytes.toString(qualifier);
+                    logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+                    if (onlyVerify) {
+                        return;
+                    }
+                    exceptionMessage = "Index verify failed - Missing cell " + indexHTable.getName();
+                    throw new IOException(exceptionMessage);
+                }
+                // Check all columns
+                if (!CellUtil.matchingValue(actualCell, expectedCell)) {
+                    String errorMsg = "Not matching value for " + Bytes.toString(family) + ":" +
+                            Bytes.toString(qualifier);
+                    logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
+                            errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
+                    if (onlyVerify) {
+                        return;
+                    }
+                    exceptionMessage = "Index verify failed - Not matching cell value - " + indexHTable.getName();
+                    throw new IOException(exceptionMessage);
+                }
+                cellCount++;
+            }
+        }
+        if (cellCount != indexRow.rawCells().length) {
+            String errorMsg = "Expected to find " + cellCount + " cells but got "
+                    + indexRow.rawCells().length + " cells";
+            logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+            if (!onlyVerify) {
+                exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
+                throw new IOException(exceptionMessage);
+            }
+        }
+    }
+
+    private void verifyIndexRows(ArrayList<KeyRange> keys) throws IOException {
+        int expectedRowCount = keys.size();
+        ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
+        Scan indexScan = new Scan();
+        indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
+        scanRanges.initializeScan(indexScan);
+        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+        indexScan.setFilter(skipScanFilter);
+        int rowCount = 0;
+        try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
+            for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
+                Put dataPut = indexKeyToDataPutMap.get(result.getRow());
+                if (dataPut == null) {
+                    exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
+                    String errorMsg = "Missing data row";
+                    logToIndexToolOutputTable(null, result.getRow(), 0, getMaxTimestamp(result), errorMsg);
+                    if (!onlyVerify) {
+                        throw new IOException(exceptionMessage);
+                    }
+                }
+                verifySingleIndexRow(result, dataPut);
+                rowCount++;
+            }
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(indexHTable.getName().toString(), t);
+        }
+        if (rowCount != expectedRowCount) {
+            String errorMsg = "Missing index rows - Expected: " + expectedRowCount +
+                    " Actual: " + rowCount;
+                    exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
+            logToIndexToolOutputTable(null, null, 0, 0, errorMsg);
+            if (!onlyVerify) {
+                throw new IOException(exceptionMessage);
+            }
+        }
+    }
+
+    private void addVerifyTask(final ArrayList<KeyRange> keys) {
+        tasks.add(new Task<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                try {
+                    if (Thread.currentThread().isInterrupted()) {
+                        exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName();
+                        throw new IOException(exceptionMessage);
+                    }
+                    verifyIndexRows(keys);
+                } catch (Exception e) {
+                    throw e;
+                }
+                return Boolean.TRUE;
+            }
+        });
+    }
+
+    @Override
+    public boolean next(List<Cell> results) throws IOException {
+        int rowCount = 0;
+        region.startRegionOperation();
+        try {
+            byte[] uuidValue = ServerCacheClient.generateId();
+            synchronized (innerScanner) {
+                do {
+                    List<Cell> row = new ArrayList<Cell>();
+                    hasMore = innerScanner.nextRaw(row);
+                    if (!row.isEmpty()) {
+                        Put put = null;
+                        Delete del = null;
+                        for (Cell cell : row) {
+                            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                                if (put == null) {
+                                    put = new Put(CellUtil.cloneRow(cell));
+                                    setMutationAttributes(put, uuidValue);
+                                    mutations.add(put);
+                                }
+                                put.add(cell);
+                            } else {
+                                if (del == null) {
+                                    del = new Delete(CellUtil.cloneRow(cell));
+                                    setMutationAttributes(del, uuidValue);
+                                    mutations.add(del);
+                                }
+                                del.addDeleteMarker(cell);
+                            }
+                        }
+                        if (onlyVerify) {
+                            rowCount++;
+                            continue;
+                        }
+                        uuidValue = commitIfReady(uuidValue);
+                        if (!scan.isRaw()) {
+                            Delete deleteMarkers = generateDeleteMarkers(row);
+                            if (deleteMarkers != null) {
+                                setMutationAttributes(deleteMarkers, uuidValue);
+                                mutations.add(deleteMarkers);
+                                uuidValue = commitIfReady(uuidValue);
+                            }
+                        }
+                        if (indexRowKey != null) {
+                            // GlobalIndexChecker passed the index row key. This is to build a single index row.
+                            // Check if the data table row we have just scanned matches with the index row key.
+                            // If not, there is no need to build the index row from this data table row,
+                            // and just return zero row count.
+                            if (checkIndexRow(indexRowKey, put)) {
+                                rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
+                            }
+                            else {
+                                rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
+                            }
+                            break;
+                        }
+                        rowCount++;
+                    }
+
+                } while (hasMore && rowCount < pageSizeInRows);
+                if (!mutations.isEmpty() && !onlyVerify) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+                    if (verify) {
+                        addToBeVerifiedIndexRows();
+                    }
+                }
+            }
+        } catch (IOException e) {
+            hasMore = false;
+            LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
+            throw e;
+        } finally {
+            region.closeRegionOperation();
+        }
+        if (verify) {
+            if (onlyVerify) {
+                addToBeVerifiedIndexRows();
+            }
+            ArrayList<KeyRange> keys = new ArrayList<>(rowCountPerTask);
+            for (byte[] key : indexKeyToDataPutMap.keySet()) {
+                keys.add(PVarbinary.INSTANCE.getKeyRange(key));
+                if (keys.size() == rowCountPerTask) {
+                    addVerifyTask(keys);
+                    keys = new ArrayList<>(rowCountPerTask);
+                }
+            }
+            if (keys.size() > 0) {
+                addVerifyTask(keys);
+            }
+            List<Boolean> taskResultList = null;
+            try {
+                LOGGER.debug("Waiting on index verify tasks to complete...");
+                taskResultList = this.pool.submitUninterruptible(tasks);
+            } catch (ExecutionException e) {
+                throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+            } catch (EarlyExitFailure e) {
+                throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
+            }
+            finally {
+                indexKeyToDataPutMap.clear();
+                tasks.getTasks().clear();
+            }
+            for (Boolean result : taskResultList) {
+                if (result == null) {
+                    // there was a failure
+                    throw new IOException(exceptionMessage);
+                }
+            }
+        }
+        byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
+        final Cell aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+                SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+        results.add(aggKeyValue);
+        return hasMore;
+    }
+
+    @Override
+    public long getMaxResultSize() {
+        return scan.getMaxResultSize();
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 347dd01..b173045 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -21,7 +21,6 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
-import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
@@ -38,7 +37,6 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
@@ -46,7 +44,6 @@ import javax.annotation.concurrent.GuardedBy;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -57,7 +54,6 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -81,7 +77,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
@@ -98,7 +93,6 @@ import org.apache.phoenix.hbase.index.exception.IndexWriteException;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
@@ -247,7 +241,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator());
     }
 
-    private void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException {
+    public void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException {
         try {
             commitBatch(region, localRegionMutations, blockingMemstoreSize);
         } catch (IOException e) {
@@ -275,7 +269,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
       // flush happen which decrease the memstore size and then writes allowed on the region.
       for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
           try {
-              checkForRegionClosingOrSplitting();
+              checkForRegionClosing();
               Thread.sleep(100);
           } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
@@ -324,7 +318,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
      * a high chance that flush might not proceed and memstore won't be freed up.
      * @throws IOException
      */
-    private void checkForRegionClosingOrSplitting() throws IOException {
+    public void checkForRegionClosing() throws IOException {
         synchronized (lock) {
             if(isRegionClosingOrSplitting) {
                 lock.notifyAll();
@@ -382,7 +376,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
     }
 
-   private long getBlockingMemstoreSize(Region region, Configuration conf) {
+    public static long getBlockingMemstoreSize(Region region, Configuration conf) {
        long flushSize = region.getTableDesc().getMemStoreFlushSize();
 
        if (flushSize <= 0) {
@@ -414,7 +408,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 return collectStats(s, statsCollector, region, scan, env.getConfiguration());
             }
         } else if (ScanUtil.isIndexRebuild(scan)) {
-            return rebuildIndices(s, region, scan, env.getConfiguration());
+            return rebuildIndices(s, region, scan, env);
         }
 
         PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
@@ -1061,226 +1055,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
     }
 
-    private class IndexRebuildRegionScanner extends BaseRegionScanner {
-        private long pageSizeInRows = Long.MAX_VALUE;
-        private boolean hasMore;
-        private final int maxBatchSize;
-        private MutationList mutations;
-        private final long maxBatchSizeBytes;
-        private final long blockingMemstoreSize;
-        private final byte[] clientVersionBytes;
-        private List<Cell> results = new ArrayList<Cell>();
-        private byte[] indexMetaData;
-        private boolean useProto = true;
-        private Scan scan;
-        private RegionScanner innerScanner;
-        private Region region;
-        private IndexMaintainer indexMaintainer;
-        private byte[] indexRowKey = null;
-
-        IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
-                                   final Configuration config) {
-            super(innerScanner);
-            if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
-                pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
-                        QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
-            }
-
-            maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-            mutations = new MutationList(maxBatchSize);
-            maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
-                    QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
-            blockingMemstoreSize = getBlockingMemstoreSize(region, config);
-            clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
-            indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
-            if (indexMetaData == null) {
-                useProto = false;
-                indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
-            }
-            if (!scan.isRaw()) {
-                List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
-                indexMaintainer = maintainers.get(0);
-            }
-            this.scan = scan;
-            this.innerScanner = innerScanner;
-            this.region = region;
-            indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
-        }
-
-        @Override
-        public HRegionInfo getRegionInfo() {
-            return region.getRegionInfo();
-        }
-
-        @Override
-        public boolean isFilterDone() { return hasMore; }
-
-        @Override
-        public void close() throws IOException { innerScanner.close(); }
-
-        private void setMutationAttributes(Mutation m, byte[] uuidValue) {
-            m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
-            m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-            m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                    BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-            m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
-            // Since we're replaying existing mutations, it makes no sense to write them to the wal
-            m.setDurability(Durability.SKIP_WAL);
-        }
-
-        private Delete generateDeleteMarkers(List<Cell> row) {
-            Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
-            if (row.size() == allColumns.size() + 1) {
-                // We have all the columns for the index table plus the empty column. So, no delete marker is needed
-                return null;
-            }
-            Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(row.size());
-            long ts = 0;
-            for (Cell cell : row) {
-                includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
-                if (ts < cell.getTimestamp()) {
-                    ts = cell.getTimestamp();
-                }
-            }
-            byte[] rowKey;
-            Delete del = null;
-            for (ColumnReference column : allColumns) {
-                if (!includedColumns.contains(column)) {
-                    if (del == null) {
-                        Cell cell = row.get(0);
-                        rowKey = CellUtil.cloneRow(cell);
-                        del = new Delete(rowKey);
-                    }
-                    del.addColumns(column.getFamily(), column.getQualifier(), ts);
-                }
-            }
-            return del;
-        }
-
-        private byte[] commitIfReady(byte[] uuidValue) throws IOException {
-            if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                checkForRegionClosingOrSplitting();
-                commitBatchWithRetries(region, mutations, blockingMemstoreSize);
-                uuidValue = ServerCacheClient.generateId();
-                mutations.clear();
-            }
-            return uuidValue;
-        }
-
-        private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException {
-            ValueGetter getter = new ValueGetter() {
-                final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
-
-                @Override
-                public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
-                    List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
-                    if (cellList == null || cellList.isEmpty()) {
-                        return null;
-                    }
-                    Cell cell = cellList.get(0);
-                    valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-                    return valuePtr;
-                }
-
-                @Override
-                public byte[] getRowKey() {
-                    return put.getRow();
-                }
-            };
-            byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(put.getRow()),
-                    null, null, HConstants.LATEST_TIMESTAMP);
-            if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
-                    indexRowKey, 0, indexRowKey.length) != 0) {
-                return false;
-            }
-            return true;
-        }
-
-        @Override
-        public boolean next(List<Cell> results) throws IOException {
-            int rowCount = 0;
-            region.startRegionOperation();
-            try {
-                byte[] uuidValue = ServerCacheClient.generateId();
-                synchronized (innerScanner) {
-                    do {
-                        List<Cell> row = new ArrayList<Cell>();
-                        hasMore = innerScanner.nextRaw(row);
-                        if (!row.isEmpty()) {
-                            Put put = null;
-                            Delete del = null;
-                            for (Cell cell : row) {
-                                if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
-                                    if (put == null) {
-                                        put = new Put(CellUtil.cloneRow(cell));
-                                        setMutationAttributes(put, uuidValue);
-                                        mutations.add(put);
-                                    }
-                                    put.add(cell);
-                                } else {
-                                    if (del == null) {
-                                        del = new Delete(CellUtil.cloneRow(cell));
-                                        setMutationAttributes(del, uuidValue);
-                                        mutations.add(del);
-                                    }
-                                    del.addDeleteMarker(cell);
-                                }
-                            }
-                            uuidValue = commitIfReady(uuidValue);
-                            if (!scan.isRaw()) {
-                                Delete deleteMarkers = generateDeleteMarkers(row);
-                                if (deleteMarkers != null) {
-                                    setMutationAttributes(deleteMarkers, uuidValue);
-                                    mutations.add(deleteMarkers);
-                                    uuidValue = commitIfReady(uuidValue);
-                                }
-                            }
-                            if (indexRowKey != null) {
-                                // GlobalIndexChecker passed the index row key. This is to build a single index row.
-                                // Check if the data table row we have just scanned matches with the index row key.
-                                // If not, there is no need to build the index row from this data table row,
-                                // and just return zero row count.
-                                if (checkIndexRow(indexRowKey, put)) {
-                                    rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
-                                }
-                                else {
-                                    rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
-                                }
-                                break;
-                            }
-                            rowCount++;
-                        }
-
-                    } while (hasMore && rowCount < pageSizeInRows);
-                    if (!mutations.isEmpty()) {
-                        checkForRegionClosingOrSplitting();
-                        commitBatchWithRetries(region, mutations, blockingMemstoreSize);
-                    }
-                }
-            } catch (IOException e) {
-                hasMore = false;
-                LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
-                throw e;
-            } finally {
-                region.closeRegionOperation();
-            }
-            byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
-            final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
-                    SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
-            results.add(aggKeyValue);
-            return hasMore;
-        }
-
-        @Override
-        public long getMaxResultSize() {
-            return scan.getMaxResultSize();
-        }
-    }
-
     private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
-                                         final Configuration config) throws IOException {
+                                         final RegionCoprocessorEnvironment env) throws IOException {
 
-        RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, config);
+        RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
         return scanner;
     }
     
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 1e8649e..6e12950 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -125,13 +125,14 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
-    private static boolean failPreIndexUpdatesForTesting = false;
-    private static boolean failPostIndexUpdatesForTesting = false;
-    private static boolean failDataTableUpdatesForTesting = false;
+  private static boolean ignoreIndexRebuildForTesting  = false;
+  private static boolean failPreIndexUpdatesForTesting = false;
+  private static boolean failPostIndexUpdatesForTesting = false;
+  private static boolean failDataTableUpdatesForTesting = false;
 
-  public static void setFailPreIndexUpdatesForTesting(boolean fail) {
-        failPreIndexUpdatesForTesting = fail;
-    }
+  public static void setIgnoreIndexRebuildForTesting(boolean ignore) { ignoreIndexRebuildForTesting = ignore; }
+
+  public static void setFailPreIndexUpdatesForTesting(boolean fail) { failPreIndexUpdatesForTesting = fail; }
 
   public static void setFailPostIndexUpdatesForTesting(boolean fail) { failPostIndexUpdatesForTesting = fail; }
 
@@ -796,8 +797,10 @@ public class IndexRegionObserver extends BaseRegionObserver {
 
   private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context,
                      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+      if (ignoreIndexRebuildForTesting && context.rebuild) {
+          return;
+      }
       long start = EnvironmentEdgeManager.currentTimeMillis();
-
       try {
           if (failPreIndexUpdatesForTesting) {
               throw new DoNotRetryIOException("Simulating the first (i.e., pre) index table write failure");
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 7e61b2d..36af59e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.phoenix.compile.*;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
@@ -42,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 /**
@@ -92,6 +92,7 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
             try {
                 scan.setTimeRange(0, scn);
                 scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
+                scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE, getIndexVerifyType(configuration).toBytes());
             } catch (IOException e) {
                 throw new SQLException(e);
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 9a589b9..dcabdfd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
@@ -71,6 +72,7 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
@@ -86,12 +88,14 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.EquiDepthStreamHistogram;
@@ -112,6 +116,59 @@ import com.google.common.collect.Lists;
  *
  */
 public class IndexTool extends Configured implements Tool {
+    public enum IndexVerifyType {
+        BEFORE("BEFORE"),
+        AFTER("AFTER"),
+        BOTH("BOTH"),
+        ONLY("ONLY"),
+        NONE("NONE");
+        private String value;
+        private byte[] valueBytes;
+
+        private IndexVerifyType(String value) {
+            this.value = value;
+            this.valueBytes = PVarchar.INSTANCE.toBytes(value);
+        }
+
+        public String getValue() {
+            return this.value;
+        }
+
+        public byte[] toBytes() {
+            return this.valueBytes;
+        }
+
+        public static IndexVerifyType fromValue(String value) {
+            for (IndexVerifyType verifyType: IndexVerifyType.values()) {
+                if (value.equals(verifyType.getValue())) {
+                    return verifyType;
+                }
+            }
+            throw new IllegalStateException("Invalid value: "+ value + " for " + IndexVerifyType.class);
+        }
+
+        public static IndexVerifyType fromValue(byte[] value) {
+            return fromValue(Bytes.toString(value));
+        }
+    }
+
+    public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
+    public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
+    public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+    public final static String DATA_TABLE_NAME = "DTName";
+    public final static byte[] DATA_TABLE_NAME_BYTES = Bytes.toBytes(DATA_TABLE_NAME);
+    public static String INDEX_TABLE_NAME = "ITName";
+    public final static byte[] INDEX_TABLE_NAME_BYTES = Bytes.toBytes(INDEX_TABLE_NAME);
+    public static String DATA_TABLE_ROW_KEY = "DTRowKey";
+    public final static byte[] DATA_TABLE_ROW_KEY_BYTES = Bytes.toBytes(DATA_TABLE_ROW_KEY);
+    public static String INDEX_TABLE_ROW_KEY = "ITRowKey";
+    public final static byte[] INDEX_TABLE_ROW_KEY_BYTES = Bytes.toBytes(INDEX_TABLE_ROW_KEY);
+    public static String DATA_TABLE_TS = "DTTS";
+    public final static byte[] DATA_TABLE_TS_BYTES = Bytes.toBytes(DATA_TABLE_TS);
+    public static String INDEX_TABLE_TS = "ITTS";
+    public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS);
+    public static String ERROR_MESSAGE = "Error";
+    public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE);
 
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexTool.class);
 
@@ -119,6 +176,7 @@ public class IndexTool extends Configured implements Tool {
     private String dataTable;
     private String indexTable;
     private boolean isPartialBuild;
+    private IndexVerifyType indexVerifyType = IndexVerifyType.NONE;
     private String qDataTable;
     private String qIndexTable;
     private boolean useSnapshot;
@@ -144,6 +202,17 @@ public class IndexTool extends Configured implements Tool {
     private static final Option DIRECT_API_OPTION = new Option("direct", "direct", false,
             "This parameter is deprecated. Direct mode will be used whether it is set or not. Keeping it for backwards compatibility.");
 
+    private static final Option VERIFY_OPTION = new Option("v", "verify", true,
+            "To verify every data row has a corresponding row. The accepted values are NONE, ONLY, BEFORE," +
+                    " AFTER, and BOTH. NONE is for no inline verification, which is also the default for this option. " +
+                    "ONLY is for verifying without rebuilding index rows. The rest for verifying before, after, and " +
+                    "both before and after rebuilding row. If the verification is done before rebuilding rows and " +
+                    "the correct index rows are not rebuilt. Currently supported values are NONE, ONLY and AFTER ");
+
+    private static final Option ONLY_VERIFY_OPTION = new Option("ov", "only-verify", false,
+            "To verify every data table row has the corresponding index row with the correct content " +
+            "(without building the index table). If the verify option is set then the only-verify option will be ignored");
+
     private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0;
 
     private static final Option SPLIT_INDEX_OPTION =
@@ -189,6 +258,8 @@ public class IndexTool extends Configured implements Tool {
         options.addOption(INDEX_TABLE_OPTION);
         options.addOption(PARTIAL_REBUILD_OPTION);
         options.addOption(DIRECT_API_OPTION);
+        options.addOption(VERIFY_OPTION);
+        options.addOption(ONLY_VERIFY_OPTION);
         options.addOption(RUN_FOREGROUND_OPTION);
         options.addOption(OUTPUT_PATH_OPTION);
         options.addOption(SNAPSHOT_OPTION);
@@ -510,6 +581,7 @@ public class IndexTool extends Configured implements Tool {
             PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
             PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
 
+            PhoenixConfigurationUtil.setIndexVerifyType(configuration, indexVerifyType);
             String physicalIndexTable = pIndexTable.getPhysicalName().getString();
 
             PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable);
@@ -569,6 +641,21 @@ public class IndexTool extends Configured implements Tool {
         return job;
     }
 
+    private void createIndexToolOutputTable(Connection connection) throws Exception {
+        ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
+        Admin admin = queryServices.getAdmin();
+        if (admin.tableExists(TableName.valueOf(OUTPUT_TABLE_NAME))) {
+            return;
+        }
+        HTableDescriptor tableDescriptor = new
+                HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME));
+        tableDescriptor.setValue("DISABLE_TABLE_SOR", "true");
+        tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
+        HColumnDescriptor columnDescriptor = new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY);
+        tableDescriptor.addFamily(columnDescriptor);
+        admin.createTable(tableDescriptor);
+    }
+
     @Override
     public int run(String[] args) throws Exception {
         Connection connection = null;
@@ -592,6 +679,14 @@ public class IndexTool extends Configured implements Tool {
             dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
             indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
             isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
+            if (cmdLine.hasOption(VERIFY_OPTION.getOpt())) {
+                String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
+                indexVerifyType = IndexVerifyType.fromValue(value);
+                if (!(indexVerifyType == IndexVerifyType.NONE || indexVerifyType == IndexVerifyType.AFTER ||
+                        indexVerifyType == IndexVerifyType.ONLY)) {
+                    throw new IllegalStateException("Unsupported value for the verify option");
+                }
+            }
             qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
             try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {
                 pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable);
@@ -606,7 +701,8 @@ public class IndexTool extends Configured implements Tool {
             pIndexTable = null;
 
             connection = ConnectionUtil.getInputConnection(configuration);
-
+            createIndexToolOutputTable(connection);
+            
             if (indexTable != null) {
                 if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) {
                     throw new IllegalArgumentException(String.format(
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 39ce183..4c50e0e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -45,6 +45,7 @@ import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
 import org.apache.phoenix.mapreduce.PhoenixInputFormat;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
+import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
@@ -146,6 +147,8 @@ public final class PhoenixConfigurationUtil {
 
     public static final String DISABLED_INDEXES = "phoenix.mr.index.disabledIndexes";
 
+    public static final String INDEX_VERIFY_TYPE = "phoenix.mr.index.IndexVerifyType";
+
     // Generate splits based on scans from stats, or just from region splits
     public static final String MAPREDUCE_SPLIT_BY_STATS = "phoenix.mapreduce.split.by.stats";
 
@@ -561,6 +564,11 @@ public final class PhoenixConfigurationUtil {
         configuration.set(DISABLED_INDEXES, indexName);
     }
 
+    public static void setIndexVerifyType(Configuration configuration, IndexTool.IndexVerifyType verifyType) {
+        Preconditions.checkNotNull(configuration);
+        configuration.set(INDEX_VERIFY_TYPE, verifyType.getValue());
+    }
+
     public static String getScrutinyDataTableName(Configuration configuration) {
         Preconditions.checkNotNull(configuration);
         return configuration.get(SCRUTINY_DATA_TABLE_NAME);
@@ -686,6 +694,12 @@ public final class PhoenixConfigurationUtil {
         return configuration.get(DISABLED_INDEXES);
     }
 
+    public static IndexTool.IndexVerifyType getIndexVerifyType(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        String value = configuration.get(INDEX_VERIFY_TYPE, IndexTool.IndexVerifyType.NONE.getValue());
+        return IndexTool.IndexVerifyType.fromValue(value);
+    }
+
     public static boolean getSplitByStats(final Configuration configuration) {
         Preconditions.checkNotNull(configuration);
         boolean split = configuration.getBoolean(MAPREDUCE_SPLIT_BY_STATS, DEFAULT_SPLIT_BY_STATS);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3dab605..e048467 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -358,7 +358,7 @@ public class QueryServicesOptions {
 
     public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
     public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
-    public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 16*1024;
+    public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024;
 
     public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;