You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/01/27 08:53:30 UTC

[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5694 Add MR job counters for IndexTool inline verification

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
     new 5ad8b29  PHOENIX-5694 Add MR job counters for IndexTool inline verification
5ad8b29 is described below

commit 5ad8b29029b093355e5b8ee33214b8f312d5be2c
Author: Kadir <ko...@salesforce.com>
AuthorDate: Thu Jan 23 18:50:39 2020 -0800

    PHOENIX-5694 Add MR job counters for IndexTool inline verification
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 137 +++--
 .../phoenix/compile/ServerBuildIndexCompiler.java  |  24 +-
 .../coprocessor/IndexRebuildRegionScanner.java     | 608 +++++++++++++++------
 .../PhoenixServerBuildIndexInputFormat.java        |   2 +
 .../apache/phoenix/mapreduce/index/IndexTool.java  |  59 +-
 .../index/PhoenixIndexImportDirectReducer.java     |  66 ++-
 .../index/PhoenixIndexToolJobCounters.java         |  35 ++
 .../java/org/apache/phoenix/util/IndexUtil.java    |   2 +-
 8 files changed, 699 insertions(+), 234 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index d79689d..d09b3b8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -17,28 +17,8 @@
  */
 package org.apache.phoenix.end2end;
 
-import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -87,8 +67,39 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
 public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
@@ -187,8 +198,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
 
     @Test
     public void testWithSetNull() throws Exception {
-        // This test is for building non-transactional mutable global indexes with direct api
-        if (localIndex || transactional || !mutable || !directApi || useSnapshot) {
+        // This test is for building non-transactional global indexes with direct api
+        if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
             return;
         }
         // This tests the cases where a column having a null value is overwritten with a not null value and vice versa;
@@ -330,10 +341,20 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    private void dropIndexToolTables(Connection conn) throws Exception {
+        Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+        TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+        admin.disableTable(indexToolOutputTable);
+        admin.deleteTable(indexToolOutputTable);
+        TableName indexToolResultTable = TableName.valueOf(IndexTool.RESULT_TABLE_NAME_BYTES);
+        admin.disableTable(indexToolResultTable);
+        admin.deleteTable(indexToolResultTable);
+    }
+
     @Test
     public void testBuildSecondaryIndexAndScrutinize() throws Exception {
         // This test is for building non-transactional global indexes with direct api
-        if (localIndex || transactional || !directApi || useSnapshot) {
+        if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
             return;
         }
         String schemaName = generateUniqueName();
@@ -364,8 +385,14 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute(stmtString2);
 
             // Run the index MR job and verify that the index table is built correctly
-            IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+            IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]);
             assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
             long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
 
@@ -374,10 +401,21 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
                 upsertRow(stmt1, i);
             }
             conn.commit();
-            indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+            indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BOTH, new String[0]);
             assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+            assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
             actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
             assertEquals(2 * NROWS, actualRowCount);
+            dropIndexToolTables(conn);
         }
     }
 
@@ -410,8 +448,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         boolean dataTableNameCheck = false;
         boolean indexTableNameCheck = false;
         Cell errorMessageCell = null;
-        Result result = scanner.next();
-        if (result != null) {
+        for (Result result = scanner.next(); result != null; result = scanner.next()) {
             for (Cell cell : result.rawCells()) {
                 assertTrue(Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
                         IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, 0,
@@ -439,7 +476,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     @Test
     public void testIndexToolVerifyBeforeAndBothOptions() throws Exception {
         // This test is for building non-transactional global indexes with direct api
-        if (localIndex || transactional || !directApi || useSnapshot) {
+        if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
             return;
         }
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -476,17 +513,14 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.BOTH);
             assertEquals(0, MutationCountingRegionObserver.getMutationCount());
-            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
-            TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
-            admin.disableTable(indexToolOutputTable);
-            admin.deleteTable(indexToolOutputTable);
+            dropIndexToolTables(conn);
         }
     }
 
     @Test
     public void testIndexToolVerifyAfterOption() throws Exception {
         // This test is for building non-transactional global indexes with direct api
-        if (localIndex || transactional || !directApi || useSnapshot) {
+        if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
             return;
         }
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -520,17 +554,14 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
                     expectedValueBytes, 0, expectedValueBytes.length) == 0);
             IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
-            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
-            TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
-            admin.disableTable(indexToolOutputTable);
-            admin.deleteTable(indexToolOutputTable);
+            dropIndexToolTables(conn);
         }
     }
 
     @Test
     public void testIndexToolOnlyVerifyOption() throws Exception {
         // This test is for building non-transactional global indexes with direct api
-        if (localIndex || transactional || !directApi || useSnapshot) {
+        if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
             return;
         }
         String schemaName = generateUniqueName();
@@ -551,16 +582,13 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             // IndexTool will go through each data table row and record the mismatches in the output table
             // called PHOENIX_INDEX_TOOL
             runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
-                    null, 0, IndexTool.IndexVerifyType.ONLY);
+                    null, -1, IndexTool.IndexVerifyType.ONLY);
             Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
             byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
             assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
                     expectedValueBytes, 0, expectedValueBytes.length) == 0);
             // Delete the output table for the next test
-            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
-            TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
-            admin.disableTable(indexToolOutputTable);
-            admin.deleteTable(indexToolOutputTable);
+            dropIndexToolTables(conn);
             // Run the index tool to populate the index while verifying rows
             runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.AFTER);
@@ -569,19 +597,19 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             conn.commit();
             // Run the index tool using the only-verify option to detect this mismatch between the data and index table
             runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
-                    null, 0, IndexTool.IndexVerifyType.ONLY);
+                    null, -1, IndexTool.IndexVerifyType.ONLY);
             cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
             expectedValueBytes = Bytes.toBytes("Not matching value for 0:0:CODE E:A A:B");
             assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
                     expectedValueBytes, 0, expectedValueBytes.length) == 0);
-            admin.disableTable(indexToolOutputTable);
-            admin.deleteTable(indexToolOutputTable);
+            dropIndexToolTables(conn);
         }
     }
 
     @Test
     public void testIndexToolVerifyWithExpiredIndexRows() throws Exception {
-        if (localIndex || transactional || !directApi || useSnapshot) {
+        // This test is for building non-transactional global indexes with direct api
+        if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
             return;
         }
         String schemaName = generateUniqueName();
@@ -600,7 +628,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement()
                     .execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC",
                         indexTableName, dataTableFullName));
-            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, -1,
                 IndexTool.IndexVerifyType.ONLY);
             Cell cell =
                     getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName,
@@ -640,8 +668,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
                             .getTable(indexToolOutputTable.getName());
             Result r = hIndexToolTable.getScanner(scan).next();
             assertTrue(r == null);
-            admin.disableTable(indexToolOutputTable);
-            admin.deleteTable(indexToolOutputTable);
+            dropIndexToolTables(conn);
         }
     }
 
@@ -732,7 +759,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     @Test
     public void testSecondaryGlobalIndexFailure() throws Exception {
         // This test is for building non-transactional global indexes with direct api
-        if (localIndex || transactional || !directApi || useSnapshot) {
+        if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
             return;
         }
         String schemaName = generateUniqueName();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 1b482aa..4392e23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -35,9 +35,11 @@ import org.apache.phoenix.schema.*;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.util.IndexUtil.addEmptyColumnToScan;
 
 
 /**
@@ -94,6 +96,16 @@ public class ServerBuildIndexCompiler {
                 throw new IllegalArgumentException(
                         "ServerBuildIndexCompiler does not support global indexes on transactional tables");
             }
+            // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
+            // However, in this case, we need to project all of the data columns that contribute to the index.
+            IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
+            for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
+                if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                    scan.addFamily(columnRef.getFamily());
+                } else {
+                    scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
+                }
+            }
             IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(index), plan.getContext().getConnection());
             // Set the scan attributes that UngroupedAggregateRegionObserver will switch on.
             // For local indexes, the BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and
@@ -109,18 +121,8 @@ public class ServerBuildIndexCompiler {
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
                 scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
                 ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+                addEmptyColumnToScan(scan, indexMaintainer.getDataEmptyKeyValueCF(), indexMaintainer.getEmptyKeyValueQualifier());
             }
-            // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
-            // However, in this case, we need to project all of the data columns that contribute to the index.
-            IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
-            for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
-                if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
-                    scan.addFamily(columnRef.getFamily());
-                } else {
-                    scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
-                }
-            }
-
             if (dataTable.isTransactional()) {
                 scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction());
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 527d78c..318440f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -20,6 +20,17 @@ package org.apache.phoenix.coprocessor;
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
 import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES;
 import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
@@ -30,6 +41,7 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATT
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +52,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
@@ -90,6 +103,238 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class IndexRebuildRegionScanner extends BaseRegionScanner {
+
+    public static class VerificationResult {
+        public static class PhaseResult {
+            private long validIndexRowCount = 0;
+            private long expiredIndexRowCount = 0;
+            private long missingIndexRowCount = 0;
+            private long invalidIndexRowCount = 0;
+
+            public void add(PhaseResult phaseResult) {
+                validIndexRowCount += phaseResult.validIndexRowCount;
+                expiredIndexRowCount += phaseResult.expiredIndexRowCount;
+                missingIndexRowCount += phaseResult.missingIndexRowCount;
+                invalidIndexRowCount += phaseResult.invalidIndexRowCount;
+            }
+
+            public long getTotalCount() {
+                return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount;
+            }
+
+            @Override
+            public String toString() {
+                return "PhaseResult{" +
+                        "validIndexRowCount=" + validIndexRowCount +
+                        ", expiredIndexRowCount=" + expiredIndexRowCount +
+                        ", missingIndexRowCount=" + missingIndexRowCount +
+                        ", invalidIndexRowCount=" + invalidIndexRowCount +
+                        '}';
+            }
+        }
+
+        private long scannedDataRowCount = 0;
+        private long rebuiltIndexRowCount = 0;
+        private PhaseResult before = new PhaseResult();
+        private PhaseResult after = new PhaseResult();
+
+        @Override
+        public String toString() {
+            return "VerificationResult{" +
+                    "scannedDataRowCount=" + scannedDataRowCount +
+                    ", rebuiltIndexRowCount=" + rebuiltIndexRowCount +
+                    ", before=" + before +
+                    ", after=" + after +
+                    '}';
+        }
+
+        public long getScannedDataRowCount() {
+            return scannedDataRowCount;
+        }
+
+        public long getRebuiltIndexRowCount() {
+            return rebuiltIndexRowCount;
+        }
+
+        public long getBeforeRebuildValidIndexRowCount() {
+            return before.validIndexRowCount;
+        }
+
+        public long getBeforeRebuildExpiredIndexRowCount() {
+            return before.expiredIndexRowCount;
+        }
+
+        public long getBeforeRebuildInvalidIndexRowCount() {
+            return before.invalidIndexRowCount;
+        }
+
+        public long getBeforeRebuildMissingIndexRowCount() {
+            return before.missingIndexRowCount;
+        }
+
+        public long getAfterRebuildValidIndexRowCount() {
+            return after.validIndexRowCount;
+        }
+
+        public long getAfterRebuildExpiredIndexRowCount() {
+            return after.expiredIndexRowCount;
+        }
+
+        public long getAfterRebuildInvalidIndexRowCount() {
+            return after.invalidIndexRowCount;
+        }
+
+        public long getAfterRebuildMissingIndexRowCount() {
+            return after.missingIndexRowCount;
+        }
+
+        private void addScannedDataRowCount(long count) {
+            this.scannedDataRowCount += count;
+        }
+
+        private void addRebuiltIndexRowCount(long count) {
+            this.rebuiltIndexRowCount += count;
+        }
+
+        private void addBeforeRebuildValidIndexRowCount(long count) {
+            before.validIndexRowCount += count;
+        }
+
+        private void addBeforeRebuildExpiredIndexRowCount(long count) {
+            before.expiredIndexRowCount += count;
+        }
+
+        private void addBeforeRebuildMissingIndexRowCount(long count) {
+            before.missingIndexRowCount += count;
+        }
+
+        private void addBeforeRebuildInvalidIndexRowCount(long count) {
+            before.invalidIndexRowCount += count;
+        }
+
+        private void addAfterRebuildValidIndexRowCount(long count) {
+            after.validIndexRowCount += count;
+        }
+
+        private void addAfterRebuildExpiredIndexRowCount(long count) {
+            after.expiredIndexRowCount += count;
+        }
+
+        private void addAfterRebuildMissingIndexRowCount(long count) {
+            after.missingIndexRowCount += count;
+        }
+
+        private void addAfterRebuildInvalidIndexRowCount(long count) {
+            after.invalidIndexRowCount += count;
+        }
+
+        private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
+            if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                    AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
+                    AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES.length) == 0) {
+                return true;
+            }
+            return false;
+        }
+
+        private long getValue(Cell cell) {
+            return Long.parseLong(Bytes.toString(cell.getValueArray(),
+                    cell.getValueOffset(), cell.getValueLength()));
+        }
+
+        private void update(Cell cell) {
+            if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) {
+                addScannedDataRowCount(getValue(cell));
+            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES)) {
+                addRebuiltIndexRowCount(getValue(cell));
+            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
+                addBeforeRebuildValidIndexRowCount(getValue(cell));
+            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
+                addBeforeRebuildExpiredIndexRowCount(getValue(cell));
+            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
+                addBeforeRebuildMissingIndexRowCount(getValue(cell));
+            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
+                addBeforeRebuildInvalidIndexRowCount(getValue(cell));
+            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
+                addAfterRebuildValidIndexRowCount(getValue(cell));
+            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
+                addAfterRebuildExpiredIndexRowCount(getValue(cell));
+            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
+                addAfterRebuildMissingIndexRowCount(getValue(cell));
+            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
+                addAfterRebuildInvalidIndexRowCount(getValue(cell));
+            }
+        }
+
+        public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
+            // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
+            // Search for the place where the trailing 0xFFs start
+            int offset = rowKeyPrefix.length;
+            while (offset > 0) {
+                if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
+                    break;
+                }
+                offset--;
+            }
+            if (offset == 0) {
+                // We got an 0xFFFF... (only FFs) stopRow value which is
+                // the last possible prefix before the end of the table.
+                // So set it to stop at the 'end of the table'
+                return HConstants.EMPTY_END_ROW;
+            }
+            // Copy the right length of the original
+            byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
+            // And increment the last one
+            newStopRow[newStopRow.length - 1]++;
+            return newStopRow;
+        }
+
+        public static VerificationResult getVerificationResult(Table hTable, long ts)
+                throws IOException {
+            VerificationResult verificationResult = new VerificationResult();
+            byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
+            byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey);
+            Scan scan = new Scan();
+            scan.setStartRow(startRowKey);
+            scan.setStopRow(stopRowKey);
+            ResultScanner scanner = hTable.getScanner(scan);
+            for (Result result = scanner.next(); result != null; result = scanner.next()) {
+                for (Cell cell : result.rawCells()) {
+                    verificationResult.update(cell);
+                }
+            }
+            return verificationResult;
+        }
+
+        public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
+            if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
+                return false;
+            }
+            if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+                if (before.validIndexRowCount + before.expiredIndexRowCount != scannedDataRowCount) {
+                    return true;
+                }
+            }
+            if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
+                if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
+                    return true;
+                }
+                if (before.validIndexRowCount + before.expiredIndexRowCount +
+                        after.expiredIndexRowCount + after.validIndexRowCount != scannedDataRowCount) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        public void add(VerificationResult verificationResult) {
+            scannedDataRowCount += verificationResult.scannedDataRowCount;
+            rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount;
+            before.add(verificationResult.before);
+            after.add(verificationResult.after);
+        }
+    }
+
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
     public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
     private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
@@ -112,9 +357,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     private byte[] indexRowKey = null;
     private Table indexHTable = null;
     private Table outputHTable = null;
+    private Table resultHTable = null;
     private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
     private boolean verify = false;
-    private boolean doNotFail = false;
     private Map<byte[], Put> indexKeyToDataPutMap;
     private Map<byte[], Put> dataKeyToDataPutMap;
     private TaskRunner pool;
@@ -124,6 +369,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     private RegionCoprocessorEnvironment env;
     private HTableFactory hTableFactory;
     private int indexTableTTL;
+    private VerificationResult verificationResult;
+    private boolean isBeforeRebuilt = true;
 
     IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
                                final RegionCoprocessorEnvironment env,
@@ -158,6 +405,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
         byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
         if (valueBytes != null) {
+            verificationResult = new VerificationResult();
             verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
             if (verifyType != IndexTool.IndexVerifyType.NONE) {
                 verify = true;
@@ -165,7 +413,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
                 indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
                 indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
-                outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
+		outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
+                resultHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.RESULT_TABLE_NAME_BYTES));
                 indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
                 dataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
                 pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
@@ -173,7 +422,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                                 env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
                                 DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
                                 INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
-                tasks = new TaskBatch<>(DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS);
                 rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
                         DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
             }
@@ -188,14 +436,58 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     @Override
     public boolean isFilterDone() { return hasMore; }
 
+    private void logToIndexToolResultTable() throws IOException {
+        long scanMaxTs = scan.getTimeRange().getMax();
+        byte[] keyPrefix = Bytes.toBytes(Long.toString(scanMaxTs));
+        byte[] regionName = Bytes.toBytes(region.getRegionInfo().getRegionNameAsString());
+        byte[] rowKey = new byte[keyPrefix.length + regionName.length];
+        // The row key for the result table is the max timestamp of the scan + the table region name
+        Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
+        Bytes.putBytes(rowKey, keyPrefix.length, regionName, 0, regionName.length);
+        Put put = new Put(rowKey);
+        put.addColumn(RESULT_TABLE_COLUMN_FAMILY, IndexTool.SCAN_STOP_ROW_KEY_BYTES,
+                scanMaxTs, scan.getStopRow());
+        put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES,
+                scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.scannedDataRowCount)));
+        put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES,
+                scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.rebuiltIndexRowCount)));
+        if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
+                verifyType == IndexTool.IndexVerifyType.ONLY) {
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.validIndexRowCount)));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.expiredIndexRowCount)));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.missingIndexRowCount)));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.invalidIndexRowCount)));
+        }
+        if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.validIndexRowCount)));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.expiredIndexRowCount)));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.missingIndexRowCount)));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.invalidIndexRowCount)));
+        }
+        resultHTable.put(put);
+    }
+
     @Override
     public void close() throws IOException {
         innerScanner.close();
         if (verify) {
-            this.pool.stop("IndexRebuildRegionScanner is closing");
-            hTableFactory.shutdown();
-            indexHTable.close();
-            outputHTable.close();
+            try {
+                logToIndexToolResultTable();
+            } finally {
+                this.pool.stop("IndexRebuildRegionScanner is closing");
+                hTableFactory.shutdown();
+                indexHTable.close();
+                outputHTable.close();
+                resultHTable.close();
+            }
         }
     }
 
@@ -213,7 +505,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
         int cellCount = put.size();
         if (cellCount == allColumns.size() + 1) {
-            // We have all the columns for the index table plus the empty column. So, no delete marker is needed
+            // We have all the columns for the index table. So, no delete marker is needed
             return null;
         }
         Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(cellCount);
@@ -241,14 +533,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return del;
     }
 
-    private void addToBeVerifiedIndexRows() throws IOException {
-        for (Mutation mutation : mutations) {
-            if (mutation instanceof Put) {
-                indexKeyToDataPutMap.put(getIndexRowKey((Put)mutation), (Put)mutation);
-            }
-        }
-    }
-
     private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
         if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
             ungroupedAggregateRegionObserver.checkForRegionClosing();
@@ -300,69 +584,72 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     }
 
     private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
-                                           String errorMsg) {
+                                           String errorMsg) throws IOException {
         logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
                 errorMsg, null, null);
 
     }
 
     private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
-                                           String errorMsg, byte[] expectedValue,  byte[] actualValue) {
+                                           String errorMsg, byte[] expectedValue,  byte[] actualValue) throws IOException {
         final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
         final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
         final int PREFIX_LENGTH = 3;
         final int TOTAL_PREFIX_LENGTH = 6;
+        final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE");
+        final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER");
+        long scanMaxTs = scan.getTimeRange().getMax();
+        byte[] keyPrefix = Bytes.toBytes(Long.toString(scanMaxTs));
+        byte[] rowKey;
+        // The row key for the output table is the max timestamp of the scan + data row key
+        if (dataRowKey != null) {
+            rowKey = new byte[keyPrefix.length + dataRowKey.length];
+            Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
+            Bytes.putBytes(rowKey, keyPrefix.length, dataRowKey, 0, dataRowKey.length);
+        } else {
+            rowKey = new byte[keyPrefix.length];
+            Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
+        }
+        Put put = new Put(rowKey);
+        put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES,
+                scanMaxTs, region.getRegionInfo().getTable().getName());
+        put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES,
+                scanMaxTs, indexMaintainer.getIndexTableName());
+        if (dataRowKey != null) {
+            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs)));
+        }
+        if (indexRowKey != null) {
+            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
+                    scanMaxTs, indexRowKey);
+            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs)));
+        }
+        byte[] errorMessageBytes;
+        if (expectedValue != null) {
+            errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
+                    TOTAL_PREFIX_LENGTH];
+            Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
+            int length = errorMsg.length();
+            Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+            length += PREFIX_LENGTH;
+            Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
+            length += expectedValue.length;
+            Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+            length += PREFIX_LENGTH;
+            Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
 
-        try {
-            int longLength = Long.SIZE / Byte.SIZE;
-            byte[] rowKey;
-            if (dataRowKey != null) {
-                rowKey = new byte[longLength + dataRowKey.length];
-                Bytes.putLong(rowKey, 0, scan.getTimeRange().getMax());
-                Bytes.putBytes(rowKey, longLength, dataRowKey, 0, dataRowKey.length);
-            } else {
-                rowKey = new byte[longLength];
-                Bytes.putLong(rowKey, 0, scan.getTimeRange().getMax());
-            }
-            Put put = new Put(rowKey);
-            long scanMaxTs = scan.getTimeRange().getMax();
-            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES,
-                    scanMaxTs, region.getRegionInfo().getTable().getName());
-            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES,
-                    scanMaxTs, indexMaintainer.getIndexTableName());
-            if (dataRowKey != null) {
-                put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
-                        scanMaxTs, Bytes.toBytes(dataRowTs));
-            }
-            if (indexRowKey != null) {
-                put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
-                        scanMaxTs, indexRowKey);
-                put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
-                        scanMaxTs, Bytes.toBytes(indexRowTs));
-            }
-            byte[] errorMessageBytes;
-            if (expectedValue != null) {
-                errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
-                        TOTAL_PREFIX_LENGTH];
-                Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
-                int length = errorMsg.length();
-                Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
-                length += PREFIX_LENGTH;
-                Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
-                length += expectedValue.length;
-                Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
-                length += PREFIX_LENGTH;
-                Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
-
-            }
-            else {
-                errorMessageBytes = Bytes.toBytes(errorMsg);
-            }
-            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
-            outputHTable.put(put);
-        } catch (IOException e) {
-            exceptionMessage = "LogToIndexToolOutputTable failed " + e;
         }
+        else {
+            errorMessageBytes = Bytes.toBytes(errorMsg);
+        }
+        put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
+        if (isBeforeRebuilt) {
+            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE);
+        } else {
+            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE);
+        }
+        outputHTable.put(put);
     }
 
     private long getMaxTimestamp(Result result) {
@@ -396,19 +683,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
                 valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null);
         if (indexPut == null) {
-            // This means the index row does not have any covered columns. We just need to check if the index row
-            // has only one cell (which is the empty column cell)
-            if (indexRow.rawCells().length == 1) {
-                return true;
-            }
-            String errorMsg = "Expected to find only empty column cell but got "
-                    + indexRow.rawCells().length;
+            String errorMsg = "Empty index update";
             logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
-            if (doNotFail) {
-                return false;
-            }
-            exceptionMessage = "Index verify failed - " + errorMsg + indexHTable.getName();
-            throw new IOException(exceptionMessage);
+            return false;
         }
         else {
             // Remove the empty column prepared by Index codec as we need to change its value
@@ -440,11 +717,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                     String errorMsg = " Missing cell " + Bytes.toString(family) + ":" +
                             Bytes.toString(qualifier);
                     logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
-                    if (doNotFail) {
-                        return false;
-                    }
-                    exceptionMessage = "Index verify failed - Missing cell " + indexHTable.getName();
-                    throw new IOException(exceptionMessage);
+                    return false;
                 }
                 // Check all columns
                 if (!CellUtil.matchingValue(actualCell, expectedCell)) {
@@ -452,11 +725,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                             Bytes.toString(qualifier);
                     logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
                             errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
-                    if (doNotFail) {
-                        return false;
-                    }
-                    exceptionMessage = "Index verify failed - Not matching cell value - " + indexHTable.getName();
-                    throw new IOException(exceptionMessage);
+                    return false;
+                } else if (!CellUtil.matchingTimestamp(actualCell, expectedCell)) {
+                    String errorMsg = "Not matching timestamp for " + Bytes.toString(family) + ":" +
+                            Bytes.toString(qualifier) + " E: " + expectedCell.getTimestamp() + " A: " +
+                            actualCell.getTimestamp();
+                    logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
+                            errorMsg, null, null);
+                    return false;
                 }
                 cellCount++;
             }
@@ -465,16 +741,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             String errorMsg = "Expected to find " + cellCount + " cells but got "
                     + indexRow.rawCells().length + " cells";
             logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
-            if (!doNotFail) {
-                exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
-                throw new IOException(exceptionMessage);
-            }
             return false;
         }
         return true;
     }
 
-    private void verifyIndexRows(ArrayList<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap) throws IOException {
+    private void verifyIndexRows(List<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap,
+                                 VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
         int expectedRowCount = keys.size();
         ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
         Scan indexScan = new Scan();
@@ -487,15 +760,17 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
                 Put dataPut = indexKeyToDataPutMap.get(result.getRow());
                 if (dataPut == null) {
+                    // This should never happen
                     String errorMsg = "Missing data row";
                     logToIndexToolOutputTable(null, result.getRow(), 0, getMaxTimestamp(result), errorMsg);
-                    if (!doNotFail) {
-                        exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
-                        throw new IOException(exceptionMessage);
-                    }
+                    exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
+                    throw new IOException(exceptionMessage);
                 }
                 if (verifySingleIndexRow(result, dataPut)) {
+                    verificationPhaseResult.validIndexRowCount++;
                     perTaskDataKeyToDataPutMap.remove(dataPut.getRow());
+                } else {
+                    verificationPhaseResult.invalidIndexRowCount++;
                 }
                 rowCount++;
             }
@@ -513,6 +788,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 if (isTimestampBeforeTTL(currentTime, ts)) {
                     itr.remove();
                     rowCount++;
+                    verificationPhaseResult.expiredIndexRowCount++;
                 }
             }
         }
@@ -521,11 +797,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 String errorMsg = "Missing index row";
                 logToIndexToolOutputTable(entry.getKey(), null, getMaxTimestamp(entry.getValue()),
                         0, errorMsg);
-                if (!doNotFail) {
-                    exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
-                    throw new IOException(exceptionMessage);
-                }
             }
+            verificationPhaseResult.missingIndexRowCount += expectedRowCount - rowCount;
         }
     }
 
@@ -536,29 +809,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
     }
 
-    private void rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
-        byte[] uuidValue = ServerCacheClient.generateId();
-        UngroupedAggregateRegionObserver.MutationList currentMutationList =
-                new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
-        for (Mutation mutation : mutationList) {
-            Put put = (Put) mutation;
-            currentMutationList.add(mutation);
-            setMutationAttributes(put, uuidValue);
-            uuidValue = commitIfReady(uuidValue, currentMutationList);
-            Delete deleteMarkers = generateDeleteMarkers(put);
-            if (deleteMarkers != null) {
-                setMutationAttributes(deleteMarkers, uuidValue);
-                currentMutationList.add(deleteMarkers);
-                uuidValue = commitIfReady(uuidValue, currentMutationList);
-            }
-        }
-        if (!currentMutationList.isEmpty()) {
-            ungroupedAggregateRegionObserver.checkForRegionClosing();
-            ungroupedAggregateRegionObserver.commitBatchWithRetries(region, currentMutationList, blockingMemstoreSize);
-        }
-    }
-
-    private void addVerifyTask(final ArrayList<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap) {
+    private void addVerifyTask(final List<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap,
+                               final VerificationResult.PhaseResult verificationPhaseResult) {
         tasks.add(new Task<Boolean>() {
             @Override
             public Boolean call() throws Exception {
@@ -567,13 +819,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                         exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName();
                         throw new IOException(exceptionMessage);
                     }
-                    verifyIndexRows(keys, perTaskDataKeyToDataPutMap);
-                    if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
-                        synchronized (dataKeyToDataPutMap) {
-                            dataKeyToDataPutMap.putAll(perTaskDataKeyToDataPutMap);
-                        }
-                    }
-                    perTaskDataKeyToDataPutMap.clear();
+                    verifyIndexRows(keys, perTaskDataKeyToDataPutMap, verificationPhaseResult);
                 } catch (Exception e) {
                     throw e;
                 }
@@ -582,21 +828,33 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         });
     }
 
-    private void parallelizeIndexVerify() throws IOException {
-        addToBeVerifiedIndexRows();
-        ArrayList<KeyRange> keys = new ArrayList<>(rowCountPerTask);
+    private void parallelizeIndexVerify(VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+        for (Mutation mutation : mutations) {
+            indexKeyToDataPutMap.put(getIndexRowKey((Put)mutation), (Put)mutation);
+        }
+        int taskCount = (indexKeyToDataPutMap.size() + rowCountPerTask - 1) / rowCountPerTask;
+        tasks = new TaskBatch<>(taskCount);
+        List<Map<byte[], Put>> dataPutMapList = new ArrayList<>(taskCount);
+        List<VerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
+        List<KeyRange> keys = new ArrayList<>(rowCountPerTask);
         Map<byte[], Put> perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+        dataPutMapList.add(perTaskDataKeyToDataPutMap);
+        VerificationResult.PhaseResult perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
+        verificationPhaseResultList.add(perTaskVerificationPhaseResult);
         for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) {
             keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey()));
             perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), entry.getValue());
             if (keys.size() == rowCountPerTask) {
-                addVerifyTask(keys, perTaskDataKeyToDataPutMap);
+                addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
                 keys = new ArrayList<>(rowCountPerTask);
                 perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+                dataPutMapList.add(perTaskDataKeyToDataPutMap);
+                perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
+                verificationPhaseResultList.add(perTaskVerificationPhaseResult);
             }
         }
         if (keys.size() > 0) {
-            addVerifyTask(keys, perTaskDataKeyToDataPutMap);
+            addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
         }
         List<Boolean> taskResultList = null;
         try {
@@ -606,8 +864,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
         } catch (EarlyExitFailure e) {
             throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
-        } finally {
-            tasks.getTasks().clear();
         }
         for (Boolean result : taskResultList) {
             if (result == null) {
@@ -615,21 +871,61 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 throw new IOException(exceptionMessage);
             }
         }
+        if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
+            for (Map<byte[], Put> dataPutMap : dataPutMapList) {
+                dataKeyToDataPutMap.putAll(dataPutMap);
+            }
+        }
+        for (VerificationResult.PhaseResult result : verificationPhaseResultList) {
+            verificationPhaseResult.add(result);
+        }
+    }
+
+    private void rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
+        byte[] uuidValue = ServerCacheClient.generateId();
+        UngroupedAggregateRegionObserver.MutationList currentMutationList =
+                new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
+        for (Mutation mutation : mutationList) {
+            Put put = (Put) mutation;
+            currentMutationList.add(mutation);
+            setMutationAttributes(put, uuidValue);
+            uuidValue = commitIfReady(uuidValue, currentMutationList);
+            Delete deleteMarkers = generateDeleteMarkers(put);
+            if (deleteMarkers != null) {
+                setMutationAttributes(deleteMarkers, uuidValue);
+                currentMutationList.add(deleteMarkers);
+                uuidValue = commitIfReady(uuidValue, currentMutationList);
+            }
+        }
+        if (!currentMutationList.isEmpty()) {
+            ungroupedAggregateRegionObserver.checkForRegionClosing();
+            ungroupedAggregateRegionObserver.commitBatchWithRetries(region, currentMutationList, blockingMemstoreSize);
+        }
     }
 
     private void verifyAndOrRebuildIndex() throws IOException {
+        VerificationResult nextVerificationResult = new VerificationResult();
+        nextVerificationResult.scannedDataRowCount = mutations.size();
         if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) {
             // For these options we start with rebuilding index rows
             rebuildIndexRows(mutations);
+            nextVerificationResult.rebuiltIndexRowCount = mutations.size();
+            isBeforeRebuilt = false;
         }
         if (verifyType == IndexTool.IndexVerifyType.NONE) {
             return;
         }
         if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
                 verifyType == IndexTool.IndexVerifyType.ONLY) {
+            VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
             // For these options we start with verifying index rows
-            doNotFail = true; // Don't stop at the first mismatch
-            parallelizeIndexVerify();
+            parallelizeIndexVerify(verificationPhaseResult);
+            nextVerificationResult.before.add(verificationPhaseResult);
+            if (mutations.size() != verificationPhaseResult.getTotalCount()) {
+                throw new DoNotRetryIOException(
+                        "mutations.size() != verificationPhaseResult.getTotalCount() at the before phase " +
+                                nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
+            }
         }
         if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
             // For these options, we have identified the rows to be rebuilt and now need to rebuild them
@@ -639,15 +935,24 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 mutations.add(entry.getValue());
             }
             rebuildIndexRows(mutations);
+            nextVerificationResult.rebuiltIndexRowCount += mutations.size();
+            isBeforeRebuilt = false;
         }
 
         if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
             // We have rebuilt index row and now we need to verify them
-            doNotFail = false; // Stop at the first mismatch
             indexKeyToDataPutMap.clear();
-            parallelizeIndexVerify();
+            VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
+            parallelizeIndexVerify(verificationPhaseResult);
+            nextVerificationResult.after.add(verificationPhaseResult);
+            if (mutations.size() != verificationPhaseResult.getTotalCount()) {
+                throw new DoNotRetryIOException(
+                        "mutations.size() != verificationPhaseResult.getTotalCount() at the after phase " +
+                                nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
+            }
         }
         indexKeyToDataPutMap.clear();
+        verificationResult.add(nextVerificationResult);
     }
 
     @Override
@@ -713,18 +1018,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                         rowCount++;
                     }
                 } while (hasMore && rowCount < pageSizeInRows);
-            }
-            if (!partialRebuild && indexRowKey == null) {
-                verifyAndOrRebuildIndex();
-            }
-            else {
-                if (!mutations.isEmpty()) {
-                    ungroupedAggregateRegionObserver.checkForRegionClosing();
-                    ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+                if (!partialRebuild && indexRowKey == null) {
+                    verifyAndOrRebuildIndex();
+                } else {
+                    if (!mutations.isEmpty()) {
+                        ungroupedAggregateRegionObserver.checkForRegionClosing();
+                        ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+                    }
                 }
             }
         } catch (IOException e) {
-            hasMore = false;
             LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
             throw e;
         } finally {
@@ -735,7 +1038,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
               dataKeyToDataPutMap.clear();
             }
         }
-
         byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
         final Cell aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
                 SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 36af59e..8a62c41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -83,6 +83,8 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
         try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) {
             PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
             Long scn = (currentScnValue != null) ? Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis();
+            configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
+                    Long.toString(scn));
             PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
             ServerBuildIndexCompiler compiler =
                     new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 51cb4b4..4eb575b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -155,6 +155,9 @@ public class IndexTool extends Configured implements Tool {
     public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
     public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
     public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+    public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT";
+    public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME);
+    public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
     public final static String DATA_TABLE_NAME = "DTName";
     public final static byte[] DATA_TABLE_NAME_BYTES = Bytes.toBytes(DATA_TABLE_NAME);
     public static String INDEX_TABLE_NAME = "ITName";
@@ -169,6 +172,30 @@ public class IndexTool extends Configured implements Tool {
     public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS);
     public static String ERROR_MESSAGE = "Error";
     public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE);
+    public static String  SCAN_STOP_ROW_KEY = "StopRowKey";
+    public final static byte[] SCAN_STOP_ROW_KEY_BYTES = Bytes.toBytes(SCAN_STOP_ROW_KEY);
+    public static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount";
+    public final static byte[] SCANNED_DATA_ROW_COUNT_BYTES = Bytes.toBytes(SCANNED_DATA_ROW_COUNT);
+    public static String REBUILT_INDEX_ROW_COUNT = "RebuiltIndexRowCount";
+    public final static byte[] REBUILT_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(REBUILT_INDEX_ROW_COUNT);
+    public static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT = "BeforeRebuildValidIndexRowCount";
+    public final static byte[] BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT);
+    public static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT = "BeforeRebuildExpiredIndexRowCount";
+    public final static byte[] BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT);
+    public static String BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT = "BeforeRebuildMissingIndexRowCount";
+    public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT);
+    public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT = "BeforeRebuildInvalidIndexRowCount";
+    public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT);
+    public static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT = "AfterValidExpiredIndexRowCount";
+    public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT);
+    public static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT = "AfterRebuildExpiredIndexRowCount";
+    public final static byte[] AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT);
+    public static String AFTER_REBUILD_MISSING_INDEX_ROW_COUNT = "AfterRebuildMissingIndexRowCount";
+    public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT);
+    public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT = "AfterRebuildInvalidIndexRowCount";
+    public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT);
+    public static String  VERIFICATION_PHASE = "Phase";
+    public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE);
 
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexTool.class);
 
@@ -329,7 +356,7 @@ public class IndexTool extends Configured implements Tool {
         formatter.printHelp("help", options);
         System.exit(exitCode);
     }
-    
+
     class JobFactory {
         Connection connection;
         Configuration configuration;
@@ -637,19 +664,25 @@ public class IndexTool extends Configured implements Tool {
         return job;
     }
 
-    private void createIndexToolOutputTable(Connection connection) throws Exception {
+    private void createIndexToolTables(Connection connection) throws Exception {
         ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
         Admin admin = queryServices.getAdmin();
-        if (admin.tableExists(TableName.valueOf(OUTPUT_TABLE_NAME))) {
-            return;
-        }
-        HTableDescriptor tableDescriptor = new
-                HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME));
-        tableDescriptor.setValue("DISABLE_TABLE_SOR", "true");
-        tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
-        HColumnDescriptor columnDescriptor = new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY);
-        tableDescriptor.addFamily(columnDescriptor);
-        admin.createTable(tableDescriptor);
+        if (!admin.tableExists(TableName.valueOf(OUTPUT_TABLE_NAME))) {
+            HTableDescriptor tableDescriptor = new
+                    HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME));
+            tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
+            HColumnDescriptor columnDescriptor = new HColumnDescriptor(OUTPUT_TABLE_COLUMN_FAMILY);
+            tableDescriptor.addFamily(columnDescriptor);
+            admin.createTable(tableDescriptor);
+        }
+        if (!admin.tableExists(TableName.valueOf(RESULT_TABLE_NAME))) {
+            HTableDescriptor tableDescriptor = new
+                    HTableDescriptor(TableName.valueOf(RESULT_TABLE_NAME));
+            tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
+            HColumnDescriptor columnDescriptor = new HColumnDescriptor(RESULT_TABLE_COLUMN_FAMILY);
+            tableDescriptor.addFamily(columnDescriptor);
+            admin.createTable(tableDescriptor);
+        }
     }
 
     @Override
@@ -693,7 +726,7 @@ public class IndexTool extends Configured implements Tool {
             pIndexTable = null;
 
             connection = ConnectionUtil.getInputConnection(configuration);
-            createIndexToolOutputTable(connection);
+            createIndexToolTables(connection);
             
             if (indexTable != null) {
                 if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 0e40aff..46fb9ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -22,11 +22,15 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
@@ -40,15 +44,75 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
 
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
+
 /**
  * Reducer class that does only one task and that is to update the index state of the table.
  */
 public class PhoenixIndexImportDirectReducer extends
         Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable> {
-
+    private AtomicBoolean calledOnce = new AtomicBoolean(false);
     private static final Logger LOGGER =
             LoggerFactory.getLogger(PhoenixIndexImportDirectReducer.class);
 
+    private void updateCounters(IndexTool.IndexVerifyType verifyType,
+                                Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context context)
+            throws IOException {
+        Configuration configuration = context.getConfiguration();
+        try (final Connection connection = ConnectionUtil.getInputConnection(configuration)) {
+            long ts = Long.valueOf(configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
+            Table hTable = connection.unwrap(PhoenixConnection.class).getQueryServices()
+                    .getTable(IndexTool.RESULT_TABLE_NAME_BYTES);
+            IndexRebuildRegionScanner.VerificationResult verificationResult =
+                    IndexRebuildRegionScanner.VerificationResult.getVerificationResult(hTable, ts);
+            context.getCounter(PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT).
+                    setValue(verificationResult.getScannedDataRowCount());
+            context.getCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT).
+                    setValue(verificationResult.getRebuiltIndexRowCount());
+            if (verifyType == IndexTool.IndexVerifyType.ONLY || verifyType == IndexTool.IndexVerifyType.BEFORE ||
+                    verifyType == IndexTool.IndexVerifyType.BOTH) {
+                context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).
+                        setValue(verificationResult.getBeforeRebuildValidIndexRowCount());
+                context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).
+                        setValue(verificationResult.getBeforeRebuildExpiredIndexRowCount());
+                context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).
+                        setValue(verificationResult.getBeforeRebuildMissingIndexRowCount());
+                context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).
+                        setValue(verificationResult.getBeforeRebuildInvalidIndexRowCount());
+            }
+            if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
+                context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT).
+                        setValue(verificationResult.getAfterRebuildValidIndexRowCount());
+                context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).
+                        setValue(verificationResult.getAfterRebuildExpiredIndexRowCount());
+                context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).
+                        setValue(verificationResult.getAfterRebuildMissingIndexRowCount());
+                context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).
+                        setValue(verificationResult.getAfterRebuildInvalidIndexRowCount());
+            }
+            if (verificationResult.isVerificationFailed(verifyType)) {
+                throw new IOException("Index verification failed! " + verificationResult);
+            }
+        } catch (Exception e) {
+            throw new IOException("Fail to get index verification result", e);
+        }
+    }
+
+    @Override
+    protected void reduce(ImmutableBytesWritable arg0, Iterable<IntWritable> arg1,
+                          Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context context)
+            throws IOException, InterruptedException
+
+    {
+        if (!calledOnce.compareAndSet(false, true)) {
+            return;
+        }
+        IndexTool.IndexVerifyType verifyType = getIndexVerifyType(context.getConfiguration());
+        if (verifyType != IndexTool.IndexVerifyType.NONE) {
+            updateCounters(verifyType, context);
+        }
+    }
+
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException{
         try {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
new file mode 100644
index 0000000..c10694d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
@@ -0,0 +1,35 @@
+/**
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+/**
+ * Counters used for Index Tool MR job
+ */
+public enum PhoenixIndexToolJobCounters {
+    SCANNED_DATA_ROW_COUNT,
+    REBUILT_INDEX_ROW_COUNT,
+    BEFORE_REBUILD_VALID_INDEX_ROW_COUNT,
+    BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT,
+    BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT,
+    BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT,
+    AFTER_REBUILD_VALID_INDEX_ROW_COUNT,
+    AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT,
+    AFTER_REBUILD_MISSING_INDEX_ROW_COUNT,
+    AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 0bfbf49..bbc70fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -934,7 +934,7 @@ public class IndexUtil {
         return false;
     }
 
-    private static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] emptyCQ) {
+    public static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] emptyCQ) {
         boolean addedEmptyColumn = false;
         Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan);
         while (iterator.hasNext()) {