You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/03/28 20:36:40 UTC

[phoenix] branch master updated: PHOENIX-5748 Simplify index update generation code for consistent global indexes

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dd7d9d  PHOENIX-5748 Simplify index update generation code for consistent global indexes
9dd7d9d is described below

commit 9dd7d9d8a720c1f98bbe1564f5cfee16b8c5d507
Author: Kadir <ko...@salesforce.com>
AuthorDate: Fri Mar 27 18:46:33 2020 -0700

    PHOENIX-5748 Simplify index update generation code for consistent global indexes
---
 .../end2end/ConcurrentMutationsExtendedIT.java     |  112 +-
 .../phoenix/end2end/ConcurrentMutationsIT.java     |    3 +-
 .../org/apache/phoenix/end2end/IndexToolIT.java    |   40 +-
 .../end2end/index/GlobalIndexCheckerIT.java        |   95 ++
 .../phoenix/compile/ServerBuildIndexCompiler.java  |   15 +-
 .../coprocessor/IndexRebuildRegionScanner.java     | 1440 +++++++++++++-------
 .../coprocessor/IndexToolVerificationResult.java   |  304 +++++
 .../UngroupedAggregateRegionObserver.java          |   17 +-
 .../phoenix/hbase/index/IndexRegionObserver.java   |  868 +++++++-----
 .../org/apache/phoenix/index/IndexMaintainer.java  |    2 +-
 .../index/PhoenixIndexImportDirectReducer.java     |    5 +-
 .../index/PrepareIndexMutationsForRebuildTest.java |  732 ++++++++++
 .../phoenix/index/VerifySingleIndexRowTest.java    |  638 +++++++++
 13 files changed, 3418 insertions(+), 853 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 571961d..d35451a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -19,13 +19,19 @@ package org.apache.phoenix.end2end;
 
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
+
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.*;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -51,6 +57,33 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
 
     private final Object lock = new Object();
 
+    private long verifyIndexTable(String tableName, String indexName, Connection conn) throws Exception {
+        // This checks the state of every raw index row without rebuilding any row
+        IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+                0, IndexTool.IndexVerifyType.ONLY);
+        // This checks the state of an index row after it is repaired
+        long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+        // We want to check the index rows again as they may be modified by the read repair
+        IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+                0, IndexTool.IndexVerifyType.ONLY);
+        // Now we rebuild the entire index table and expect that it is still good after the full rebuild
+        IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+                0, IndexTool.IndexVerifyType.AFTER);
+        // Truncate, rebuild and verify the index table
+        PTable pIndexTable = PhoenixRuntime.getTable(conn, indexName);
+        TableName physicalTableName = TableName.valueOf(pIndexTable.getPhysicalName().getBytes());
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        try (Admin admin = pConn.getQueryServices().getAdmin()) {
+            admin.disableTable(physicalTableName);
+            admin.truncateTable(physicalTableName, true);
+        }
+        IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+                0, IndexTool.IndexVerifyType.AFTER);
+        long actualRowCountAfterCompaction = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+        assertEquals(actualRowCount, actualRowCountAfterCompaction);
+        return actualRowCount;
+    }
+
     @Test
     public void testSynchronousDeletesAndUpsertValues() throws Exception {
         final String tableName = generateUniqueName();
@@ -130,7 +163,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
         t2.start();
 
         doneSignal.await(60, TimeUnit.SECONDS);
-        IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+        verifyIndexTable(tableName, indexName, conn);
     }
 
     @Test
@@ -191,7 +224,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
         t2.start();
 
         doneSignal.await(60, TimeUnit.SECONDS);
-        IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+        verifyIndexTable(tableName, indexName, conn);
     }
 
     @Test @Repeat(5)
@@ -204,9 +237,10 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
         final String indexName = generateUniqueName();
         Connection conn = DriverManager.getConnection(getUrl());
         conn.createStatement().execute("CREATE TABLE " + tableName
-                + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2))  COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
+                + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, a.v1 INTEGER, b.v2 INTEGER, c.v3 INTEGER, d.v4 INTEGER," +
+                "CONSTRAINT pk PRIMARY KEY (k1,k2))  COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
         TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
-        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)");
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE(v2, v3)");
         final CountDownLatch doneSignal = new CountDownLatch(nThreads);
         Runnable[] runnables = new Runnable[nThreads];
         for (int i = 0; i < nThreads; i++) {
@@ -216,11 +250,12 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
                     try {
                         Connection conn = DriverManager.getConnection(getUrl());
                         for (int i = 0; i < 10000; i++) {
-                            boolean isNull = RAND.nextBoolean();
-                            int randInt = RAND.nextInt() % nIndexValues;
                             conn.createStatement().execute(
                                     "UPSERT INTO " + tableName + " VALUES (" + (i % nRows) + ", 0, "
-                                            + (isNull ? null : randInt) + ")");
+                                            + (RAND.nextBoolean() ? null : (RAND.nextInt() % nIndexValues)) + ", "
+                                            + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+                                            + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+                                            + (RAND.nextBoolean() ? null : RAND.nextInt()) + ")");
                             if ((i % batchSize) == 0) {
                                 conn.commit();
                             }
@@ -241,11 +276,72 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
         }
 
         assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
-        long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+        long actualRowCount = verifyIndexTable(tableName, indexName, conn);
         assertEquals(nRows, actualRowCount);
     }
 
     @Test
+    public void testConcurrentUpsertsWithNoIndexedColumns() throws Exception {
+        int nThreads = 4;
+        final int batchSize = 100;
+        final int nRows = 997;
+        final String tableName = generateUniqueName();
+        final String indexName = generateUniqueName();
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE " + tableName
+                + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, a.v1 INTEGER, b.v2 INTEGER, c.v3 INTEGER, d.v4 INTEGER," +
+                "CONSTRAINT pk PRIMARY KEY (k1,k2))  COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
+        TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE(v2, v3)");
+        final CountDownLatch doneSignal = new CountDownLatch(nThreads);
+        Runnable[] runnables = new Runnable[nThreads];
+        for (int i = 0; i < nThreads; i++) {
+            runnables[i] = new Runnable() {
+
+                @Override public void run() {
+                    try {
+                        Connection conn = DriverManager.getConnection(getUrl());
+                        for (int i = 0; i < 1000; i++) {
+                            if (RAND.nextInt() % 1000 < 10) {
+                                // Do not include the indexed column in upserts
+                                conn.createStatement().execute(
+                                        "UPSERT INTO " + tableName + " (k1, k2, b.v2, c.v3, d.v4) VALUES ("
+                                                + (RAND.nextInt() % nRows) + ", 0, "
+                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ")");
+                            } else {
+                                conn.createStatement().execute(
+                                        "UPSERT INTO " + tableName + " VALUES (" + (i % nRows) + ", 0, "
+                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+                                                + (RAND.nextBoolean() ? null : RAND.nextInt()) + ")");
+                            }
+                            if ((i % batchSize) == 0) {
+                                conn.commit();
+                            }
+                        }
+                        conn.commit();
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    } finally {
+                        doneSignal.countDown();
+                    }
+                }
+
+            };
+        }
+        for (int i = 0; i < nThreads; i++) {
+            Thread t = new Thread(runnables[i]);
+            t.start();
+        }
+
+        assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
+        verifyIndexTable(tableName, indexName, conn);
+    }
+
+    @Test
     public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception {
         final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName();
         final String indexName = generateUniqueName();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
index d1cffe5..e882463 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
@@ -266,7 +266,8 @@ public class ConcurrentMutationsIT extends ParallelStatsDisabledIT {
             EnvironmentEdgeManager.injectEdge(null);
         }
     }
-
+    @Ignore ("It is not possible to assign the same timestamp two separately committed mutations in the current model\n" +
+            " except when the server time goes backward. In that case, the behavior is not deterministic")
     @Test
     public void testDeleteRowAndUpsertValueAtSameTS1() throws Exception {
         try {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index c123741..7394f57 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
@@ -227,8 +229,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
             long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
-            // Check after compaction
-            TestUtil.doMajorCompaction(conn, dataTableFullName);
             actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
             setEveryNthRowWithNull(NROWS, 5, stmt);
@@ -239,7 +239,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             conn.commit();
             actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
-            TestUtil.doMajorCompaction(conn, dataTableFullName);
             actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
             indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null,
@@ -457,6 +456,21 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    private void verifyIndexTableRowKey(byte[] rowKey, String indexTableFullName) {
+        // The row key for the output table : timestamp | index table name | data row key
+        // The row key for the result table : timestamp | index table name | datable table region name |
+        //                                    scan start row | scan stop row
+
+        // This method verifies the common prefix, i.e., "timestamp | index table name | ", since the rest of the
+        // fields may include the separator key
+        int offset = Bytes.indexOf(rowKey, IndexRebuildRegionScanner.ROW_KEY_SEPARATOR_BYTE);
+        offset++;
+        byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
+        assertEquals(Bytes.compareTo(rowKey, offset, indexTableFullNameBytes.length, indexTableFullNameBytes, 0,
+                indexTableFullNameBytes.length), 0);
+        assertEquals(rowKey[offset + indexTableFullNameBytes.length], IndexRebuildRegionScanner.ROW_KEY_SEPARATOR_BYTE[0]);
+    }
+
     private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName)
             throws Exception {
         byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
@@ -490,6 +504,14 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             }
         }
         assertTrue(dataTableNameCheck && indexTableNameCheck && errorMessageCell != null);
+        verifyIndexTableRowKey(CellUtil.cloneRow(errorMessageCell), indexTableFullName);
+        hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .getTable(IndexTool.RESULT_TABLE_NAME_BYTES);
+        scan = new Scan();
+        scanner = hIndexTable.getScanner(scan);
+        Result result = scanner.next();
+        assert(result != null);
+        verifyIndexTableRowKey(CellUtil.cloneRow(result.rawCells()[0]), indexTableFullName);
         return errorMessageCell;
     }
 
@@ -524,7 +546,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
                     null, 0, IndexTool.IndexVerifyType.AFTER);
             assertEquals(1, MutationCountingRegionObserver.getMutationCount());
             MutationCountingRegionObserver.setMutationCount(0);
-            // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should
+            // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should not
             // write any index rows
             runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.BEFORE);
@@ -612,16 +634,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             // Run the index tool to populate the index while verifying rows
             runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.AFTER);
-            // Corrupt one cell by writing directly into the index table
-            conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix', 1, 'B')");
-            conn.commit();
-            // Run the index tool using the only-verify option to detect this mismatch between the data and index table
             runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
-                    null, -1, IndexTool.IndexVerifyType.ONLY);
-            cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
-            expectedValueBytes = Bytes.toBytes("Not matching value for 0:0:CODE E:A A:B");
-            assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
-                    expectedValueBytes, 0, expectedValueBytes.length) == 0);
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
             dropIndexToolTables(conn);
         }
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 8e4a89c..8fbeae0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.end2end.IndexToolIT;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -113,6 +114,100 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
+    public void testDelete() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+            String dml = "DELETE from " + dataTableName + " WHERE id  = 'a'";
+            assertEquals(1, conn.createStatement().executeUpdate(dml));
+            conn.commit();
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+                    dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+            if (async) {
+                // run the index MR job.
+                IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName);
+            }
+            // Count the number of index rows
+            String query = "SELECT COUNT(*) from " + indexTableName;
+            // There should be one row in the index table
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            // Add rows and check everything is still okay
+            verifyTableHealth(conn, dataTableName, indexTableName);
+        }
+    }
+
+    @Test
+    public void testDeleteNonExistingRow() throws Exception {
+        if (async) {
+            // No need to run the same test twice one for async = true and the other for async = false
+            return;
+        }
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+                    dataTableName + " (val1) include (val2, val3)");
+            String dml = "DELETE from " + dataTableName + " WHERE id  = 'a'";
+            conn.createStatement().executeUpdate(dml);
+            conn.commit();
+            // Attempt to delete a row that does not exist
+            conn.createStatement().executeUpdate(dml);
+            conn.commit();
+            // Make sure this delete attempt did not make the index and data table inconsistent
+            IndexToolIT.runIndexTool(true, false, "", dataTableName, indexTableName, null,
+                    0, IndexTool.IndexVerifyType.ONLY);
+        }
+    }
+
+    @Test
+    public void testSimulateConcurrentUpdates() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+                    dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+            if (async) {
+                // run the index MR job.
+                IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName);
+            }
+            // For the concurrent updates on the same row, the last write phase is ignored.
+            // Configure IndexRegionObserver to fail the last write phase (i.e., the post index update phase) where the
+            // verify flag is set to true and/or index rows are deleted and check that this does not impact the
+            // correctness.
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            // Do multiple updates on the same data row
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1) values ('a', 'aa')");
+            conn.commit();
+            // The expected state of the index table is  {('aa', 'a', 'abcc', 'abcd'), ('bc', 'b', 'bcd', 'bcde')}
+            // Do more multiple updates on the same data row
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val3) values ('a', null, null)");
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1) values ('a', 'ab')");
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1) values ('b', 'ab')");
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('b', 'ab', null)");
+            conn.commit();
+            // Now the expected state of the index table is  {('ab', 'a', 'abcc' , null), ('ab', 'b', null, 'bcde')}
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * from "  + indexTableName);
+            assertTrue(rs.next());
+            assertEquals("ab", rs.getString(1));
+            assertEquals("a", rs.getString(2));
+            assertEquals("abcc", rs.getString(3));
+            assertEquals(null, rs.getString(4));
+            assertTrue(rs.next());
+            assertEquals("ab", rs.getString(1));
+            assertEquals("b", rs.getString(2));
+            assertEquals(null, rs.getString(3));
+            assertEquals("bcde", rs.getString(4));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
     public void testFailPostIndexDeleteUpdate() throws Exception {
         String dataTableName = generateUniqueName();
         populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 4392e23..99caa6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -17,27 +17,30 @@
  */
 package org.apache.phoenix.compile;
 
-import java.sql.SQLException;
-import java.util.Collections;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.schema.*;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 
+import java.sql.SQLException;
+import java.util.Collections;
+
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 import static org.apache.phoenix.util.IndexUtil.addEmptyColumnToScan;
 
@@ -96,9 +99,9 @@ public class ServerBuildIndexCompiler {
                 throw new IllegalArgumentException(
                         "ServerBuildIndexCompiler does not support global indexes on transactional tables");
             }
+            IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
             // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
             // However, in this case, we need to project all of the data columns that contribute to the index.
-            IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
             for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
                 if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
                     scan.addFamily(columnRef.getFamily());
@@ -121,6 +124,8 @@ public class ServerBuildIndexCompiler {
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
                 scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
                 ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+                scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
+                BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
                 addEmptyColumnToScan(scan, indexMaintainer.getDataEmptyKeyValueCF(), indexMaintainer.getEmptyKeyValueQualifier());
             }
             if (dataTable.isTransactional()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 83c479a..793ab8e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -41,14 +41,16 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATT
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
+import java.util.NavigableSet;
 import java.util.concurrent.ExecutionException;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -71,6 +73,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.filter.SkipScanFilter;
@@ -84,6 +87,8 @@ import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
 import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
 import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
@@ -92,6 +97,7 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
@@ -99,246 +105,18 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 public class IndexRebuildRegionScanner extends BaseRegionScanner {
 
-    public static class VerificationResult {
-        public static class PhaseResult {
-            private long validIndexRowCount = 0;
-            private long expiredIndexRowCount = 0;
-            private long missingIndexRowCount = 0;
-            private long invalidIndexRowCount = 0;
-
-            public void add(PhaseResult phaseResult) {
-                validIndexRowCount += phaseResult.validIndexRowCount;
-                expiredIndexRowCount += phaseResult.expiredIndexRowCount;
-                missingIndexRowCount += phaseResult.missingIndexRowCount;
-                invalidIndexRowCount += phaseResult.invalidIndexRowCount;
-            }
-
-            public long getTotalCount() {
-                return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount;
-            }
-
-            @Override
-            public String toString() {
-                return "PhaseResult{" +
-                        "validIndexRowCount=" + validIndexRowCount +
-                        ", expiredIndexRowCount=" + expiredIndexRowCount +
-                        ", missingIndexRowCount=" + missingIndexRowCount +
-                        ", invalidIndexRowCount=" + invalidIndexRowCount +
-                        '}';
-            }
-        }
-
-        private long scannedDataRowCount = 0;
-        private long rebuiltIndexRowCount = 0;
-        private PhaseResult before = new PhaseResult();
-        private PhaseResult after = new PhaseResult();
-
-        @Override
-        public String toString() {
-            return "VerificationResult{" +
-                    "scannedDataRowCount=" + scannedDataRowCount +
-                    ", rebuiltIndexRowCount=" + rebuiltIndexRowCount +
-                    ", before=" + before +
-                    ", after=" + after +
-                    '}';
-        }
-
-        public long getScannedDataRowCount() {
-            return scannedDataRowCount;
-        }
-
-        public long getRebuiltIndexRowCount() {
-            return rebuiltIndexRowCount;
-        }
-
-        public long getBeforeRebuildValidIndexRowCount() {
-            return before.validIndexRowCount;
-        }
-
-        public long getBeforeRebuildExpiredIndexRowCount() {
-            return before.expiredIndexRowCount;
-        }
-
-        public long getBeforeRebuildInvalidIndexRowCount() {
-            return before.invalidIndexRowCount;
-        }
-
-        public long getBeforeRebuildMissingIndexRowCount() {
-            return before.missingIndexRowCount;
-        }
-
-        public long getAfterRebuildValidIndexRowCount() {
-            return after.validIndexRowCount;
-        }
-
-        public long getAfterRebuildExpiredIndexRowCount() {
-            return after.expiredIndexRowCount;
-        }
-
-        public long getAfterRebuildInvalidIndexRowCount() {
-            return after.invalidIndexRowCount;
-        }
-
-        public long getAfterRebuildMissingIndexRowCount() {
-            return after.missingIndexRowCount;
-        }
-
-        private void addScannedDataRowCount(long count) {
-            this.scannedDataRowCount += count;
-        }
-
-        private void addRebuiltIndexRowCount(long count) {
-            this.rebuiltIndexRowCount += count;
-        }
-
-        private void addBeforeRebuildValidIndexRowCount(long count) {
-            before.validIndexRowCount += count;
-        }
-
-        private void addBeforeRebuildExpiredIndexRowCount(long count) {
-            before.expiredIndexRowCount += count;
-        }
-
-        private void addBeforeRebuildMissingIndexRowCount(long count) {
-            before.missingIndexRowCount += count;
-        }
-
-        private void addBeforeRebuildInvalidIndexRowCount(long count) {
-            before.invalidIndexRowCount += count;
-        }
-
-        private void addAfterRebuildValidIndexRowCount(long count) {
-            after.validIndexRowCount += count;
-        }
-
-        private void addAfterRebuildExpiredIndexRowCount(long count) {
-            after.expiredIndexRowCount += count;
-        }
-
-        private void addAfterRebuildMissingIndexRowCount(long count) {
-            after.missingIndexRowCount += count;
-        }
-
-        private void addAfterRebuildInvalidIndexRowCount(long count) {
-            after.invalidIndexRowCount += count;
-        }
-
-        private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
-            if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
-                    AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
-                    AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES.length) == 0) {
-                return true;
-            }
-            return false;
-        }
-
-        private long getValue(Cell cell) {
-            return Long.parseLong(Bytes.toString(cell.getValueArray(),
-                    cell.getValueOffset(), cell.getValueLength()));
-        }
-
-        private void update(Cell cell) {
-            if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) {
-                addScannedDataRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES)) {
-                addRebuiltIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
-                addBeforeRebuildValidIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
-                addBeforeRebuildExpiredIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
-                addBeforeRebuildMissingIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
-                addBeforeRebuildInvalidIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
-                addAfterRebuildValidIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
-                addAfterRebuildExpiredIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
-                addAfterRebuildMissingIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
-                addAfterRebuildInvalidIndexRowCount(getValue(cell));
-            }
-        }
-
-        public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
-            // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
-            // Search for the place where the trailing 0xFFs start
-            int offset = rowKeyPrefix.length;
-            while (offset > 0) {
-                if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
-                    break;
-                }
-                offset--;
-            }
-            if (offset == 0) {
-                // We got an 0xFFFF... (only FFs) stopRow value which is
-                // the last possible prefix before the end of the table.
-                // So set it to stop at the 'end of the table'
-                return HConstants.EMPTY_END_ROW;
-            }
-            // Copy the right length of the original
-            byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
-            // And increment the last one
-            newStopRow[newStopRow.length - 1]++;
-            return newStopRow;
-        }
-
-        public static VerificationResult getVerificationResult(Table hTable, long ts)
-                throws IOException {
-            VerificationResult verificationResult = new VerificationResult();
-            byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
-            byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey);
-            Scan scan = new Scan();
-            scan.setStartRow(startRowKey);
-            scan.setStopRow(stopRowKey);
-            ResultScanner scanner = hTable.getScanner(scan);
-            for (Result result = scanner.next(); result != null; result = scanner.next()) {
-                for (Cell cell : result.rawCells()) {
-                    verificationResult.update(cell);
-                }
-            }
-            return verificationResult;
-        }
-
-        public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
-            if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
-                return false;
-            }
-            if (verifyType == IndexTool.IndexVerifyType.ONLY) {
-                if (before.validIndexRowCount + before.expiredIndexRowCount != scannedDataRowCount) {
-                    return true;
-                }
-            }
-            if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
-                if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
-                    return true;
-                }
-                if (before.validIndexRowCount + before.expiredIndexRowCount +
-                        after.expiredIndexRowCount + after.validIndexRowCount != scannedDataRowCount) {
-                    return true;
-                }
-            }
-            return false;
-        }
-
-        public void add(VerificationResult verificationResult) {
-            scannedDataRowCount += verificationResult.scannedDataRowCount;
-            rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount;
-            before.add(verificationResult.before);
-            after.add(verificationResult.after);
-        }
-    }
-
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
     public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
     private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
     public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max";
     private static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048;
+    public static final String NO_EXPECTED_MUTATION = "No expected mutation";
+    public static final String
+            ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty";
+    public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|");
     private long pageSizeInRows = Long.MAX_VALUE;
     private int rowCountPerTask;
     private boolean hasMore;
@@ -359,25 +137,32 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     private Table resultHTable = null;
     private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
     private boolean verify = false;
-    private Map<byte[], Put> indexKeyToDataPutMap;
-    private Map<byte[], Put> dataKeyToDataPutMap;
+    private Map<byte[], List<Mutation>> indexKeyToMutationMap;
+    private Map<byte[], Pair<Put, Delete>> dataKeyToMutationMap;
     private TaskRunner pool;
     private TaskBatch<Boolean> tasks;
     private String exceptionMessage;
     private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
     private RegionCoprocessorEnvironment env;
-    private int indexTableTTL;
-    private VerificationResult verificationResult;
+    private int indexTableTTL = 0;
+    private IndexToolVerificationResult verificationResult;
     private boolean isBeforeRebuilt = true;
-
-    IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
-                               final RegionCoprocessorEnvironment env,
-                               UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
+    private boolean partialRebuild = false;
+    private int  singleRowRebuildReturnCode;
+    private Map<byte[], NavigableSet<byte[]>> familyMap;
+    private byte[][] viewConstants;
+
+    @VisibleForTesting
+    public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
+                              final RegionCoprocessorEnvironment env,
+                              UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
         super(innerScanner);
         final Configuration config = env.getConfiguration();
         if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
             pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
                     QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+        } else {
+            partialRebuild = true;
         }
         maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
         mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
@@ -390,23 +175,30 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             useProto = false;
             indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
         }
-        if (!scan.isRaw()) {
-            // No need to deserialize index maintainers when the scan is raw. Raw scan is used by partial rebuilds
-            List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
-            indexMaintainer = maintainers.get(0);
-        }
+        List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+        indexMaintainer = maintainers.get(0);
         this.scan = scan;
+        familyMap = scan.getFamilyMap();
+        if (familyMap.isEmpty()) {
+            familyMap = null;
+        }
+
         this.innerScanner = innerScanner;
         this.region = region;
         this.env = env;
         this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
         indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
+        if (indexRowKey != null) {
+            setReturnCodeForSingleRowRebuild();
+            pageSizeInRows = 1;
+        }
         byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
         if (valueBytes != null) {
-            verificationResult = new VerificationResult();
+            verificationResult = new IndexToolVerificationResult();
             verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
             if (verifyType != IndexTool.IndexVerifyType.NONE) {
                 verify = true;
+                viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
                 // Create the following objects only for rebuilds by IndexTool
                 indexHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION,
                         env).getTable(TableName.valueOf(indexMaintainer.getIndexTableName()));
@@ -415,8 +207,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                         env).getTable(TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES));
                 resultHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION,
                         env).getTable(TableName.valueOf(IndexTool.RESULT_TABLE_NAME_BYTES));
-                indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-                dataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+                indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+                dataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
                 pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
                         new ThreadPoolBuilder("IndexVerify",
                                 env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
@@ -428,28 +220,72 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         }
     }
 
+    private void setReturnCodeForSingleRowRebuild() throws IOException {
+        try (RegionScanner scanner = region.getScanner(scan)) {
+            List<Cell> row = new ArrayList<>();
+            scanner.next(row);
+            // Check if the data table row we have just scanned matches with the index row key.
+            // If not, there is no need to build the index row from this data table row,
+            // and just return zero row count.
+            if (row.isEmpty()) {
+                singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue();
+            } else {
+                Put put = new Put(CellUtil.cloneRow(row.get(0)));
+                for (Cell cell : row) {
+                    put.add(cell);
+                }
+                if (checkIndexRow(indexRowKey, put)) {
+                    singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
+                } else {
+                    singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
+                }
+            }
+        }
+    }
+
     @Override
     public RegionInfo getRegionInfo() {
         return region.getRegionInfo();
     }
 
     @Override
-    public boolean isFilterDone() { return false; }
+    public boolean isFilterDone() {
+        return false;
+    }
+
+    private static byte[] generateResultTableRowKey(long ts, byte[] indexTableName,  byte [] regionName,
+                                                    byte[] startRow, byte[] stopRow) {
+        byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
+        int targetOffset = 0;
+        // The row key for the result table : timestamp | index table name | datable table region name |
+        //                                    scan start row | scan stop row
+        byte[] rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
+                ROW_KEY_SEPARATOR_BYTE.length + regionName.length + ROW_KEY_SEPARATOR_BYTE.length +
+                startRow.length + ROW_KEY_SEPARATOR_BYTE.length + stopRow.length];
+        Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
+        targetOffset += keyPrefix.length;
+        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
+        targetOffset += indexTableName.length;
+        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(rowKey, targetOffset, regionName, 0, regionName.length);
+        targetOffset += regionName.length;
+        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(rowKey, targetOffset, startRow, 0, startRow.length);
+        targetOffset += startRow.length;
+        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(rowKey, targetOffset, stopRow, 0, stopRow.length);
+        return rowKey;
+    }
 
     private void logToIndexToolResultTable() throws IOException {
         long scanMaxTs = scan.getTimeRange().getMax();
-        byte[] keyPrefix = Bytes.toBytes(Long.toString(scanMaxTs));
-        byte[] regionName = Bytes.toBytes(region.getRegionInfo().getRegionNameAsString());
-        // The row key for the result table is the max timestamp of the scan + the table region name + scan start row
-        // + scan stop row
-        byte[] rowKey = new byte[keyPrefix.length + regionName.length + scan.getStartRow().length +
-                scan.getStopRow().length];
-        Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
-        Bytes.putBytes(rowKey, keyPrefix.length, regionName, 0, regionName.length);
-        Bytes.putBytes(rowKey, keyPrefix.length + regionName.length, scan.getStartRow(), 0,
-                scan.getStartRow().length);
-        Bytes.putBytes(rowKey, keyPrefix.length + regionName.length + scan.getStartRow().length,
-                scan.getStopRow(), 0, scan.getStopRow().length);
+        byte[] rowKey = generateResultTableRowKey(scanMaxTs, indexHTable.getName().toBytes(),
+                Bytes.toBytes(region.getRegionInfo().getRegionNameAsString()), scan.getStartRow(), scan.getStopRow());
         Put put = new Put(rowKey);
         put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES,
                 scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.scannedDataRowCount)));
@@ -504,38 +340,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         m.setDurability(Durability.SKIP_WAL);
     }
 
-    private Delete generateDeleteMarkers(Put put) {
-        Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
-        int cellCount = put.size();
-        if (cellCount == allColumns.size() + 1) {
-            // We have all the columns for the index table. So, no delete marker is needed
-            return null;
-        }
-        Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(cellCount);
-        long ts = 0;
-        for (List<Cell> cells : put.getFamilyCellMap().values()) {
-            if (cells == null) {
-                break;
-            }
-            for (Cell cell : cells) {
-                includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
-                if (ts < cell.getTimestamp()) {
-                    ts = cell.getTimestamp();
-                }
-            }
-        }
-        Delete del = null;
-        for (ColumnReference column : allColumns) {
-            if (!includedColumns.contains(column)) {
-                if (del == null) {
-                    del = new Delete(put.getRow());
-                }
-                del.addColumns(column.getFamily(), column.getQualifier(), ts);
-            }
-        }
-        return del;
-    }
-
     private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
         if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
             ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
@@ -546,12 +350,32 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return uuidValue;
     }
 
-    private class SimpleValueGetter implements ValueGetter {
+    @VisibleForTesting
+    public int setIndexTableTTL(int ttl) {
+        indexTableTTL = ttl;
+        return 0;
+    }
+
+    @VisibleForTesting
+    public int setIndexMaintainer(IndexMaintainer indexMaintainer) {
+        this.indexMaintainer = indexMaintainer;
+        return 0;
+    }
+
+    @VisibleForTesting
+    public int setIndexKeyToMutationMap(Map<byte[], List<Mutation>> newTreeMap) {
+        this.indexKeyToMutationMap = newTreeMap;
+        return 0;
+    }
+
+    public static class SimpleValueGetter implements ValueGetter {
         final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
         final Put put;
-        SimpleValueGetter (final Put put) {
+
+        public SimpleValueGetter(final Put put) {
             this.put = put;
         }
+
         @Override
         public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
             List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
@@ -570,7 +394,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
 
     }
 
-    private byte[] getIndexRowKey(final Put dataRow) throws IOException {
+    public byte[] getIndexRowKey(final Put dataRow) throws IOException {
         ValueGetter valueGetter = new SimpleValueGetter(dataRow);
         byte[] builtIndexRowKey = indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()),
                 null, null, HConstants.LATEST_TIMESTAMP);
@@ -586,15 +410,36 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return true;
     }
 
-    private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+    @VisibleForTesting
+    public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
                                            String errorMsg) throws IOException {
         logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
                 errorMsg, null, null);
 
     }
 
-    private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
-                                           String errorMsg, byte[] expectedValue,  byte[] actualValue) throws IOException {
+    private static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) {
+        byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
+        byte[] rowKey;
+        int targetOffset = 0;
+        // The row key for the output table : timestamp | index table name | data row key
+        rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
+                ROW_KEY_SEPARATOR_BYTE.length + dataRowKey.length];
+        Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
+        targetOffset += keyPrefix.length;
+        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
+        targetOffset += indexTableName.length;
+        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(rowKey, targetOffset, dataRowKey, 0, dataRowKey.length);
+        return rowKey;
+    }
+
+    @VisibleForTesting
+    public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+                                           String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException {
         final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
         final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
         final int PREFIX_LENGTH = 3;
@@ -602,32 +447,19 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE");
         final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER");
         long scanMaxTs = scan.getTimeRange().getMax();
-        byte[] keyPrefix = Bytes.toBytes(Long.toString(scanMaxTs));
-        byte[] rowKey;
-        // The row key for the output table is the max timestamp of the scan + data row key
-        if (dataRowKey != null) {
-            rowKey = new byte[keyPrefix.length + dataRowKey.length];
-            Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
-            Bytes.putBytes(rowKey, keyPrefix.length, dataRowKey, 0, dataRowKey.length);
-        } else {
-            rowKey = new byte[keyPrefix.length];
-            Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
-        }
+        byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexHTable.getName().toBytes(), dataRowKey);
         Put put = new Put(rowKey);
         put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES,
                 scanMaxTs, region.getRegionInfo().getTable().getName());
         put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES,
                 scanMaxTs, indexMaintainer.getIndexTableName());
-        if (dataRowKey != null) {
-            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
+        put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
                     scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs)));
-        }
-        if (indexRowKey != null) {
-            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
-                    scanMaxTs, indexRowKey);
-            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs)));
-        }
+
+        put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
+                scanMaxTs, indexRowKey);
+        put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
+                scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs)));
         byte[] errorMessageBytes;
         if (expectedValue != null) {
             errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
@@ -642,8 +474,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             length += PREFIX_LENGTH;
             Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
 
-        }
-        else {
+        } else {
             errorMessageBytes = Bytes.toBytes(errorMsg);
         }
         put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
@@ -655,21 +486,11 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         outputHTable.put(put);
     }
 
-    private long getMaxTimestamp(Result result) {
-        long ts = 0;
-        for (Cell cell : result.rawCells()) {
-            if (ts < cell.getTimestamp()) {
-                ts = cell.getTimestamp();
-            }
-        }
-        return ts;
-    }
-
-    private long getMaxTimestamp(Put put) {
+    private static long getMaxTimestamp(Mutation m) {
         long ts = 0;
-        for (List<Cell> cells : put.getFamilyCellMap().values()) {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
             if (cells == null) {
-                break;
+                continue;
             }
             for (Cell cell : cells) {
                 if (ts < cell.getTimestamp()) {
@@ -680,132 +501,477 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return ts;
     }
 
-    private boolean verifySingleIndexRow(Result indexRow, final Put dataRow) throws IOException {
-        ValueGetter valueGetter = new SimpleValueGetter(dataRow);
-        long ts = getMaxTimestamp(dataRow);
-        Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
-                valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null);
-        if (indexPut == null) {
-            // This means the data row does not have any covered column values
-            indexPut = new Put(indexRow.getRow());
+    private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) {
+        List<Cell> cellList = m.getFamilyCellMap().get(family);
+        if (cellList == null) {
+            return null;
         }
-        else {
-            // Remove the empty column prepared by Index codec as we need to change its value
-            removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
-                    indexMaintainer.getEmptyKeyValueQualifier());
+        for (Cell cell : cellList) {
+            if (CellUtil.matchingQualifier(cell, qualifier)) {
+                return cell;
+            }
         }
-        // Add the empty column
-        indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
-                indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
-        int cellCount = 0;
-        long currentTime = EnvironmentEdgeManager.currentTime();
-        for (List<Cell> cells : indexPut.getFamilyCellMap().values()) {
+        return null;
+    }
+
+    private boolean isMatchingMutation(Mutation expected, Mutation actual, int iteration) throws IOException {
+        if (getTimestamp(expected) != getTimestamp(actual)) {
+            String errorMsg = "Not matching timestamp";
+            byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+            logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
+                    errorMsg, null, null);
+            return false;
+        }
+        int expectedCellCount = 0;
+        for (List<Cell> cells : expected.getFamilyCellMap().values()) {
             if (cells == null) {
-                break;
+                continue;
             }
             for (Cell expectedCell : cells) {
+                expectedCellCount++;
                 byte[] family = CellUtil.cloneFamily(expectedCell);
                 byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
-                Cell actualCell = indexRow.getColumnLatestCell(family, qualifier);
-                if (actualCell == null) {
-                    // Check if cell expired as per the current server's time and data table ttl
-                    // Index table should have the same ttl as the data table, hence we might not
-                    // get a value back from index if it has already expired between our rebuild and
-                    // verify
-                    // TODO: have a metric to update for these cases
-                    if (isTimestampBeforeTTL(currentTime, expectedCell.getTimestamp())) {
-                        continue;
-                    }
-                    String errorMsg = " Missing cell " + Bytes.toString(family) + ":" +
-                            Bytes.toString(qualifier);
-                    logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+                Cell actualCell = getCell(actual, family, qualifier);
+                if (actualCell == null ||
+                        !CellUtil.matchingType(expectedCell, actualCell)) {
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+                    String errorMsg = "Missing cell (in iteration " + iteration + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+                    logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg);
                     return false;
                 }
-                if (actualCell.getTimestamp() < ts) {
-                    // Skip older cells since a Phoenix index row is composed of cells with the same timestamp
-                    continue;
-                }
-                // Check all columns
                 if (!CellUtil.matchingValue(actualCell, expectedCell)) {
-                    String errorMsg = "Not matching value for " + Bytes.toString(family) + ":" +
-                            Bytes.toString(qualifier);
-                    logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
+                    String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+                    logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
                             errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
                     return false;
-                } else if (actualCell.getTimestamp() != ts) {
-                    String errorMsg = "Not matching timestamp for " + Bytes.toString(family) + ":" +
-                            Bytes.toString(qualifier) + " E: " + ts + " A: " +
-                            actualCell.getTimestamp();
-                    logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
-                            errorMsg, null, null);
-                    return false;
                 }
-                cellCount++;
             }
         }
-        if (cellCount != indexRow.rawCells().length) {
-            String errorMsg = "Expected to find " + cellCount + " cells but got "
-                    + indexRow.rawCells().length + " cells";
-            logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+        int actualCellCount = 0;
+        for (List<Cell> cells : actual.getFamilyCellMap().values()) {
+            if (cells == null) {
+                continue;
+            }
+            actualCellCount += cells.size();
+        }
+        if (expectedCellCount != actualCellCount) {
+            String errorMsg = "Index has extra cells (in iteration " + iteration + ")";
+            byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+            logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
+                    errorMsg);
+            return false;
+        }
+        return true;
+    }
+
+    private boolean isVerified(Put mutation) throws IOException {
+        List<Cell> cellList = mutation.get(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                indexMaintainer.getEmptyKeyValueQualifier());
+        Cell cell = (cellList != null && !cellList.isEmpty()) ? cellList.get(0) : null;
+        if (cell == null) {
+            throw new DoNotRetryIOException("No empty column cell");
+        }
+        if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                VERIFIED_BYTES, 0, VERIFIED_BYTES.length) == 0) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * This is to reorder the mutations in descending order by the tuple of timestamp and mutation type where
+     * delete comes before put
+     */
+    public static final Comparator<Mutation> MUTATION_TS_DESC_COMPARATOR = new Comparator<Mutation>() {
+        @Override
+        public int compare(Mutation o1, Mutation o2) {
+            long ts1 = getTimestamp(o1);
+            long ts2 = getTimestamp(o2);
+            if (ts1 > ts2) {
+                return -1;
+            }
+            if (ts1 < ts2) {
+                return 1;
+            }
+            if (o1 instanceof Delete && o2 instanceof Put) {
+                return -1;
+            }
+            if (o1 instanceof Put && o2 instanceof Delete) {
+                return 1;
+            }
+            return 0;
+        }
+    };
+
+    private boolean isDeleteFamily(Mutation mutation) {
+        for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamily) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private boolean isDeleteFamilyVersion(Mutation mutation) {
+        for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamilyVersion) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    @VisibleForTesting
+    public List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException {
+        Put put = null;
+        Delete del = null;
+        for (Cell cell : indexRow.rawCells()) {
+            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                if (put == null) {
+                    put = new Put(CellUtil.cloneRow(cell));
+                }
+                put.add(cell);
+            } else {
+                if (del == null) {
+                    del = new Delete(CellUtil.cloneRow(cell));
+                }
+                del.addDeleteMarker(cell);
+            }
+        }
+        return getMutationsWithSameTS(put, del);
+    }
+    /**
+     * In this method, the actual list is repaired in memory using the expected list which is actually the output of
+     * rebuilding the index table row. The result of this repair is used only for verification.
+     */
+    private void repairActualMutationList(List<Mutation> actualMutationList, List<Mutation> expectedMutationList)
+            throws IOException {
+        // Find the first (latest) actual unverified put mutation
+        List<Mutation> repairedMutationList = new ArrayList<>(expectedMutationList.size());
+        for (Mutation actual : actualMutationList) {
+            if (actual instanceof Put && !isVerified((Put) actual)) {
+                long ts = getTimestamp(actual);
+                int expectedIndex;
+                int expectedListSize = expectedMutationList.size();
+                for (expectedIndex = 0; expectedIndex < expectedListSize; expectedIndex++) {
+                    if (getTimestamp(expectedMutationList.get(expectedIndex)) <= ts) {
+                        if (expectedIndex > 0) {
+                            expectedIndex--;
+                        }
+                        break;
+                    }
+                }
+                if (expectedIndex == expectedListSize) {
+                    continue;
+                }
+                for (; expectedIndex < expectedListSize; expectedIndex++) {
+                    Mutation mutation = expectedMutationList.get(expectedIndex);
+                    if (mutation instanceof Put) {
+                        mutation = new Put((Put) mutation);
+                    } else {
+                        mutation = new Delete((Delete) mutation);
+                    }
+                    repairedMutationList.add(mutation);
+                }
+                // Since we repair the entire history, there is no need to more than once
+                break;
+            }
+        }
+        if (repairedMutationList.isEmpty()) {
+            return;
+        }
+        actualMutationList.addAll(repairedMutationList);
+        Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
+    }
+
+    private void cleanUpActualMutationList(List<Mutation> actualMutationList)
+            throws IOException {
+        Iterator<Mutation> iterator = actualMutationList.iterator();
+        Mutation previous = null;
+        while (iterator.hasNext()) {
+            Mutation mutation = iterator.next();
+            if ((mutation instanceof Put && !isVerified((Put) mutation)) ||
+                    (mutation instanceof Delete && isDeleteFamilyVersion(mutation))) {
+                iterator.remove();
+            } else {
+                if (previous != null && getTimestamp(previous) == getTimestamp(mutation) &&
+                        ((previous instanceof Put && mutation instanceof Put) ||
+                                previous instanceof Delete && mutation instanceof Delete)) {
+                    iterator.remove();
+                } else {
+                    previous = mutation;
+                }
+            }
+        }
+    }
+
+    /**
+     * There are two types of verification: without repair and with repair. Without-repair verification is done before
+     * or after index rebuild. It is done before index rebuild to identify the rows to be rebuilt. It is done after
+     * index rebuild to verify the rows that have been rebuilt. With-repair verification can be done anytime using
+     * the “-v ONLY” option to check the consistency of the index table. Note that with-repair verification simulates
+     * read repair in-memory for the purpose of verification, but does not actually repair the data in the index.
+     *
+     * Unverified Rows
+     *
+     * For each mutable data table mutation during regular data table updates, two operations are done on the data table.
+     * One is to read the existing row state, and the second is to update the data table for this row. The processing of
+     * concurrent data mutations are serialized once for reading the existing row states, and then serialized again
+     * for updating the data table. In other words, they go through locking twice, i.e., [lock, read, unlock] and
+     * [lock, write, unlock]. Because of this two phase locking, for a pair of concurrent mutations (for the same row),
+     * the same row state can be read from the data table. This means the same existing index row can be made unverified
+     * twice with different timestamps, one for each concurrent mutation. These unverified mutations can be repaired
+     * from the data table later during HBase scans using the index read repair process. This is one of the reasons
+     * for having extra unverified rows in the index table. The other reason is the data table write failures.
+     * When a data table write fails, it leaves an unverified index row behind. These rows are never returned to clients,
+     * instead they are repaired, which means either they are rebuilt from their data table rows or they are deleted if
+     * their data table rows do not exist.
+     *
+     * Delete Family Version Markers
+     *
+     * The family version delete markers are generated by the read repair to remove extra unverified rows. They only
+     * show up in the actual mutation list since they are not generated for regular table updates or index rebuilds.
+     * For the verification purpose, these delete markers can be treated as extra unverified rows and can be safely
+     * skipped.
+     *
+     * Delete Family Markers
+     * Delete family markers are generated during read repair, regular table updates and index rebuilds to delete index
+     * table rows. The read repair generates them to delete extra unverified rows. During regular table updates or
+     * index rebuilds, the delete family markers are used to delete index rows due to data table row deletes or
+     * data table row overwrites.
+     *
+     * Verification Algorithm
+     *
+     * IndexTool verification generates an expected list of index mutations from the data table rows and uses this list
+     * to check if index table rows are consistent with the data table.
+     *
+     * The expect list is generated using the index rebuild algorithm. This mean for a given row, the list can include
+     * a number of put and delete mutations such that the followings hold:
+     *
+     * Every mutation will include a set of cells with the same timestamp
+     * Every mutation has a different timestamp
+     * A delete mutation will include only delete family cells and it is for deleting the entire row and its versions
+     * Every put mutation is verified
+     *
+     * For both verification types, after the expected list of index mutations is constructed for a given data table,
+     * another list called the actual list of index mutations is constructed by reading the index table row using HBase
+     * raw scan and all versions of the cells of the row are retrieved.
+     *
+     * As in the construction for the expected list, the cells are grouped into a put and a delete set. The put and
+     * delete sets for a given row are further grouped based on their timestamps into put and delete mutations such that
+     * all the cells in a mutation have the timestamps. The put and delete mutations are then sorted within a single
+     * list. Mutations in this list are sorted in ascending order of their timestamp. This list is the actual list.
+     *
+     * For the without-repair verification, unverified mutations and family version delete markers are removed from
+     * the actual list and then the list is compared with the expected list.
+     *
+     * In case of the with-repair verification, the actual list is first repaired, then unverified mutations and family
+     * version delete markers are removed from the actual list and finally the list is compared with the expected list.
+     *
+     * The actual list is repaired as follows: Every unverified mutation is repaired using the method read repair uses.
+     * However, instead of going through actual repair implementation, the expected mutations are used for repair.
+     */
+
+    @VisibleForTesting
+    public boolean verifySingleIndexRow(Result indexRow, IndexToolVerificationResult.PhaseResult verificationPhaseResult)
+            throws IOException {
+        List<Mutation> expectedMutationList = indexKeyToMutationMap.get(indexRow.getRow());
+        if (expectedMutationList == null) {
+            throw new DoNotRetryIOException(NO_EXPECTED_MUTATION);
+        }
+        List<Mutation> actualMutationList = prepareActualIndexMutations(indexRow);
+        if (actualMutationList == null || actualMutationList.isEmpty()) {
+            throw new DoNotRetryIOException(ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
+        }
+        Collections.sort(expectedMutationList, MUTATION_TS_DESC_COMPARATOR);
+        Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
+        if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+            repairActualMutationList(actualMutationList, expectedMutationList);
+        }
+        cleanUpActualMutationList(actualMutationList);
+        long currentTime = EnvironmentEdgeManager.currentTime();
+        int actualIndex = 0;
+        int expectedIndex = 0;
+        int matchingCount = 0;
+        int expectedSize = expectedMutationList.size();
+        int actualSize = actualMutationList.size();
+        Mutation expected = null;
+        Mutation previousExpected;
+        Mutation actual;
+        while (expectedIndex < expectedSize && actualIndex <actualSize) {
+            previousExpected = expected;
+            expected = expectedMutationList.get(expectedIndex);
+            // Check if cell expired as per the current server's time and data table ttl
+            // Index table should have the same ttl as the data table, hence we might not
+            // get a value back from index if it has already expired between our rebuild and
+            // verify
+            // TODO: have a metric to update for these cases
+            if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) {
+                verificationPhaseResult.expiredIndexRowCount++;
+                return true;
+            }
+            actual = actualMutationList.get(actualIndex);
+            if (expected instanceof Put) {
+                if (previousExpected instanceof Delete) {
+                    // Between an expected delete and put, there can be one or more deletes due to
+                    // concurrent mutations or data table write failures. Skip all of them if any
+                    while (getTimestamp(actual) > getTimestamp(expected) && (actual instanceof Delete)) {
+                        actualIndex++;
+                        if (actualIndex == actualSize) {
+                            break;
+                        }
+                        actual = actualMutationList.get(actualIndex);
+                    }
+                    if (actualIndex == actualSize) {
+                        break;
+                    }
+                }
+                if (isMatchingMutation(expected, actual, expectedIndex)) {
+                    expectedIndex++;
+                    actualIndex++;
+                    matchingCount++;
+                    continue;
+                }
+            } else { // expected instanceof Delete
+                // Between put and delete, delete and delete, or before first delete, there can be other deletes.
+                // Skip all of them if any
+                while (getTimestamp(actual) > getTimestamp(expected) && actual instanceof Delete) {
+                    actualIndex++;
+                    if (actualIndex == actualSize) {
+                        break;
+                    }
+                    actual = actualMutationList.get(actualIndex);
+                }
+                if (actualIndex == actualSize) {
+                    break;
+                }
+                if (getTimestamp(actual) == getTimestamp(expected) &&
+                        (actual instanceof Delete && isDeleteFamily(actual))) {
+                    expectedIndex++;
+                    actualIndex++;
+                    matchingCount++;
+                    continue;
+                }
+                String errorMsg = "Delete check failure";
+                byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+                logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+                        getTimestamp(expected),
+                        getTimestamp(actual), errorMsg);
+            }
+            verificationPhaseResult.invalidIndexRowCount++;
             return false;
         }
+        if ((expectedIndex != expectedSize) || actualIndex != actualSize) {
+            for (; expectedIndex < expectedSize; expectedIndex++) {
+                expected = expectedMutationList.get(expectedIndex);
+                // Check if cell expired as per the current server's time and data table ttl
+                // Index table should have the same ttl as the data table, hence we might not
+                // get a value back from index if it has already expired between our rebuild and
+                // verify
+                // TODO: have a metric to update for these cases
+                if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) {
+                    verificationPhaseResult.expiredIndexRowCount++;
+                }
+            }
+            if (matchingCount > 0) {
+                if (verifyType != IndexTool.IndexVerifyType.ONLY) {
+                    // We do not consider this as a verification issue but log it for further information.
+                    // This may happen due to compaction
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+                    String errorMsg = "Expected to find " + expectedMutationList.size() + " mutations but got "
+                            + actualMutationList.size();
+                    logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+                            getTimestamp(expectedMutationList.get(0)),
+                            getTimestamp(actualMutationList.get(0)), errorMsg);
+                }
+            } else {
+                byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+                String errorMsg = "Not matching index row";
+                logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+                        getTimestamp(expectedMutationList.get(0)), 0L, errorMsg);
+                verificationPhaseResult.invalidIndexRowCount++;
+                return false;
+            }
+        }
+        verificationPhaseResult.validIndexRowCount++;
         return true;
     }
 
-    private void verifyIndexRows(List<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap,
-                                 VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
-        int expectedRowCount = keys.size();
+    private static long getMaxTimestamp(Pair<Put, Delete> pair) {
+        Put put = pair.getFirst();
+        long ts1 = 0;
+        if (put != null) {
+            ts1 = getMaxTimestamp(put);
+        }
+        Delete del = pair.getSecond();
+        long ts2 = 0;
+        if (del != null) {
+            ts1 = getMaxTimestamp(del);
+        }
+        return (ts1 > ts2) ? ts1 : ts2;
+    }
+
+    private void verifyIndexRows(List<KeyRange> keys,
+            IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+        List<KeyRange> invalidKeys = new ArrayList<>();
         ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
         Scan indexScan = new Scan();
         indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
         scanRanges.initializeScan(indexScan);
+        /*
         SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
-        indexScan.setFilter(skipScanFilter);
-        int rowCount = 0;
+        indexScan.setFilter(new SkipScanFilter(skipScanFilter, true));
+        */
+        indexScan.setRaw(true);
+        indexScan.setMaxVersions();
         try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
             for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
-                Put dataPut = indexKeyToDataPutMap.get(result.getRow());
-                if (dataPut == null) {
-                    // This should never happen
-                    String errorMsg = "Missing data row";
-                    logToIndexToolOutputTable(null, result.getRow(), 0, getMaxTimestamp(result), errorMsg);
-                    exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
-                    throw new IOException(exceptionMessage);
+                KeyRange keyRange = PVarbinary.INSTANCE.getKeyRange(result.getRow());
+                if (!keys.contains(keyRange)) {
+                    continue;
                 }
-                if (verifySingleIndexRow(result, dataPut)) {
-                    verificationPhaseResult.validIndexRowCount++;
-                    perTaskDataKeyToDataPutMap.remove(dataPut.getRow());
-                } else {
-                    verificationPhaseResult.invalidIndexRowCount++;
+                if (!verifySingleIndexRow(result, verificationPhaseResult)) {
+                    invalidKeys.add(keyRange);
                 }
-                rowCount++;
+                keys.remove(keyRange);
             }
         } catch (Throwable t) {
             ServerUtil.throwIOException(indexHTable.getName().toString(), t);
         }
         // Check if any expected rows from index(which we didn't get) are already expired due to TTL
         // TODO: metrics for expired rows
-        if (!perTaskDataKeyToDataPutMap.isEmpty()) {
-            Iterator<Entry<byte[], Put>> itr = perTaskDataKeyToDataPutMap.entrySet().iterator();
+        if (!keys.isEmpty()) {
+            Iterator<KeyRange> itr = keys.iterator();
             long currentTime = EnvironmentEdgeManager.currentTime();
             while(itr.hasNext()) {
-                Entry<byte[], Put> entry = itr.next();
-                long ts = getMaxTimestamp(entry.getValue());
-                if (isTimestampBeforeTTL(currentTime, ts)) {
+                KeyRange keyRange = itr.next();
+                byte[] key = keyRange.getLowerRange();
+                List<Mutation> mutationList = indexKeyToMutationMap.get(key);
+                if (isTimestampBeforeTTL(currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) {
                     itr.remove();
-                    rowCount++;
                     verificationPhaseResult.expiredIndexRowCount++;
                 }
             }
         }
-        if (rowCount != expectedRowCount) {
-            for (Map.Entry<byte[], Put> entry : perTaskDataKeyToDataPutMap.entrySet()) {
+        if (keys.size() > 0) {
+            for (KeyRange keyRange : keys) {
                 String errorMsg = "Missing index row";
-                logToIndexToolOutputTable(entry.getKey(), null, getMaxTimestamp(entry.getValue()),
-                        0, errorMsg);
+                byte[] key = keyRange.getLowerRange();
+                List<Mutation> mutationList = indexKeyToMutationMap.get(key);
+                byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
+                logToIndexToolOutputTable(dataKey,
+                        keyRange.getLowerRange(),
+                        getMaxTimestamp(dataKeyToMutationMap.get(dataKey)),
+                        getTimestamp(mutationList.get(mutationList.size() - 1)), errorMsg);
             }
-            verificationPhaseResult.missingIndexRowCount += expectedRowCount - rowCount;
+            verificationPhaseResult.missingIndexRowCount += keys.size();
         }
+        keys.addAll(invalidKeys);
     }
 
     private boolean isTimestampBeforeTTL(long currentTime, long tsToCheck) {
@@ -815,8 +981,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
     }
 
-    private void addVerifyTask(final List<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap,
-                               final VerificationResult.PhaseResult verificationPhaseResult) {
+    private void addVerifyTask(final List<KeyRange> keys,
+                               final IndexToolVerificationResult.PhaseResult verificationPhaseResult) {
         tasks.add(new Task<Boolean>() {
             @Override
             public Boolean call() throws Exception {
@@ -825,7 +991,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                         exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName();
                         throw new IOException(exceptionMessage);
                     }
-                    verifyIndexRows(keys, perTaskDataKeyToDataPutMap, verificationPhaseResult);
+                    verifyIndexRows(keys, verificationPhaseResult);
                 } catch (Exception e) {
                     throw e;
                 }
@@ -834,33 +1000,27 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         });
     }
 
-    private void parallelizeIndexVerify(VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
-        for (Mutation mutation : mutations) {
-            indexKeyToDataPutMap.put(getIndexRowKey((Put)mutation), (Put)mutation);
-        }
-        int taskCount = (indexKeyToDataPutMap.size() + rowCountPerTask - 1) / rowCountPerTask;
+    private void parallelizeIndexVerify(IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+        int taskCount = (indexKeyToMutationMap.size() + rowCountPerTask - 1) / rowCountPerTask;
         tasks = new TaskBatch<>(taskCount);
-        List<Map<byte[], Put>> dataPutMapList = new ArrayList<>(taskCount);
-        List<VerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
+        List<List<KeyRange>> listOfKeyRangeList = new ArrayList<>(taskCount);
+        List<IndexToolVerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
         List<KeyRange> keys = new ArrayList<>(rowCountPerTask);
-        Map<byte[], Put> perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-        dataPutMapList.add(perTaskDataKeyToDataPutMap);
-        VerificationResult.PhaseResult perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
+        listOfKeyRangeList.add(keys);
+        IndexToolVerificationResult.PhaseResult perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
         verificationPhaseResultList.add(perTaskVerificationPhaseResult);
-        for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) {
-            keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey()));
-            perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), entry.getValue());
+        for (byte[] indexKey: indexKeyToMutationMap.keySet()) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(indexKey));
             if (keys.size() == rowCountPerTask) {
-                addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
+                addVerifyTask(keys, perTaskVerificationPhaseResult);
                 keys = new ArrayList<>(rowCountPerTask);
-                perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-                dataPutMapList.add(perTaskDataKeyToDataPutMap);
-                perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
+                listOfKeyRangeList.add(keys);
+                perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
                 verificationPhaseResultList.add(perTaskVerificationPhaseResult);
             }
         }
         if (keys.size() > 0) {
-            addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
+            addVerifyTask(keys, perTaskVerificationPhaseResult);
         }
         List<Boolean> taskResultList = null;
         try {
@@ -877,13 +1037,19 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 throw new IOException(exceptionMessage);
             }
         }
+        for (IndexToolVerificationResult.PhaseResult result : verificationPhaseResultList) {
+            verificationPhaseResult.add(result);
+        }
         if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
-            for (Map<byte[], Put> dataPutMap : dataPutMapList) {
-                dataKeyToDataPutMap.putAll(dataPutMap);
+            Map<byte[], Pair<Put, Delete>> newDataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+            for (List<KeyRange> keyRangeList : listOfKeyRangeList) {
+                for (KeyRange keyRange : keyRangeList) {
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
+                    newDataKeyToMutationMap.put(dataKey, dataKeyToMutationMap.get(dataKey));
+                }
             }
-        }
-        for (VerificationResult.PhaseResult result : verificationPhaseResultList) {
-            verificationPhaseResult.add(result);
+            dataKeyToMutationMap.clear();
+            dataKeyToMutationMap = newDataKeyToMutationMap;
         }
     }
 
@@ -891,16 +1057,21 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         byte[] uuidValue = ServerCacheClient.generateId();
         UngroupedAggregateRegionObserver.MutationList currentMutationList =
                 new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
+        Put put = null;
         for (Mutation mutation : mutationList) {
-            Put put = (Put) mutation;
-            currentMutationList.add(mutation);
-            setMutationAttributes(put, uuidValue);
-            uuidValue = commitIfReady(uuidValue, currentMutationList);
-            Delete deleteMarkers = generateDeleteMarkers(put);
-            if (deleteMarkers != null) {
-                setMutationAttributes(deleteMarkers, uuidValue);
-                currentMutationList.add(deleteMarkers);
+            if (mutation instanceof Put) {
+                if (put != null) {
+                    // back to back put, i.e., no delete in between. we can commit the previous put
+                    uuidValue = commitIfReady(uuidValue, currentMutationList);
+                }
+                currentMutationList.add(mutation);
+                setMutationAttributes(mutation, uuidValue);
+                put = (Put)mutation;
+            } else {
+                currentMutationList.add(mutation);
+                setMutationAttributes(mutation, uuidValue);
                 uuidValue = commitIfReady(uuidValue, currentMutationList);
+                put = null;
             }
         }
         if (!currentMutationList.isEmpty()) {
@@ -910,12 +1081,12 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     }
 
     private void verifyAndOrRebuildIndex() throws IOException {
-        VerificationResult nextVerificationResult = new VerificationResult();
-        nextVerificationResult.scannedDataRowCount = mutations.size();
+        IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult();
+        nextVerificationResult.scannedDataRowCount = dataKeyToMutationMap.size();
         if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) {
             // For these options we start with rebuilding index rows
             rebuildIndexRows(mutations);
-            nextVerificationResult.rebuiltIndexRowCount = mutations.size();
+            nextVerificationResult.rebuiltIndexRowCount = dataKeyToMutationMap.size();
             isBeforeRebuilt = false;
         }
         if (verifyType == IndexTool.IndexVerifyType.NONE) {
@@ -923,77 +1094,350 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         }
         if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
                 verifyType == IndexTool.IndexVerifyType.ONLY) {
-            VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
+            IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
             // For these options we start with verifying index rows
             parallelizeIndexVerify(verificationPhaseResult);
             nextVerificationResult.before.add(verificationPhaseResult);
-            if (mutations.size() != verificationPhaseResult.getTotalCount()) {
-                throw new DoNotRetryIOException(
-                        "mutations.size() != verificationPhaseResult.getTotalCount() at the before phase " +
-                                nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
-            }
         }
         if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
             // For these options, we have identified the rows to be rebuilt and now need to rebuild them
             // At this point, dataKeyToDataPutMap includes mapping only for the rows to be rebuilt
             mutations.clear();
-            for (Map.Entry<byte[], Put> entry: dataKeyToDataPutMap.entrySet()) {
-                mutations.add(entry.getValue());
+
+            for (Map.Entry<byte[], Pair<Put, Delete>> entry: dataKeyToMutationMap.entrySet()) {
+                if (entry.getValue().getFirst() != null) {
+                    mutations.add(entry.getValue().getFirst());
+                }
+                if (entry.getValue().getSecond() != null) {
+                    mutations.add(entry.getValue().getSecond());
+                }
             }
             rebuildIndexRows(mutations);
-            nextVerificationResult.rebuiltIndexRowCount += mutations.size();
+            nextVerificationResult.rebuiltIndexRowCount += dataKeyToMutationMap.size();
             isBeforeRebuilt = false;
         }
 
         if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
             // We have rebuilt index row and now we need to verify them
-            indexKeyToDataPutMap.clear();
-            VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
+            IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
+            indexKeyToMutationMap.clear();
+            for (Map.Entry<byte[], Pair<Put, Delete>> entry: dataKeyToMutationMap.entrySet()) {
+                prepareIndexMutations(entry.getValue().getFirst(), entry.getValue().getSecond());
+            }
             parallelizeIndexVerify(verificationPhaseResult);
             nextVerificationResult.after.add(verificationPhaseResult);
-            if (mutations.size() != verificationPhaseResult.getTotalCount()) {
-                throw new DoNotRetryIOException(
-                        "mutations.size() != verificationPhaseResult.getTotalCount() at the after phase " +
-                                nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
-            }
         }
-        indexKeyToDataPutMap.clear();
         verificationResult.add(nextVerificationResult);
     }
 
+    private boolean isColumnIncluded(Cell cell) {
+        byte[] family = CellUtil.cloneFamily(cell);
+        if (!familyMap.containsKey(family)) {
+            return false;
+        }
+        NavigableSet<byte[]> set = familyMap.get(family);
+        if (set == null || set.isEmpty()) {
+            return true;
+        }
+        byte[] qualifier = CellUtil.cloneQualifier(cell);
+        return set.contains(qualifier);
+    }
+
+    public static long getTimestamp(Mutation m) {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                return cell.getTimestamp();
+            }
+        }
+        throw new IllegalStateException("No cell found");
+    }
+
+    /**
+     * This is to reorder the mutations in ascending order by the tuple of timestamp and mutation type where
+     * put comes before delete
+     */
+    public static final Comparator<Mutation> MUTATION_TS_COMPARATOR = new Comparator<Mutation>() {
+        @Override
+        public int compare(Mutation o1, Mutation o2) {
+            long ts1 = getTimestamp(o1);
+            long ts2 = getTimestamp(o2);
+            if (ts1 < ts2) {
+                return -1;
+            }
+            if (ts1 > ts2) {
+                return 1;
+            }
+            if (o1 instanceof Put && o2 instanceof Delete) {
+                return -1;
+            }
+            if (o1 instanceof Delete && o2 instanceof Put) {
+                return 1;
+            }
+            return 0;
+        }
+    };
+
+    public static List<Mutation> getMutationsWithSameTS(Put put, Delete del) {
+        List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(2);
+        if (put != null) {
+            mutationList.add(put);
+        }
+        if (del != null) {
+            mutationList.add(del);
+        }
+        // Group the cells within a mutation based on their timestamps and create a separate mutation for each group
+        mutationList = (List<Mutation>) IndexManagementUtil.flattenMutationsByTimestamp(mutationList);
+        // Reorder the mutations on the same row so that delete comes before put when they have the same timestamp
+        Collections.sort(mutationList, MUTATION_TS_COMPARATOR);
+        return mutationList;
+    }
+
+    private static Put prepareIndexPutForRebuid(IndexMaintainer indexMaintainer, ImmutableBytesPtr rowKeyPtr,
+                                                ValueGetter mergedRowVG, long ts)
+            throws IOException {
+        Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                mergedRowVG, rowKeyPtr, ts, null, null);
+        if (indexPut == null) {
+            // No covered column. Just prepare an index row with the empty column
+            byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG, rowKeyPtr,
+                    null, null, HConstants.LATEST_TIMESTAMP);
+            indexPut = new Put(indexRowKey);
+        } else {
+            removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                    indexMaintainer.getEmptyKeyValueQualifier());
+        }
+        indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
+        return indexPut;
+    }
+
+    public static void removeColumn(Put put, Cell deleteCell) {
+        byte[] family = CellUtil.cloneFamily(deleteCell);
+        List<Cell> cellList = put.getFamilyCellMap().get(family);
+        if (cellList == null) {
+            return;
+        }
+        Iterator<Cell> cellIterator = cellList.iterator();
+        while (cellIterator.hasNext()) {
+            Cell cell = cellIterator.next();
+            if (CellUtil.matchingQualifier(cell, deleteCell)) {
+                cellIterator.remove();
+                if (cellList.isEmpty()) {
+                    put.getFamilyCellMap().remove(family);
+                }
+                return;
+            }
+        }
+    }
+
+    public static void apply(Put destination, Put source) throws IOException {
+        for (List<Cell> cells : source.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                if (!destination.has(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell))) {
+                    destination.add(cell);
+                }
+            }
+        }
+    }
+
+    public static Put applyNew(Put destination, Put source) throws IOException {
+        Put next = new Put(destination);
+        apply(next, source);
+        return next;
+    }
+
+    private static void applyDeleteOnPut(Delete del, Put put) throws IOException {
+        for (List<Cell> cells : del.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                switch ((KeyValue.Type.codeToType(cell.getTypeByte()))) {
+                    case DeleteFamily:
+                        put.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+                        break;
+                    case DeleteColumn:
+                        removeColumn(put, cell);
+                        break;
+                    default:
+                        // We do not expect this can happen
+                        throw new DoNotRetryIOException("Single version delete marker in data mutation " +
+                                del);
+                }
+            }
+        }
+    }
+
+    /**
+     * Generate the index update for a data row from the mutation that are obtained by merging the previous data row
+     * state with the pending row mutation for index rebuild. This method is called only for global indexes.
+     * pendingMutations is a sorted list of data table mutations that are used to replay index table mutations.
+     * This list is sorted in ascending order by the tuple of row key, timestamp and mutation type where delete comes
+     * after put.
+     */
+    public static List<Mutation> prepareIndexMutationsForRebuild(IndexMaintainer indexMaintainer,
+                                                                 Put dataPut, Delete dataDel) throws IOException {
+        List<Mutation> dataMutations = getMutationsWithSameTS(dataPut, dataDel);
+        List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size());
+        // The row key ptr of the data table row for which we will build index rows here
+        ImmutableBytesPtr rowKeyPtr = (dataPut != null) ? new ImmutableBytesPtr(dataPut.getRow()) :
+                new ImmutableBytesPtr(dataDel.getRow());
+        // Start with empty data table row
+        Put currentDataRowState = null;
+        // The index row key corresponding to the current data row
+        byte[] indexRowKeyForCurrentDataRow = null;
+        int dataMutationListSize = dataMutations.size();
+        for (int i = 0; i < dataMutationListSize; i++) {
+            Mutation mutation = dataMutations.get(i);
+            long ts = getTimestamp(mutation);
+            if (mutation instanceof Put) {
+                if (i < dataMutationListSize - 1) {
+                    // If there is a pair of a put and delete mutation with the same timestamp then apply the delete
+                    // mutation on the put. If the delete mutation deletes all the cells in the put mutation, the family
+                    // cell map of the put mutation becomes empty and the mutation is ignored later
+                    Mutation nextMutation = dataMutations.get(i + 1);
+                    if (getTimestamp(nextMutation) == ts && nextMutation instanceof Delete) {
+                        applyDeleteOnPut((Delete) nextMutation, (Put) mutation);
+                        // Apply the delete mutation on the current data row state too
+                        if (currentDataRowState != null) {
+                            applyDeleteOnPut((Delete) nextMutation, currentDataRowState);
+                            if (currentDataRowState.getFamilyCellMap().size() == 0) {
+                                currentDataRowState = null;
+                                indexRowKeyForCurrentDataRow = null;
+                            }
+                        }
+                        // This increment is to skip the next (delete) mutation as we have already processed it
+                        i++;
+                    }
+                }
+                if (mutation.getFamilyCellMap().size() != 0) {
+                    // Add this put on top of the current data row state to get the next data row state
+                    Put nextDataRow = (currentDataRowState == null) ? new Put((Put)mutation) : applyNew((Put)mutation, currentDataRowState);
+                    ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRow);
+                    Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
+                    indexMutations.add(indexPut);
+                    // Delete the current index row if the new index key is different than the current one
+                    if (currentDataRowState != null) {
+                        if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
+                            Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                                    IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                            indexMutations.add(del);
+                        }
+                    }
+                    // For the next iteration of the for loop
+                    currentDataRowState = nextDataRow;
+                    indexRowKeyForCurrentDataRow = indexPut.getRow();
+                } else {
+                    if (currentDataRowState != null) {
+                        Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                                IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                        indexMutations.add(del);
+                        // For the next iteration of the for loop
+                        currentDataRowState = null;
+                        indexRowKeyForCurrentDataRow = null;
+                    }
+                }
+            } else { // mutation instanceof Delete
+                if (currentDataRowState != null) {
+                    // We apply delete mutations only on the current data row state to obtain the next data row state.
+                    // For the index table, we are only interested in if the index row should be deleted or not.
+                    // There is no need to apply column deletes to index rows since index rows are always full rows
+                    // and all the cells in an index row have the same timestamp value. Because of this index rows
+                    // versions do not share cells.
+                    applyDeleteOnPut((Delete) mutation, currentDataRowState);
+                    Put nextDataRowState = currentDataRowState;
+                    if (nextDataRowState.getFamilyCellMap().size() == 0) {
+                        Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                                IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                        indexMutations.add(del);
+                        currentDataRowState = null;
+                        indexRowKeyForCurrentDataRow = null;
+                    } else {
+                        ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState);
+                        Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
+                        indexMutations.add(indexPut);
+                        // Delete the current index row if the new index key is different than the current one
+                        if (indexRowKeyForCurrentDataRow != null) {
+                            if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
+                                Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                                        IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                                indexMutations.add(del);
+                            }
+                        }
+                        indexRowKeyForCurrentDataRow = indexPut.getRow();
+                    }
+                }
+            }
+        }
+        return indexMutations;
+    }
+
+    @VisibleForTesting
+    public int prepareIndexMutations(Put put, Delete del) throws IOException {
+        List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del);
+        for (Mutation mutation : indexMutations) {
+            byte[] indexRowKey = mutation.getRow();
+            List<Mutation> mutationList = indexKeyToMutationMap.get(indexRowKey);
+            if (mutationList == null) {
+                mutationList = new ArrayList<>();
+                mutationList.add(mutation);
+                indexKeyToMutationMap.put(indexRowKey, mutationList);
+            } else {
+                mutationList.add(mutation);
+            }
+        }
+        return 0;
+    }
+
     @Override
     public boolean next(List<Cell> results) throws IOException {
+        if (indexRowKey != null &&
+                singleRowRebuildReturnCode == GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue()) {
+            byte[] rowCountBytes =
+                    PLong.INSTANCE.toBytes(Long.valueOf(singleRowRebuildReturnCode));
+            final Cell aggKeyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+                    SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+            results.add(aggKeyValue);
+            return false;
+        }
         Cell lastCell = null;
         int rowCount = 0;
         region.startRegionOperation();
         try {
-            // Partial rebuilds by MetadataRegionObserver use raw scan. Inline verification is not supported for them
-            boolean partialRebuild = scan.isRaw();
             byte[] uuidValue = ServerCacheClient.generateId();
             synchronized (innerScanner) {
                 do {
                     List<Cell> row = new ArrayList<Cell>();
                     hasMore = innerScanner.nextRaw(row);
                     if (!row.isEmpty()) {
-                        lastCell = row.get(0);
+                        lastCell = row.get(0); // lastCell is any cell from the last visited row
                         Put put = null;
                         Delete del = null;
                         for (Cell cell : row) {
                             if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                                if (!partialRebuild && familyMap != null && !isColumnIncluded(cell)) {
+                                    continue;
+                                }
                                 if (put == null) {
                                     put = new Put(CellUtil.cloneRow(cell));
-                                    mutations.add(put);
                                 }
                                 put.add(cell);
                             } else {
                                 if (del == null) {
                                     del = new Delete(CellUtil.cloneRow(cell));
-                                    mutations.add(del);
                                 }
                                 del.addDeleteMarker(cell);
                             }
                         }
-                        if (partialRebuild) {
+                        if (put == null && del == null) {
+                            continue;
+                        }
+                        // Always add the put first and then delete for a given row. This simplifies the logic in
+                        // IndexRegionObserver
+                        if (put != null) {
+                            mutations.add(put);
+                        }
+                        if (del != null) {
+                            mutations.add(del);
+                        }
+                        if (!verify) {
                             if (put != null) {
                                 setMutationAttributes(put, uuidValue);
                             }
@@ -1001,35 +1445,18 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                                 setMutationAttributes(del, uuidValue);
                             }
                             uuidValue = commitIfReady(uuidValue, mutations);
-                        }
-                        if (indexRowKey != null) {
-                            if (put != null) {
-                                setMutationAttributes(put, uuidValue);
-                            }
-                            Delete deleteMarkers = generateDeleteMarkers(put);
-                            if (deleteMarkers != null) {
-                                setMutationAttributes(deleteMarkers, uuidValue);
-                                mutations.add(deleteMarkers);
-                                uuidValue = commitIfReady(uuidValue, mutations);
-                            }
-                            // GlobalIndexChecker passed the index row key. This is to build a single index row.
-                            // Check if the data table row we have just scanned matches with the index row key.
-                            // If not, there is no need to build the index row from this data table row,
-                            // and just return zero row count.
-                            if (checkIndexRow(indexRowKey, put)) {
-                                rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
-                            } else {
-                                rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
-                            }
-                            break;
+                        } else {
+                            byte[] dataKey = (put != null) ? put.getRow() : del.getRow();
+                            prepareIndexMutations(put, del);
+                            dataKeyToMutationMap.put(dataKey, new Pair<Put, Delete>(put, del));
                         }
                         rowCount++;
                     }
                 } while (hasMore && rowCount < pageSizeInRows);
-                if (!partialRebuild && indexRowKey == null) {
-                    verifyAndOrRebuildIndex();
-                } else {
-                    if (!mutations.isEmpty()) {
+                if (!mutations.isEmpty()) {
+                    if (verify) {
+                        verifyAndOrRebuildIndex();
+                    } else {
                         ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
                         ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
                     }
@@ -1042,10 +1469,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             region.closeRegionOperation();
             mutations.clear();
             if (verify) {
-              indexKeyToDataPutMap.clear();
-              dataKeyToDataPutMap.clear();
+              dataKeyToMutationMap.clear();
+              indexKeyToMutationMap.clear();
             }
         }
+        if (indexRowKey != null) {
+            rowCount = singleRowRebuildReturnCode;
+        }
         byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
         final Cell aggKeyValue;
         if (lastCell == null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
new file mode 100644
index 0000000..ed92fad
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES;
+
+public class IndexToolVerificationResult {
+    public static class PhaseResult {
+        long validIndexRowCount = 0;
+        long expiredIndexRowCount = 0;
+        long missingIndexRowCount = 0;
+        long invalidIndexRowCount = 0;
+
+        public void add(PhaseResult phaseResult) {
+            validIndexRowCount += phaseResult.validIndexRowCount;
+            expiredIndexRowCount += phaseResult.expiredIndexRowCount;
+            missingIndexRowCount += phaseResult.missingIndexRowCount;
+            invalidIndexRowCount += phaseResult.invalidIndexRowCount;
+        }
+
+        public PhaseResult(){}
+
+        public PhaseResult(long validIndexRowCount, long expiredIndexRowCount,
+                long missingIndexRowCount, long invalidIndexRowCount) {
+            this.validIndexRowCount = validIndexRowCount;
+            this.expiredIndexRowCount = expiredIndexRowCount;
+            this.missingIndexRowCount = missingIndexRowCount;
+            this.invalidIndexRowCount = invalidIndexRowCount;
+        }
+
+        public long getTotalCount() {
+            return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount;
+        }
+
+        @Override
+        public String toString() {
+            return "PhaseResult{" +
+                    "validIndexRowCount=" + validIndexRowCount +
+                    ", expiredIndexRowCount=" + expiredIndexRowCount +
+                    ", missingIndexRowCount=" + missingIndexRowCount +
+                    ", invalidIndexRowCount=" + invalidIndexRowCount +
+                    '}';
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null) {
+                return false;
+            }
+            if (!(o instanceof PhaseResult)) {
+                return false;
+            }
+            PhaseResult pr = (PhaseResult) o;
+            return this.expiredIndexRowCount == pr.expiredIndexRowCount
+                    && this.validIndexRowCount == pr.validIndexRowCount
+                    && this.invalidIndexRowCount == pr.invalidIndexRowCount
+                    && this.missingIndexRowCount == pr.missingIndexRowCount;
+        }
+
+        @Override
+        public int hashCode() {
+            long result = 17;
+            result = 31 * result + expiredIndexRowCount;
+            result = 31 * result + validIndexRowCount;
+            result = 31 * result + missingIndexRowCount;
+            result = 31 * result + invalidIndexRowCount;
+            return (int)result;
+        }
+    }
+
+    long scannedDataRowCount = 0;
+    long rebuiltIndexRowCount = 0;
+    PhaseResult before = new PhaseResult();
+    PhaseResult after = new PhaseResult();
+
+    @Override
+    public String toString() {
+        return "VerificationResult{" +
+                "scannedDataRowCount=" + scannedDataRowCount +
+                ", rebuiltIndexRowCount=" + rebuiltIndexRowCount +
+                ", before=" + before +
+                ", after=" + after +
+                '}';
+    }
+
+    public long getScannedDataRowCount() {
+        return scannedDataRowCount;
+    }
+
+    public long getRebuiltIndexRowCount() {
+        return rebuiltIndexRowCount;
+    }
+
+    public long getBeforeRebuildValidIndexRowCount() {
+        return before.validIndexRowCount;
+    }
+
+    public long getBeforeRebuildExpiredIndexRowCount() {
+        return before.expiredIndexRowCount;
+    }
+
+    public long getBeforeRebuildInvalidIndexRowCount() {
+        return before.invalidIndexRowCount;
+    }
+
+    public long getBeforeRebuildMissingIndexRowCount() {
+        return before.missingIndexRowCount;
+    }
+
+    public long getAfterRebuildValidIndexRowCount() {
+        return after.validIndexRowCount;
+    }
+
+    public long getAfterRebuildExpiredIndexRowCount() {
+        return after.expiredIndexRowCount;
+    }
+
+    public long getAfterRebuildInvalidIndexRowCount() {
+        return after.invalidIndexRowCount;
+    }
+
+    public long getAfterRebuildMissingIndexRowCount() {
+        return after.missingIndexRowCount;
+    }
+
+    private void addScannedDataRowCount(long count) {
+        this.scannedDataRowCount += count;
+    }
+
+    private void addRebuiltIndexRowCount(long count) {
+        this.rebuiltIndexRowCount += count;
+    }
+
+    private void addBeforeRebuildValidIndexRowCount(long count) {
+        before.validIndexRowCount += count;
+    }
+
+    private void addBeforeRebuildExpiredIndexRowCount(long count) {
+        before.expiredIndexRowCount += count;
+    }
+
+    private void addBeforeRebuildMissingIndexRowCount(long count) {
+        before.missingIndexRowCount += count;
+    }
+
+    private void addBeforeRebuildInvalidIndexRowCount(long count) {
+        before.invalidIndexRowCount += count;
+    }
+
+    private void addAfterRebuildValidIndexRowCount(long count) {
+        after.validIndexRowCount += count;
+    }
+
+    private void addAfterRebuildExpiredIndexRowCount(long count) {
+        after.expiredIndexRowCount += count;
+    }
+
+    private void addAfterRebuildMissingIndexRowCount(long count) {
+        after.missingIndexRowCount += count;
+    }
+
+    private void addAfterRebuildInvalidIndexRowCount(long count) {
+        after.invalidIndexRowCount += count;
+    }
+
+    private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
+        if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
+                AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES.length) == 0) {
+            return true;
+        }
+        return false;
+    }
+
+    private long getValue(Cell cell) {
+        return Long.parseLong(Bytes.toString(cell.getValueArray(),
+                cell.getValueOffset(), cell.getValueLength()));
+    }
+
+    private void update(Cell cell) {
+        if (CellUtil
+                .matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) {
+            addScannedDataRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES)) {
+            addRebuiltIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
+            addBeforeRebuildValidIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
+            addBeforeRebuildExpiredIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
+            addBeforeRebuildMissingIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
+            addBeforeRebuildInvalidIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
+            addAfterRebuildValidIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
+            addAfterRebuildExpiredIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
+            addAfterRebuildMissingIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
+            addAfterRebuildInvalidIndexRowCount(getValue(cell));
+        }
+    }
+
+    public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
+        // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
+        // Search for the place where the trailing 0xFFs start
+        int offset = rowKeyPrefix.length;
+        while (offset > 0) {
+            if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
+                break;
+            }
+            offset--;
+        }
+        if (offset == 0) {
+            // We got an 0xFFFF... (only FFs) stopRow value which is
+            // the last possible prefix before the end of the table.
+            // So set it to stop at the 'end of the table'
+            return HConstants.EMPTY_END_ROW;
+        }
+        // Copy the right length of the original
+        byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
+        // And increment the last one
+        newStopRow[newStopRow.length - 1]++;
+        return newStopRow;
+    }
+
+    public static IndexToolVerificationResult getVerificationResult(Table hTable, long ts)
+            throws IOException {
+        IndexToolVerificationResult verificationResult = new IndexToolVerificationResult();
+        byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
+        byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey);
+        Scan scan = new Scan();
+        scan.setStartRow(startRowKey);
+        scan.setStopRow(stopRowKey);
+        ResultScanner scanner = hTable.getScanner(scan);
+        for (Result result = scanner.next(); result != null; result = scanner.next()) {
+            for (Cell cell : result.rawCells()) {
+                verificationResult.update(cell);
+            }
+        }
+        return verificationResult;
+    }
+
+    public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
+        if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
+            return false;
+        } else if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+            if (before.invalidIndexRowCount + before.missingIndexRowCount > 0) {
+                return true;
+            }
+        } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
+            if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public void add(IndexToolVerificationResult verificationResult) {
+        scannedDataRowCount += verificationResult.scannedDataRowCount;
+        rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount;
+        before.add(verificationResult.before);
+        after.add(verificationResult.after);
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 566fb59..0180368 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1112,9 +1112,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
                                          final RegionCoprocessorEnvironment env) throws IOException {
-
-        RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
-        return scanner;
+        if (!scan.isRaw()) {
+            Scan rawScan = new Scan(scan);
+            rawScan.setRaw(true);
+            rawScan.setMaxVersions();
+            rawScan.getFamilyMap().clear();
+            rawScan.setFilter(null);
+            for (byte[] family : scan.getFamilyMap().keySet()) {
+                rawScan.addFamily(family);
+            }
+            innerScanner.close();
+            RegionScanner scanner = region.getScanner(rawScan);
+            return new IndexRebuildRegionScanner(scanner, region, scan, env, this);
+        }
+        return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
     }
     
     private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index c4ae93d..7bfefb2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -39,13 +40,16 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -55,36 +59,49 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
+import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.LockManager.RowLock;
 import org.apache.phoenix.hbase.index.builder.FatalIndexBuildingFailureException;
 import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
 import org.apache.phoenix.hbase.index.builder.IndexBuilder;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
 
-import com.google.common.collect.Lists;
+import java.util.Set;
+
+import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
+import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.prepareIndexMutationsForRebuild;
+import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeColumn;
 
 /**
  * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
@@ -159,14 +176,19 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
       private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
       private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
       private boolean rebuild;
+      // The current and next states of the data rows corresponding to the pending mutations
+      private HashMap<ImmutableBytesPtr, Pair<Put, Put>> dataRowStates;
+      // Data table pending mutations
+      private Map<ImmutableBytesPtr, MultiMutation> multiMutationMap;
+
       private BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
       }
   }
-  
+
   private ThreadLocal<BatchMutateContext> batchMutateContext =
           new ThreadLocal<BatchMutateContext>();
-  
+
   /** Configuration key for the {@link IndexBuilder} to use */
   public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
 
@@ -176,17 +198,11 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
    */
   public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion";
 
-  private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hadoop.hbase.index.recovery.failurepolicy";
-
   public static final String INDEX_LAZY_POST_BATCH_WRITE = "org.apache.hadoop.hbase.index.lazy.post_batch.write";
   private static final boolean INDEX_LAZY_POST_BATCH_WRITE_DEFAULT = false;
 
   private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.post.batch.mutate.threshold";
   private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000;
-  private static final String INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.batch.mutate.threshold";
-  private static final long INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT = 3_000;
-  private static final String INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.open.threshold";
-  private static final long INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT = 3_000;
   private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.increment";
   private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 3_000;
 
@@ -349,13 +365,9 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
 
   public static long getMaxTimestamp(Mutation m) {
       long maxTs = 0;
-      long ts = 0;
-      Iterator iterator = m.getFamilyCellMap().entrySet().iterator();
-      while (iterator.hasNext()) {
-          Map.Entry<byte[], List<Cell>> entry = (Map.Entry) iterator.next();
-          Iterator<Cell> cellIterator = entry.getValue().iterator();
-          while (cellIterator.hasNext()) {
-              Cell cell = cellIterator.next();
+      long ts;
+      for (List<Cell> cells : m.getFamilyCellMap().values()) {
+          for (Cell cell : cells) {
               ts = cell.getTimestamp();
               if (ts > maxTs) {
                   maxTs = ts;
@@ -409,316 +421,550 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
       }
   }
 
-  private Collection<? extends Mutation> groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
-                                                        long now, ReplayWrite replayWrite) throws IOException {
-      Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
-      boolean copyMutations = false;
-      for (int i = 0; i < miniBatchOp.size(); i++) {
-          if (miniBatchOp.getOperationStatus(i) == IGNORE) {
-              continue;
-          }
-          Mutation m = miniBatchOp.getOperation(i);
-          if (this.builder.isEnabled(m)) {
-              // Track whether or not we need to
-              ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-              if (mutationsMap.containsKey(row)) {
-                  copyMutations = true;
-              } else {
-                  mutationsMap.put(row, null);
-              }
-          }
-      }
-      // early exit if it turns out we don't have any edits
-      if (mutationsMap.isEmpty()) {
-          return null;
-      }
-      // If we're copying the mutations
-      Collection<Mutation> originalMutations;
-      Collection<? extends Mutation> mutations;
-      if (copyMutations) {
-          originalMutations = null;
-          mutations = mutationsMap.values();
-      } else {
-          originalMutations = Lists.newArrayListWithExpectedSize(mutationsMap.size());
-          mutations = originalMutations;
-      }
+    private Collection<? extends Mutation> groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                                          BatchMutateContext context) throws IOException {
+        context.multiMutationMap = new HashMap<>();
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            Mutation m = miniBatchOp.getOperation(i);
+            // skip this mutation if we aren't enabling indexing
+            // unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
+            // should be indexed, which means we need to expose another method on the builder. Such is the
+            // way optimization go though.
+            if (miniBatchOp.getOperationStatus(i) != IGNORE && this.builder.isEnabled(m)) {
+                ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+                MultiMutation stored = context.multiMutationMap.get(row);
+                if (stored == null) {
+                    // we haven't seen this row before, so add it
+                    stored = new MultiMutation(row);
+                    context.multiMutationMap.put(row, stored);
+                }
+                stored.addAll(m);
+            }
+        }
+        return context.multiMutationMap.values();
+    }
 
-      boolean resetTimeStamp = replayWrite == null;
+    public static void setTimestamp(Mutation m, long ts) throws IOException {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                CellUtil.setTimestamp(cell, ts);
+            }
+        }
+    }
 
-      for (int i = 0; i < miniBatchOp.size(); i++) {
-          Mutation m = miniBatchOp.getOperation(i);
-          // skip this mutation if we aren't enabling indexing
-          // unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
-          // should be indexed, which means we need to expose another method on the builder. Such is the
-          // way optimization go though.
-          if (miniBatchOp.getOperationStatus(i) != IGNORE && this.builder.isEnabled(m)) {
-              if (resetTimeStamp) {
-                  // Unless we're replaying edits to rebuild the index, we update the time stamp
-                  // of the data table to prevent overlapping time stamps (which prevents index
-                  // inconsistencies as this case isn't handled correctly currently).
-                  for (List<Cell> cells : m.getFamilyCellMap().values()) {
-                      for (Cell cell : cells) {
-                          CellUtil.setTimestamp(cell, now);
-                      }
-                  }
-              }
-              // No need to write the table mutations when we're rebuilding
-              // the index as they're already written and just being replayed.
-              if (replayWrite == ReplayWrite.INDEX_ONLY
-                      || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY) {
-                  miniBatchOp.setOperationStatus(i, NOWRITE);
-              }
+    /**
+     * This method applies pending delete mutations on the next row states
+     */
+    private void applyPendingDeleteMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                             BatchMutateContext context) throws IOException {
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+                continue;
+            }
+            Mutation m = miniBatchOp.getOperation(i);
+            if (!this.builder.isEnabled(m)) {
+                continue;
+            }
+            if (!(m instanceof Delete)) {
+                continue;
+            }
+            ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+            Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+            if (dataRowState == null) {
+                dataRowState = new Pair<Put, Put>(null, null);
+                context.dataRowStates.put(rowKeyPtr, dataRowState);
+            }
+            Put nextDataRowState = dataRowState.getSecond();
+            if (nextDataRowState == null) {
+                if (dataRowState.getFirst() == null) {
+                    // This is a delete row mutation on a non-existing row. There is no need to apply this mutation
+                    // on the data table
+                    miniBatchOp.setOperationStatus(i, NOWRITE);
+                }
+                continue;
+            }
+            for (List<Cell> cells : m.getFamilyCellMap().values()) {
+                for (Cell cell : cells) {
+                    switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
+                        case DeleteFamily:
+                        case DeleteFamilyVersion:
+                            nextDataRowState.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+                            break;
+                        case DeleteColumn:
+                        case Delete:
+                            removeColumn(nextDataRowState, cell);
+                    }
+                }
+            }
+            if (nextDataRowState != null && nextDataRowState.getFamilyCellMap().size() == 0) {
+                dataRowState.setSecond(null);
+            }
+        }
+    }
 
-              // Only copy mutations if we found duplicate rows
-              // which only occurs when we're partially rebuilding
-              // the index (since we'll potentially have both a
-              // Put and a Delete mutation for the same row).
-              if (copyMutations) {
-                  // Add the mutation to the batch set
-
-                  ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-                  MultiMutation stored = mutationsMap.get(row);
-                  // we haven't seen this row before, so add it
-                  if (stored == null) {
-                      stored = new MultiMutation(row);
-                      mutationsMap.put(row, stored);
-                  }
-                  stored.addAll(m);
-              } else {
-                  originalMutations.add(m);
-              }
-          }
-      }
+    /**
+     * This method applies the pending put mutations on the the next row states.
+     * Before this method is called, the next row states is set to current row states.
+     */
+    private void applyPendingPutMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                          BatchMutateContext context, long now) throws IOException {
+        for (Integer i = 0; i < miniBatchOp.size(); i++) {
+            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+                continue;
+            }
+            Mutation m = miniBatchOp.getOperation(i);
+            // skip this mutation if we aren't enabling indexing
+            if (!this.builder.isEnabled(m)) {
+                continue;
+            }
+            // Unless we're replaying edits to rebuild the index, we update the time stamp
+            // of the data table to prevent overlapping time stamps (which prevents index
+            // inconsistencies as this case isn't handled correctly currently).
+            setTimestamp(m, now);
+            if (m instanceof Put) {
+                ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+                Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+                if (dataRowState == null) {
+                    dataRowState = new Pair<Put, Put>(null, null);
+                    context.dataRowStates.put(rowKeyPtr, dataRowState);
+                }
+                Put nextDataRowState = dataRowState.getSecond();
+                dataRowState.setSecond((nextDataRowState != null) ? applyNew((Put) m, nextDataRowState) : new Put((Put) m));
+            }
+        }
+    }
 
-      if (copyMutations || replayWrite != null) {
-          mutations = IndexManagementUtil.flattenMutationsByTimestamp(mutations);
-      }
-      return mutations;
-  }
+    /**
+     * * Prepares data row current and next row states
+     */
+    private void prepareDataRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
+                                      MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                      BatchMutateContext context,
+                                      long now) throws IOException {
+        if (context.rowsToLock.size() == 0) {
+            return;
+        }
+        // Retrieve the current row states from the data table
+        getCurrentRowStates(c, context);
+        applyPendingPutMutations(miniBatchOp, context, now);
+        applyPendingDeleteMutations(miniBatchOp, context);
+    }
 
-  public static void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) {
-      List<Cell> cellList = m.getFamilyCellMap().get(emptyCF);
-      if (cellList == null) {
-          return;
-      }
-      Iterator<Cell> cellIterator = cellList.iterator();
-      while (cellIterator.hasNext()) {
-          Cell cell = cellIterator.next();
-          if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
-                  emptyCQ, 0, emptyCQ.length) == 0) {
-              cellIterator.remove();
-              return;
-          }
-      }
-  }
+    public static void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) {
+        List<Cell> cellList = m.getFamilyCellMap().get(emptyCF);
+        if (cellList == null) {
+            return;
+        }
+        Iterator<Cell> cellIterator = cellList.iterator();
+        while (cellIterator.hasNext()) {
+            Cell cell = cellIterator.next();
+            if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                    emptyCQ, 0, emptyCQ.length) == 0) {
+                cellIterator.remove();
+                return;
+            }
+        }
+    }
 
-  private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c,
-                                       MiniBatchOperationInProgress<Mutation> miniBatchOp,
-                                       ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates) {
-      byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
-      HTableInterfaceReference hTableInterfaceReference =
-                          new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
-      List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference);
-      if (localIndexUpdates == null || localIndexUpdates.isEmpty()) {
-          return;
-      }
-      List<Mutation> localUpdates = new ArrayList<Mutation>();
-      Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator();
-      while (indexUpdatesItr.hasNext()) {
-          Pair<Mutation, byte[]> next = indexUpdatesItr.next();
-          localUpdates.add(next.getFirst());
-      }
-      if (!localUpdates.isEmpty()) {
-          miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()]));
-      }
-  }
+    /**
+     * The index update generation for local indexes uses the existing index update generation code (i.e.,
+     * the {@link IndexBuilder} implementation).
+     */
+    private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c,
+                                         MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                         Collection<? extends Mutation> pendingMutations,
+                                         PhoenixIndexMetaData indexMetaData) throws Throwable {
+        ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
+        this.builder.getIndexUpdates(indexUpdates, miniBatchOp, pendingMutations, indexMetaData);
+        byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
+        HTableInterfaceReference hTableInterfaceReference =
+                new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
+        List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference);
+        if (localIndexUpdates == null || localIndexUpdates.isEmpty()) {
+            return;
+        }
+        List<Mutation> localUpdates = new ArrayList<Mutation>();
+        Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator();
+        while (indexUpdatesItr.hasNext()) {
+            Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+            localUpdates.add(next.getFirst());
+        }
+        if (!localUpdates.isEmpty()) {
+            miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()]));
+        }
+    }
+    /**
+     * Retrieve the the last committed data row state. This method is called only for regular data mutations since for
+     * rebuild (i.e., index replay) mutations include all row versions.
+     */
+
+    private void getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
+                                     BatchMutateContext context) throws IOException {
+        Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
+        for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get()));
+        }
+        Scan scan = new Scan();
+        ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys));
+        scanRanges.initializeScan(scan);
+        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+        scan.setFilter(skipScanFilter);
+        context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size());
+        try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
+            boolean more = true;
+            while(more) {
+                List<Cell> cells = new ArrayList<Cell>();
+                more = scanner.next(cells);
+                if (cells.isEmpty()) {
+                    continue;
+                }
+                byte[] rowKey = CellUtil.cloneRow(cells.get(0));
+                Put put = new Put(rowKey);
+                for (Cell cell : cells) {
+                    put.add(cell);
+                }
+                context.dataRowStates.put(new ImmutableBytesPtr(rowKey), new Pair<Put, Put>(put, new Put(put)));
+            }
+        }
+    }
 
-  private void prepareIndexMutations(
-          ObserverContext<RegionCoprocessorEnvironment> c,
-          MiniBatchOperationInProgress<Mutation> miniBatchOp,
-          BatchMutateContext context,
-          Collection<? extends Mutation> mutations,
-          long now,
-          PhoenixIndexMetaData indexMetaData) throws Throwable {
-      List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
-      // get the current span, or just use a null-span to avoid a bunch of if statements
-      try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
-          Span current = scope.getSpan();
-          if (current == null) {
-              current = NullSpan.INSTANCE;
-          }
-          // get the index updates for all elements in this batch
-          context.indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
-          this.builder.getIndexUpdates(context.indexUpdates, miniBatchOp, mutations, indexMetaData);
-          current.addTimelineAnnotation("Built index updates, doing preStep");
-          handleLocalIndexUpdates(c, miniBatchOp, context.indexUpdates);
-          context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
-          int updateCount = 0;
-          for (IndexMaintainer indexMaintainer : maintainers) {
-              updateCount++;
-              byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
-              byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
-              HTableInterfaceReference hTableInterfaceReference =
-                      new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
-              List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
-              for (Pair<Mutation, byte[]> update : updates) {
-                  // add the VERIFIED cell, which is the empty cell
-                  Mutation m = update.getFirst();
-                  if (context.rebuild) {
-                      if (m instanceof Put) {
-                          long ts = getMaxTimestamp(m);
-                          // Remove the empty column prepared by Index codec as we need to change its value
-                          removeEmptyColumn(m, emptyCF, emptyCQ);
-                          ((Put) m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
-                      }
-                      context.preIndexUpdates.put(hTableInterfaceReference, m);
-                  } else {
-                      if (m instanceof Put) {
-                          // Remove the empty column prepared by Index codec as we need to change its value
-                          removeEmptyColumn(m, emptyCF, emptyCQ);
-                          // Set the status of the index row to "unverified"
-                          ((Put) m).addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
-                          // This will be done before the data table row is updated (i.e., in the first write phase)
-                          context.preIndexUpdates.put(hTableInterfaceReference, m);
-                      }
-                      else {
-                          // Set the status of the index row to "unverified"
-                          Put unverifiedPut = new Put(m.getRow());
-                          unverifiedPut.addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
-                          // This will be done before the data table row is updated (i.e., in the first write phase)
-                          context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
-                      }
-                  }
-              }
-          }
-          TracingUtils.addAnnotation(current, "index update count", updateCount);
-      }
-  }
+    /**
+     * Generate the index update for a data row from the mutation that are obtained by merging the previous data row
+     * state with the pending row mutation.
+     */
+    private void prepareIndexMutations(BatchMutateContext context, List<IndexMaintainer> maintainers, long ts)
+            throws IOException {
+        List<Pair<IndexMaintainer, HTableInterfaceReference>> indexTables = new ArrayList<>(maintainers.size());
+        for (IndexMaintainer indexMaintainer : maintainers) {
+            if (indexMaintainer.isLocalIndex()) {
+                continue;
+            }
+            HTableInterfaceReference hTableInterfaceReference =
+                    new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+            indexTables.add(new Pair<>(indexMaintainer, hTableInterfaceReference));
+        }
+        for (Map.Entry<ImmutableBytesPtr, Pair<Put, Put>> entry : context.dataRowStates.entrySet()) {
+            ImmutableBytesPtr rowKeyPtr = entry.getKey();
+            Pair<Put, Put> dataRowState =  entry.getValue();
+            Put currentDataRowState = dataRowState.getFirst();
+            Put nextDataRowState = dataRowState.getSecond();
+            if (currentDataRowState == null && nextDataRowState == null) {
+                continue;
+            }
+            for (Pair<IndexMaintainer, HTableInterfaceReference> pair : indexTables) {
+                IndexMaintainer indexMaintainer = pair.getFirst();
+                HTableInterfaceReference hTableInterfaceReference = pair.getSecond();
+                if (nextDataRowState != null) {
+                    ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState);
+                    Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                            nextDataRowVG, rowKeyPtr, ts, null, null);
+                    if (indexPut == null) {
+                        // No covered column. Just prepare an index row with the empty column
+                        byte[] indexRowKey = indexMaintainer.buildRowKey(nextDataRowVG, rowKeyPtr,
+                                null, null, HConstants.LATEST_TIMESTAMP);
+                        indexPut = new Put(indexRowKey);
+                    } else {
+                        removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                                indexMaintainer.getEmptyKeyValueQualifier());
+                    }
+                    indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                            indexMaintainer.getEmptyKeyValueQualifier(), ts, UNVERIFIED_BYTES);
+                    context.indexUpdates.put(hTableInterfaceReference,
+                            new Pair<Mutation, byte[]>(indexPut, rowKeyPtr.get()));
+                    // Delete the current index row if the new index key is different than the current one
+                    if (currentDataRowState != null) {
+                        ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState);
+                        byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
+                                null, null, HConstants.LATEST_TIMESTAMP);
+                        if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
+                            Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                                    IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                            context.indexUpdates.put(hTableInterfaceReference,
+                                    new Pair<Mutation, byte[]>(del, rowKeyPtr.get()));
+                        }
+                    }
+                } else if (currentDataRowState != null) {
+                    ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState);
+                    byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
+                            null, null, HConstants.LATEST_TIMESTAMP);
+                    Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                            IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                    context.indexUpdates.put(hTableInterfaceReference,
+                            new Pair<Mutation, byte[]>(del, rowKeyPtr.get()));
+                }
+            }
+        }
+    }
 
-  protected PhoenixIndexMetaData getPhoenixIndexMetaData(
-          ObserverContext<RegionCoprocessorEnvironment> observerContext,
-          MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-      IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
-      if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
-          throw new DoNotRetryIOException(
-                  "preBatchMutateWithExceptions: indexMetaData is not an instance of "+PhoenixIndexMetaData.class.getName() +
-                          ", current table is:" +
-                          observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
-      }
-      return (PhoenixIndexMetaData)indexMetaData;
-  }
+    /**
+     * This method prepares unverified index mutations which are applied to index tables before the data table is
+     * updated. In the three phase update approach, in phase 1, the status of existing index rows is set to "unverified"
+     * (these rows will be deleted from the index table in phase 3), and/or new put mutations are added with the
+     * unverified status. In phase 2, data table mutations are applied. In phase 3, the status for an index table row is
+     * either set to "verified" or the row is deleted.
+     */
+    private void preparePreIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+                                          MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                          BatchMutateContext context,
+                                          Collection<? extends Mutation> pendingMutations,
+                                          long now,
+                                          PhoenixIndexMetaData indexMetaData) throws Throwable {
+        List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
+        // get the current span, or just use a null-span to avoid a bunch of if statements
+        try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
+            Span current = scope.getSpan();
+            if (current == null) {
+                current = NullSpan.INSTANCE;
+            }
+            current.addTimelineAnnotation("Built index updates, doing preStep");
+            // Handle local index updates
+            for (IndexMaintainer indexMaintainer : maintainers) {
+                if (indexMaintainer.isLocalIndex()) {
+                    handleLocalIndexUpdates(c, miniBatchOp, pendingMutations, indexMetaData);
+                    break;
+                }
+            }
+            // The rest of this method is for handling global index updates
+            context.indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
+            prepareIndexMutations(context, maintainers, now);
+
+            context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+            int updateCount = 0;
+            for (IndexMaintainer indexMaintainer : maintainers) {
+                updateCount++;
+                byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+                byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+                HTableInterfaceReference hTableInterfaceReference =
+                        new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+                List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
+                for (Pair<Mutation, byte[]> update : updates) {
+                    Mutation m = update.getFirst();
+                    if (m instanceof Put) {
+                        // This will be done before the data table row is updated (i.e., in the first write phase)
+                        context.preIndexUpdates.put(hTableInterfaceReference, m);
+                    } else {
+                        // Set the status of the index row to "unverified"
+                        Put unverifiedPut = new Put(m.getRow());
+                        unverifiedPut.addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
+                        // This will be done before the data table row is updated (i.e., in the first write phase)
+                        context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
+                    }
+                }
+            }
+            TracingUtils.addAnnotation(current, "index update count", updateCount);
+        }
+    }
 
-  private void preparePostIndexMutations(
-          BatchMutateContext context,
-          long now,
-          PhoenixIndexMetaData indexMetaData) throws Throwable {
-      context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
-      List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
-      // Check if we need to skip post index update for any of the rows
-      for (IndexMaintainer indexMaintainer : maintainers) {
-          byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
-          byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
-          HTableInterfaceReference hTableInterfaceReference =
-                  new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
-          List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
-          for (Pair<Mutation, byte[]> update : updates) {
-              // Are there concurrent updates on the data table row? if so, skip post index updates
-              // and let read repair resolve conflicts
-              ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
-              PendingRow pendingRow = pendingRows.get(rowKey);
-              if (!pendingRow.isConcurrent()) {
-                  Mutation m = update.getFirst();
-                  if (m instanceof Put) {
-                      Put verifiedPut = new Put(m.getRow());
-                      // Set the status of the index row to "verified"
-                      verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
-                      context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
-                  }
-                  else {
-                      context.postIndexUpdates.put(hTableInterfaceReference, m);
-                  }
+    protected PhoenixIndexMetaData getPhoenixIndexMetaData(ObserverContext<RegionCoprocessorEnvironment> observerContext,
+                                                           MiniBatchOperationInProgress<Mutation> miniBatchOp)
+            throws IOException {
+        IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
+        if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
+            throw new DoNotRetryIOException(
+                    "preBatchMutateWithExceptions: indexMetaData is not an instance of "+PhoenixIndexMetaData.class.getName() +
+                            ", current table is:" +
+                            observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+        }
+        return (PhoenixIndexMetaData)indexMetaData;
+    }
 
-              }
-          }
-      }
-      // We are done with handling concurrent mutations. So we can remove the rows of this batch from
-      // the collection of pending rows
-      removePendingRows(context);
-      context.indexUpdates.clear();
-  }
+    /**
+     * IndexMaintainer.getIndexedColumns() returns the data column references for indexed columns. The data columns are
+     * grouped into three classes, pk columns (data table pk columns), the indexed columns (the columns for which
+     * we want to have indexing; they form the prefix for the primary key for the index table (after salt and tenant id))
+     * and covered columns. The purpose of this method is to find out if all the indexed columns are included in the
+     * pending data table mutation pointed by multiMutation.
+     */
+    private boolean hasAllIndexedColumns(IndexMaintainer indexMaintainer, MultiMutation multiMutation) {
+        Map<byte[], List<Cell>> familyMap = multiMutation.getFamilyCellMap();
+        for (ColumnReference columnReference : indexMaintainer.getIndexedColumns()) {
+            byte[] family = columnReference.getFamily();
+            List<Cell> cellList = familyMap.get(family);
+            if (cellList == null) {
+                return false;
+            }
+            boolean has = false;
+            for (Cell cell : cellList) {
+                if (CellUtil.matchingColumn(cell, family, columnReference.getQualifier())) {
+                    has = true;
+                    break;
+                }
+            }
+            if (!has) {
+                return false;
+            }
+        }
+        return true;
+    }
 
-  public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
-          MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
-      ignoreAtomicOperations(miniBatchOp);
-      PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
-      BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
-      setBatchMutateContext(c, context);
-      Mutation firstMutation = miniBatchOp.getOperation(0);
-      ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
-      context.rebuild = replayWrite != null;
-      /*
-       * Exclusively lock all rows so we get a consistent read
-       * while determining the index updates
-       */
-      long now;
-      if (!context.rebuild) {
-          populateRowsToLock(miniBatchOp, context);
-          lockRows(context);
-          now = EnvironmentEdgeManager.currentTimeMillis();
-          // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
-          // concurrent updates
-          populatePendingRows(context);
-      }
-      else {
-          now = EnvironmentEdgeManager.currentTimeMillis();
-      }
-      // First group all the updates for a single row into a single update to be processed
-      Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite);
-      // early exit if it turns out we don't have any edits
-      if (mutations == null) {
-          return;
-      }
+    private void preparePostIndexMutations(BatchMutateContext context, long now, PhoenixIndexMetaData indexMetaData,
+                                           String tableName)
+            throws Throwable {
+        context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+        List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
+        // Check if we need to skip post index update for any of the rows
+        for (IndexMaintainer indexMaintainer : maintainers) {
+            byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+            HTableInterfaceReference hTableInterfaceReference =
+                    new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+            List<Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
+            for (Pair<Mutation, byte[]> update : updates) {
+                // Are there concurrent updates on the data table row? if so, skip post index updates
+                // and let read repair resolve conflicts
+                ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
+                PendingRow pendingRow = pendingRows.get(rowKey);
+                if (!pendingRow.isConcurrent()) {
+                    Mutation m = update.getFirst();
+                    if (m instanceof Put) {
+                        Put verifiedPut = new Put(m.getRow());
+                        // Set the status of the index row to "verified"
+                        verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
+                        context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
+                    } else {
+                        context.postIndexUpdates.put(hTableInterfaceReference, m);
+                    }
+                } else {
+                    if (!hasAllIndexedColumns(indexMaintainer, context.multiMutationMap.get(rowKey))) {
+                        // This batch needs to be retried since one of the concurrent mutations does not have the value
+                        // for an indexed column. Not including an index column may lead to incorrect index row key
+                        // generation for concurrent mutations since concurrent mutations are not serialized entirely
+                        // and do not see each other's effect on data table. Throwing an IOException will result in
+                        // retries of this batch. Before throwing exception, we need to remove reference counts and
+                        // locks for the rows of this batch
+                        removePendingRows(context);
+                        context.indexUpdates.clear();
+                        for (RowLock rowLock : context.rowLocks) {
+                            rowLock.release();
+                        }
+                        context.rowLocks.clear();
+                        throw new IOException("One of the concurrent mutations does not have all indexed columns. " +
+                                "The batch needs to be retried " + tableName);
+                    }
+                }
+            }
+        }
 
-      long start = EnvironmentEdgeManager.currentTimeMillis();
-      prepareIndexMutations(c, miniBatchOp, context, mutations, now, indexMetaData);
-      metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start);
-
-      // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to
-      // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates
-      // can be prepared in less than one millisecond
-      if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis()) {
-          Thread.sleep(1);
-          LOG.debug("slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
-      }
-      // Release the locks before making RPC calls for index updates
-      for (RowLock rowLock : context.rowLocks) {
-          rowLock.release();
-      }
-      // Do the first phase index updates
-      doPre(c, context, miniBatchOp);
-      if (!context.rebuild) {
-          // Acquire the locks again before letting the region proceed with data table updates
-          List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
-          for (RowLock rowLock : context.rowLocks) {
-              rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), rowLockWaitDuration));
-          }
-          context.rowLocks.clear();
-          context.rowLocks = rowLocks;
-          preparePostIndexMutations(context, now, indexMetaData);
-      }
-      if (failDataTableUpdatesForTesting) {
-          throw new DoNotRetryIOException("Simulating the data table write failure");
-      }
-  }
+        // We are done with handling concurrent mutations. So we can remove the rows of this batch from
+        // the collection of pending rows
+        removePendingRows(context);
+        context.indexUpdates.clear();
+    }
+
+    /**
+     * There are at most two rebuild mutation for every row, one put and one delete. They are listed in indexMutations
+     * next to each other such that put comes before delete by {@link IndexRebuildRegionScanner}. This method is called
+     * only for global indexes.
+     */
+    private void preBatchMutateWithExceptionsForRebuild(ObserverContext<RegionCoprocessorEnvironment> c,
+                                                        MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                                        BatchMutateContext context,
+                                                        IndexMaintainer indexMaintainer) throws Throwable {
+        Put put = null;
+        List <Mutation> indexMutations = new ArrayList<>();
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+                continue;
+            }
+            Mutation m = miniBatchOp.getOperation(i);
+            if (!this.builder.isEnabled(m)) {
+                continue;
+            }
+            if (m instanceof Put) {
+                if (put != null) {
+                    indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null));
+                }
+                put = (Put)m;
+            } else {
+                indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, (Delete)m));
+                put = null;
+            }
+            miniBatchOp.setOperationStatus(i, NOWRITE);
+        }
+        if (put != null) {
+            indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null));
+        }
+        HTableInterfaceReference hTableInterfaceReference =
+                new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+        context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+        for (Mutation m : indexMutations) {
+            context.preIndexUpdates.put(hTableInterfaceReference, m);
+        }
+        doPre(c, context, miniBatchOp);
+        // For rebuild updates, no post index update is prepared. Just create an empty list.
+        context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+    }
+
+    public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
+                                             MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
+        ignoreAtomicOperations(miniBatchOp);
+        PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
+        BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
+        setBatchMutateContext(c, context);
+        Mutation firstMutation = miniBatchOp.getOperation(0);
+        ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
+        context.rebuild = replayWrite != null;
+        if (context.rebuild) {
+            preBatchMutateWithExceptionsForRebuild(c, miniBatchOp, context, indexMetaData.getIndexMaintainers().get(0));
+            return;
+        }
+        /*
+         * Exclusively lock all rows so we get a consistent read
+         * while determining the index updates
+         */
+        long now;
+        populateRowsToLock(miniBatchOp, context);
+        lockRows(context);
+        now = EnvironmentEdgeManager.currentTimeMillis();
+        // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
+        // concurrent updates
+        populatePendingRows(context);
+        // Prepare current and next data rows states for pending mutations (for global indexes)
+        prepareDataRowStates(c, miniBatchOp, context, now);
+        // Group all the updates for a single row into a single update to be processed (for local indexes)
+        Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, context);
+        // early exit if it turns out we don't have any edits
+        if (mutations == null || mutations.isEmpty()) {
+            return;
+        }
+        long start = EnvironmentEdgeManager.currentTimeMillis();
+        preparePreIndexMutations(c, miniBatchOp, context, mutations, now, indexMetaData);
+        metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start);
+        // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to
+        // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates
+        // can be prepared in less than one millisecond
+        if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis()) {
+            Thread.sleep(1);
+            LOG.debug("slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+        }
+        // Release the locks before making RPC calls for index updates
+        for (RowLock rowLock : context.rowLocks) {
+            rowLock.release();
+        }
+        // Do the first phase index updates
+        doPre(c, context, miniBatchOp);
+        // Acquire the locks again before letting the region proceed with data table updates
+        List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
+        for (RowLock rowLock : context.rowLocks) {
+            rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), rowLockWaitDuration));
+        }
+        context.rowLocks.clear();
+        context.rowLocks = rowLocks;
+        preparePostIndexMutations(context, now, indexMetaData,
+                c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+        if (failDataTableUpdatesForTesting) {
+            throw new DoNotRetryIOException("Simulating the data table write failure");
+        }
+    }
 
   private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
       this.batchMutateContext.set(context);
   }
-  
+
   private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
       return this.batchMutateContext.get();
   }
-  
+
   private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
       this.batchMutateContext.remove();
   }
@@ -828,14 +1074,6 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
   }
 
   /**
-   * Exposed for testing!
-   * @return the currently instantiated index builder
-   */
-  public IndexBuilder getBuilderForTesting() {
-    return this.builder.getBuilderForTesting();
-  }
-
-  /**
    * Enable indexing on the given table
    * @param descBuilder {@link TableDescriptor} for the table on which indexing should be enabled
  * @param builder class to use when building the index for this table
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 8c9a2d9..7004c81 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -1620,7 +1620,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
      */
     private void initCachedState() {
         byte[] emptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst();
-        dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier);
+        dataEmptyKeyValueRef = new ColumnReference(dataEmptyKeyValueCF, emptyKvQualifier);
         this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumnsMap.size());
         // columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key)
         this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 98000f7..8d1b4db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
@@ -63,8 +64,8 @@ public class PhoenixIndexImportDirectReducer extends
             long ts = Long.valueOf(configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
             Table hTable = connection.unwrap(PhoenixConnection.class).getQueryServices()
                     .getTable(IndexTool.RESULT_TABLE_NAME_BYTES);
-            IndexRebuildRegionScanner.VerificationResult verificationResult =
-                    IndexRebuildRegionScanner.VerificationResult.getVerificationResult(hTable, ts);
+            IndexToolVerificationResult verificationResult =
+                    IndexToolVerificationResult.getVerificationResult(hTable, ts);
             context.getCounter(PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT).
                     setValue(verificationResult.getScannedDataRowCount());
             context.getCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT).
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
new file mode 100644
index 0000000..56ec027
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.index;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class PrepareIndexMutationsForRebuildTest extends BaseConnectionlessQueryTest {
+    private static String ROW_KEY = "k1";
+    private static String TABLE_NAME = "dataTable";
+    private static String INDEX_NAME = "idx";
+
+    class SetupInfo {
+        public IndexMaintainer indexMaintainer;
+        public PTable pDataTable;
+    }
+
+    /**
+     * Get the index maintainer and phoenix table definition of data table.
+     * @param tableName
+     * @param indexName
+     * @param columns
+     * @param indexColumns
+     * @param pk
+     * @param includeColumns
+     * @return
+     * @throws Exception
+     */
+    private SetupInfo setup(String tableName,
+                            String indexName,
+                            String columns,
+                            String indexColumns,
+                            String pk,
+                            String includeColumns) throws Exception {
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+
+            String fullTableName = SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(""), SchemaUtil.normalizeIdentifier(tableName));
+            String fullIndexName = SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(""), SchemaUtil.normalizeIdentifier(indexName));
+
+            // construct the data table and index based from the parameters
+            String str1 = String.format("CREATE TABLE %1$s (%2$s CONSTRAINT pk PRIMARY KEY (%3$s)) COLUMN_ENCODED_BYTES=0",
+                    fullTableName,
+                    columns,
+                    pk);
+            conn.createStatement().execute(str1);
+
+            String str2 = String.format("CREATE INDEX %1$s ON %2$s (%3$s)",
+                    fullIndexName,
+                    fullTableName,
+                    indexColumns);
+            if (!includeColumns.isEmpty())
+                str2 += " INCLUDE (" + includeColumns + ")";
+            conn.createStatement().execute(str2);
+
+            // Get the data table, index table and index maintainer reference from the client's ConnectionQueryServiceImpl
+            // In this way, we don't need to setup a local cluster.
+            PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+            PTable pIndexTable = pconn.getTable(new PTableKey(pconn.getTenantId(), fullIndexName));
+            PTable pDataTable = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName));
+            IndexMaintainer im = pIndexTable.getIndexMaintainer(pDataTable, pconn);
+
+            SetupInfo info = new SetupInfo();
+            info.indexMaintainer = im;
+            info.pDataTable = pDataTable;
+            return info;
+        }
+    }
+
+    /**
+     * Simulate one put mutation on the indexed column
+     * @throws Exception
+     */
+    @Test
+    public void testSinglePutOnIndexColumn() throws Exception {
+        SetupInfo info = setup(TABLE_NAME,
+                INDEX_NAME,
+                "ROW_KEY VARCHAR, C1 VARCHAR, C2 VARCHAR",
+                "C1",
+                "ROW_KEY",
+                "");
+
+        // insert a row
+        Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C1"),
+                1,
+                Bytes.toBytes("v1"));
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C2"),
+                1,
+                Bytes.toBytes("v2"));
+        addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+                dataPut,
+                null);
+
+        // Expect one row of index with row key "v1_k1"
+        Put idxPut1 = new Put(generateIndexRowKey("v1"));
+        addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+
+        assertEqualMutationList(Arrays.asList((Mutation)idxPut1), actualIndexMutations);
+    }
+
+    /**
+     * Simulate one put mutation on the non-indexed column
+     * @throws Exception
+     */
+    @Test
+    public void testSinglePutOnNonIndexColumn() throws Exception {
+        SetupInfo info = setup(TABLE_NAME,
+                INDEX_NAME,
+                "ROW_KEY VARCHAR, C1 VARCHAR, C2 VARCHAR",
+                "C1",
+                "ROW_KEY",
+                "");
+
+        Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C2"),
+                1,
+                Bytes.toBytes("v2"));
+        addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+                dataPut,
+                null);
+
+        // Expect one row of index with row key "_k1", as indexed column C1 is nullable.
+        Put idxPut1 = new Put(generateIndexRowKey(null));
+        addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+
+        assertEqualMutationList(Arrays.asList((Mutation)idxPut1), actualIndexMutations);
+    }
+
+    /**
+     * Simulate the column delete on the index column
+     * @throws Exception
+     */
+    @Test
+    public void testDelOnIndexColumn() throws Exception {
+        SetupInfo info = setup(TABLE_NAME,
+                INDEX_NAME,
+                "ROW_KEY VARCHAR, C1 VARCHAR, C2 VARCHAR",
+                "C1",
+                "ROW_KEY",
+                "");
+
+        // insert the row for deletion
+        Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C1"),
+                1,
+                Bytes.toBytes("v1"));
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C2"),
+                1,
+                Bytes.toBytes("v2"));
+        addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+
+        // only delete the value of column C1
+        Delete dataDel = new Delete(Bytes.toBytes(ROW_KEY));
+        addCellToDelMutation(dataDel,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C1"),
+                2,
+                KeyValue.Type.DeleteColumn);
+
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+                dataPut,
+                dataDel);
+
+        List<Mutation> expectedIndexMutation = new ArrayList<>();
+
+        // generate the index row key "v1_k1"
+        byte[] idxKeyBytes = generateIndexRowKey("v1");
+
+        Put idxPut1 = new Put(idxKeyBytes);
+        addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+        expectedIndexMutation.add(idxPut1);
+
+        // generate the index row key "_k1"
+        Put idxPut2 = new Put(generateIndexRowKey(null));
+        addEmptyColumnToIndexPutMutation(idxPut2, info.indexMaintainer, 2);
+        expectedIndexMutation.add(idxPut2);
+
+        // This deletion is to remove the row added by the idxPut1, as idxPut2 has different row key as idxPut1.
+        // Otherwise the row "v1_k1" will still be shown in the scan result
+        Delete idxDel = new Delete(idxKeyBytes);
+        addCellToDelMutation(idxDel,
+                QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                null,
+                2,
+                KeyValue.Type.DeleteFamily);
+        expectedIndexMutation.add(idxDel);
+
+        assertEqualMutationList(expectedIndexMutation, actualIndexMutations);
+    }
+
+    /**
+     * Simulate the column delete on the non-indexed column
+     * @throws Exception
+     */
+    @Test
+    public void testDelOnNonIndexColumn() throws Exception {
+        SetupInfo info = setup(TABLE_NAME,
+                INDEX_NAME,
+                "ROW_KEY VARCHAR, C1 VARCHAR, C2 VARCHAR",
+                "C1",
+                "ROW_KEY",
+                "");
+
+        // insert the row for deletion
+        Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C1"),
+                1,
+                Bytes.toBytes("v1"));
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C2"),
+                1,
+                Bytes.toBytes("v2"));
+        addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+
+        // delete the value of column C2
+        Delete dataDel = new Delete(Bytes.toBytes(ROW_KEY));
+        addCellToDelMutation(dataDel,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C2"),
+                2,
+                KeyValue.Type.DeleteColumn);
+
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+                dataPut,
+                dataDel);
+
+        List<Mutation> expectedIndexMutations = new ArrayList<>();
+
+        byte[] idxKeyBytes = generateIndexRowKey("v1");
+
+        // idxPut1 is the corresponding index mutation of dataPut
+        Put idxPut1 = new Put(idxKeyBytes);
+        addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+        expectedIndexMutations.add(idxPut1);
+
+        // idxPut2 is required to update the timestamp, so the index row will have the same life time as its corresponding data row.
+        // No delete mutation is expected on index table, as data mutation happens only on non-indexed column.
+        Put idxPut2 = new Put(idxKeyBytes);
+        addEmptyColumnToIndexPutMutation(idxPut2, info.indexMaintainer, 2);
+        expectedIndexMutations.add(idxPut2);
+
+        assertEqualMutationList(expectedIndexMutations, actualIndexMutations);
+    }
+
+    /**
+     * Simulate the data deletion of all version on the indexed row
+     * @throws Exception
+     */
+    @Test
+    public void testDeleteAllVersions() throws Exception {
+        SetupInfo info = setup(TABLE_NAME,
+                INDEX_NAME,
+                "ROW_KEY VARCHAR, C1 VARCHAR",
+                "C1",
+                "ROW_KEY",
+                "");
+
+        // insert two versions for a single row
+        Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C1"),
+                1,
+                Bytes.toBytes("v1"));
+        addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C1"),
+                2,
+                Bytes.toBytes("v2"));
+        addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 2);
+
+        // DeleteFamily will delete all versions of the columns in that family
+        // Since C1 is the only column of the default column family, so deleting the default family removes all version
+        // of column C1
+        Delete dataDel = new Delete(Bytes.toBytes(ROW_KEY));
+        addCellToDelMutation(dataDel,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                null,
+                3,
+                KeyValue.Type.DeleteFamily);
+
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+                dataPut,
+                dataDel);
+
+        List<Mutation> expectedIndexMutations = new ArrayList<>();
+
+        byte[] idxKeyBytes1 = generateIndexRowKey("v1");
+        byte[] idxKeyBytes2 = generateIndexRowKey("v2");
+
+        // idxPut1 and idxPut2 are generated by two versions in dataPut
+        Put idxPut1 = new Put(idxKeyBytes1);
+        addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+        expectedIndexMutations.add(idxPut1);
+
+        Put idxPut2 = new Put(idxKeyBytes2);
+        addEmptyColumnToIndexPutMutation(idxPut2, info.indexMaintainer, 2);
+        expectedIndexMutations.add(idxPut2);
+
+        // idxDel1 is required to remove the row key "v1_k1" which is added by idxPut1.
+        // The ts of idxDel1 is same as idxPut2, because it is a result of idxPut2.
+        // Since C1 is the only index column, so it is translated to DeleteFamily mutation.
+        Delete idxDel1 = new Delete(idxKeyBytes1);
+        addCellToDelMutation(idxDel1,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                null,
+                2,
+                KeyValue.Type.DeleteFamily);
+        expectedIndexMutations.add(idxDel1);
+
+        // idxDel2 is corresponding index mutation of dataDel
+        Delete idxDel2 = new Delete(idxKeyBytes2);
+        addCellToDelMutation(idxDel2,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                null,
+                3,
+                KeyValue.Type.DeleteFamily);
+        expectedIndexMutations.add(idxDel2);
+
+        assertEqualMutationList(expectedIndexMutations, actualIndexMutations);
+    }
+
+    // Simulate the put and delete mutation with the same time stamp on the index
+    @Test
+    public void testPutDeleteOnSameTimeStamp() throws Exception {
+        SetupInfo info = setup(TABLE_NAME,
+                INDEX_NAME,
+                "ROW_KEY VARCHAR, C1 VARCHAR",
+                "C1",
+                "ROW_KEY",
+                "");
+
+        // insert a row
+        Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C1"),
+                1,
+                Bytes.toBytes("v1"));
+        addEmptyColumnToDataPutMutation(dataPut, info.pDataTable,1);
+
+        // delete column of C1 from the inserted row
+        Delete dataDel = new Delete(Bytes.toBytes(ROW_KEY));
+        addCellToDelMutation(dataDel,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C1"),
+                1,
+                KeyValue.Type.DeleteColumn);
+
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+                dataPut,
+                dataDel);
+
+        List<Mutation> expectedIndexMutations = new ArrayList<>();
+
+        // The dataDel will be applied on top of dataPut when we replay them for index rebuild, when they have the same time stamp.
+        // idxPut1 is expected as in data table we still see the row of k1 with empty C1, so we need a row in index table with row key "_k1"
+        Put idxPut1 = new Put(generateIndexRowKey(null));
+        addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+        expectedIndexMutations.add(idxPut1);
+
+        assertEqualMutationList(Arrays.asList((Mutation)idxPut1), actualIndexMutations);
+    }
+
+    // Simulate the put and delete mutation on the covered column of data table
+    @Test
+    public void testCoveredIndexColumns() throws Exception {
+        SetupInfo info = setup(TABLE_NAME,
+                INDEX_NAME,
+                "ROW_KEY VARCHAR, C1 VARCHAR, C2 VARCHAR",
+                "C1",
+                "ROW_KEY",
+                "C2");
+
+        Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C1"),
+                1,
+                Bytes.toBytes("v1"));
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C2"),
+                1,
+                Bytes.toBytes("v2"));
+        addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+
+        Delete dataDel = new Delete(Bytes.toBytes(ROW_KEY));
+        addCellToDelMutation(dataDel,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C1"),
+                2,
+                KeyValue.Type.DeleteColumn);
+
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+                dataPut,
+                dataDel);
+
+        List<Mutation> expectedIndexMutations = new ArrayList<>();
+        byte[] idxKeyBytes = generateIndexRowKey("v1");
+
+        // idxPut1 is generated corresponding to dataPut.
+        // The column "0:C2" is generated from data table column family and column name, its family name is still default family name of index table
+        Put idxPut1 = new Put(idxKeyBytes);
+        addCellToPutMutation(idxPut1,
+                QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                Bytes.toBytes("0:C2"),
+                1,
+                Bytes.toBytes("v2"));
+        addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+        expectedIndexMutations.add(idxPut1);
+
+        // idxKey2 is required by dataDel, as dataDel change the corresponding row key of index table
+        List<Byte> idxKey2 = new ArrayList<>();
+        idxKey2.add(QueryConstants.SEPARATOR_BYTE);
+        idxKey2.addAll(com.google.common.primitives.Bytes.asList(Bytes.toBytes(ROW_KEY)));
+        byte[] idxKeyBytes2 = com.google.common.primitives.Bytes.toArray(idxKey2);
+        Put idxPut2 = new Put(idxKeyBytes2);
+        addCellToPutMutation(idxPut2,
+                QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                Bytes.toBytes("0:C2"),
+                2,
+                Bytes.toBytes("v2"));
+        addEmptyColumnToIndexPutMutation(idxPut2, info.indexMaintainer, 2);
+        expectedIndexMutations.add(idxPut2);
+
+        // idxDel is required to invalid the index row "v1_k1", dataDel removed the value of indexed column
+        Delete idxDel = new Delete(idxKeyBytes);
+        addCellToDelMutation(idxDel,
+                QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                null,
+                2,
+                KeyValue.Type.DeleteFamily);
+        expectedIndexMutations.add(idxDel);
+
+        assertEqualMutationList(expectedIndexMutations, actualIndexMutations);
+    }
+
+    // Simulate the scenario that index column, and covered column belong to different column families
+    @Test
+    public void testForMultipleFamilies() throws Exception {
+        SetupInfo info = setup(TABLE_NAME,
+                INDEX_NAME,
+                "ROW_KEY VARCHAR, CF1.C1 VARCHAR, CF2.C2 VARCHAR",  //define C1 and C2 with different families
+                "CF1.C1",
+                "ROW_KEY",
+                "CF2.C2");
+
+        // insert a row to the data table
+        Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+        addCellToPutMutation(dataPut,
+                Bytes.toBytes("CF1"),
+                Bytes.toBytes("C1"),
+                1,
+                Bytes.toBytes("v1"));
+        addCellToPutMutation(dataPut,
+                Bytes.toBytes("CF2"),
+                Bytes.toBytes("C2"),
+                1,
+                Bytes.toBytes("v2"));
+        addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+
+        // delete the indexed column CF1:C1
+        Delete dataDel = new Delete(Bytes.toBytes(ROW_KEY));
+        addCellToDelMutation(dataDel,
+                Bytes.toBytes("CF1"),
+                Bytes.toBytes("C1"),
+                2,
+                KeyValue.Type.DeleteColumn);
+
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+                dataPut,
+                dataDel);
+
+        List<Mutation> expectedIndexMutation = new ArrayList<>();
+
+        byte[] idxKeyBytes = generateIndexRowKey("v1");
+
+        // index table will use the family name of the first covered column, which is CF2 here.
+        Put idxPut1 = new Put(idxKeyBytes);
+        addCellToPutMutation(idxPut1,
+                Bytes.toBytes("CF2"),
+                Bytes.toBytes("CF2:C2"),
+                1,
+                Bytes.toBytes("v2"));
+        addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+        expectedIndexMutation.add(idxPut1);
+
+        // idxPut2 and idxDel are the result of dataDel
+        // idxPut2 is to create the index row "_k1", idxDel is to invalid the index row "v1_k1".
+        Put idxPut2 = new Put(generateIndexRowKey(null));
+        addCellToPutMutation(idxPut2,
+                Bytes.toBytes("CF2"),
+                Bytes.toBytes("CF2:C2"),
+                2,
+                Bytes.toBytes("v2"));
+        addEmptyColumnToIndexPutMutation(idxPut2, info.indexMaintainer, 2);
+        expectedIndexMutation.add(idxPut2);
+
+        Delete idxDel = new Delete(idxKeyBytes);
+        addCellToDelMutation(idxDel,
+                Bytes.toBytes("CF2"),
+                null,
+                2,
+                KeyValue.Type.DeleteFamily);
+        expectedIndexMutation.add(idxDel);
+
+        assertEqualMutationList(expectedIndexMutation, actualIndexMutations);
+    }
+
+    // Simulate two data put with the same value but different time stamp.
+    // We expect to see 2 index mutations with same value but different time stamps.
+    @Test
+    public void testSameTypeOfMutationWithSameValueButDifferentTimeStamp() throws Exception {
+        SetupInfo info = setup(TABLE_NAME,
+                INDEX_NAME,
+                "ROW_KEY VARCHAR, C1 VARCHAR, C2 VARCHAR",
+                "C1",
+                "ROW_KEY",
+                "");
+
+        Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C2"),
+                1,
+                Bytes.toBytes("v2"));
+        addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+        addCellToPutMutation(dataPut,
+                info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                Bytes.toBytes("C2"),
+                1,
+                Bytes.toBytes("v3"));
+        addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 2);
+
+        List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+                dataPut,
+                null);
+
+        byte[] idxKeyBytes = generateIndexRowKey(null);
+
+        // idxPut1 and idxPut2 have same value but different time stamp
+        Put idxPut1 = new Put(idxKeyBytes);
+        addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+
+        Put idxPut2 = new Put(idxKeyBytes);
+        addEmptyColumnToIndexPutMutation(idxPut2, info.indexMaintainer, 2);
+
+        assertEqualMutationList(Arrays.asList((Mutation)idxPut1, (Mutation)idxPut2), actualIndexMutations);
+    }
+
+    /**
+     * Generate the row key for index table by the value of indexed column
+     * @param indexVal
+     * @return
+     */
+    byte[] generateIndexRowKey(String indexVal) {
+        List<Byte> idxKey = new ArrayList<>();
+        if (indexVal != null && !indexVal.isEmpty())
+            idxKey.addAll(com.google.common.primitives.Bytes.asList(Bytes.toBytes(indexVal)));
+        idxKey.add(QueryConstants.SEPARATOR_BYTE);
+        idxKey.addAll(com.google.common.primitives.Bytes.asList(Bytes.toBytes(ROW_KEY)));
+        return com.google.common.primitives.Bytes.toArray(idxKey);
+    }
+
+    void addCellToPutMutation(Put put, byte[] family, byte[] column, long ts, byte[] value) throws Exception {
+        byte[] rowKey = put.getRow();
+        Cell cell = CellUtil.createCell(rowKey, family, column, ts, KeyValue.Type.Put.getCode(), value);
+        put.add(cell);
+    }
+
+    void addCellToDelMutation(Delete del, byte[] family, byte[] column, long ts, KeyValue.Type type) throws Exception {
+        byte[] rowKey = del.getRow();
+        Cell cell = CellUtil.createCell(rowKey, family, column, ts, type.getCode(), null);
+        del.addDeleteMarker(cell);
+    }
+
+    /**
+     * Add Empty column to the existing data put mutation
+     * @param put
+     * @param ptable
+     * @param ts
+     * @throws Exception
+     */
+    void addEmptyColumnToDataPutMutation(Put put, PTable ptable, long ts) throws Exception {
+        addCellToPutMutation(put,
+                SchemaUtil.getEmptyColumnFamily(ptable),
+                QueryConstants.EMPTY_COLUMN_BYTES,
+                ts,
+                QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+    }
+
+    /**
+     * Add the verified flag to the existing index put mutation
+     * @param put
+     * @param im
+     * @param ts
+     * @throws Exception
+     */
+    void addEmptyColumnToIndexPutMutation(Put put, IndexMaintainer im, long ts) throws Exception {
+        addCellToPutMutation(put,
+                im.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                QueryConstants.EMPTY_COLUMN_BYTES,
+                ts,
+                IndexRegionObserver.VERIFIED_BYTES);
+    }
+
+    /**
+     * Compare two mutation lists without worrying about the order of the mutations in the lists
+     * @param expectedMutations
+     * @param actualMutations
+     */
+    void assertEqualMutationList(List<Mutation> expectedMutations,
+                                 List<Mutation> actualMutations) {
+        assertEquals(expectedMutations.size(), actualMutations.size());
+        for (Mutation expected : expectedMutations) {
+            boolean found = false;
+            for (Mutation actual: actualMutations) {
+                if (isEqualMutation(expected, actual)) {
+                    actualMutations.remove(actual);
+                    found = true;
+                    break;
+                }
+            }
+            if (!found)
+                Assert.fail(String.format("Cannot find mutation:%s", expected));
+        }
+    }
+
+    /**
+     * Compare two mutations without worrying about the order of cells within each mutation
+     * @param expectedMutation
+     * @param actualMutation
+     * @return
+     */
+    boolean isEqualMutation(Mutation expectedMutation, Mutation actualMutation){
+        List<Cell> expectedCells = new ArrayList<>();
+        for (List<Cell> cells : expectedMutation.getFamilyCellMap().values()) {
+            expectedCells.addAll(cells);
+        }
+
+        List<Cell> actualCells = new ArrayList<>();
+        for (List<Cell> cells : actualMutation.getFamilyCellMap().values()) {
+            actualCells.addAll(cells);
+        }
+
+        if (expectedCells.size() != actualCells.size())
+            return false;
+        for(Cell expected : expectedCells) {
+            boolean found = false;
+            for(Cell actual: actualCells){
+                if (isEqualCell(expected, actual)) {
+                    actualCells.remove(actual);
+                    found = true;
+                    break;
+                }
+            }
+            if (!found)
+                return false;
+        }
+
+        return true;
+    }
+
+    boolean isEqualCell(Cell a, Cell b) {
+        return CellUtil.matchingRow(a, b)
+                && CellUtil.matchingFamily(a, b)
+                && CellUtil.matchingQualifier(a, b)
+                && CellUtil.matchingTimestamp(a, b)
+                && CellUtil.matchingType(a, b)
+                && CellUtil.matchingValue(a, b);
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
new file mode 100644
index 0000000..fbb022d
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Properties;
+
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
+import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_BYTES;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
+
+    private static final int INDEX_TABLE_EXPIRY_SEC = 1;
+    private static final String UNEXPECTED_COLUMN = "0:UNEXPECTED_COLUMN";
+    public static final String FIRST_ID = "FIRST_ID";
+    public static final String SECOND_ID = "SECOND_ID";
+    public static final String FIRST_VALUE = "FIRST_VALUE";
+    public static final String SECOND_VALUE = "SECOND_VALUE";
+    public static final String
+            CREATE_TABLE_DDL = "CREATE TABLE IF NOT EXISTS %s (FIRST_ID BIGINT NOT NULL, "
+                        + "SECOND_ID BIGINT NOT NULL, FIRST_VALUE VARCHAR(20), "
+                        + "SECOND_VALUE INTEGER "
+                        + "CONSTRAINT PK PRIMARY KEY(FIRST_ID, SECOND_ID)) COLUMN_ENCODED_BYTES=0";
+
+    public static final String
+            CREATE_INDEX_DDL = "CREATE INDEX %s ON %s (SECOND_VALUE) INCLUDE (FIRST_VALUE)";
+    public static final String COMPLETE_ROW_UPSERT = "UPSERT INTO %s VALUES (?,?,?,?)";
+    public static final String PARTIAL_ROW_UPSERT = "UPSERT INTO %s (%s, %s, %s) VALUES (?,?,?)";
+    public static final String DELETE_ROW_DML = "DELETE FROM %s WHERE %s = ?  AND %s = ?";
+    public static final String INCLUDED_COLUMN = "0:FIRST_VALUE";
+
+    @Rule
+    public ExpectedException exceptionRule = ExpectedException.none();
+
+    private enum TestType {
+        //set of mutations matching expected mutations
+        VALID_EXACT_MATCH,
+        //mix of delete and put mutations
+        VALID_MIX_MUTATIONS,
+        //only incoming unverified mutations
+        VALID_NEW_UNVERIFIED_MUTATIONS,
+        //extra mutations mimicking incoming mutations
+        VALID_MORE_MUTATIONS,
+        EXPIRED,
+        INVALID_EXTRA_CELL,
+        INVALID_EMPTY_CELL,
+        INVALID_CELL_VALUE,
+        INVALID_COLUMN
+    }
+
+    public static class UnitTestClock extends EnvironmentEdge {
+        long initialTime;
+        long delta;
+
+        public UnitTestClock(long delta) {
+            initialTime = System.currentTimeMillis() + delta;
+            this.delta = delta;
+        }
+
+        @Override
+        public long currentTime() {
+            return System.currentTimeMillis() + delta;
+        }
+    }
+
+    @Mock
+    Result indexRow;
+    @Mock
+    IndexRebuildRegionScanner rebuildScanner;
+    List<Mutation> actualMutationList;
+    String schema, table, dataTableFullName, index, indexTableFullName;
+    PTable pIndexTable, pDataTable;
+    Put put = null;
+    Delete delete = null;
+    PhoenixConnection pconn;
+    IndexToolVerificationResult.PhaseResult actualPR;
+    public Map<byte[], List<Mutation>> indexKeyToMutationMapLocal;
+    private IndexMaintainer indexMaintainer;
+
+    @Before
+    public void setup() throws SQLException, IOException {
+        MockitoAnnotations.initMocks(this);
+        createDBObject();
+        createMutationsWithUpserts();
+        initializeRebuildScannerAttributes();
+        initializeGlobalMockitoSetup();
+    }
+
+    public void createDBObject() throws SQLException {
+        try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())) {
+            schema = generateUniqueName();
+            table = generateUniqueName();
+            index = generateUniqueName();
+            dataTableFullName = SchemaUtil.getQualifiedTableName(schema, table);
+            indexTableFullName = SchemaUtil.getQualifiedTableName(schema, index);
+
+            conn.createStatement().execute(String.format(CREATE_TABLE_DDL, dataTableFullName));
+            conn.createStatement().execute(String.format(CREATE_INDEX_DDL, index, dataTableFullName));
+            conn.commit();
+
+            pconn = conn.unwrap(PhoenixConnection.class);
+            pIndexTable = pconn.getTable(new PTableKey(pconn.getTenantId(), indexTableFullName));
+            pDataTable = pconn.getTable(new PTableKey(pconn.getTenantId(), dataTableFullName));
+        }
+    }
+
+    private void createMutationsWithUpserts() throws SQLException, IOException {
+        deleteRow(2, 3);
+        upsertPartialRow(2, 3, "abc");
+        upsertCompleteRow(2, 3, "hik", 8);
+        upsertPartialRow(2, 3, 10);
+        upsertPartialRow(2,3,4);
+        deleteRow(2, 3);
+        upsertPartialRow(2,3, "def");
+        upsertCompleteRow(2, 3, null, 20);
+        upsertPartialRow(2,3, "wert");
+    }
+
+    private void deleteRow(int key1, int key2) throws SQLException, IOException {
+        try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){
+            PreparedStatement ps =
+                    conn.prepareStatement(
+                            String.format(DELETE_ROW_DML, dataTableFullName, FIRST_ID, SECOND_ID));
+            ps.setInt(1, key1);
+            ps.setInt(2, key2);
+            ps.execute();
+            convertUpsertToMutations(conn);
+        }
+    }
+
+    private void upsertPartialRow(int key1, int key2, String val1)
+            throws SQLException, IOException {
+
+        try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){
+            PreparedStatement ps =
+                    conn.prepareStatement(
+                            String.format(PARTIAL_ROW_UPSERT, dataTableFullName, FIRST_ID, SECOND_ID,
+                                    FIRST_VALUE));
+            ps.setInt(1, key1);
+            ps.setInt(2, key2);
+            ps.setString(3, val1);
+            ps.execute();
+            convertUpsertToMutations(conn);
+        }
+    }
+
+    private void upsertPartialRow(int key1, int key2, int value1)
+            throws SQLException, IOException {
+
+        try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){
+            PreparedStatement
+                    ps =
+                    conn.prepareStatement(
+                            String.format(PARTIAL_ROW_UPSERT, dataTableFullName, FIRST_ID, SECOND_ID,
+                                    SECOND_VALUE));
+            ps.setInt(1, key1);
+            ps.setInt(2, key2);
+            ps.setInt(3, value1);
+            ps.execute();
+            convertUpsertToMutations(conn);
+        }
+    }
+
+    private void upsertCompleteRow(int key1, int key2, String val1
+    , int val2) throws SQLException, IOException {
+        try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())) {
+            PreparedStatement
+                    ps = conn.prepareStatement(String.format(COMPLETE_ROW_UPSERT, dataTableFullName));
+            ps.setInt(1, key1);
+            ps.setInt(2, key2);
+            ps.setString(3, val1);
+            ps.setInt(4, val2);
+            ps.execute();
+            convertUpsertToMutations(conn);
+        }
+    }
+
+    private void convertUpsertToMutations(Connection conn) throws SQLException, IOException {
+        Iterator<Pair<byte[],List<Cell>>>
+                dataTableNameAndMutationKeyValuesIter = PhoenixRuntime.getUncommittedDataIterator(conn);
+        Pair<byte[], List<Cell>> elem = dataTableNameAndMutationKeyValuesIter.next();
+        byte[] key = CellUtil.cloneRow(elem.getSecond().get(0));
+        long mutationTS = EnvironmentEdgeManager.currentTimeMillis();
+
+        for (Cell kv : elem.getSecond()) {
+            Cell cell =
+                    CellUtil.createCell(CellUtil.cloneRow(kv), CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
+                            mutationTS, kv.getTypeByte(), CellUtil.cloneValue(kv));
+            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                if (put == null ) {
+                    put = new Put(key);
+                }
+                put.add(cell);
+            } else {
+                if (delete == null) {
+                    delete = new Delete(key);
+                }
+                delete.addDeleteMarker(cell);
+            }
+        }
+    }
+
+    private void initializeRebuildScannerAttributes() {
+        when(rebuildScanner.setIndexTableTTL(Matchers.anyInt())).thenCallRealMethod();
+        when(rebuildScanner.setIndexMaintainer(Matchers.<IndexMaintainer>any())).thenCallRealMethod();
+        when(rebuildScanner.setIndexKeyToMutationMap(Matchers.<Map>any())).thenCallRealMethod();
+        rebuildScanner.setIndexTableTTL(HConstants.FOREVER);
+        indexMaintainer = pIndexTable.getIndexMaintainer(pDataTable, pconn);
+        rebuildScanner.setIndexMaintainer(indexMaintainer);
+    }
+
+    private void initializeGlobalMockitoSetup() throws IOException {
+        //setup
+        when(rebuildScanner.getIndexRowKey(put)).thenCallRealMethod();
+        when(rebuildScanner.prepareIndexMutations(put, delete)).thenCallRealMethod();
+        when(rebuildScanner.verifySingleIndexRow(Matchers.<Result>any(),
+                Matchers.<IndexToolVerificationResult.PhaseResult>any())).thenCallRealMethod();
+        doNothing().when(rebuildScanner)
+                .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(),
+                Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(),
+                        Matchers.<byte[]>any(), Matchers.<byte[]>any());
+        doNothing().when(rebuildScanner)
+                .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(),
+                Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString());
+
+        //populate the local map to use to create actual mutations
+        indexKeyToMutationMapLocal = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+        rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMapLocal);
+        rebuildScanner.prepareIndexMutations(put, delete);
+
+        //populate map to use in test code
+        Map<byte[], List<Mutation>> indexKeyToMutationMap = Maps.newTreeMap((Bytes.BYTES_COMPARATOR));
+        rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMap);
+        rebuildScanner.prepareIndexMutations(put, delete);
+    }
+
+    private byte[] getValidRowKey() {
+        return indexKeyToMutationMapLocal.entrySet().iterator().next().getKey();
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_validIndexRowCount_nonZero() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.VALID_EXACT_MATCH);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_validIndexRowCount_moreActual() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.VALID_MORE_MUTATIONS);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_allMix() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.VALID_MIX_MUTATIONS);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_allUnverified() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.VALID_NEW_UNVERIFIED_MUTATIONS);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_expiredIndexRowCount_nonZero() throws IOException {
+        IndexToolVerificationResult.PhaseResult
+                expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0);
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.EXPIRED);
+            expireThisRow();
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_invalidIndexRowCount_cellValue() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.INVALID_CELL_VALUE);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_invalidIndexRowCount_emptyCell() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.INVALID_EMPTY_CELL);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_invalidIndexRowCount_diffColumn() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.INVALID_COLUMN);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_invalidIndexRowCount_extraCell() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.INVALID_EXTRA_CELL);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_expectedMutations_null() throws IOException {
+        when(indexRow.getRow()).thenReturn(Bytes.toBytes(1));
+        exceptionRule.expect(DoNotRetryIOException.class);
+        exceptionRule.expectMessage(IndexRebuildRegionScanner.NO_EXPECTED_MUTATION);
+        rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_actualMutations_null() throws IOException {
+        byte [] validRowKey = getValidRowKey();
+        when(indexRow.getRow()).thenReturn(validRowKey);
+        when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(null);
+        exceptionRule.expect(DoNotRetryIOException.class);
+        exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
+        rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_actualMutations_empty() throws IOException {
+        byte [] validRowKey = getValidRowKey();
+        when(indexRow.getRow()).thenReturn(validRowKey);
+        actualMutationList = new ArrayList<>();
+        when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(actualMutationList);
+        exceptionRule.expect(DoNotRetryIOException.class);
+        exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
+        rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+    }
+
+    private IndexToolVerificationResult.PhaseResult getValidPhaseResult() {
+        return new IndexToolVerificationResult.PhaseResult(1,0,0,0);
+    }
+
+    private IndexToolVerificationResult.PhaseResult getInvalidPhaseResult() {
+        return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1);
+    }
+
+    private void initializeLocalMockitoSetup(Map.Entry<byte[], List<Mutation>> entry,
+            TestType testType)
+            throws IOException {
+        actualPR = new IndexToolVerificationResult.PhaseResult();
+        byte[] indexKey = entry.getKey();
+        when(indexRow.getRow()).thenReturn(indexKey);
+        actualMutationList = buildActualIndexMutationsList(testType);
+        when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(actualMutationList);
+    }
+
+    private List<Mutation> buildActualIndexMutationsList(TestType testType) {
+        List<Mutation> actualMutations = new ArrayList<>();
+        actualMutations.addAll(indexKeyToMutationMapLocal.get(indexRow.getRow()));
+        if(testType.equals(TestType.EXPIRED)) {
+            return actualMutations;
+        }
+        if(testType.toString().startsWith("VALID")) {
+            return getValidActualMutations(testType, actualMutations);
+        }
+        if(testType.toString().startsWith("INVALID")) {
+            return getInvalidActualMutations(testType, actualMutations);
+        }
+        return null;
+    }
+
+    private List <Mutation> getValidActualMutations(TestType testType,
+            List<Mutation> actualMutations) {
+        List <Mutation> newActualMutations = new ArrayList<>();
+        if(testType.equals(TestType.VALID_EXACT_MATCH)) {
+            return actualMutations;
+        }
+        if (testType.equals(TestType.VALID_MIX_MUTATIONS)) {
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+            newActualMutations.add(getDeleteMutation(actualMutations.get(0), new Long(1)));
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+        }
+        if (testType.equals(TestType.VALID_NEW_UNVERIFIED_MUTATIONS)) {
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), new Long(1)));
+        }
+        newActualMutations.addAll(actualMutations);
+        if(testType.equals(TestType.VALID_MORE_MUTATIONS)) {
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+            newActualMutations.add(getDeleteMutation(actualMutations.get(0), null));
+            newActualMutations.add(getDeleteMutation(actualMutations.get(0), new Long(1)));
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), new Long(1)));
+        }
+        return newActualMutations;
+    }
+
+    private List <Mutation> getInvalidActualMutations(TestType testType,
+            List<Mutation> actualMutations) {
+        List <Mutation> newActualMutations = new ArrayList<>();
+        newActualMutations.addAll(actualMutations);
+        for (Mutation m : actualMutations) {
+            newActualMutations.remove(m);
+            NavigableMap<byte[], List<Cell>> familyCellMap = m.getFamilyCellMap();
+            List<Cell> cellList = familyCellMap.firstEntry().getValue();
+            List<Cell> newCellList = new ArrayList<>();
+            byte[] fam = CellUtil.cloneFamily(cellList.get(0));
+            for (Cell c : cellList) {
+                infiltrateCell(c, newCellList, testType);
+            }
+            familyCellMap.put(fam, newCellList);
+            m.setFamilyCellMap(familyCellMap);
+            newActualMutations.add(m);
+        }
+        return newActualMutations;
+    }
+
+    private void infiltrateCell(Cell c, List<Cell> newCellList, TestType e) {
+        Cell newCell;
+        Cell emptyCell;
+        switch(e) {
+        case INVALID_COLUMN:
+            newCell =
+                    CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
+                            Bytes.toBytes(UNEXPECTED_COLUMN),
+                            EnvironmentEdgeManager.currentTimeMillis(),
+                            KeyValue.Type.Put.getCode(), Bytes.toBytes("zxcv"));
+            newCellList.add(newCell);
+            newCellList.add(c);
+            break;
+        case INVALID_CELL_VALUE:
+            if (CellUtil.matchingQualifier(c, EMPTY_COLUMN_BYTES)) {
+                newCell = getCellWithPut(c);
+                emptyCell = getVerifiedEmptyCell(c);
+                newCellList.add(newCell);
+                newCellList.add(emptyCell);
+            } else {
+                newCellList.add(c);
+            }
+            break;
+        case INVALID_EMPTY_CELL:
+            if (CellUtil.matchingQualifier(c, EMPTY_COLUMN_BYTES)) {
+                newCell =
+                        CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
+                                CellUtil.cloneQualifier(c), c.getTimestamp(),
+                                KeyValue.Type.Delete.getCode(), VERIFIED_BYTES);
+                newCellList.add(newCell);
+            } else {
+                newCellList.add(c);
+            }
+            break;
+        case INVALID_EXTRA_CELL:
+            newCell = getCellWithPut(c);
+            emptyCell = getVerifiedEmptyCell(c);
+            newCellList.add(newCell);
+            newCellList.add(emptyCell);
+            newCellList.add(c);
+        }
+    }
+
+    private Cell getVerifiedEmptyCell(Cell c) {
+        return CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
+                indexMaintainer.getEmptyKeyValueQualifier(),
+                EnvironmentEdgeManager.currentTimeMillis(),
+                KeyValue.Type.Put.getCode(), VERIFIED_BYTES);
+    }
+
+    private Cell getCellWithPut(Cell c) {
+        return CellUtil.createCell(CellUtil.cloneRow(c),
+                CellUtil.cloneFamily(c), Bytes.toBytes(INCLUDED_COLUMN),
+                c.getTimestamp(), KeyValue.Type.Put.getCode(),
+                Bytes.toBytes("zxcv"));
+    }
+
+    private void expireThisRow() {
+        rebuildScanner.setIndexTableTTL(INDEX_TABLE_EXPIRY_SEC);
+        UnitTestClock expiryClock = new UnitTestClock(5000);
+        EnvironmentEdgeManager.injectEdge(expiryClock);
+    }
+
+    private Mutation getDeleteMutation(Mutation orig, Long ts) {
+        Mutation m = new Delete(orig.getRow());
+        List<Cell> origList = orig.getFamilyCellMap().firstEntry().getValue();
+        ts = ts == null ? EnvironmentEdgeManager.currentTimeMillis() : ts;
+        Cell c = getNewPutCell(orig, origList, ts, KeyValue.Type.DeleteFamilyVersion);
+        Cell empty = getEmptyCell(orig, origList, ts, KeyValue.Type.Put, true);
+        byte[] fam = CellUtil.cloneFamily(origList.get(0));
+        List<Cell> famCells = Lists.newArrayList();
+        m.getFamilyCellMap().put(fam, famCells);
+        famCells.add(c);
+        famCells.add(empty);
+        return m;
+    }
+
+    private Mutation getUnverifiedPutMutation(Mutation orig, Long ts) {
+        Mutation m = new Put(orig.getRow());
+        if (orig.getAttributesMap() != null) {
+            for (Map.Entry<String,byte[]> entry : orig.getAttributesMap().entrySet()) {
+                m.setAttribute(entry.getKey(), entry.getValue());
+            }
+        }
+        List<Cell> origList = orig.getFamilyCellMap().firstEntry().getValue();
+        ts = ts == null ? EnvironmentEdgeManager.currentTimeMillis() : ts;
+        Cell c = getNewPutCell(orig, origList, ts, KeyValue.Type.Put);
+        Cell empty = getEmptyCell(orig, origList, ts, KeyValue.Type.Put, false);
+        byte[] fam = CellUtil.cloneFamily(origList.get(0));
+        List<Cell> famCells = Lists.newArrayList();
+        m.getFamilyCellMap().put(fam, famCells);
+        famCells.add(c);
+        famCells.add(empty);
+        return m;
+    }
+
+    private Cell getEmptyCell(Mutation orig, List<Cell> origList, Long ts, KeyValue.Type type,
+            boolean verified) {
+        return CellUtil.createCell(orig.getRow(), CellUtil.cloneFamily(origList.get(0)),
+                indexMaintainer.getEmptyKeyValueQualifier(),
+                ts, type.getCode(), verified ? VERIFIED_BYTES : UNVERIFIED_BYTES);
+    }
+
+    private Cell getNewPutCell(Mutation orig, List<Cell> origList, Long ts, KeyValue.Type type) {
+        return CellUtil.createCell(orig.getRow(),
+                CellUtil.cloneFamily(origList.get(0)), Bytes.toBytes(INCLUDED_COLUMN),
+                ts, type.getCode(), Bytes.toBytes("asdfg"));
+    }
+}