You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/03/04 08:14:30 UTC

[phoenix] branch PHOENIX-5748-4.x-HBase-1.5 updated (2efc813 -> bb13f54)

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a change to branch PHOENIX-5748-4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git.


 discard 2efc813  PHOENIX-5709 Addendum (unverified rows should fail all verification types except ONLY)
 discard 6909445  PHOENIX-5709 Base changes for simplifying index update generation code for consistent global indexes
     add 0a30c11  PHOENIX-4866 UDFs get error: org.apache.phoenix.schema.FunctionNotFoundException: ERROR 6001 (42F01): Function undefined
     add 9c7626f  PHOENIX-5636: Improve the error message when client connects to server with higher major version
     new 7732aeb  PHOENIX-5709 Base changes for simplifying index update generation code for consistent global indexes
     new bb13f54  PHOENIX-5709 Addendum (unverified rows should fail all verification types except ONLY)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (2efc813)
            \
             N -- N -- N   refs/heads/PHOENIX-5748-4.x-HBase-1.5 (bb13f54)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../phoenix/end2end/UserDefinedFunctionsIT.java    |   5 +
 .../org/apache/phoenix/compile/FromCompiler.java   | 271 ++++++++++++---------
 .../coprocessor/IndexRebuildRegionScanner.java     | 132 +++++++---
 .../phoenix/query/ConnectionQueryServicesImpl.java |  57 +++--
 .../java/org/apache/phoenix/util/MetaDataUtil.java |  49 +++-
 .../org/apache/phoenix/util/MetaDataUtilTest.java  |  38 ++-
 6 files changed, 371 insertions(+), 181 deletions(-)


[phoenix] 01/02: PHOENIX-5709 Base changes for simplifying index update generation code for consistent global indexes

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch PHOENIX-5748-4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 7732aeb44df48cb6cefd8e67046e567e1aaffb9e
Author: Kadir <ko...@salesforce.com>
AuthorDate: Wed Feb 26 10:19:48 2020 -0800

    PHOENIX-5709 Base changes for simplifying index update generation code for consistent global indexes
---
 .../end2end/ConcurrentMutationsExtendedIT.java     |  35 +-
 .../phoenix/end2end/ConcurrentMutationsIT.java     |   3 +-
 .../org/apache/phoenix/end2end/IndexToolIT.java    |  15 +-
 .../end2end/index/GlobalIndexCheckerIT.java        |  70 ++
 .../phoenix/compile/ServerBuildIndexCompiler.java  |  15 +-
 .../coprocessor/IndexRebuildRegionScanner.java     | 921 +++++++++++++++------
 .../UngroupedAggregateRegionObserver.java          |  17 +-
 .../phoenix/hbase/index/IndexRegionObserver.java   | 855 ++++++++++++-------
 .../org/apache/phoenix/index/IndexMaintainer.java  |   2 +-
 9 files changed, 1350 insertions(+), 583 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 571961d..546c1fe 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -25,7 +25,9 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
+
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.util.*;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -51,6 +53,21 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
 
     private final Object lock = new Object();
 
+    private long verifyIndexTable(String tableName, String indexName, Connection conn) throws Exception {
+        // This checks the state of every raw index row without rebuilding any row
+        IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+                0, IndexTool.IndexVerifyType.ONLY);
+        // This checks the state of an index row after it is repaired
+        long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+        // We want to check the index rows again as they may be modified by the read repair
+        IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+                0, IndexTool.IndexVerifyType.ONLY);
+        // Now we rebuild the entire index table and expect that it is still good after the full rebuild
+        IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+                0, IndexTool.IndexVerifyType.AFTER);
+        return actualRowCount;
+    }
+
     @Test
     public void testSynchronousDeletesAndUpsertValues() throws Exception {
         final String tableName = generateUniqueName();
@@ -130,7 +147,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
         t2.start();
 
         doneSignal.await(60, TimeUnit.SECONDS);
-        IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+        verifyIndexTable(tableName, indexName, conn);
     }
 
     @Test
@@ -191,7 +208,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
         t2.start();
 
         doneSignal.await(60, TimeUnit.SECONDS);
-        IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+        verifyIndexTable(tableName, indexName, conn);
     }
 
     @Test @Repeat(5)
@@ -204,9 +221,10 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
         final String indexName = generateUniqueName();
         Connection conn = DriverManager.getConnection(getUrl());
         conn.createStatement().execute("CREATE TABLE " + tableName
-                + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2))  COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
+                + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, a.v1 INTEGER, b.v2 INTEGER, c.v3 INTEGER, d.v4 INTEGER," +
+                "CONSTRAINT pk PRIMARY KEY (k1,k2))  COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
         TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
-        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)");
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE(v2, v3)");
         final CountDownLatch doneSignal = new CountDownLatch(nThreads);
         Runnable[] runnables = new Runnable[nThreads];
         for (int i = 0; i < nThreads; i++) {
@@ -216,11 +234,12 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
                     try {
                         Connection conn = DriverManager.getConnection(getUrl());
                         for (int i = 0; i < 10000; i++) {
-                            boolean isNull = RAND.nextBoolean();
-                            int randInt = RAND.nextInt() % nIndexValues;
                             conn.createStatement().execute(
                                     "UPSERT INTO " + tableName + " VALUES (" + (i % nRows) + ", 0, "
-                                            + (isNull ? null : randInt) + ")");
+                                            + (RAND.nextBoolean() ? null : (RAND.nextInt() % nIndexValues)) + ", "
+                                            + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+                                            + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+                                            + (RAND.nextBoolean() ? null : RAND.nextInt()) + ")");
                             if ((i % batchSize) == 0) {
                                 conn.commit();
                             }
@@ -241,7 +260,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
         }
 
         assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
-        long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+        long actualRowCount = verifyIndexTable(tableName, indexName, conn);
         assertEquals(nRows, actualRowCount);
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
index f312df0..6562af0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
@@ -274,7 +274,8 @@ public class ConcurrentMutationsIT extends ParallelStatsDisabledIT {
             EnvironmentEdgeManager.injectEdge(null);
         }
     }
-
+    @Ignore ("It is not possible to assign the same timestamp two separately committed mutations in the current model\n" +
+            " except when the server time goes backward. In that case, the behavior is not deterministic")
     @Test
     public void testDeleteRowAndUpsertValueAtSameTS1() throws Exception {
         try {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index d3ca67d..30fd018 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -229,8 +229,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
             long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
-            // Check after compaction
-            TestUtil.doMajorCompaction(conn, dataTableFullName);
             actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
             setEveryNthRowWithNull(NROWS, 5, stmt);
@@ -241,7 +239,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             conn.commit();
             actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
-            TestUtil.doMajorCompaction(conn, dataTableFullName);
             actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
             indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null,
@@ -526,7 +523,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
                     null, 0, IndexTool.IndexVerifyType.AFTER);
             assertEquals(1, MutationCountingRegionObserver.getMutationCount());
             MutationCountingRegionObserver.setMutationCount(0);
-            // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should
+            // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should not
             // write any index rows
             runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.BEFORE);
@@ -614,16 +611,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             // Run the index tool to populate the index while verifying rows
             runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.AFTER);
-            // Corrupt one cell by writing directly into the index table
-            conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix', 1, 'B')");
-            conn.commit();
-            // Run the index tool using the only-verify option to detect this mismatch between the data and index table
             runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
-                    null, -1, IndexTool.IndexVerifyType.ONLY);
-            cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
-            expectedValueBytes = Bytes.toBytes("Not matching value for 0:0:CODE E:A A:B");
-            assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
-                    expectedValueBytes, 0, expectedValueBytes.length) == 0);
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
             dropIndexToolTables(conn);
         }
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index f9c50fd..35489d0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -117,6 +117,76 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
+    public void testDelete() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+            String dml = "DELETE from " + dataTableName + " WHERE id  = 'a'";
+            assertEquals(1, conn.createStatement().executeUpdate(dml));
+            conn.commit();
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+                    dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+            if (async) {
+                // run the index MR job.
+                IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName);
+            }
+            // Count the number of index rows
+            String query = "SELECT COUNT(*) from " + indexTableName;
+            // There should be one row in the index table
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            // Add rows and check everything is still okay
+            verifyTableHealth(conn, dataTableName, indexTableName);
+        }
+    }
+
+    @Test
+    public void testSimulateConcurrentUpdates() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+                    dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+            if (async) {
+                // run the index MR job.
+                IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName);
+            }
+            // For the concurrent updates on the same row, the last write phase is ignored.
+            // Configure IndexRegionObserver to fail the last write phase (i.e., the post index update phase) where the
+            // verify flag is set to true and/or index rows are deleted and check that this does not impact the
+            // correctness.
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            // Do multiple updates on the same data row
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1) values ('a', 'aa')");
+            conn.commit();
+            // The expected state of the index table is  {('aa', 'a', 'abcc', 'abcd'), ('bc', 'b', 'bcd', 'bcde')}
+            // Do more multiple updates on the same data row
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val3) values ('a', null, null)");
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1) values ('a', 'ab')");
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1) values ('b', 'ab')");
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('b', 'ab', null)");
+            conn.commit();
+            // Now the expected state of the index table is  {('ab', 'a', 'abcc' , null), ('ab', 'b', null, 'bcde')}
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * from "  + indexTableName);
+            assertTrue(rs.next());
+            assertEquals("ab", rs.getString(1));
+            assertEquals("a", rs.getString(2));
+            assertEquals("abcc", rs.getString(3));
+            assertEquals(null, rs.getString(4));
+            assertTrue(rs.next());
+            assertEquals("ab", rs.getString(1));
+            assertEquals("b", rs.getString(2));
+            assertEquals(null, rs.getString(3));
+            assertEquals("bcde", rs.getString(4));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
     public void testFailPostIndexDeleteUpdate() throws Exception {
         String dataTableName = generateUniqueName();
         populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 4392e23..99caa6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -17,27 +17,30 @@
  */
 package org.apache.phoenix.compile;
 
-import java.sql.SQLException;
-import java.util.Collections;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.schema.*;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 
+import java.sql.SQLException;
+import java.util.Collections;
+
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 import static org.apache.phoenix.util.IndexUtil.addEmptyColumnToScan;
 
@@ -96,9 +99,9 @@ public class ServerBuildIndexCompiler {
                 throw new IllegalArgumentException(
                         "ServerBuildIndexCompiler does not support global indexes on transactional tables");
             }
+            IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
             // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
             // However, in this case, we need to project all of the data columns that contribute to the index.
-            IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
             for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
                 if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
                     scan.addFamily(columnRef.getFamily());
@@ -121,6 +124,8 @@ public class ServerBuildIndexCompiler {
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
                 scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
                 ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+                scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
+                BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
                 addEmptyColumnToScan(scan, indexMaintainer.getDataEmptyKeyValueCF(), indexMaintainer.getEmptyKeyValueQualifier());
             }
             if (dataTable.isTransactional()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 598eb5f..a52bddf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -17,8 +17,8 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
-import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
 import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
 import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
 import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
@@ -42,13 +42,15 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATT
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
+import java.util.NavigableSet;
 import java.util.concurrent.ExecutionException;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -70,6 +72,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.filter.SkipScanFilter;
@@ -85,6 +88,7 @@ import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
@@ -93,6 +97,7 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
@@ -100,7 +105,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 public class IndexRebuildRegionScanner extends BaseRegionScanner {
 
@@ -309,20 +313,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
             if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
                 return false;
-            }
-            if (verifyType == IndexTool.IndexVerifyType.ONLY) {
-                if (before.validIndexRowCount + before.expiredIndexRowCount != scannedDataRowCount) {
+            } else if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+                if (before.invalidIndexRowCount + before.missingIndexRowCount > 0) {
                     return true;
                 }
-            }
-            if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
+            } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
                 if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
                     return true;
                 }
-                if (before.validIndexRowCount + before.expiredIndexRowCount +
-                        after.expiredIndexRowCount + after.validIndexRowCount != scannedDataRowCount) {
-                    return true;
-                }
             }
             return false;
         }
@@ -360,8 +358,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     private Table resultHTable = null;
     private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
     private boolean verify = false;
-    private Map<byte[], Put> indexKeyToDataPutMap;
-    private Map<byte[], Put> dataKeyToDataPutMap;
+    private Map<byte[], List<Mutation>> indexKeyToMutationMap;
+    private Map<byte[], Pair<Put, Delete>> dataKeyToMutationMap;
     private TaskRunner pool;
     private TaskBatch<Boolean> tasks;
     private String exceptionMessage;
@@ -371,15 +369,21 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     private int indexTableTTL;
     private VerificationResult verificationResult;
     private boolean isBeforeRebuilt = true;
-
-    IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
-                               final RegionCoprocessorEnvironment env,
-                               UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
+    private boolean partialRebuild = false;
+    private int  singleRowRebuildReturnCode;
+    private Map<byte[], NavigableSet<byte[]>> familyMap;
+    private byte[][] viewConstants;
+
+    IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
+                              final RegionCoprocessorEnvironment env,
+                              UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
         super(innerScanner);
         final Configuration config = env.getConfiguration();
         if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
             pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
                     QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+        } else {
+            partialRebuild = true;
         }
         maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
         mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
@@ -392,31 +396,38 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             useProto = false;
             indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
         }
-        if (!scan.isRaw()) {
-            // No need to deserialize index maintainers when the scan is raw. Raw scan is used by partial rebuilds
-            List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
-            indexMaintainer = maintainers.get(0);
-        }
+        List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+        indexMaintainer = maintainers.get(0);
         this.scan = scan;
+        familyMap = scan.getFamilyMap();
+        if (familyMap.isEmpty()) {
+            familyMap = null;
+        }
+
         this.innerScanner = innerScanner;
         this.region = region;
         this.env = env;
         this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
         indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
+        if (indexRowKey != null) {
+            setReturnCodeForSingleRowRebuild();
+            pageSizeInRows = 1;
+        }
         byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
         if (valueBytes != null) {
             verificationResult = new VerificationResult();
             verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
             if (verifyType != IndexTool.IndexVerifyType.NONE) {
                 verify = true;
+                viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
                 // Create the following objects only for rebuilds by IndexTool
                 hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
                 indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
                 indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
 		outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
                 resultHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.RESULT_TABLE_NAME_BYTES));
-                indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-                dataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+                indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+                dataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
                 pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
                         new ThreadPoolBuilder("IndexVerify",
                                 env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
@@ -428,13 +439,38 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         }
     }
 
+    private void setReturnCodeForSingleRowRebuild() throws IOException {
+        try (RegionScanner scanner = region.getScanner(scan)) {
+            List<Cell> row = new ArrayList<>();
+            scanner.next(row);
+            // Check if the data table row we have just scanned matches with the index row key.
+            // If not, there is no need to build the index row from this data table row,
+            // and just return zero row count.
+            if (row.isEmpty()) {
+                singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue();
+            } else {
+                Put put = new Put(CellUtil.cloneRow(row.get(0)));
+                for (Cell cell : row) {
+                    put.add(cell);
+                }
+                if (checkIndexRow(indexRowKey, put)) {
+                    singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
+                } else {
+                    singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
+                }
+            }
+        }
+    }
+
     @Override
     public HRegionInfo getRegionInfo() {
         return region.getRegionInfo();
     }
 
     @Override
-    public boolean isFilterDone() { return false; }
+    public boolean isFilterDone() {
+        return false;
+    }
 
     private void logToIndexToolResultTable() throws IOException {
         long scanMaxTs = scan.getTimeRange().getMax();
@@ -505,38 +541,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         m.setDurability(Durability.SKIP_WAL);
     }
 
-    private Delete generateDeleteMarkers(Put put) {
-        Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
-        int cellCount = put.size();
-        if (cellCount == allColumns.size() + 1) {
-            // We have all the columns for the index table. So, no delete marker is needed
-            return null;
-        }
-        Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(cellCount);
-        long ts = 0;
-        for (List<Cell> cells : put.getFamilyCellMap().values()) {
-            if (cells == null) {
-                break;
-            }
-            for (Cell cell : cells) {
-                includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
-                if (ts < cell.getTimestamp()) {
-                    ts = cell.getTimestamp();
-                }
-            }
-        }
-        Delete del = null;
-        for (ColumnReference column : allColumns) {
-            if (!includedColumns.contains(column)) {
-                if (del == null) {
-                    del = new Delete(put.getRow());
-                }
-                del.addColumns(column.getFamily(), column.getQualifier(), ts);
-            }
-        }
-        return del;
-    }
-
     private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
         if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
             ungroupedAggregateRegionObserver.checkForRegionClosing();
@@ -547,12 +551,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return uuidValue;
     }
 
-    private class SimpleValueGetter implements ValueGetter {
+    public static class SimpleValueGetter implements ValueGetter {
         final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
         final Put put;
-        SimpleValueGetter (final Put put) {
+
+        public SimpleValueGetter(final Put put) {
             this.put = put;
         }
+
         @Override
         public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
             List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
@@ -595,7 +601,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     }
 
     private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
-                                           String errorMsg, byte[] expectedValue,  byte[] actualValue) throws IOException {
+                                           String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException {
         final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
         final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
         final int PREFIX_LENGTH = 3;
@@ -643,8 +649,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             length += PREFIX_LENGTH;
             Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
 
-        }
-        else {
+        } else {
             errorMessageBytes = Bytes.toBytes(errorMsg);
         }
         put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
@@ -656,21 +661,11 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         outputHTable.put(put);
     }
 
-    private long getMaxTimestamp(Result result) {
+    private static long getMaxTimestamp(Mutation m) {
         long ts = 0;
-        for (Cell cell : result.rawCells()) {
-            if (ts < cell.getTimestamp()) {
-                ts = cell.getTimestamp();
-            }
-        }
-        return ts;
-    }
-
-    private long getMaxTimestamp(Put put) {
-        long ts = 0;
-        for (List<Cell> cells : put.getFamilyCellMap().values()) {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
             if (cells == null) {
-                break;
+                continue;
             }
             for (Cell cell : cells) {
                 if (ts < cell.getTimestamp()) {
@@ -681,132 +676,383 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return ts;
     }
 
-    private boolean verifySingleIndexRow(Result indexRow, final Put dataRow) throws IOException {
-        ValueGetter valueGetter = new SimpleValueGetter(dataRow);
-        long ts = getMaxTimestamp(dataRow);
-        Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
-                valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null);
-        if (indexPut == null) {
-            // This means the data row does not have any covered column values
-            indexPut = new Put(indexRow.getRow());
+    private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) {
+        List<Cell> cellList = m.getFamilyCellMap().get(family);
+        if (cellList == null) {
+            return null;
         }
-        else {
-            // Remove the empty column prepared by Index codec as we need to change its value
-            removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
-                    indexMaintainer.getEmptyKeyValueQualifier());
+        for (Cell cell : cellList) {
+            if (CellUtil.matchingQualifier(cell, qualifier)) {
+                return cell;
+            }
         }
-        // Add the empty column
-        indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
-                indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
-        int cellCount = 0;
-        long currentTime = EnvironmentEdgeManager.currentTime();
-        for (List<Cell> cells : indexPut.getFamilyCellMap().values()) {
+        return null;
+    }
+
+    private boolean isMatchingMutation(Mutation expected, Mutation actual, int version) throws IOException {
+        if (getTimestamp(expected) != getTimestamp(actual)) {
+            String errorMsg = "Not matching timestamp";
+            byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+            logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
+                    errorMsg, null, null);
+            return false;
+        }
+        for (List<Cell> cells : expected.getFamilyCellMap().values()) {
             if (cells == null) {
-                break;
+                continue;
             }
             for (Cell expectedCell : cells) {
                 byte[] family = CellUtil.cloneFamily(expectedCell);
                 byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
-                Cell actualCell = indexRow.getColumnLatestCell(family, qualifier);
-                if (actualCell == null) {
-                    // Check if cell expired as per the current server's time and data table ttl
-                    // Index table should have the same ttl as the data table, hence we might not
-                    // get a value back from index if it has already expired between our rebuild and
-                    // verify
-                    // TODO: have a metric to update for these cases
-                    if (isTimestampBeforeTTL(currentTime, expectedCell.getTimestamp())) {
-                        continue;
-                    }
-                    String errorMsg = " Missing cell " + Bytes.toString(family) + ":" +
-                            Bytes.toString(qualifier);
-                    logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+                Cell actualCell = getCell(actual, family, qualifier);
+                if (actualCell == null ||
+                        !CellUtil.matchingType(expectedCell, actualCell)) {
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+                    String errorMsg = "Missing cell (in version " + version + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+                    logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg);
                     return false;
                 }
-                if (actualCell.getTimestamp() < ts) {
-                    // Skip older cells since a Phoenix index row is composed of cells with the same timestamp
-                    continue;
-                }
-                // Check all columns
                 if (!CellUtil.matchingValue(actualCell, expectedCell)) {
-                    String errorMsg = "Not matching value for " + Bytes.toString(family) + ":" +
-                            Bytes.toString(qualifier);
-                    logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
+                    if (Bytes.compareTo(family, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary()) == 0 &&
+                            Bytes.compareTo(qualifier, indexMaintainer.getEmptyKeyValueQualifier()) == 0) {
+                        if (Bytes.compareTo(actualCell.getValueArray(), actualCell.getValueOffset(), actualCell.getValueLength(),
+                                UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0) {
+                            // We will not flag this as mismatch but will log it
+                            byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+                            String errorMsg = "Unverified index row (in version " + version + ")";
+                            logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg);
+                            continue;
+                        }
+                    }
+                    String errorMsg = "Not matching value (in version " + version + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+                    logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
                             errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
                     return false;
-                } else if (actualCell.getTimestamp() != ts) {
-                    String errorMsg = "Not matching timestamp for " + Bytes.toString(family) + ":" +
-                            Bytes.toString(qualifier) + " E: " + ts + " A: " +
-                            actualCell.getTimestamp();
-                    logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
-                            errorMsg, null, null);
+                }
+            }
+        }
+        return true;
+    }
+
+    private boolean isVerified(Put mutation) throws IOException {
+        List<Cell> cellList = mutation.get(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                indexMaintainer.getEmptyKeyValueQualifier());
+        Cell cell = (cellList != null && !cellList.isEmpty()) ? cellList.get(0) : null;
+        if (cell == null) {
+            throw new DoNotRetryIOException("No empty column cell");
+        }
+        if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                VERIFIED_BYTES, 0, VERIFIED_BYTES.length) == 0) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * This is to reorder the mutations in descending order by the tuple of timestamp and mutation type where
+     * delete comes before put
+     */
+    public static final Comparator<Mutation> MUTATION_TS_DESC_COMPARATOR = new Comparator<Mutation>() {
+        @Override
+        public int compare(Mutation o1, Mutation o2) {
+            long ts1 = getTimestamp(o1);
+            long ts2 = getTimestamp(o2);
+            if (ts1 > ts2) {
+                return -1;
+            }
+            if (ts1 < ts2) {
+                return 1;
+            }
+            if (o1 instanceof Delete && o2 instanceof Put) {
+                return -1;
+            }
+            if (o1 instanceof Put && o2 instanceof Delete) {
+                return 1;
+            }
+            return 0;
+        }
+    };
+
+    private boolean isDeleteFamily(Mutation mutation) {
+        for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamily) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private boolean isDeleteFamilyVersion(Mutation mutation) {
+        for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamilyVersion) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private static List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException {
+        Put put = null;
+        Delete del = null;
+        for (Cell cell : indexRow.rawCells()) {
+            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                if (put == null) {
+                    put = new Put(CellUtil.cloneRow(cell));
+                }
+                put.add(cell);
+            } else {
+                if (del == null) {
+                    del = new Delete(CellUtil.cloneRow(cell));
+                }
+                del.addDeleteMarker(cell);
+            }
+        }
+        return getMutationsWithSameTS(put, del);
+    }
+
+    /**
+     * indexRow is the set of all cells of all the row version of an index row from the index table. These are actual
+     * cells. We group these cells based on timestamp and type (put vs delete), and form the actual set of
+     * index mutations. indexKeyToMutationMap is a map from an index row key to a set of mutations that are generated
+     * using the rebuild process (i.e., by replaying raw data table mutations). These sets are sets of expected
+     * index mutations, one set for each index row key. Since not all mutations in the index table have both phase
+     * (i.e., pre and post data phase) mutations, we cannot compare actual index mutations with expected one by one
+     * and expect to find them identical. We need to consider concurrent data mutation effects, data table row write
+     * failures, post index write failures. Thus, we need to allow some expected and actual mutations to be skipped
+     * during comparing actual mutations to index mutations.
+     *
+     * The main idea for the verification algorithm used here is to match every expected verified put with an actual
+     * put such that these two mutations are the same except that actual mutation can be unverified.
+     *
+     * Some background on why we can skip some of the actual unverified puts and delete markers due to concurrent data
+     * table updates is as follows:
+     *
+     * For each data table mutation, two operations are done on the data table. One is to read the existing row state,
+     * and the second is to write to the data table. The processing of concurrent data mutations are serialized once
+     * for reading the existing row states, and then serialized again for updating data table. In other words,
+     * they go through locking twice, i.e., [lock, read, unlock] and [lock, write, unlock]. Because of this two phase
+     * locking, for a pair of concurrent mutations (for the same row), the same row state can be read from the data
+     * table. This means the same existing index row can be made unverified twice with different timestamps, one for
+     * each concurrent mutation. These unverified mutation are then repaired from the data table. Since expected
+     * mutations are used for rebuild (which is also used by the read repair), skipping these unverified put mutations
+     * that are not matching with expected mutation are safe as they will go through the same process during
+     * read repair and will be skipped and eventually cleaned up by the read repair. We can skip the delete markers
+     * safely too as they are placed to clean up these unverified mutations. When the data table rows are rebuilt,
+     * the rebuild process generates the delete family markers. The timestamp of delete markers are the timestamp of
+     * the data table mutation for which the delete marker is added. Thus, the timestamp of these delete markers will be
+     * higher than the timestamp of index row to be deleted.
+     */
+    private boolean verifySingleIndexRow(Result indexRow, VerificationResult.PhaseResult verificationPhaseResult)
+            throws IOException {
+        List<Mutation> expectedMutationList = indexKeyToMutationMap.get(indexRow.getRow());
+        if (expectedMutationList == null) {
+            throw new DoNotRetryIOException("No expected mutation");
+        }
+        List<Mutation> actualMutationList = prepareActualIndexMutations(indexRow);
+        if (actualMutationList == null || actualMutationList.isEmpty()) {
+            throw new DoNotRetryIOException("actualMutationList is null or empty");
+        }
+        Collections.sort(expectedMutationList, MUTATION_TS_DESC_COMPARATOR);
+        Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
+        long currentTime = EnvironmentEdgeManager.currentTime();
+        int actualIndex = 0;
+        int expectedIndex = 0;
+        int matchingCount = 0;
+        int expectedSize = expectedMutationList.size();
+        int actualSize = actualMutationList.size();
+        Mutation expected = null;
+        Mutation previousExpected;
+        Mutation actual;
+        while (expectedIndex < expectedSize && actualIndex <actualSize) {
+            previousExpected = expected;
+            expected = expectedMutationList.get(expectedIndex);
+            // Check if cell expired as per the current server's time and data table ttl
+            // Index table should have the same ttl as the data table, hence we might not
+            // get a value back from index if it has already expired between our rebuild and
+            // verify
+            // TODO: have a metric to update for these cases
+            if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) {
+                verificationPhaseResult.expiredIndexRowCount++;
+                return true;
+            }
+            actual = actualMutationList.get(actualIndex);
+            if (expected instanceof Put) {
+                if (previousExpected == null || previousExpected instanceof Put) {
+                    // This expected put is either the first mutation or a put just comes after another expected mutation
+                    // on the expected mutation list which is sorted by the mutation timestamps. The cell timestamps
+                    // within each mutation here are the same.
+                    // Go down the list of actual mutations and find the corresponding actual put mutation with the same
+                    // timestamp. Stop if a verified put or delete family mutation is encountered on the way. Skip
+                    // unverified puts or delete family version delete markers.
+                    while (getTimestamp(actual) > getTimestamp(expected) &&
+                            ((actual instanceof Put && !isVerified((Put) actual)) ||
+                                    (actual instanceof Delete && isDeleteFamilyVersion(actual)))) {
+                        actualIndex++;
+                        if (actualIndex == actualSize) {
+                            break;
+                        }
+                        actual = actualMutationList.get(actualIndex);
+                    }
+                } else { // previousExpected instanceof Delete
+                    // Between an expected delete and put, there cannot be any types of mutation even verified put
+                    while (getTimestamp(actual) > getTimestamp(expected)) {
+                        actualIndex++;
+                        if (actualIndex == actualSize) {
+                            break;
+                        }
+                        actual = actualMutationList.get(actualIndex);
+                    }
+                }
+                if (actualIndex == actualSize) {
+                    break;
+                }
+                // Now the expected and actual mutations should match
+                if (isMatchingMutation(expected, actual, expectedIndex)) {
+                    expectedIndex++;
+                    actualIndex++;
+                    matchingCount++;
+                    continue;
+                }
+                verificationPhaseResult.invalidIndexRowCount++;
+                return false;
+            } else { // expected instanceof Delete
+                // Between put and delete, delete and delete, or before first delete, there can be other deletes and
+                // unverified puts. Skip all of them if any
+                while (getTimestamp(actual) > getTimestamp(expected) &&
+                        ((actual instanceof Put && !isVerified((Put) actual)) || actual instanceof Delete)) {
+                    actualIndex++;
+                    if (actualIndex == actualSize) {
+                        break;
+                    }
+                    actual = actualMutationList.get(actualIndex);
+                }
+                if (actualIndex == actualSize) {
+                    break;
+                }
+                // If this is first expected mutation is delete, there should be an actual delete mutation with the
+                // same timestamp or an unverified put with the same or older timestamp
+                if (getTimestamp(actual) == getTimestamp(expected) &&
+                        (actual instanceof Delete && isDeleteFamily(actual))) {
+                    expectedIndex++;
+                    actualIndex++;
+                    matchingCount++;
+                    continue;
+                } else if (getTimestamp(actual) <= getTimestamp(expected) &&
+                        (actual instanceof Put && !isVerified((Put) actual))) {
+                    expectedIndex++;
+                    if (previousExpected == null) {
+                        matchingCount++;
+                    }
+                    continue;
+                }
+                if (previousExpected == null) {
+                    String errorMsg = "First delete check failure";
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+                    logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+                            getTimestamp(expected),
+                            getTimestamp(actual), errorMsg);
+                    verificationPhaseResult.invalidIndexRowCount++;
                     return false;
                 }
-                cellCount++;
             }
         }
-        if (cellCount != indexRow.rawCells().length) {
-            String errorMsg = "Expected to find " + cellCount + " cells but got "
-                    + indexRow.rawCells().length + " cells";
-            logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
-            return false;
+        if ((expectedIndex != expectedSize) || actualIndex != actualSize) {
+            if (matchingCount > 0) {
+                // We do not consider this as a verification issue but log it for further information.
+                // This may happen due to compaction
+                byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+                String errorMsg = "Expected to find " + expectedMutationList.size() + " mutations but got "
+                        + actualMutationList.size();
+                logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+                        getTimestamp(expectedMutationList.get(0)),
+                        getTimestamp(actualMutationList.get(0)), errorMsg);
+            } else {
+                byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+                String errorMsg = "Not matching index row";
+                logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+                        getTimestamp(expectedMutationList.get(0)),
+                        getTimestamp(actualMutationList.get(0)), errorMsg);
+                verificationPhaseResult.invalidIndexRowCount++;
+                return false;
+            }
         }
+        verificationPhaseResult.validIndexRowCount++;
         return true;
     }
 
-    private void verifyIndexRows(List<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap,
+    private static long getMaxTimestamp(Pair<Put, Delete> pair) {
+        Put put = pair.getFirst();
+        long ts1 = 0;
+        if (put != null) {
+            ts1 = getMaxTimestamp((Mutation)put);
+        }
+        Delete del = pair.getSecond();
+        long ts2 = 0;
+        if (del != null) {
+            ts1 = getMaxTimestamp((Mutation)del);
+        }
+        return (ts1 > ts2) ? ts1 : ts2;
+    }
+
+    private void verifyIndexRows(List<KeyRange> keys,
                                  VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
-        int expectedRowCount = keys.size();
+        List<KeyRange> invalidKeys = new ArrayList<>();
         ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
         Scan indexScan = new Scan();
         indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
         scanRanges.initializeScan(indexScan);
         SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
-        indexScan.setFilter(skipScanFilter);
-        int rowCount = 0;
+        indexScan.setFilter(new SkipScanFilter(skipScanFilter, true));
+        indexScan.setRaw(true);
+        indexScan.setMaxVersions();
         try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
             for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
-                Put dataPut = indexKeyToDataPutMap.get(result.getRow());
-                if (dataPut == null) {
-                    // This should never happen
-                    String errorMsg = "Missing data row";
-                    logToIndexToolOutputTable(null, result.getRow(), 0, getMaxTimestamp(result), errorMsg);
-                    exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
-                    throw new IOException(exceptionMessage);
+                KeyRange keyRange = PVarbinary.INSTANCE.getKeyRange(result.getRow());
+                if (!keys.contains(keyRange)) {
+                    continue;
                 }
-                if (verifySingleIndexRow(result, dataPut)) {
-                    verificationPhaseResult.validIndexRowCount++;
-                    perTaskDataKeyToDataPutMap.remove(dataPut.getRow());
-                } else {
-                    verificationPhaseResult.invalidIndexRowCount++;
+                if (!verifySingleIndexRow(result, verificationPhaseResult)) {
+                    invalidKeys.add(keyRange);
                 }
-                rowCount++;
+                keys.remove(keyRange);
             }
         } catch (Throwable t) {
             ServerUtil.throwIOException(indexHTable.getName().toString(), t);
         }
         // Check if any expected rows from index(which we didn't get) are already expired due to TTL
         // TODO: metrics for expired rows
-        if (!perTaskDataKeyToDataPutMap.isEmpty()) {
-            Iterator<Entry<byte[], Put>> itr = perTaskDataKeyToDataPutMap.entrySet().iterator();
+        if (!keys.isEmpty()) {
+            Iterator<KeyRange> itr = keys.iterator();
             long currentTime = EnvironmentEdgeManager.currentTime();
             while(itr.hasNext()) {
-                Entry<byte[], Put> entry = itr.next();
-                long ts = getMaxTimestamp(entry.getValue());
-                if (isTimestampBeforeTTL(currentTime, ts)) {
+                KeyRange keyRange = itr.next();
+                byte[] key = keyRange.getLowerRange();
+                List<Mutation> mutationList = indexKeyToMutationMap.get(key);
+                if (isTimestampBeforeTTL(currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) {
                     itr.remove();
-                    rowCount++;
                     verificationPhaseResult.expiredIndexRowCount++;
                 }
             }
         }
-        if (rowCount != expectedRowCount) {
-            for (Map.Entry<byte[], Put> entry : perTaskDataKeyToDataPutMap.entrySet()) {
+        if (keys.size() > 0) {
+            for (KeyRange keyRange : keys) {
                 String errorMsg = "Missing index row";
-                logToIndexToolOutputTable(entry.getKey(), null, getMaxTimestamp(entry.getValue()),
-                        0, errorMsg);
+                byte[] key = keyRange.getLowerRange();
+                List<Mutation> mutationList = indexKeyToMutationMap.get(key);
+                byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
+                logToIndexToolOutputTable(dataKey,
+                        keyRange.getLowerRange(),
+                        getMaxTimestamp(dataKeyToMutationMap.get(dataKey)),
+                        getTimestamp(mutationList.get(mutationList.size() - 1)), errorMsg);
             }
-            verificationPhaseResult.missingIndexRowCount += expectedRowCount - rowCount;
+            verificationPhaseResult.missingIndexRowCount += keys.size();
         }
+        keys.addAll(invalidKeys);
     }
 
     private boolean isTimestampBeforeTTL(long currentTime, long tsToCheck) {
@@ -816,7 +1062,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
     }
 
-    private void addVerifyTask(final List<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap,
+    private void addVerifyTask(final List<KeyRange> keys,
                                final VerificationResult.PhaseResult verificationPhaseResult) {
         tasks.add(new Task<Boolean>() {
             @Override
@@ -826,7 +1072,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                         exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName();
                         throw new IOException(exceptionMessage);
                     }
-                    verifyIndexRows(keys, perTaskDataKeyToDataPutMap, verificationPhaseResult);
+                    verifyIndexRows(keys, verificationPhaseResult);
                 } catch (Exception e) {
                     throw e;
                 }
@@ -836,32 +1082,26 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     }
 
     private void parallelizeIndexVerify(VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
-        for (Mutation mutation : mutations) {
-            indexKeyToDataPutMap.put(getIndexRowKey((Put)mutation), (Put)mutation);
-        }
-        int taskCount = (indexKeyToDataPutMap.size() + rowCountPerTask - 1) / rowCountPerTask;
+        int taskCount = (indexKeyToMutationMap.size() + rowCountPerTask - 1) / rowCountPerTask;
         tasks = new TaskBatch<>(taskCount);
-        List<Map<byte[], Put>> dataPutMapList = new ArrayList<>(taskCount);
+        List<List<KeyRange>> listOfKeyRangeList = new ArrayList<>(taskCount);
         List<VerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
         List<KeyRange> keys = new ArrayList<>(rowCountPerTask);
-        Map<byte[], Put> perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-        dataPutMapList.add(perTaskDataKeyToDataPutMap);
+        listOfKeyRangeList.add(keys);
         VerificationResult.PhaseResult perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
         verificationPhaseResultList.add(perTaskVerificationPhaseResult);
-        for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) {
-            keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey()));
-            perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), entry.getValue());
+        for (byte[] indexKey: indexKeyToMutationMap.keySet()) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(indexKey));
             if (keys.size() == rowCountPerTask) {
-                addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
+                addVerifyTask(keys, perTaskVerificationPhaseResult);
                 keys = new ArrayList<>(rowCountPerTask);
-                perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-                dataPutMapList.add(perTaskDataKeyToDataPutMap);
+                listOfKeyRangeList.add(keys);
                 perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
                 verificationPhaseResultList.add(perTaskVerificationPhaseResult);
             }
         }
         if (keys.size() > 0) {
-            addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
+            addVerifyTask(keys, perTaskVerificationPhaseResult);
         }
         List<Boolean> taskResultList = null;
         try {
@@ -878,30 +1118,41 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 throw new IOException(exceptionMessage);
             }
         }
-        if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
-            for (Map<byte[], Put> dataPutMap : dataPutMapList) {
-                dataKeyToDataPutMap.putAll(dataPutMap);
-            }
-        }
         for (VerificationResult.PhaseResult result : verificationPhaseResultList) {
             verificationPhaseResult.add(result);
         }
+        if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
+            Map<byte[], Pair<Put, Delete>> newDataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+            for (List<KeyRange> keyRangeList : listOfKeyRangeList) {
+                for (KeyRange keyRange : keyRangeList) {
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
+                    newDataKeyToMutationMap.put(dataKey, dataKeyToMutationMap.get(dataKey));
+                }
+            }
+            dataKeyToMutationMap.clear();
+            dataKeyToMutationMap = newDataKeyToMutationMap;
+        }
     }
 
     private void rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
         byte[] uuidValue = ServerCacheClient.generateId();
         UngroupedAggregateRegionObserver.MutationList currentMutationList =
                 new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
+        Put put = null;
         for (Mutation mutation : mutationList) {
-            Put put = (Put) mutation;
-            currentMutationList.add(mutation);
-            setMutationAttributes(put, uuidValue);
-            uuidValue = commitIfReady(uuidValue, currentMutationList);
-            Delete deleteMarkers = generateDeleteMarkers(put);
-            if (deleteMarkers != null) {
-                setMutationAttributes(deleteMarkers, uuidValue);
-                currentMutationList.add(deleteMarkers);
+            if (mutation instanceof Put) {
+                if (put != null) {
+                    // back to back put, i.e., no delete in between. we can commit the previous put
+                    uuidValue = commitIfReady(uuidValue, currentMutationList);
+                }
+                currentMutationList.add(mutation);
+                setMutationAttributes(mutation, uuidValue);
+                put = (Put)mutation;
+            } else {
+                currentMutationList.add(mutation);
+                setMutationAttributes(mutation, uuidValue);
                 uuidValue = commitIfReady(uuidValue, currentMutationList);
+                put = null;
             }
         }
         if (!currentMutationList.isEmpty()) {
@@ -912,11 +1163,11 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
 
     private void verifyAndOrRebuildIndex() throws IOException {
         VerificationResult nextVerificationResult = new VerificationResult();
-        nextVerificationResult.scannedDataRowCount = mutations.size();
+        nextVerificationResult.scannedDataRowCount = dataKeyToMutationMap.size();
         if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) {
             // For these options we start with rebuilding index rows
             rebuildIndexRows(mutations);
-            nextVerificationResult.rebuiltIndexRowCount = mutations.size();
+            nextVerificationResult.rebuiltIndexRowCount = dataKeyToMutationMap.size();
             isBeforeRebuilt = false;
         }
         if (verifyType == IndexTool.IndexVerifyType.NONE) {
@@ -928,73 +1179,285 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             // For these options we start with verifying index rows
             parallelizeIndexVerify(verificationPhaseResult);
             nextVerificationResult.before.add(verificationPhaseResult);
-            if (mutations.size() != verificationPhaseResult.getTotalCount()) {
-                throw new DoNotRetryIOException(
-                        "mutations.size() != verificationPhaseResult.getTotalCount() at the before phase " +
-                                nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
-            }
         }
         if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
             // For these options, we have identified the rows to be rebuilt and now need to rebuild them
             // At this point, dataKeyToDataPutMap includes mapping only for the rows to be rebuilt
             mutations.clear();
-            for (Map.Entry<byte[], Put> entry: dataKeyToDataPutMap.entrySet()) {
-                mutations.add(entry.getValue());
+
+            for (Map.Entry<byte[], Pair<Put, Delete>> entry: dataKeyToMutationMap.entrySet()) {
+                if (entry.getValue().getFirst() != null) {
+                    mutations.add(entry.getValue().getFirst());
+                }
+                if (entry.getValue().getSecond() != null) {
+                    mutations.add(entry.getValue().getSecond());
+                }
             }
             rebuildIndexRows(mutations);
-            nextVerificationResult.rebuiltIndexRowCount += mutations.size();
+            nextVerificationResult.rebuiltIndexRowCount += dataKeyToMutationMap.size();
             isBeforeRebuilt = false;
         }
 
         if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
             // We have rebuilt index row and now we need to verify them
-            indexKeyToDataPutMap.clear();
             VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
+            indexKeyToMutationMap.clear();
+            for (Map.Entry<byte[], Pair<Put, Delete>> entry: dataKeyToMutationMap.entrySet()) {
+                prepareIndexMutations(entry.getValue().getFirst(), entry.getValue().getSecond());
+            }
             parallelizeIndexVerify(verificationPhaseResult);
             nextVerificationResult.after.add(verificationPhaseResult);
-            if (mutations.size() != verificationPhaseResult.getTotalCount()) {
-                throw new DoNotRetryIOException(
-                        "mutations.size() != verificationPhaseResult.getTotalCount() at the after phase " +
-                                nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
-            }
         }
-        indexKeyToDataPutMap.clear();
         verificationResult.add(nextVerificationResult);
     }
 
+    private boolean isColumnIncluded(Cell cell) {
+        byte[] family = CellUtil.cloneFamily(cell);
+        if (!familyMap.containsKey(family)) {
+            return false;
+        }
+        NavigableSet<byte[]> set = familyMap.get(family);
+        if (set == null || set.isEmpty()) {
+            return true;
+        }
+        byte[] qualifier = CellUtil.cloneQualifier(cell);
+        return set.contains(qualifier);
+    }
+
+    public static long getTimestamp(Mutation m) {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                return cell.getTimestamp();
+            }
+        }
+        throw new IllegalStateException("No cell found");
+    }
+
+    /**
+     * This is to reorder the mutations in ascending order by the tuple of timestamp and mutation type where
+     * delete comes before put
+     */
+    public static final Comparator<Mutation> MUTATION_TS_COMPARATOR = new Comparator<Mutation>() {
+        @Override
+        public int compare(Mutation o1, Mutation o2) {
+            long ts1 = getTimestamp(o1);
+            long ts2 = getTimestamp(o2);
+            if (ts1 < ts2) {
+                return -1;
+            }
+            if (ts1 > ts2) {
+                return 1;
+            }
+            if (o1 instanceof Delete && o2 instanceof Put) {
+                return -1;
+            }
+            if (o1 instanceof Put && o2 instanceof Delete) {
+                return 1;
+            }
+            return 0;
+        }
+    };
+
+    public static List<Mutation> getMutationsWithSameTS(Put put, Delete del) {
+        List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(2);
+        if (put != null) {
+            mutationList.add(put);
+        }
+        if (del != null) {
+            mutationList.add(del);
+        }
+        // Group the cells within a mutation based on their timestamps and create a separate mutation for each group
+        mutationList = (List<Mutation>) IndexManagementUtil.flattenMutationsByTimestamp(mutationList);
+        // Reorder the mutations on the same row so that delete comes before put when they have the same timestamp
+        Collections.sort(mutationList, MUTATION_TS_COMPARATOR);
+        return mutationList;
+    }
+
+    private static Put prepareIndexPutForRebuid(IndexMaintainer indexMaintainer, ImmutableBytesPtr rowKeyPtr,
+                                                ValueGetter mergedRowVG, long ts)
+            throws IOException {
+        Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                mergedRowVG, rowKeyPtr, ts, null, null);
+        if (indexPut == null) {
+            // No covered column. Just prepare an index row with the empty column
+            byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG, rowKeyPtr,
+                    null, null, HConstants.LATEST_TIMESTAMP);
+            indexPut = new Put(indexRowKey);
+        }
+        indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
+        return indexPut;
+    }
+
+    public static void removeColumn(Put put, Cell deleteCell) {
+        byte[] family = CellUtil.cloneFamily(deleteCell);
+        List<Cell> cellList = put.getFamilyCellMap().get(family);
+        if (cellList == null) {
+            return;
+        }
+        Iterator<Cell> cellIterator = cellList.iterator();
+        while (cellIterator.hasNext()) {
+            Cell cell = cellIterator.next();
+            if (CellUtil.matchingQualifier(cell, deleteCell)) {
+                cellIterator.remove();
+                if (cellList.isEmpty()) {
+                    put.getFamilyCellMap().remove(family);
+                }
+                return;
+            }
+        }
+    }
+
+    public static void apply(Put destination, Put source) throws IOException {
+        for (List<Cell> cells : source.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                if (!destination.has(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell))) {
+                    destination.add(cell);
+                }
+            }
+        }
+    }
+
+    public static Put applyNew(Put destination, Put source) throws IOException {
+        Put next = new Put(destination);
+        apply(next, source);
+        return next;
+    }
+
+    /**
+     * Generate the index update for a data row from the mutation that are obtained by merging the previous data row
+     * state with the pending row mutation for index rebuild. This method is called only for global indexes.
+     * pendingMutations is a sorted list of data table mutations that are used to replay index table mutations.
+     * This list is sorted in ascending order by the tuple of row key, timestamp and mutation type where delete comes
+     * after put.
+     */
+    public static List<Mutation> prepareIndexMutationsForRebuild(IndexMaintainer indexMaintainer,
+                                                                 Put dataPut, Delete dataDel) throws IOException {
+        List<Mutation> dataMutations = getMutationsWithSameTS(dataPut, dataDel);
+        List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size());
+        // The row key ptr of the data table row for which we will build index rows here
+        ImmutableBytesPtr rowKeyPtr = (dataPut != null) ? new ImmutableBytesPtr(dataPut.getRow()) :
+                new ImmutableBytesPtr(dataDel.getRow());
+        // Start with empty data table row
+        Put currentDataRow = null;
+        // The index row key corresponding to the current data row
+        byte[] indexRowKeyForCurrentDataRow = null;
+        for (Mutation mutation : dataMutations) {
+            long ts = getTimestamp(mutation);
+            if (mutation instanceof Put) {
+                // Add this put on top of the current data row state to get the next data row state
+                Put nextDataRow = (currentDataRow == null) ? new Put((Put)mutation) : applyNew((Put)mutation, currentDataRow);
+                ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRow);
+                Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
+                indexMutations.add(indexPut);
+                // Delete the current index row if the new index key is different than the current one
+                if (indexRowKeyForCurrentDataRow != null) {
+                    if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
+                        Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                                IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                        indexMutations.add(del);
+                    }
+                }
+                // For the next iteration
+                currentDataRow = nextDataRow;
+                indexRowKeyForCurrentDataRow = indexPut.getRow();
+            } else if (currentDataRow != null) {
+                // We apply delete column mutations only on the current data row state. For the index table,
+                // we are only interested in if the current data row is deleted or not. There is no need to apply
+                // column deletes to index rows since index rows are always full rows and all the cells in an index
+                // row have the same timestamp value. Because of this index rows versions do not share cells.
+                for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+                    for (Cell cell : cells) {
+                        switch ((KeyValue.Type.codeToType(cell.getTypeByte()))) {
+                            case DeleteFamily:
+                            case DeleteFamilyVersion:
+                                currentDataRow.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+                                break;
+                            case DeleteColumn:
+                            case Delete:
+                                removeColumn(currentDataRow, cell);
+                        }
+                    }
+                }
+                if (currentDataRow.getFamilyCellMap().size() == 0) {
+                    Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                            IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                    indexMutations.add(del);
+                    currentDataRow = null;
+                    indexRowKeyForCurrentDataRow = null;
+                }
+            }
+        }
+        return indexMutations;
+    }
+
+    private void prepareIndexMutations(Put put, Delete del) throws IOException{
+        List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del);
+        for (Mutation mutation : indexMutations) {
+            byte[] indexRowKey = mutation.getRow();
+            List<Mutation> mutationList = indexKeyToMutationMap.get(indexRowKey);
+            if (mutationList == null) {
+                mutationList = new ArrayList<>();
+                mutationList.add(mutation);
+                indexKeyToMutationMap.put(indexRowKey, mutationList);
+            } else {
+                mutationList.add(mutation);
+            }
+        }
+    }
+
     @Override
     public boolean next(List<Cell> results) throws IOException {
+        if (indexRowKey != null &&
+                singleRowRebuildReturnCode == GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue()) {
+            byte[] rowCountBytes =
+                    PLong.INSTANCE.toBytes(Long.valueOf(singleRowRebuildReturnCode));
+            final Cell aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+                    SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+            results.add(aggKeyValue);
+            return false;
+        }
         Cell lastCell = null;
         int rowCount = 0;
         region.startRegionOperation();
         try {
-            // Partial rebuilds by MetadataRegionObserver use raw scan. Inline verification is not supported for them
-            boolean partialRebuild = scan.isRaw();
             byte[] uuidValue = ServerCacheClient.generateId();
             synchronized (innerScanner) {
                 do {
                     List<Cell> row = new ArrayList<Cell>();
                     hasMore = innerScanner.nextRaw(row);
                     if (!row.isEmpty()) {
-                        lastCell = row.get(0);
+                        lastCell = row.get(0); // lastCell is any cell from the last visited row
                         Put put = null;
                         Delete del = null;
                         for (Cell cell : row) {
                             if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                                if (!partialRebuild && familyMap != null && !isColumnIncluded(cell)) {
+                                    continue;
+                                }
                                 if (put == null) {
                                     put = new Put(CellUtil.cloneRow(cell));
-                                    mutations.add(put);
                                 }
                                 put.add(cell);
                             } else {
                                 if (del == null) {
                                     del = new Delete(CellUtil.cloneRow(cell));
-                                    mutations.add(del);
                                 }
                                 del.addDeleteMarker(cell);
                             }
                         }
-                        if (partialRebuild) {
+                        if (put == null && del == null) {
+                            continue;
+                        }
+                        // Always add the put first and then delete for a given row. This simplifies the logic in
+                        // IndexRegionObserver
+                        if (put != null) {
+                            mutations.add(put);
+                        }
+                        if (del != null) {
+                            mutations.add(del);
+                        }
+                        if (!verify) {
                             if (put != null) {
                                 setMutationAttributes(put, uuidValue);
                             }
@@ -1002,35 +1465,18 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                                 setMutationAttributes(del, uuidValue);
                             }
                             uuidValue = commitIfReady(uuidValue, mutations);
-                        }
-                        if (indexRowKey != null) {
-                            if (put != null) {
-                                setMutationAttributes(put, uuidValue);
-                            }
-                            Delete deleteMarkers = generateDeleteMarkers(put);
-                            if (deleteMarkers != null) {
-                                setMutationAttributes(deleteMarkers, uuidValue);
-                                mutations.add(deleteMarkers);
-                                uuidValue = commitIfReady(uuidValue, mutations);
-                            }
-                            // GlobalIndexChecker passed the index row key. This is to build a single index row.
-                            // Check if the data table row we have just scanned matches with the index row key.
-                            // If not, there is no need to build the index row from this data table row,
-                            // and just return zero row count.
-                            if (checkIndexRow(indexRowKey, put)) {
-                                rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
-                            } else {
-                                rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
-                            }
-                            break;
+                        } else {
+                            byte[] dataKey = (put != null) ? put.getRow() : del.getRow();
+                            prepareIndexMutations(put, del);
+                            dataKeyToMutationMap.put(dataKey, new Pair<Put, Delete>(put, del));
                         }
                         rowCount++;
                     }
                 } while (hasMore && rowCount < pageSizeInRows);
-                if (!partialRebuild && indexRowKey == null) {
-                    verifyAndOrRebuildIndex();
-                } else {
-                    if (!mutations.isEmpty()) {
+                if (!mutations.isEmpty()) {
+                    if (verify) {
+                        verifyAndOrRebuildIndex();
+                    } else {
                         ungroupedAggregateRegionObserver.checkForRegionClosing();
                         ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
                     }
@@ -1043,10 +1489,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             region.closeRegionOperation();
             mutations.clear();
             if (verify) {
-              indexKeyToDataPutMap.clear();
-              dataKeyToDataPutMap.clear();
+              dataKeyToMutationMap.clear();
+              indexKeyToMutationMap.clear();
             }
         }
+        if (indexRowKey != null) {
+            rowCount = singleRowRebuildReturnCode;
+        }
         byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
         final Cell aggKeyValue;
         if (lastCell == null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 4f21511..fa42ce9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1060,9 +1060,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
                                          final RegionCoprocessorEnvironment env) throws IOException {
-
-        RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
-        return scanner;
+        if (!scan.isRaw() && scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY) == null) {
+            Scan rawScan = new Scan(scan);
+            rawScan.setRaw(true);
+            rawScan.setMaxVersions();
+            rawScan.getFamilyMap().clear();
+            rawScan.setFilter(null);
+            for (byte[] family : scan.getFamilyMap().keySet()) {
+                rawScan.addFamily(family);
+            }
+            innerScanner.close();
+            RegionScanner scanner = region.getScanner(rawScan);
+            return new IndexRebuildRegionScanner(scanner, region, scan, env, this);
+        }
+        return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
     }
     
     private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 08a120e..0092c5c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,25 +42,31 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
+import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.LockManager.RowLock;
 import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
 import org.apache.phoenix.hbase.index.builder.IndexBuilder;
@@ -67,20 +74,27 @@ import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
 
-import com.google.common.collect.Lists;
+import java.util.Set;
+
+import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
+import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.prepareIndexMutationsForRebuild;
+import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeColumn;
 
 /**
  * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
@@ -155,14 +169,16 @@ public class IndexRegionObserver extends BaseRegionObserver {
       private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
       private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
       private boolean rebuild;
+      // The current and next states of the data rows corresponding to the pending mutations
+      private HashMap<ImmutableBytesPtr, Pair<Put, Put>> dataRowStates;
       private BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
       }
   }
-  
+
   private ThreadLocal<BatchMutateContext> batchMutateContext =
           new ThreadLocal<BatchMutateContext>();
-  
+
   /** Configuration key for the {@link IndexBuilder} to use */
   public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
 
@@ -172,17 +188,11 @@ public class IndexRegionObserver extends BaseRegionObserver {
    */
   public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion";
 
-  private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hadoop.hbase.index.recovery.failurepolicy";
-
   public static final String INDEX_LAZY_POST_BATCH_WRITE = "org.apache.hadoop.hbase.index.lazy.post_batch.write";
   private static final boolean INDEX_LAZY_POST_BATCH_WRITE_DEFAULT = false;
 
   private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.post.batch.mutate.threshold";
   private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000;
-  private static final String INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.batch.mutate.threshold";
-  private static final long INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT = 3_000;
-  private static final String INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.open.threshold";
-  private static final long INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT = 3_000;
   private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.increment";
   private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 3_000;
 
@@ -343,13 +353,9 @@ public class IndexRegionObserver extends BaseRegionObserver {
 
   public static long getMaxTimestamp(Mutation m) {
       long maxTs = 0;
-      long ts = 0;
-      Iterator iterator = m.getFamilyCellMap().entrySet().iterator();
-      while (iterator.hasNext()) {
-          Map.Entry<byte[], List<Cell>> entry = (Map.Entry) iterator.next();
-          Iterator<Cell> cellIterator = entry.getValue().iterator();
-          while (cellIterator.hasNext()) {
-              Cell cell = cellIterator.next();
+      long ts;
+      for (List<Cell> cells : m.getFamilyCellMap().values()) {
+          for (Cell cell : cells) {
               ts = cell.getTimestamp();
               if (ts > maxTs) {
                   maxTs = ts;
@@ -403,316 +409,541 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
-  private Collection<? extends Mutation> groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
-                                                        long now, ReplayWrite replayWrite) throws IOException {
-      Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
-      boolean copyMutations = false;
-      for (int i = 0; i < miniBatchOp.size(); i++) {
-          if (miniBatchOp.getOperationStatus(i) == IGNORE) {
-              continue;
-          }
-          Mutation m = miniBatchOp.getOperation(i);
-          if (this.builder.isEnabled(m)) {
-              // Track whether or not we need to
-              ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-              if (mutationsMap.containsKey(row)) {
-                  copyMutations = true;
-              } else {
-                  mutationsMap.put(row, null);
-              }
-          }
-      }
-      // early exit if it turns out we don't have any edits
-      if (mutationsMap.isEmpty()) {
-          return null;
-      }
-      // If we're copying the mutations
-      Collection<Mutation> originalMutations;
-      Collection<? extends Mutation> mutations;
-      if (copyMutations) {
-          originalMutations = null;
-          mutations = mutationsMap.values();
-      } else {
-          originalMutations = Lists.newArrayListWithExpectedSize(mutationsMap.size());
-          mutations = originalMutations;
-      }
-
-      boolean resetTimeStamp = replayWrite == null;
+    /**
+     * This method is only used for local indexes
+     */
+    private Collection<? extends Mutation> groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                                          long now) throws IOException {
+        Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
+        boolean copyMutations = false;
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+                continue;
+            }
+            Mutation m = miniBatchOp.getOperation(i);
+            if (this.builder.isEnabled(m)) {
+                // Track whether or not we need to
+                ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+                if (mutationsMap.containsKey(row)) {
+                    copyMutations = true;
+                } else {
+                    mutationsMap.put(row, null);
+                }
+            }
+        }
+        // early exit if it turns out we don't have any edits
+        if (mutationsMap.isEmpty()) {
+            return null;
+        }
+        // If we're copying the mutations
+        Collection<Mutation> originalMutations;
+        Collection<? extends Mutation> mutations;
+        if (copyMutations) {
+            originalMutations = null;
+            mutations = mutationsMap.values();
+        } else {
+            originalMutations = Lists.newArrayListWithExpectedSize(mutationsMap.size());
+            mutations = originalMutations;
+        }
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            Mutation m = miniBatchOp.getOperation(i);
+            // skip this mutation if we aren't enabling indexing
+            // unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
+            // should be indexed, which means we need to expose another method on the builder. Such is the
+            // way optimization go though.
+            if (miniBatchOp.getOperationStatus(i) != IGNORE && this.builder.isEnabled(m)) {
+                // Unless we're replaying edits to rebuild the index, we update the time stamp
+                // of the data table to prevent overlapping time stamps (which prevents index
+                // inconsistencies as this case isn't handled correctly currently).
+                for (List<Cell> cells : m.getFamilyCellMap().values()) {
+                    for (Cell cell : cells) {
+                        CellUtil.setTimestamp(cell, now);
+                    }
+                }
+                // Only copy mutations if we found duplicate rows
+                // which only occurs when we're partially rebuilding
+                // the index (since we'll potentially have both a
+                // Put and a Delete mutation for the same row).
+                if (copyMutations) {
+                    // Add the mutation to the batch set
+                    ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+                    MultiMutation stored = mutationsMap.get(row);
+                    // we haven't seen this row before, so add it
+                    if (stored == null) {
+                        stored = new MultiMutation(row);
+                        mutationsMap.put(row, stored);
+                    }
+                    stored.addAll(m);
+                } else {
+                    originalMutations.add(m);
+                }
+            }
+        }
+        if (copyMutations) {
+            mutations = IndexManagementUtil.flattenMutationsByTimestamp(mutations);
+        }
+        return mutations;
+    }
 
-      for (int i = 0; i < miniBatchOp.size(); i++) {
-          Mutation m = miniBatchOp.getOperation(i);
-          // skip this mutation if we aren't enabling indexing
-          // unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
-          // should be indexed, which means we need to expose another method on the builder. Such is the
-          // way optimization go though.
-          if (miniBatchOp.getOperationStatus(i) != IGNORE && this.builder.isEnabled(m)) {
-              if (resetTimeStamp) {
-                  // Unless we're replaying edits to rebuild the index, we update the time stamp
-                  // of the data table to prevent overlapping time stamps (which prevents index
-                  // inconsistencies as this case isn't handled correctly currently).
-                  for (List<Cell> cells : m.getFamilyCellMap().values()) {
-                      for (Cell cell : cells) {
-                          CellUtil.setTimestamp(cell, now);
-                      }
-                  }
-              }
-              // No need to write the table mutations when we're rebuilding
-              // the index as they're already written and just being replayed.
-              if (replayWrite == ReplayWrite.INDEX_ONLY
-                      || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY) {
-                  miniBatchOp.setOperationStatus(i, NOWRITE);
-              }
+    public static void setTimestamp(Mutation m, long ts) throws IOException {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                CellUtil.setTimestamp(cell, ts);
+            }
+        }
+    }
 
-              // Only copy mutations if we found duplicate rows
-              // which only occurs when we're partially rebuilding
-              // the index (since we'll potentially have both a
-              // Put and a Delete mutation for the same row).
-              if (copyMutations) {
-                  // Add the mutation to the batch set
-
-                  ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-                  MultiMutation stored = mutationsMap.get(row);
-                  // we haven't seen this row before, so add it
-                  if (stored == null) {
-                      stored = new MultiMutation(row);
-                      mutationsMap.put(row, stored);
-                  }
-                  stored.addAll(m);
-              } else {
-                  originalMutations.add(m);
-              }
-          }
-      }
+    /**
+     * This method applies pending delete mutations on the next row states
+     */
+    private void applyPendingDeleteMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                             BatchMutateContext context) throws IOException {
+        for (Integer i = 0; i < miniBatchOp.size(); i++) {
+            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+                continue;
+            }
+            Mutation m = miniBatchOp.getOperation(i);
+            if (!this.builder.isEnabled(m)) {
+                continue;
+            }
+            if (!(m instanceof Delete)) {
+                continue;
+            }
+            ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+            Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+            Put nextDataRowState = dataRowState.getSecond();
+            if (nextDataRowState == null) {
+                continue;
+            }
+            for (List<Cell> cells : m.getFamilyCellMap().values()) {
+                for (Cell cell : cells) {
+                    switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
+                        case DeleteFamily:
+                        case DeleteFamilyVersion:
+                            nextDataRowState.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+                            break;
+                        case DeleteColumn:
+                        case Delete:
+                            removeColumn(nextDataRowState, cell);
+                    }
+                }
+            }
+            if (nextDataRowState != null && nextDataRowState.getFamilyCellMap().size() == 0) {
+                dataRowState.setSecond(null);
+            }
+        }
+    }
 
-      if (copyMutations || replayWrite != null) {
-          mutations = IndexManagementUtil.flattenMutationsByTimestamp(mutations);
-      }
-      return mutations;
-  }
+    /**
+     * This method applies the pending put mutations on the the next row states.
+     * Before this method is called, the next row states is set to current row states.
+     */
+    private void applyPendingPutMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                          BatchMutateContext context, long now) throws IOException {
+        for (Integer i = 0; i < miniBatchOp.size(); i++) {
+            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+                continue;
+            }
+            Mutation m = miniBatchOp.getOperation(i);
+            // skip this mutation if we aren't enabling indexing
+            if (!this.builder.isEnabled(m)) {
+                continue;
+            }
+            // Unless we're replaying edits to rebuild the index, we update the time stamp
+            // of the data table to prevent overlapping time stamps (which prevents index
+            // inconsistencies as this case isn't handled correctly currently).
+            setTimestamp(m, now);
+            if (m instanceof Put) {
+                ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+                Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+                if (dataRowState == null) {
+                    dataRowState = new Pair<Put, Put>(null, null);
+                    context.dataRowStates.put(rowKeyPtr, dataRowState);
+                }
+                Put nextDataRowState = dataRowState.getSecond();
+                dataRowState.setSecond((nextDataRowState != null) ? applyNew((Put) m, nextDataRowState) : new Put((Put) m));
+            }
+        }
+    }
 
-  public static void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) {
-      List<Cell> cellList = m.getFamilyCellMap().get(emptyCF);
-      if (cellList == null) {
-          return;
-      }
-      Iterator<Cell> cellIterator = cellList.iterator();
-      while (cellIterator.hasNext()) {
-          Cell cell = cellIterator.next();
-          if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
-                  emptyCQ, 0, emptyCQ.length) == 0) {
-              cellIterator.remove();
-              return;
-          }
-      }
-  }
+    /**
+     * * Prepares data row current and next row states
+     */
+    private void prepareDataRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
+                                      MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                      BatchMutateContext context,
+                                      long now) throws IOException {
+        if (context.rowsToLock.size() == 0) {
+            return;
+        }
+        // Retrieve the current row states from the data table
+        getCurrentRowStates(c, context);
+        applyPendingPutMutations(miniBatchOp, context, now);
+        applyPendingDeleteMutations(miniBatchOp, context);
+    }
 
-  private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c,
-                                       MiniBatchOperationInProgress<Mutation> miniBatchOp,
-                                       ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates) {
-      byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
-      HTableInterfaceReference hTableInterfaceReference =
-                          new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
-      List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference);
-      if (localIndexUpdates == null || localIndexUpdates.isEmpty()) {
-          return;
-      }
-      List<Mutation> localUpdates = new ArrayList<Mutation>();
-      Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator();
-      while (indexUpdatesItr.hasNext()) {
-          Pair<Mutation, byte[]> next = indexUpdatesItr.next();
-          localUpdates.add(next.getFirst());
-      }
-      if (!localUpdates.isEmpty()) {
-          miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()]));
-      }
-  }
+    public static void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) {
+        List<Cell> cellList = m.getFamilyCellMap().get(emptyCF);
+        if (cellList == null) {
+            return;
+        }
+        Iterator<Cell> cellIterator = cellList.iterator();
+        while (cellIterator.hasNext()) {
+            Cell cell = cellIterator.next();
+            if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                    emptyCQ, 0, emptyCQ.length) == 0) {
+                cellIterator.remove();
+                return;
+            }
+        }
+    }
 
-  private void prepareIndexMutations(
-          ObserverContext<RegionCoprocessorEnvironment> c,
-          MiniBatchOperationInProgress<Mutation> miniBatchOp,
-          BatchMutateContext context,
-          Collection<? extends Mutation> mutations,
-          long now,
-          PhoenixIndexMetaData indexMetaData) throws Throwable {
-      List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
-      // get the current span, or just use a null-span to avoid a bunch of if statements
-      try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
-          Span current = scope.getSpan();
-          if (current == null) {
-              current = NullSpan.INSTANCE;
-          }
-          // get the index updates for all elements in this batch
-          context.indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
-          this.builder.getIndexUpdates(context.indexUpdates, miniBatchOp, mutations, indexMetaData);
-          current.addTimelineAnnotation("Built index updates, doing preStep");
-          handleLocalIndexUpdates(c, miniBatchOp, context.indexUpdates);
-          context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
-          int updateCount = 0;
-          for (IndexMaintainer indexMaintainer : maintainers) {
-              updateCount++;
-              byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
-              byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
-              HTableInterfaceReference hTableInterfaceReference =
-                      new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
-              List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
-              for (Pair<Mutation, byte[]> update : updates) {
-                  // add the VERIFIED cell, which is the empty cell
-                  Mutation m = update.getFirst();
-                  if (context.rebuild) {
-                      if (m instanceof Put) {
-                          long ts = getMaxTimestamp(m);
-                          // Remove the empty column prepared by Index codec as we need to change its value
-                          removeEmptyColumn(m, emptyCF, emptyCQ);
-                          ((Put) m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
-                      }
-                      context.preIndexUpdates.put(hTableInterfaceReference, m);
-                  } else {
-                      if (m instanceof Put) {
-                          // Remove the empty column prepared by Index codec as we need to change its value
-                          removeEmptyColumn(m, emptyCF, emptyCQ);
-                          // Set the status of the index row to "unverified"
-                          ((Put) m).addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
-                          // This will be done before the data table row is updated (i.e., in the first write phase)
-                          context.preIndexUpdates.put(hTableInterfaceReference, m);
-                      }
-                      else {
-                          // Set the status of the index row to "unverified"
-                          Put unverifiedPut = new Put(m.getRow());
-                          unverifiedPut.addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
-                          // This will be done before the data table row is updated (i.e., in the first write phase)
-                          context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
-                      }
-                  }
-              }
-          }
-          TracingUtils.addAnnotation(current, "index update count", updateCount);
-      }
-  }
+    /**
+     * The index update generation for local indexes uses the existing index update generation code (i.e.,
+     * the {@link IndexBuilder} implementation).
+     */
+    private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c,
+                                         MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                         Collection<? extends Mutation> pendingMutations,
+                                         PhoenixIndexMetaData indexMetaData) throws Throwable {
+        ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
+        this.builder.getIndexUpdates(indexUpdates, miniBatchOp, pendingMutations, indexMetaData);
+        byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+        HTableInterfaceReference hTableInterfaceReference =
+                new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
+        List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference);
+        if (localIndexUpdates == null || localIndexUpdates.isEmpty()) {
+            return;
+        }
+        List<Mutation> localUpdates = new ArrayList<Mutation>();
+        Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator();
+        while (indexUpdatesItr.hasNext()) {
+            Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+            localUpdates.add(next.getFirst());
+        }
+        if (!localUpdates.isEmpty()) {
+            miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()]));
+        }
+    }
+    /**
+     * Retrieve the the last committed data row state. This method is called only for regular data mutations since for
+     * rebuild (i.e., index replay) mutations include all row versions.
+     */
+    private void getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
+                                     BatchMutateContext context) throws IOException {
+        Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
+        for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get()));
+        }
+        Scan scan = new Scan();
+        ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys));
+        scanRanges.initializeScan(scan);
+        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+        scan.setFilter(skipScanFilter);
+        context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size());
+        try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
+            boolean more = true;
+            while(more) {
+                List<Cell> cells = new ArrayList<Cell>();
+                more = scanner.next(cells);
+                if (cells.isEmpty()) {
+                    continue;
+                }
+                byte[] rowKey = CellUtil.cloneRow(cells.get(0));
+                Put put = new Put(rowKey);
+                for (Cell cell : cells) {
+                    put.add(cell);
+                }
+                context.dataRowStates.put(new ImmutableBytesPtr(rowKey), new Pair<Put, Put>(put, new Put(put)));
+            }
+        }
+    }
 
-  protected PhoenixIndexMetaData getPhoenixIndexMetaData(
-          ObserverContext<RegionCoprocessorEnvironment> observerContext,
-          MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-      IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
-      if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
-          throw new DoNotRetryIOException(
-                  "preBatchMutateWithExceptions: indexMetaData is not an instance of "+PhoenixIndexMetaData.class.getName() +
-                          ", current table is:" +
-                          observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
-      }
-      return (PhoenixIndexMetaData)indexMetaData;
-  }
+    /**
+     * Generate the index update for a data row from the mutation that are obtained by merging the previous data row
+     * state with the pending row mutation.
+     */
+    private void prepareIndexMutations(BatchMutateContext context, List<IndexMaintainer> maintainers, long ts)
+            throws IOException {
+        List<Pair<IndexMaintainer, HTableInterfaceReference>> indexTables = new ArrayList<>(maintainers.size());
+        for (IndexMaintainer indexMaintainer : maintainers) {
+            if (indexMaintainer.isLocalIndex()) {
+                continue;
+            }
+            HTableInterfaceReference hTableInterfaceReference =
+                    new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+            indexTables.add(new Pair<>(indexMaintainer, hTableInterfaceReference));
+        }
+        for (Map.Entry<ImmutableBytesPtr, Pair<Put, Put>> entry : context.dataRowStates.entrySet()) {
+            ImmutableBytesPtr rowKeyPtr = entry.getKey();
+            Pair<Put, Put> dataRowState =  entry.getValue();
+            Put currentDataRowState = dataRowState.getFirst();
+            Put nextDataRowState = dataRowState.getSecond();
+            if (currentDataRowState == null && nextDataRowState == null) {
+                continue;
+            }
+            for (Pair<IndexMaintainer, HTableInterfaceReference> pair : indexTables) {
+                IndexMaintainer indexMaintainer = pair.getFirst();
+                HTableInterfaceReference hTableInterfaceReference = pair.getSecond();
+                if (nextDataRowState != null) {
+                    ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState);
+                    Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                            nextDataRowVG, rowKeyPtr, ts, null, null);
+                    if (indexPut == null) {
+                        // No covered column. Just prepare an index row with the empty column
+                        byte[] indexRowKey = indexMaintainer.buildRowKey(nextDataRowVG, rowKeyPtr,
+                                null, null, HConstants.LATEST_TIMESTAMP);
+                        indexPut = new Put(indexRowKey);
+                    }
+                    indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                            indexMaintainer.getEmptyKeyValueQualifier(), ts, UNVERIFIED_BYTES);
+                    context.indexUpdates.put(hTableInterfaceReference,
+                            new Pair<Mutation, byte[]>(indexPut, rowKeyPtr.get()));
+                    // Delete the current index row if the new index key is different than the current one
+                    if (currentDataRowState != null) {
+                        ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState);
+                        byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
+                                null, null, HConstants.LATEST_TIMESTAMP);
+                        if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
+                            Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                                    IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                            context.indexUpdates.put(hTableInterfaceReference,
+                                    new Pair<Mutation, byte[]>(del, rowKeyPtr.get()));
+                        }
+                    }
+                } else if (currentDataRowState != null) {
+                    ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState);
+                    byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
+                            null, null, HConstants.LATEST_TIMESTAMP);
+                    Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                            IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                    context.indexUpdates.put(hTableInterfaceReference,
+                            new Pair<Mutation, byte[]>(del, rowKeyPtr.get()));
+                }
+            }
+        }
+    }
 
-  private void preparePostIndexMutations(
-          BatchMutateContext context,
-          long now,
-          PhoenixIndexMetaData indexMetaData) throws Throwable {
-      context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
-      List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
-      // Check if we need to skip post index update for any of the rows
-      for (IndexMaintainer indexMaintainer : maintainers) {
-          byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
-          byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
-          HTableInterfaceReference hTableInterfaceReference =
-                  new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
-          List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
-          for (Pair<Mutation, byte[]> update : updates) {
-              // Are there concurrent updates on the data table row? if so, skip post index updates
-              // and let read repair resolve conflicts
-              ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
-              PendingRow pendingRow = pendingRows.get(rowKey);
-              if (!pendingRow.isConcurrent()) {
-                  Mutation m = update.getFirst();
-                  if (m instanceof Put) {
-                      Put verifiedPut = new Put(m.getRow());
-                      // Set the status of the index row to "verified"
-                      verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
-                      context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
-                  }
-                  else {
-                      context.postIndexUpdates.put(hTableInterfaceReference, m);
-                  }
+    /**
+     * This method prepares unverified index mutations which are applied to index tables before the data table is
+     * updated. In the three phase update approach, in phase 1, the status of existing index rows is set to "unverified"
+     * (these rows will be deleted from the index table in phase 3), and/or new put mutations are added with the
+     * unverified status. In phase 2, data table mutations are applied. In phase 3, the status for an index table row is
+     * either set to "verified" or the row is deleted.
+     */
+    private void preparePreIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+                                          MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                          BatchMutateContext context,
+                                          Collection<? extends Mutation> pendingMutations,
+                                          long now,
+                                          PhoenixIndexMetaData indexMetaData) throws Throwable {
+        List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
+        // get the current span, or just use a null-span to avoid a bunch of if statements
+        try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
+            Span current = scope.getSpan();
+            if (current == null) {
+                current = NullSpan.INSTANCE;
+            }
+            current.addTimelineAnnotation("Built index updates, doing preStep");
+            // Handle local index updates
+            for (IndexMaintainer indexMaintainer : maintainers) {
+                if (indexMaintainer.isLocalIndex()) {
+                    handleLocalIndexUpdates(c, miniBatchOp, pendingMutations, indexMetaData);
+                    break;
+                }
+            }
+            // The rest of this method is for handling global index updates
+            context.indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
+            prepareIndexMutations(context, maintainers, now);
+
+            context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+            int updateCount = 0;
+            for (IndexMaintainer indexMaintainer : maintainers) {
+                updateCount++;
+                byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+                byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+                HTableInterfaceReference hTableInterfaceReference =
+                        new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+                List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
+                for (Pair<Mutation, byte[]> update : updates) {
+                    Mutation m = update.getFirst();
+                    if (m instanceof Put) {
+                        // This will be done before the data table row is updated (i.e., in the first write phase)
+                        context.preIndexUpdates.put(hTableInterfaceReference, m);
+                    } else {
+                        // Set the status of the index row to "unverified"
+                        Put unverifiedPut = new Put(m.getRow());
+                        unverifiedPut.addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
+                        // This will be done before the data table row is updated (i.e., in the first write phase)
+                        context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
+                    }
+                }
+            }
+            TracingUtils.addAnnotation(current, "index update count", updateCount);
+        }
+    }
 
-              }
-          }
-      }
-      // We are done with handling concurrent mutations. So we can remove the rows of this batch from
-      // the collection of pending rows
-      removePendingRows(context);
-      context.indexUpdates.clear();
-  }
+    protected PhoenixIndexMetaData getPhoenixIndexMetaData(ObserverContext<RegionCoprocessorEnvironment> observerContext,
+                                                           MiniBatchOperationInProgress<Mutation> miniBatchOp)
+            throws IOException {
+        IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
+        if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
+            throw new DoNotRetryIOException(
+                    "preBatchMutateWithExceptions: indexMetaData is not an instance of "+PhoenixIndexMetaData.class.getName() +
+                            ", current table is:" +
+                            observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+        }
+        return (PhoenixIndexMetaData)indexMetaData;
+    }
 
-  public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
-          MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
-      ignoreAtomicOperations(miniBatchOp);
-      PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
-      BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
-      setBatchMutateContext(c, context);
-      Mutation firstMutation = miniBatchOp.getOperation(0);
-      ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
-      context.rebuild = replayWrite != null;
-      /*
-       * Exclusively lock all rows so we get a consistent read
-       * while determining the index updates
-       */
-      long now;
-      if (!context.rebuild) {
-          populateRowsToLock(miniBatchOp, context);
-          lockRows(context);
-          now = EnvironmentEdgeManager.currentTimeMillis();
-          // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
-          // concurrent updates
-          populatePendingRows(context);
-      }
-      else {
-          now = EnvironmentEdgeManager.currentTimeMillis();
-      }
-      // First group all the updates for a single row into a single update to be processed
-      Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite);
-      // early exit if it turns out we don't have any edits
-      if (mutations == null) {
-          return;
-      }
+    private void preparePostIndexMutations(BatchMutateContext context, long now, PhoenixIndexMetaData indexMetaData)
+            throws Throwable {
+        context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+        List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
+        // Check if we need to skip post index update for any of the rows
+        for (IndexMaintainer indexMaintainer : maintainers) {
+            byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+            HTableInterfaceReference hTableInterfaceReference =
+                    new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+            List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
+            for (Pair<Mutation, byte[]> update : updates) {
+                // Are there concurrent updates on the data table row? if so, skip post index updates
+                // and let read repair resolve conflicts
+                ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
+                PendingRow pendingRow = pendingRows.get(rowKey);
+                if (!pendingRow.isConcurrent()) {
+                    Mutation m = update.getFirst();
+                    if (m instanceof Put) {
+                        Put verifiedPut = new Put(m.getRow());
+                        // Set the status of the index row to "verified"
+                        verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
+                        context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
+                    }
+                    else {
+                        context.postIndexUpdates.put(hTableInterfaceReference, m);
+                    }
+                }
+            }
+        }
+        // We are done with handling concurrent mutations. So we can remove the rows of this batch from
+        // the collection of pending rows
+        removePendingRows(context);
+        context.indexUpdates.clear();
+    }
 
-      long start = EnvironmentEdgeManager.currentTimeMillis();
-      prepareIndexMutations(c, miniBatchOp, context, mutations, now, indexMetaData);
-      metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start);
-
-      // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to
-      // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates
-      // can be prepared in less than one millisecond
-      if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis()) {
-          Thread.sleep(1);
-          LOG.debug("slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
-      }
-      // Release the locks before making RPC calls for index updates
-      for (RowLock rowLock : context.rowLocks) {
-          rowLock.release();
-      }
-      // Do the first phase index updates
-      doPre(c, context, miniBatchOp);
-      if (!context.rebuild) {
-          // Acquire the locks again before letting the region proceed with data table updates
-          List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
-          for (RowLock rowLock : context.rowLocks) {
-              rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), rowLockWaitDuration));
-          }
-          context.rowLocks.clear();
-          context.rowLocks = rowLocks;
-          preparePostIndexMutations(context, now, indexMetaData);
-      }
-      if (failDataTableUpdatesForTesting) {
-          throw new DoNotRetryIOException("Simulating the data table write failure");
-      }
-  }
+    /**
+     * There are at most two rebuild mutation for every row, one put and one delete. They are listed in indexMutations
+     * next to each other such that put comes before delete by {@link IndexRebuildRegionScanner}. This method is called
+     * only for global indexes.
+     */
+    private void preBatchMutateWithExceptionsForRebuild(ObserverContext<RegionCoprocessorEnvironment> c,
+                                                        MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                                        BatchMutateContext context,
+                                                        IndexMaintainer indexMaintainer) throws Throwable {
+        Put put = null;
+        List <Mutation> indexMutations = new ArrayList<>();
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+                continue;
+            }
+            Mutation m = miniBatchOp.getOperation(i);
+            if (!this.builder.isEnabled(m)) {
+                continue;
+            }
+            if (m instanceof Put) {
+                if (put != null) {
+                    indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null));
+                }
+                put = (Put)m;
+            } else {
+                indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, (Delete)m));
+                put = null;
+            }
+            miniBatchOp.setOperationStatus(i, NOWRITE);
+        }
+        if (put != null) {
+            indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null));
+        }
+        HTableInterfaceReference hTableInterfaceReference =
+                new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+        context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+        for (Mutation m : indexMutations) {
+            context.preIndexUpdates.put(hTableInterfaceReference, m);
+        }
+        doPre(c, context, miniBatchOp);
+        // For rebuild updates, no post index update is prepared. Just create an empty list.
+        context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+    }
 
+    public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
+                                             MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
+        ignoreAtomicOperations(miniBatchOp);
+        PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
+        BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
+        setBatchMutateContext(c, context);
+        Mutation firstMutation = miniBatchOp.getOperation(0);
+        ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
+        context.rebuild = replayWrite != null;
+        if (context.rebuild) {
+            preBatchMutateWithExceptionsForRebuild(c, miniBatchOp, context, indexMetaData.getIndexMaintainers().get(0));
+            return;
+        }
+        /*
+         * Exclusively lock all rows so we get a consistent read
+         * while determining the index updates
+         */
+        long now;
+        populateRowsToLock(miniBatchOp, context);
+        lockRows(context);
+        now = EnvironmentEdgeManager.currentTimeMillis();
+        // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
+        // concurrent updates
+        populatePendingRows(context);
+        // Prepare current and next data rows states for pending mutations (for global indexes)
+        prepareDataRowStates(c, miniBatchOp, context, now);
+        // Group all the updates for a single row into a single update to be processed (for local indexes)
+        Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now);
+        // early exit if it turns out we don't have any edits
+        if (mutations == null || mutations.isEmpty()) {
+            return;
+        }
+        long start = EnvironmentEdgeManager.currentTimeMillis();
+        preparePreIndexMutations(c, miniBatchOp, context, mutations, now, indexMetaData);
+        metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start);
+        // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to
+        // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates
+        // can be prepared in less than one millisecond
+        if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis()) {
+            Thread.sleep(1);
+            LOG.debug("slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+        }
+        // Release the locks before making RPC calls for index updates
+        for (RowLock rowLock : context.rowLocks) {
+            rowLock.release();
+        }
+        // Do the first phase index updates
+        doPre(c, context, miniBatchOp);
+        // Acquire the locks again before letting the region proceed with data table updates
+        List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
+        for (RowLock rowLock : context.rowLocks) {
+            rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), rowLockWaitDuration));
+        }
+        context.rowLocks.clear();
+        context.rowLocks = rowLocks;
+        preparePostIndexMutations(context, now, indexMetaData);
+        if (failDataTableUpdatesForTesting) {
+            throw new DoNotRetryIOException("Simulating the data table write failure");
+        }
+    }
   private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
       this.batchMutateContext.set(context);
   }
-  
+
   private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
       return this.batchMutateContext.get();
   }
-  
+
   private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
       this.batchMutateContext.remove();
   }
@@ -822,14 +1053,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
   }
 
   /**
-   * Exposed for testing!
-   * @return the currently instantiated index builder
-   */
-  public IndexBuilder getBuilderForTesting() {
-    return this.builder.getBuilderForTesting();
-  }
-
-  /**
    * Enable indexing on the given table
    * @param desc {@link HTableDescriptor} for the table on which indexing should be enabled
    * @param builder class to use when building the index for this table
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index dba165b..3723388 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -1593,7 +1593,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
      */
     private void initCachedState() {
         byte[] emptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst();
-        dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier);
+        dataEmptyKeyValueRef = new ColumnReference(dataEmptyKeyValueCF, emptyKvQualifier);
         this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumnsMap.size());
         // columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key)
         this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size());


[phoenix] 02/02: PHOENIX-5709 Addendum (unverified rows should fail all verification types except ONLY)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch PHOENIX-5748-4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit bb13f547868b13e113a6aee90a7d9284b4095631
Author: Kadir <ko...@salesforce.com>
AuthorDate: Sat Feb 29 18:58:04 2020 -0800

    PHOENIX-5709 Addendum (unverified rows should fail all verification types except ONLY)
---
 .../coprocessor/IndexRebuildRegionScanner.java     | 179 ++++++++++++++-------
 .../phoenix/hbase/index/IndexRegionObserver.java   |   7 +
 2 files changed, 126 insertions(+), 60 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index a52bddf..6cb1145 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.coprocessor;
 
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
 import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
 import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
 import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
@@ -424,7 +425,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
                 indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
                 indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
-		outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
+                outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
                 resultHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.RESULT_TABLE_NAME_BYTES));
                 indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
                 dataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
@@ -689,7 +690,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return null;
     }
 
-    private boolean isMatchingMutation(Mutation expected, Mutation actual, int version) throws IOException {
+    private boolean isMatchingMutation(Mutation expected, Mutation actual, int iteration) throws IOException {
         if (getTimestamp(expected) != getTimestamp(actual)) {
             String errorMsg = "Not matching timestamp";
             byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
@@ -708,23 +709,20 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 if (actualCell == null ||
                         !CellUtil.matchingType(expectedCell, actualCell)) {
                     byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
-                    String errorMsg = "Missing cell (in version " + version + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+                    String errorMsg = "Missing cell (in iteration " + iteration + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
                     logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg);
                     return false;
                 }
                 if (!CellUtil.matchingValue(actualCell, expectedCell)) {
-                    if (Bytes.compareTo(family, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary()) == 0 &&
-                            Bytes.compareTo(qualifier, indexMaintainer.getEmptyKeyValueQualifier()) == 0) {
-                        if (Bytes.compareTo(actualCell.getValueArray(), actualCell.getValueOffset(), actualCell.getValueLength(),
-                                UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0) {
-                            // We will not flag this as mismatch but will log it
-                            byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
-                            String errorMsg = "Unverified index row (in version " + version + ")";
-                            logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg);
-                            continue;
-                        }
+                    if (verifyType == IndexTool.IndexVerifyType.ONLY &&
+                            (Bytes.compareTo(family, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary()) == 0 &&
+                            Bytes.compareTo(qualifier, indexMaintainer.getEmptyKeyValueQualifier()) == 0) &&
+                            (Bytes.compareTo(actualCell.getValueArray(), actualCell.getValueOffset(), actualCell.getValueLength(),
+                                UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0)) {
+                        // We do not flag this as mismatch as we can have unverified but still valid rows
+                        continue;
                     }
-                    String errorMsg = "Not matching value (in version " + version + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+                    String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
                     byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
                     logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
                             errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
@@ -963,14 +961,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         }
         if ((expectedIndex != expectedSize) || actualIndex != actualSize) {
             if (matchingCount > 0) {
-                // We do not consider this as a verification issue but log it for further information.
-                // This may happen due to compaction
-                byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
-                String errorMsg = "Expected to find " + expectedMutationList.size() + " mutations but got "
-                        + actualMutationList.size();
-                logToIndexToolOutputTable(dataKey, indexRow.getRow(),
-                        getTimestamp(expectedMutationList.get(0)),
-                        getTimestamp(actualMutationList.get(0)), errorMsg);
+                if (verifyType != IndexTool.IndexVerifyType.ONLY) {
+                    // We do not consider this as a verification issue but log it for further information.
+                    // This may happen due to compaction
+                    byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+                    String errorMsg = "Expected to find " + expectedMutationList.size() + " mutations but got "
+                            + actualMutationList.size();
+                    logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+                            getTimestamp(expectedMutationList.get(0)),
+                            getTimestamp(actualMutationList.get(0)), errorMsg);
+                }
             } else {
                 byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
                 String errorMsg = "Not matching index row";
@@ -1235,7 +1235,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
 
     /**
      * This is to reorder the mutations in ascending order by the tuple of timestamp and mutation type where
-     * delete comes before put
+     * put comes before delete
      */
     public static final Comparator<Mutation> MUTATION_TS_COMPARATOR = new Comparator<Mutation>() {
         @Override
@@ -1248,10 +1248,10 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             if (ts1 > ts2) {
                 return 1;
             }
-            if (o1 instanceof Delete && o2 instanceof Put) {
+            if (o1 instanceof Put && o2 instanceof Delete) {
                 return -1;
             }
-            if (o1 instanceof Put && o2 instanceof Delete) {
+            if (o1 instanceof Delete && o2 instanceof Put) {
                 return 1;
             }
             return 0;
@@ -1283,6 +1283,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG, rowKeyPtr,
                     null, null, HConstants.LATEST_TIMESTAMP);
             indexPut = new Put(indexRowKey);
+        } else {
+            removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                    indexMaintainer.getEmptyKeyValueQualifier());
         }
         indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
                 indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
@@ -1324,6 +1327,25 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return next;
     }
 
+    private static void applyDeleteOnPut(Delete del, Put put) throws IOException {
+        for (List<Cell> cells : del.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                switch ((KeyValue.Type.codeToType(cell.getTypeByte()))) {
+                    case DeleteFamily:
+                        put.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+                        break;
+                    case DeleteColumn:
+                        removeColumn(put, cell);
+                        break;
+                    default:
+                        // We do not expect this can happen
+                        throw new DoNotRetryIOException("Single version delete marker in data mutation " +
+                                del);
+                }
+            }
+        }
+    }
+
     /**
      * Generate the index update for a data row from the mutation that are obtained by merging the previous data row
      * state with the pending row mutation for index rebuild. This method is called only for global indexes.
@@ -1339,53 +1361,90 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         ImmutableBytesPtr rowKeyPtr = (dataPut != null) ? new ImmutableBytesPtr(dataPut.getRow()) :
                 new ImmutableBytesPtr(dataDel.getRow());
         // Start with empty data table row
-        Put currentDataRow = null;
+        Put currentDataRowState = null;
         // The index row key corresponding to the current data row
         byte[] indexRowKeyForCurrentDataRow = null;
-        for (Mutation mutation : dataMutations) {
+        int dataMutationListSize = dataMutations.size();
+        for (int i = 0; i < dataMutationListSize; i++) {
+            Mutation mutation = dataMutations.get(i);
             long ts = getTimestamp(mutation);
             if (mutation instanceof Put) {
-                // Add this put on top of the current data row state to get the next data row state
-                Put nextDataRow = (currentDataRow == null) ? new Put((Put)mutation) : applyNew((Put)mutation, currentDataRow);
-                ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRow);
-                Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
-                indexMutations.add(indexPut);
-                // Delete the current index row if the new index key is different than the current one
-                if (indexRowKeyForCurrentDataRow != null) {
-                    if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
+                if (i < dataMutationListSize - 1) {
+                    // If there is a pair of a put and delete mutation with the same timestamp then apply the delete
+                    // mutation on the put. If the delete mutation deletes all the cells in the put mutation, the family
+                    // cell map of the put mutation becomes empty and the mutation is ignored later
+                    Mutation nextMutation = dataMutations.get(i + 1);
+                    if (getTimestamp(nextMutation) == ts && nextMutation instanceof Delete) {
+                        applyDeleteOnPut((Delete) nextMutation, (Put) mutation);
+                        // Apply the delete mutation on the current data row state too
+                        if (currentDataRowState != null) {
+                            applyDeleteOnPut((Delete) nextMutation, currentDataRowState);
+                            if (currentDataRowState.getFamilyCellMap().size() == 0) {
+                                currentDataRowState = null;
+                                indexRowKeyForCurrentDataRow = null;
+                            }
+                        }
+                        // This increment is to skip the next (delete) mutation as we have already processed it
+                        i++;
+                    }
+                }
+                if (mutation.getFamilyCellMap().size() != 0) {
+                    // Add this put on top of the current data row state to get the next data row state
+                    Put nextDataRow = (currentDataRowState == null) ? new Put((Put)mutation) : applyNew((Put)mutation, currentDataRowState);
+                    ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRow);
+                    Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
+                    indexMutations.add(indexPut);
+                    // Delete the current index row if the new index key is different than the current one
+                    if (currentDataRowState != null) {
+                        if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
+                            Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                                    IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                            indexMutations.add(del);
+                        }
+                    }
+                    // For the next iteration of the for loop
+                    currentDataRowState = nextDataRow;
+                    indexRowKeyForCurrentDataRow = indexPut.getRow();
+                } else {
+                    if (currentDataRowState != null) {
                         Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
                                 IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
                         indexMutations.add(del);
+                        // For the next iteration of the for loop
+                        currentDataRowState = null;
+                        indexRowKeyForCurrentDataRow = null;
                     }
                 }
-                // For the next iteration
-                currentDataRow = nextDataRow;
-                indexRowKeyForCurrentDataRow = indexPut.getRow();
-            } else if (currentDataRow != null) {
-                // We apply delete column mutations only on the current data row state. For the index table,
-                // we are only interested in if the current data row is deleted or not. There is no need to apply
-                // column deletes to index rows since index rows are always full rows and all the cells in an index
-                // row have the same timestamp value. Because of this index rows versions do not share cells.
-                for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
-                    for (Cell cell : cells) {
-                        switch ((KeyValue.Type.codeToType(cell.getTypeByte()))) {
-                            case DeleteFamily:
-                            case DeleteFamilyVersion:
-                                currentDataRow.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
-                                break;
-                            case DeleteColumn:
-                            case Delete:
-                                removeColumn(currentDataRow, cell);
+            } else { // mutation instanceof Delete
+                if (currentDataRowState != null) {
+                    // We apply delete mutations only on the current data row state to obtain the next data row state.
+                    // For the index table, we are only interested in if the index row should be deleted or not.
+                    // There is no need to apply column deletes to index rows since index rows are always full rows
+                    // and all the cells in an index row have the same timestamp value. Because of this index rows
+                    // versions do not share cells.
+                    applyDeleteOnPut((Delete) mutation, currentDataRowState);
+                    Put nextDataRowState = currentDataRowState;
+                    if (nextDataRowState.getFamilyCellMap().size() == 0) {
+                        Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                                IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                        indexMutations.add(del);
+                        currentDataRowState = null;
+                        indexRowKeyForCurrentDataRow = null;
+                    } else {
+                        ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState);
+                        Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
+                        indexMutations.add(indexPut);
+                        // Delete the current index row if the new index key is different than the current one
+                        if (indexRowKeyForCurrentDataRow != null) {
+                            if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
+                                Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                                        IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                                indexMutations.add(del);
+                            }
                         }
+                        indexRowKeyForCurrentDataRow = indexPut.getRow();
                     }
                 }
-                if (currentDataRow.getFamilyCellMap().size() == 0) {
-                    Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
-                            IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
-                    indexMutations.add(del);
-                    currentDataRow = null;
-                    indexRowKeyForCurrentDataRow = null;
-                }
             }
         }
         return indexMutations;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 0092c5c..e540572 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -511,6 +511,10 @@ public class IndexRegionObserver extends BaseRegionObserver {
             }
             ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
             Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+            if (dataRowState == null) {
+                dataRowState = new Pair<Put, Put>(null, null);
+                context.dataRowStates.put(rowKeyPtr, dataRowState);
+            }
             Put nextDataRowState = dataRowState.getSecond();
             if (nextDataRowState == null) {
                 continue;
@@ -694,6 +698,9 @@ public class IndexRegionObserver extends BaseRegionObserver {
                         byte[] indexRowKey = indexMaintainer.buildRowKey(nextDataRowVG, rowKeyPtr,
                                 null, null, HConstants.LATEST_TIMESTAMP);
                         indexPut = new Put(indexRowKey);
+                    } else {
+                        removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                                indexMaintainer.getEmptyKeyValueQualifier());
                     }
                     indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
                             indexMaintainer.getEmptyKeyValueQualifier(), ts, UNVERIFIED_BYTES);