You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/03/17 19:24:45 UTC

[phoenix] branch PHOENIX-5748-4.x-HBase-1.5 updated: PHOENIX-5749: Add unit tests for verifySingleIndexRow() of IndexRebui… (#725)

This is an automated email from the ASF dual-hosted git repository.

skadam pushed a commit to branch PHOENIX-5748-4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-5748-4.x-HBase-1.5 by this push:
     new d6f939b  PHOENIX-5749: Add unit tests for verifySingleIndexRow() of IndexRebui… (#725)
d6f939b is described below

commit d6f939bc903480675825bea6c3b1b2be04bca599
Author: Swaroopa Kadam <sw...@gmail.com>
AuthorDate: Tue Mar 17 12:24:38 2020 -0700

    PHOENIX-5749: Add unit tests for verifySingleIndexRow() of IndexRebui… (#725)
    
    PHOENIX-5749: Add unit tests for verifySingleIndexRow() of IndexRebuildRegionScanner
---
 .../coprocessor/IndexRebuildRegionScanner.java     | 304 ++--------
 .../coprocessor/IndexToolVerificationResult.java   | 304 ++++++++++
 .../index/PhoenixIndexImportDirectReducer.java     |   5 +-
 .../phoenix/index/VerifySingleIndexRowTest.java    | 637 +++++++++++++++++++++
 4 files changed, 998 insertions(+), 252 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 6cb1145..ad549e5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -42,15 +42,16 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATT
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -109,236 +110,14 @@ import com.google.common.collect.Maps;
 
 public class IndexRebuildRegionScanner extends BaseRegionScanner {
 
-    public static class VerificationResult {
-        public static class PhaseResult {
-            private long validIndexRowCount = 0;
-            private long expiredIndexRowCount = 0;
-            private long missingIndexRowCount = 0;
-            private long invalidIndexRowCount = 0;
-
-            public void add(PhaseResult phaseResult) {
-                validIndexRowCount += phaseResult.validIndexRowCount;
-                expiredIndexRowCount += phaseResult.expiredIndexRowCount;
-                missingIndexRowCount += phaseResult.missingIndexRowCount;
-                invalidIndexRowCount += phaseResult.invalidIndexRowCount;
-            }
-
-            public long getTotalCount() {
-                return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount;
-            }
-
-            @Override
-            public String toString() {
-                return "PhaseResult{" +
-                        "validIndexRowCount=" + validIndexRowCount +
-                        ", expiredIndexRowCount=" + expiredIndexRowCount +
-                        ", missingIndexRowCount=" + missingIndexRowCount +
-                        ", invalidIndexRowCount=" + invalidIndexRowCount +
-                        '}';
-            }
-        }
-
-        private long scannedDataRowCount = 0;
-        private long rebuiltIndexRowCount = 0;
-        private PhaseResult before = new PhaseResult();
-        private PhaseResult after = new PhaseResult();
-
-        @Override
-        public String toString() {
-            return "VerificationResult{" +
-                    "scannedDataRowCount=" + scannedDataRowCount +
-                    ", rebuiltIndexRowCount=" + rebuiltIndexRowCount +
-                    ", before=" + before +
-                    ", after=" + after +
-                    '}';
-        }
-
-        public long getScannedDataRowCount() {
-            return scannedDataRowCount;
-        }
-
-        public long getRebuiltIndexRowCount() {
-            return rebuiltIndexRowCount;
-        }
-
-        public long getBeforeRebuildValidIndexRowCount() {
-            return before.validIndexRowCount;
-        }
-
-        public long getBeforeRebuildExpiredIndexRowCount() {
-            return before.expiredIndexRowCount;
-        }
-
-        public long getBeforeRebuildInvalidIndexRowCount() {
-            return before.invalidIndexRowCount;
-        }
-
-        public long getBeforeRebuildMissingIndexRowCount() {
-            return before.missingIndexRowCount;
-        }
-
-        public long getAfterRebuildValidIndexRowCount() {
-            return after.validIndexRowCount;
-        }
-
-        public long getAfterRebuildExpiredIndexRowCount() {
-            return after.expiredIndexRowCount;
-        }
-
-        public long getAfterRebuildInvalidIndexRowCount() {
-            return after.invalidIndexRowCount;
-        }
-
-        public long getAfterRebuildMissingIndexRowCount() {
-            return after.missingIndexRowCount;
-        }
-
-        private void addScannedDataRowCount(long count) {
-            this.scannedDataRowCount += count;
-        }
-
-        private void addRebuiltIndexRowCount(long count) {
-            this.rebuiltIndexRowCount += count;
-        }
-
-        private void addBeforeRebuildValidIndexRowCount(long count) {
-            before.validIndexRowCount += count;
-        }
-
-        private void addBeforeRebuildExpiredIndexRowCount(long count) {
-            before.expiredIndexRowCount += count;
-        }
-
-        private void addBeforeRebuildMissingIndexRowCount(long count) {
-            before.missingIndexRowCount += count;
-        }
-
-        private void addBeforeRebuildInvalidIndexRowCount(long count) {
-            before.invalidIndexRowCount += count;
-        }
-
-        private void addAfterRebuildValidIndexRowCount(long count) {
-            after.validIndexRowCount += count;
-        }
-
-        private void addAfterRebuildExpiredIndexRowCount(long count) {
-            after.expiredIndexRowCount += count;
-        }
-
-        private void addAfterRebuildMissingIndexRowCount(long count) {
-            after.missingIndexRowCount += count;
-        }
-
-        private void addAfterRebuildInvalidIndexRowCount(long count) {
-            after.invalidIndexRowCount += count;
-        }
-
-        private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
-            if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
-                    AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
-                    AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES.length) == 0) {
-                return true;
-            }
-            return false;
-        }
-
-        private long getValue(Cell cell) {
-            return Long.parseLong(Bytes.toString(cell.getValueArray(),
-                    cell.getValueOffset(), cell.getValueLength()));
-        }
-
-        private void update(Cell cell) {
-            if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) {
-                addScannedDataRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES)) {
-                addRebuiltIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
-                addBeforeRebuildValidIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
-                addBeforeRebuildExpiredIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
-                addBeforeRebuildMissingIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
-                addBeforeRebuildInvalidIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
-                addAfterRebuildValidIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
-                addAfterRebuildExpiredIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
-                addAfterRebuildMissingIndexRowCount(getValue(cell));
-            } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
-                addAfterRebuildInvalidIndexRowCount(getValue(cell));
-            }
-        }
-
-        public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
-            // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
-            // Search for the place where the trailing 0xFFs start
-            int offset = rowKeyPrefix.length;
-            while (offset > 0) {
-                if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
-                    break;
-                }
-                offset--;
-            }
-            if (offset == 0) {
-                // We got an 0xFFFF... (only FFs) stopRow value which is
-                // the last possible prefix before the end of the table.
-                // So set it to stop at the 'end of the table'
-                return HConstants.EMPTY_END_ROW;
-            }
-            // Copy the right length of the original
-            byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
-            // And increment the last one
-            newStopRow[newStopRow.length - 1]++;
-            return newStopRow;
-        }
-
-        public static VerificationResult getVerificationResult(Table hTable, long ts)
-                throws IOException {
-            VerificationResult verificationResult = new VerificationResult();
-            byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
-            byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey);
-            Scan scan = new Scan();
-            scan.setStartRow(startRowKey);
-            scan.setStopRow(stopRowKey);
-            ResultScanner scanner = hTable.getScanner(scan);
-            for (Result result = scanner.next(); result != null; result = scanner.next()) {
-                for (Cell cell : result.rawCells()) {
-                    verificationResult.update(cell);
-                }
-            }
-            return verificationResult;
-        }
-
-        public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
-            if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
-                return false;
-            } else if (verifyType == IndexTool.IndexVerifyType.ONLY) {
-                if (before.invalidIndexRowCount + before.missingIndexRowCount > 0) {
-                    return true;
-                }
-            } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
-                if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
-                    return true;
-                }
-            }
-            return false;
-        }
-
-        public void add(VerificationResult verificationResult) {
-            scannedDataRowCount += verificationResult.scannedDataRowCount;
-            rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount;
-            before.add(verificationResult.before);
-            after.add(verificationResult.after);
-        }
-    }
-
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
     public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
     private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
     public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max";
     private static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048;
+    public static final String NO_EXPECTED_MUTATION = "No expected mutation";
+    public static final String
+            ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty";
     private long pageSizeInRows = Long.MAX_VALUE;
     private int rowCountPerTask;
     private boolean hasMore;
@@ -367,15 +146,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
     private RegionCoprocessorEnvironment env;
     private HTableFactory hTableFactory;
-    private int indexTableTTL;
-    private VerificationResult verificationResult;
+    private int indexTableTTL = 0;
+    private IndexToolVerificationResult verificationResult;
     private boolean isBeforeRebuilt = true;
     private boolean partialRebuild = false;
     private int  singleRowRebuildReturnCode;
     private Map<byte[], NavigableSet<byte[]>> familyMap;
     private byte[][] viewConstants;
 
-    IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
+    @VisibleForTesting
+    public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
                               final RegionCoprocessorEnvironment env,
                               UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
         super(innerScanner);
@@ -416,7 +196,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         }
         byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
         if (valueBytes != null) {
-            verificationResult = new VerificationResult();
+            verificationResult = new IndexToolVerificationResult();
             verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
             if (verifyType != IndexTool.IndexVerifyType.NONE) {
                 verify = true;
@@ -552,6 +332,24 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return uuidValue;
     }
 
+    @VisibleForTesting
+    public int setIndexTableTTL(int ttl) {
+        indexTableTTL = ttl;
+        return 0;
+    }
+
+    @VisibleForTesting
+    public int setIndexMaintainer(IndexMaintainer indexMaintainer) {
+        this.indexMaintainer = indexMaintainer;
+        return 0;
+    }
+
+    @VisibleForTesting
+    public int setIndexKeyToMutationMap(Map<byte[], List<Mutation>> newTreeMap) {
+        this.indexKeyToMutationMap = newTreeMap;
+        return 0;
+    }
+
     public static class SimpleValueGetter implements ValueGetter {
         final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
         final Put put;
@@ -578,7 +376,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
 
     }
 
-    private byte[] getIndexRowKey(final Put dataRow) throws IOException {
+    public byte[] getIndexRowKey(final Put dataRow) throws IOException {
         ValueGetter valueGetter = new SimpleValueGetter(dataRow);
         byte[] builtIndexRowKey = indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()),
                 null, null, HConstants.LATEST_TIMESTAMP);
@@ -594,14 +392,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return true;
     }
 
-    private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+    @VisibleForTesting
+    public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
                                            String errorMsg) throws IOException {
         logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
                 errorMsg, null, null);
 
     }
 
-    private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+    @VisibleForTesting
+    public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
                                            String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException {
         final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
         final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
@@ -794,7 +594,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return false;
     }
 
-    private static List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException {
+    @VisibleForTesting
+    public List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException {
         Put put = null;
         Delete del = null;
         for (Cell cell : indexRow.rawCells()) {
@@ -845,15 +646,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
      * the data table mutation for which the delete marker is added. Thus, the timestamp of these delete markers will be
      * higher than the timestamp of index row to be deleted.
      */
-    private boolean verifySingleIndexRow(Result indexRow, VerificationResult.PhaseResult verificationPhaseResult)
+    @VisibleForTesting
+    public boolean verifySingleIndexRow(Result indexRow, IndexToolVerificationResult.PhaseResult verificationPhaseResult)
             throws IOException {
         List<Mutation> expectedMutationList = indexKeyToMutationMap.get(indexRow.getRow());
         if (expectedMutationList == null) {
-            throw new DoNotRetryIOException("No expected mutation");
+            throw new DoNotRetryIOException(NO_EXPECTED_MUTATION);
         }
         List<Mutation> actualMutationList = prepareActualIndexMutations(indexRow);
         if (actualMutationList == null || actualMutationList.isEmpty()) {
-            throw new DoNotRetryIOException("actualMutationList is null or empty");
+            throw new DoNotRetryIOException(ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
         }
         Collections.sort(expectedMutationList, MUTATION_TS_DESC_COMPARATOR);
         Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
@@ -989,18 +791,18 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         Put put = pair.getFirst();
         long ts1 = 0;
         if (put != null) {
-            ts1 = getMaxTimestamp((Mutation)put);
+            ts1 = getMaxTimestamp(put);
         }
         Delete del = pair.getSecond();
         long ts2 = 0;
         if (del != null) {
-            ts1 = getMaxTimestamp((Mutation)del);
+            ts1 = getMaxTimestamp(del);
         }
         return (ts1 > ts2) ? ts1 : ts2;
     }
 
     private void verifyIndexRows(List<KeyRange> keys,
-                                 VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+            IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
         List<KeyRange> invalidKeys = new ArrayList<>();
         ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
         Scan indexScan = new Scan();
@@ -1063,7 +865,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     }
 
     private void addVerifyTask(final List<KeyRange> keys,
-                               final VerificationResult.PhaseResult verificationPhaseResult) {
+                               final IndexToolVerificationResult.PhaseResult verificationPhaseResult) {
         tasks.add(new Task<Boolean>() {
             @Override
             public Boolean call() throws Exception {
@@ -1081,14 +883,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         });
     }
 
-    private void parallelizeIndexVerify(VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+    private void parallelizeIndexVerify(IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
         int taskCount = (indexKeyToMutationMap.size() + rowCountPerTask - 1) / rowCountPerTask;
         tasks = new TaskBatch<>(taskCount);
         List<List<KeyRange>> listOfKeyRangeList = new ArrayList<>(taskCount);
-        List<VerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
+        List<IndexToolVerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
         List<KeyRange> keys = new ArrayList<>(rowCountPerTask);
         listOfKeyRangeList.add(keys);
-        VerificationResult.PhaseResult perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
+        IndexToolVerificationResult.PhaseResult perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
         verificationPhaseResultList.add(perTaskVerificationPhaseResult);
         for (byte[] indexKey: indexKeyToMutationMap.keySet()) {
             keys.add(PVarbinary.INSTANCE.getKeyRange(indexKey));
@@ -1096,7 +898,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 addVerifyTask(keys, perTaskVerificationPhaseResult);
                 keys = new ArrayList<>(rowCountPerTask);
                 listOfKeyRangeList.add(keys);
-                perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
+                perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
                 verificationPhaseResultList.add(perTaskVerificationPhaseResult);
             }
         }
@@ -1118,7 +920,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 throw new IOException(exceptionMessage);
             }
         }
-        for (VerificationResult.PhaseResult result : verificationPhaseResultList) {
+        for (IndexToolVerificationResult.PhaseResult result : verificationPhaseResultList) {
             verificationPhaseResult.add(result);
         }
         if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
@@ -1162,7 +964,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     }
 
     private void verifyAndOrRebuildIndex() throws IOException {
-        VerificationResult nextVerificationResult = new VerificationResult();
+        IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult();
         nextVerificationResult.scannedDataRowCount = dataKeyToMutationMap.size();
         if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) {
             // For these options we start with rebuilding index rows
@@ -1175,7 +977,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         }
         if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
                 verifyType == IndexTool.IndexVerifyType.ONLY) {
-            VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
+            IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
             // For these options we start with verifying index rows
             parallelizeIndexVerify(verificationPhaseResult);
             nextVerificationResult.before.add(verificationPhaseResult);
@@ -1200,7 +1002,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
 
         if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
             // We have rebuilt index row and now we need to verify them
-            VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
+            IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
             indexKeyToMutationMap.clear();
             for (Map.Entry<byte[], Pair<Put, Delete>> entry: dataKeyToMutationMap.entrySet()) {
                 prepareIndexMutations(entry.getValue().getFirst(), entry.getValue().getSecond());
@@ -1450,7 +1252,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return indexMutations;
     }
 
-    private void prepareIndexMutations(Put put, Delete del) throws IOException{
+    @VisibleForTesting
+    public int prepareIndexMutations(Put put, Delete del) throws IOException {
         List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del);
         for (Mutation mutation : indexMutations) {
             byte[] indexRowKey = mutation.getRow();
@@ -1463,6 +1266,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 mutationList.add(mutation);
             }
         }
+        return 0;
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
new file mode 100644
index 0000000..ed92fad
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES;
+
+public class IndexToolVerificationResult {
+    public static class PhaseResult {
+        long validIndexRowCount = 0;
+        long expiredIndexRowCount = 0;
+        long missingIndexRowCount = 0;
+        long invalidIndexRowCount = 0;
+
+        public void add(PhaseResult phaseResult) {
+            validIndexRowCount += phaseResult.validIndexRowCount;
+            expiredIndexRowCount += phaseResult.expiredIndexRowCount;
+            missingIndexRowCount += phaseResult.missingIndexRowCount;
+            invalidIndexRowCount += phaseResult.invalidIndexRowCount;
+        }
+
+        public PhaseResult(){}
+
+        public PhaseResult(long validIndexRowCount, long expiredIndexRowCount,
+                long missingIndexRowCount, long invalidIndexRowCount) {
+            this.validIndexRowCount = validIndexRowCount;
+            this.expiredIndexRowCount = expiredIndexRowCount;
+            this.missingIndexRowCount = missingIndexRowCount;
+            this.invalidIndexRowCount = invalidIndexRowCount;
+        }
+
+        public long getTotalCount() {
+            return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount;
+        }
+
+        @Override
+        public String toString() {
+            return "PhaseResult{" +
+                    "validIndexRowCount=" + validIndexRowCount +
+                    ", expiredIndexRowCount=" + expiredIndexRowCount +
+                    ", missingIndexRowCount=" + missingIndexRowCount +
+                    ", invalidIndexRowCount=" + invalidIndexRowCount +
+                    '}';
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null) {
+                return false;
+            }
+            if (!(o instanceof PhaseResult)) {
+                return false;
+            }
+            PhaseResult pr = (PhaseResult) o;
+            return this.expiredIndexRowCount == pr.expiredIndexRowCount
+                    && this.validIndexRowCount == pr.validIndexRowCount
+                    && this.invalidIndexRowCount == pr.invalidIndexRowCount
+                    && this.missingIndexRowCount == pr.missingIndexRowCount;
+        }
+
+        @Override
+        public int hashCode() {
+            long result = 17;
+            result = 31 * result + expiredIndexRowCount;
+            result = 31 * result + validIndexRowCount;
+            result = 31 * result + missingIndexRowCount;
+            result = 31 * result + invalidIndexRowCount;
+            return (int)result;
+        }
+    }
+
+    long scannedDataRowCount = 0;
+    long rebuiltIndexRowCount = 0;
+    PhaseResult before = new PhaseResult();
+    PhaseResult after = new PhaseResult();
+
+    @Override
+    public String toString() {
+        return "VerificationResult{" +
+                "scannedDataRowCount=" + scannedDataRowCount +
+                ", rebuiltIndexRowCount=" + rebuiltIndexRowCount +
+                ", before=" + before +
+                ", after=" + after +
+                '}';
+    }
+
+    public long getScannedDataRowCount() {
+        return scannedDataRowCount;
+    }
+
+    public long getRebuiltIndexRowCount() {
+        return rebuiltIndexRowCount;
+    }
+
+    public long getBeforeRebuildValidIndexRowCount() {
+        return before.validIndexRowCount;
+    }
+
+    public long getBeforeRebuildExpiredIndexRowCount() {
+        return before.expiredIndexRowCount;
+    }
+
+    public long getBeforeRebuildInvalidIndexRowCount() {
+        return before.invalidIndexRowCount;
+    }
+
+    public long getBeforeRebuildMissingIndexRowCount() {
+        return before.missingIndexRowCount;
+    }
+
+    public long getAfterRebuildValidIndexRowCount() {
+        return after.validIndexRowCount;
+    }
+
+    public long getAfterRebuildExpiredIndexRowCount() {
+        return after.expiredIndexRowCount;
+    }
+
+    public long getAfterRebuildInvalidIndexRowCount() {
+        return after.invalidIndexRowCount;
+    }
+
+    public long getAfterRebuildMissingIndexRowCount() {
+        return after.missingIndexRowCount;
+    }
+
+    private void addScannedDataRowCount(long count) {
+        this.scannedDataRowCount += count;
+    }
+
+    private void addRebuiltIndexRowCount(long count) {
+        this.rebuiltIndexRowCount += count;
+    }
+
+    private void addBeforeRebuildValidIndexRowCount(long count) {
+        before.validIndexRowCount += count;
+    }
+
+    private void addBeforeRebuildExpiredIndexRowCount(long count) {
+        before.expiredIndexRowCount += count;
+    }
+
+    private void addBeforeRebuildMissingIndexRowCount(long count) {
+        before.missingIndexRowCount += count;
+    }
+
+    private void addBeforeRebuildInvalidIndexRowCount(long count) {
+        before.invalidIndexRowCount += count;
+    }
+
+    private void addAfterRebuildValidIndexRowCount(long count) {
+        after.validIndexRowCount += count;
+    }
+
+    private void addAfterRebuildExpiredIndexRowCount(long count) {
+        after.expiredIndexRowCount += count;
+    }
+
+    private void addAfterRebuildMissingIndexRowCount(long count) {
+        after.missingIndexRowCount += count;
+    }
+
+    private void addAfterRebuildInvalidIndexRowCount(long count) {
+        after.invalidIndexRowCount += count;
+    }
+
+    private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
+        if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
+                AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES.length) == 0) {
+            return true;
+        }
+        return false;
+    }
+
+    private long getValue(Cell cell) {
+        return Long.parseLong(Bytes.toString(cell.getValueArray(),
+                cell.getValueOffset(), cell.getValueLength()));
+    }
+
+    private void update(Cell cell) {
+        if (CellUtil
+                .matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) {
+            addScannedDataRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES)) {
+            addRebuiltIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
+            addBeforeRebuildValidIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
+            addBeforeRebuildExpiredIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
+            addBeforeRebuildMissingIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
+            addBeforeRebuildInvalidIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
+            addAfterRebuildValidIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
+            addAfterRebuildExpiredIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
+            addAfterRebuildMissingIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
+            addAfterRebuildInvalidIndexRowCount(getValue(cell));
+        }
+    }
+
+    public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
+        // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
+        // Search for the place where the trailing 0xFFs start
+        int offset = rowKeyPrefix.length;
+        while (offset > 0) {
+            if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
+                break;
+            }
+            offset--;
+        }
+        if (offset == 0) {
+            // We got an 0xFFFF... (only FFs) stopRow value which is
+            // the last possible prefix before the end of the table.
+            // So set it to stop at the 'end of the table'
+            return HConstants.EMPTY_END_ROW;
+        }
+        // Copy the right length of the original
+        byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
+        // And increment the last one
+        newStopRow[newStopRow.length - 1]++;
+        return newStopRow;
+    }
+
+    public static IndexToolVerificationResult getVerificationResult(Table hTable, long ts)
+            throws IOException {
+        IndexToolVerificationResult verificationResult = new IndexToolVerificationResult();
+        byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
+        byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey);
+        Scan scan = new Scan();
+        scan.setStartRow(startRowKey);
+        scan.setStopRow(stopRowKey);
+        ResultScanner scanner = hTable.getScanner(scan);
+        for (Result result = scanner.next(); result != null; result = scanner.next()) {
+            for (Cell cell : result.rawCells()) {
+                verificationResult.update(cell);
+            }
+        }
+        return verificationResult;
+    }
+
+    public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
+        if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
+            return false;
+        } else if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+            if (before.invalidIndexRowCount + before.missingIndexRowCount > 0) {
+                return true;
+            }
+        } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
+            if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public void add(IndexToolVerificationResult verificationResult) {
+        scannedDataRowCount += verificationResult.scannedDataRowCount;
+        rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount;
+        before.add(verificationResult.before);
+        after.add(verificationResult.after);
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 98000f7..8d1b4db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
@@ -63,8 +64,8 @@ public class PhoenixIndexImportDirectReducer extends
             long ts = Long.valueOf(configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
             Table hTable = connection.unwrap(PhoenixConnection.class).getQueryServices()
                     .getTable(IndexTool.RESULT_TABLE_NAME_BYTES);
-            IndexRebuildRegionScanner.VerificationResult verificationResult =
-                    IndexRebuildRegionScanner.VerificationResult.getVerificationResult(hTable, ts);
+            IndexToolVerificationResult verificationResult =
+                    IndexToolVerificationResult.getVerificationResult(hTable, ts);
             context.getCounter(PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT).
                     setValue(verificationResult.getScannedDataRowCount());
             context.getCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT).
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
new file mode 100644
index 0000000..2506609
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Properties;
+
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
+import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_BYTES;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
+
+    private static final int INDEX_TABLE_EXPIRY_SEC = 1;
+    private static final String UNEXPECTED_COLUMN = "0:UNEXPECTED_COLUMN";
+    public static final String FIRST_ID = "FIRST_ID";
+    public static final String SECOND_ID = "SECOND_ID";
+    public static final String FIRST_VALUE = "FIRST_VALUE";
+    public static final String SECOND_VALUE = "SECOND_VALUE";
+    public static final String
+            CREATE_TABLE_DDL = "CREATE TABLE IF NOT EXISTS %s (FIRST_ID BIGINT NOT NULL, "
+                        + "SECOND_ID BIGINT NOT NULL, FIRST_VALUE VARCHAR(20), "
+                        + "SECOND_VALUE INTEGER "
+                        + "CONSTRAINT PK PRIMARY KEY(FIRST_ID, SECOND_ID)) COLUMN_ENCODED_BYTES=0";
+
+    public static final String
+            CREATE_INDEX_DDL = "CREATE INDEX %s ON %s (SECOND_VALUE) INCLUDE (FIRST_VALUE)";
+    public static final String COMPLETE_ROW_UPSERT = "UPSERT INTO %s VALUES (?,?,?,?)";
+    public static final String PARTIAL_ROW_UPSERT = "UPSERT INTO %s (%s, %s, %s) VALUES (?,?,?)";
+    public static final String DELETE_ROW_DML = "DELETE FROM %s WHERE %s = ?  AND %s = ?";
+    public static final String INCLUDED_COLUMN = "0:FIRST_VALUE";
+
+    @Rule
+    public ExpectedException exceptionRule = ExpectedException.none();
+
+    private enum TestType {
+        //set of mutations matching expected mutations
+        VALID_EXACT_MATCH,
+        //mix of delete and put mutations
+        VALID_MIX_MUTATIONS,
+        //only incoming unverified mutations
+        VALID_NEW_UNVERIFIED_MUTATIONS,
+        //extra mutations mimicking incoming mutations
+        VALID_MORE_MUTATIONS,
+        EXPIRED,
+        INVALID_EXTRA_CELL,
+        INVALID_EMPTY_CELL,
+        INVALID_CELL_VALUE,
+        INVALID_COLUMN
+    }
+
+    public static class UnitTestClock extends EnvironmentEdge {
+        long initialTime;
+        long delta;
+
+        public UnitTestClock(long delta) {
+            initialTime = System.currentTimeMillis() + delta;
+            this.delta = delta;
+        }
+
+        @Override
+        public long currentTime() {
+            return System.currentTimeMillis() + delta;
+        }
+    }
+
+    @Mock
+    Result indexRow;
+    @Mock
+    IndexRebuildRegionScanner rebuildScanner;
+    List<Mutation> actualMutationList;
+    String schema, table, dataTableFullName, index, indexTableFullName;
+    PTable pIndexTable, pDataTable;
+    Put put = null;
+    Delete delete = null;
+    PhoenixConnection pconn;
+    IndexToolVerificationResult.PhaseResult actualPR;
+    public Map<byte[], List<Mutation>> indexKeyToMutationMapLocal;
+    private IndexMaintainer indexMaintainer;
+
+    @Before
+    public void setup() throws SQLException, IOException {
+        MockitoAnnotations.initMocks(this);
+        createDBObject();
+        createMutationsWithUpserts();
+        initializeRebuildScannerAttributes();
+        initializeGlobalMockitoSetup();
+    }
+
+    public void createDBObject() throws SQLException {
+        try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())) {
+            schema = generateUniqueName();
+            table = generateUniqueName();
+            index = generateUniqueName();
+            dataTableFullName = SchemaUtil.getQualifiedTableName(schema, table);
+            indexTableFullName = SchemaUtil.getQualifiedTableName(schema, index);
+
+            conn.createStatement().execute(String.format(CREATE_TABLE_DDL, dataTableFullName));
+            conn.createStatement().execute(String.format(CREATE_INDEX_DDL, index, dataTableFullName));
+            conn.commit();
+
+            pconn = conn.unwrap(PhoenixConnection.class);
+            pIndexTable = pconn.getTable(new PTableKey(pconn.getTenantId(), indexTableFullName));
+            pDataTable = pconn.getTable(new PTableKey(pconn.getTenantId(), dataTableFullName));
+        }
+    }
+
+    private void createMutationsWithUpserts() throws SQLException, IOException {
+        deleteRow(2, 3);
+        upsertPartialRow(2, 3, "abc");
+        upsertCompleteRow(2, 3, "hik", 8);
+        upsertPartialRow(2, 3, 10);
+        upsertPartialRow(2,3,4);
+        deleteRow(2, 3);
+        upsertPartialRow(2,3, "def");
+        upsertCompleteRow(2, 3, null, 20);
+        upsertPartialRow(2,3, "wert");
+    }
+
+    private void deleteRow(int key1, int key2) throws SQLException, IOException {
+        try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){
+            PreparedStatement ps =
+                    conn.prepareStatement(
+                            String.format(DELETE_ROW_DML, dataTableFullName, FIRST_ID, SECOND_ID));
+            ps.setInt(1, key1);
+            ps.setInt(2, key2);
+            ps.execute();
+            convertUpsertToMutations(conn);
+        }
+    }
+
+    private void upsertPartialRow(int key1, int key2, String val1)
+            throws SQLException, IOException {
+
+        try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){
+            PreparedStatement ps =
+                    conn.prepareStatement(
+                            String.format(PARTIAL_ROW_UPSERT, dataTableFullName, FIRST_ID, SECOND_ID,
+                                    FIRST_VALUE));
+            ps.setInt(1, key1);
+            ps.setInt(2, key2);
+            ps.setString(3, val1);
+            ps.execute();
+            convertUpsertToMutations(conn);
+        }
+    }
+
+    private void upsertPartialRow(int key1, int key2, int value1)
+            throws SQLException, IOException {
+
+        try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){
+            PreparedStatement
+                    ps =
+                    conn.prepareStatement(
+                            String.format(PARTIAL_ROW_UPSERT, dataTableFullName, FIRST_ID, SECOND_ID,
+                                    SECOND_VALUE));
+            ps.setInt(1, key1);
+            ps.setInt(2, key2);
+            ps.setInt(3, value1);
+            ps.execute();
+            convertUpsertToMutations(conn);
+        }
+    }
+
+    private void upsertCompleteRow(int key1, int key2, String val1
+    , int val2) throws SQLException, IOException {
+        try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())) {
+            PreparedStatement
+                    ps = conn.prepareStatement(String.format(COMPLETE_ROW_UPSERT, dataTableFullName));
+            ps.setInt(1, key1);
+            ps.setInt(2, key2);
+            ps.setString(3, val1);
+            ps.setInt(4, val2);
+            ps.execute();
+            convertUpsertToMutations(conn);
+        }
+    }
+
+    private void convertUpsertToMutations(Connection conn) throws SQLException, IOException {
+        Iterator<Pair<byte[],List<KeyValue>>>
+                dataTableNameAndMutationKeyValuesIter = PhoenixRuntime.getUncommittedDataIterator(conn);
+        Pair<byte[], List<KeyValue>> elem = dataTableNameAndMutationKeyValuesIter.next();
+        byte[] key = elem.getSecond().get(0).getRow();
+        long mutationTS = EnvironmentEdgeManager.currentTimeMillis();
+
+        for (KeyValue kv : elem.getSecond()) {
+            Cell cell =
+                    CellUtil.createCell(kv.getRow(), kv.getFamily(), kv.getQualifier(),
+                            mutationTS, kv.getType(), kv.getValue());
+            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                if (put == null ) {
+                    put = new Put(key);
+                }
+                put.add(cell);
+            } else {
+                if (delete == null) {
+                    delete = new Delete(key);
+                }
+                delete.addDeleteMarker(cell);
+            }
+        }
+    }
+
+    private void initializeRebuildScannerAttributes() {
+        when(rebuildScanner.setIndexTableTTL(Matchers.anyInt())).thenCallRealMethod();
+        when(rebuildScanner.setIndexMaintainer(Matchers.<IndexMaintainer>any())).thenCallRealMethod();
+        when(rebuildScanner.setIndexKeyToMutationMap(Matchers.<Map>any())).thenCallRealMethod();
+        rebuildScanner.setIndexTableTTL(HConstants.FOREVER);
+        indexMaintainer = pIndexTable.getIndexMaintainer(pDataTable, pconn);
+        rebuildScanner.setIndexMaintainer(indexMaintainer);
+    }
+
+    private void initializeGlobalMockitoSetup() throws IOException {
+        //setup
+        when(rebuildScanner.getIndexRowKey(put)).thenCallRealMethod();
+        when(rebuildScanner.prepareIndexMutations(put, delete)).thenCallRealMethod();
+        when(rebuildScanner.verifySingleIndexRow(Matchers.<Result>any(),
+                Matchers.<IndexToolVerificationResult.PhaseResult>any())).thenCallRealMethod();
+        doNothing().when(rebuildScanner)
+                .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(),
+                Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(),
+                        Matchers.<byte[]>any(), Matchers.<byte[]>any());
+        doNothing().when(rebuildScanner)
+                .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(),
+                Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString());
+
+        //populate the local map to use to create actual mutations
+        indexKeyToMutationMapLocal = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+        rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMapLocal);
+        rebuildScanner.prepareIndexMutations(put, delete);
+
+        //populate map to use in test code
+        Map<byte[], List<Mutation>> indexKeyToMutationMap = Maps.newTreeMap((Bytes.BYTES_COMPARATOR));
+        rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMap);
+        rebuildScanner.prepareIndexMutations(put, delete);
+    }
+
+    private byte[] getValidRowKey() {
+        return indexKeyToMutationMapLocal.entrySet().iterator().next().getKey();
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_validIndexRowCount_nonZero() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.VALID_EXACT_MATCH);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_validIndexRowCount_moreActual() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.VALID_MORE_MUTATIONS);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_allMix() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.VALID_MIX_MUTATIONS);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_allUnverified() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.VALID_NEW_UNVERIFIED_MUTATIONS);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_expiredIndexRowCount_nonZero() throws IOException {
+        IndexToolVerificationResult.PhaseResult
+                expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0);
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.EXPIRED);
+            expireThisRow();
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_invalidIndexRowCount_cellValue() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.INVALID_CELL_VALUE);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_invalidIndexRowCount_emptyCell() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.INVALID_EMPTY_CELL);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_invalidIndexRowCount_diffColumn() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.INVALID_COLUMN);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_invalidIndexRowCount_extraCell() throws IOException {
+        IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+        for (Map.Entry<byte[], List<Mutation>>
+                entry : indexKeyToMutationMapLocal.entrySet()) {
+            initializeLocalMockitoSetup(entry, TestType.INVALID_EXTRA_CELL);
+            //test code
+            rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+            assertTrue(actualPR.equals(expectedPR));
+        }
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_expectedMutations_null() throws IOException {
+        when(indexRow.getRow()).thenReturn(Bytes.toBytes(1));
+        exceptionRule.expect(DoNotRetryIOException.class);
+        exceptionRule.expectMessage(IndexRebuildRegionScanner.NO_EXPECTED_MUTATION);
+        rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_actualMutations_null() throws IOException {
+        byte [] validRowKey = getValidRowKey();
+        when(indexRow.getRow()).thenReturn(validRowKey);
+        when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(null);
+        exceptionRule.expect(DoNotRetryIOException.class);
+        exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
+        rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+    }
+
+    @Test
+    public void testVerifySingleIndexRow_actualMutations_empty() throws IOException {
+        byte [] validRowKey = getValidRowKey();
+        when(indexRow.getRow()).thenReturn(validRowKey);
+        actualMutationList = new ArrayList<>();
+        when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(actualMutationList);
+        exceptionRule.expect(DoNotRetryIOException.class);
+        exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
+        rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+    }
+
+    private IndexToolVerificationResult.PhaseResult getValidPhaseResult() {
+        return new IndexToolVerificationResult.PhaseResult(1,0,0,0);
+    }
+
+    private IndexToolVerificationResult.PhaseResult getInvalidPhaseResult() {
+        return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1);
+    }
+
+    private void initializeLocalMockitoSetup(Map.Entry<byte[], List<Mutation>> entry,
+            TestType testType)
+            throws IOException {
+        actualPR = new IndexToolVerificationResult.PhaseResult();
+        byte[] indexKey = entry.getKey();
+        when(indexRow.getRow()).thenReturn(indexKey);
+        actualMutationList = buildActualIndexMutationsList(testType);
+        when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(actualMutationList);
+    }
+
+    private List<Mutation> buildActualIndexMutationsList(TestType testType) {
+        List<Mutation> actualMutations = new ArrayList<>();
+        actualMutations.addAll(indexKeyToMutationMapLocal.get(indexRow.getRow()));
+        if(testType.equals(TestType.EXPIRED)) {
+            return actualMutations;
+        }
+        if(testType.toString().startsWith("VALID")) {
+            return getValidActualMutations(testType, actualMutations);
+        }
+        if(testType.toString().startsWith("INVALID")) {
+            return getInvalidActualMutations(testType, actualMutations);
+        }
+        return null;
+    }
+
+    private List <Mutation> getValidActualMutations(TestType testType,
+            List<Mutation> actualMutations) {
+        List <Mutation> newActualMutations = new ArrayList<>();
+        if(testType.equals(TestType.VALID_EXACT_MATCH)) {
+            return actualMutations;
+        }
+        if (testType.equals(TestType.VALID_MIX_MUTATIONS)) {
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+            newActualMutations.add(getDeleteMutation(actualMutations.get(0), new Long(1)));
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+        }
+        if (testType.equals(TestType.VALID_NEW_UNVERIFIED_MUTATIONS)) {
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), new Long(1)));
+        }
+        newActualMutations.addAll(actualMutations);
+        if(testType.equals(TestType.VALID_MORE_MUTATIONS)) {
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+            newActualMutations.add(getDeleteMutation(actualMutations.get(0), null));
+            newActualMutations.add(getDeleteMutation(actualMutations.get(0), new Long(1)));
+            newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), new Long(1)));
+        }
+        return newActualMutations;
+    }
+
+    private List <Mutation> getInvalidActualMutations(TestType testType,
+            List<Mutation> actualMutations) {
+        List <Mutation> newActualMutations = new ArrayList<>();
+        newActualMutations.addAll(actualMutations);
+        for (Mutation m : actualMutations) {
+            newActualMutations.remove(m);
+            NavigableMap<byte[], List<Cell>> familyCellMap = m.getFamilyCellMap();
+            List<Cell> cellList = familyCellMap.firstEntry().getValue();
+            List<Cell> newCellList = new ArrayList<>();
+            byte[] fam = CellUtil.cloneFamily(cellList.get(0));
+            for (Cell c : cellList) {
+                infiltrateCell(c, newCellList, testType);
+            }
+            familyCellMap.put(fam, newCellList);
+            m.setFamilyCellMap(familyCellMap);
+            newActualMutations.add(m);
+        }
+        return newActualMutations;
+    }
+
+    private void infiltrateCell(Cell c, List<Cell> newCellList, TestType e) {
+        Cell newCell;
+        Cell emptyCell;
+        switch(e) {
+        case INVALID_COLUMN:
+            newCell =
+                    CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
+                            Bytes.toBytes(UNEXPECTED_COLUMN),
+                            EnvironmentEdgeManager.currentTimeMillis(),
+                            KeyValue.Type.Put.getCode(), Bytes.toBytes("zxcv"));
+            newCellList.add(newCell);
+            newCellList.add(c);
+            break;
+        case INVALID_CELL_VALUE:
+            if (CellUtil.matchingQualifier(c, EMPTY_COLUMN_BYTES)) {
+                newCell = getCellWithPut(c);
+                emptyCell = getUnverifiedEmptyCell(c);
+                newCellList.add(newCell);
+                newCellList.add(emptyCell);
+            } else {
+                newCellList.add(c);
+            }
+            break;
+        case INVALID_EMPTY_CELL:
+            if (CellUtil.matchingQualifier(c, EMPTY_COLUMN_BYTES)) {
+                newCell =
+                        CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
+                                CellUtil.cloneQualifier(c), c.getTimestamp(),
+                                KeyValue.Type.Delete.getCode(), VERIFIED_BYTES);
+                newCellList.add(newCell);
+            } else {
+                newCellList.add(c);
+            }
+            break;
+        case INVALID_EXTRA_CELL:
+            newCell = getCellWithPut(c);
+            emptyCell = getUnverifiedEmptyCell(c);
+            newCellList.add(newCell);
+            newCellList.add(emptyCell);
+            newCellList.add(c);
+        }
+    }
+
+    private Cell getUnverifiedEmptyCell(Cell c) {
+        return CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
+                indexMaintainer.getEmptyKeyValueQualifier(),
+                EnvironmentEdgeManager.currentTimeMillis(),
+                KeyValue.Type.Put.getCode(), UNVERIFIED_BYTES);
+    }
+
+    private Cell getCellWithPut(Cell c) {
+        return CellUtil.createCell(CellUtil.cloneRow(c),
+                CellUtil.cloneFamily(c), Bytes.toBytes(INCLUDED_COLUMN),
+                EnvironmentEdgeManager.currentTimeMillis(), KeyValue.Type.Put.getCode(),
+                Bytes.toBytes("zxcv"));
+    }
+
+    private void expireThisRow() {
+        rebuildScanner.setIndexTableTTL(INDEX_TABLE_EXPIRY_SEC);
+        UnitTestClock expiryClock = new UnitTestClock(5000);
+        EnvironmentEdgeManager.injectEdge(expiryClock);
+    }
+
+    private Mutation getDeleteMutation(Mutation orig, Long ts) {
+        Mutation m = new Delete(orig.getRow());
+        List<Cell> origList = orig.getFamilyCellMap().firstEntry().getValue();
+        ts = ts == null ? EnvironmentEdgeManager.currentTimeMillis() : ts;
+        Cell c = getNewPutCell(orig, origList, ts, KeyValue.Type.DeleteFamilyVersion);
+        Cell empty = getEmptyCell(orig, origList, ts, KeyValue.Type.Put, true);
+        byte[] fam = CellUtil.cloneFamily(origList.get(0));
+        List<Cell> famCells = Lists.newArrayList();
+        m.getFamilyCellMap().put(fam, famCells);
+        famCells.add(c);
+        famCells.add(empty);
+        return m;
+    }
+
+    private Mutation getUnverifiedPutMutation(Mutation orig, Long ts) {
+        Mutation m = new Put(orig.getRow());
+        if (orig.getAttributesMap() != null) {
+            for (Map.Entry<String,byte[]> entry : orig.getAttributesMap().entrySet()) {
+                m.setAttribute(entry.getKey(), entry.getValue());
+            }
+        }
+        List<Cell> origList = orig.getFamilyCellMap().firstEntry().getValue();
+        ts = ts == null ? EnvironmentEdgeManager.currentTimeMillis() : ts;
+        Cell c = getNewPutCell(orig, origList, ts, KeyValue.Type.Put);
+        Cell empty = getEmptyCell(orig, origList, ts, KeyValue.Type.Put, false);
+        byte[] fam = CellUtil.cloneFamily(origList.get(0));
+        List<Cell> famCells = Lists.newArrayList();
+        m.getFamilyCellMap().put(fam, famCells);
+        famCells.add(c);
+        famCells.add(empty);
+        return m;
+    }
+
+    private Cell getEmptyCell(Mutation orig, List<Cell> origList, Long ts, KeyValue.Type type,
+            boolean verified) {
+        return CellUtil.createCell(orig.getRow(), CellUtil.cloneFamily(origList.get(0)),
+                indexMaintainer.getEmptyKeyValueQualifier(),
+                ts, type.getCode(), verified ? VERIFIED_BYTES : UNVERIFIED_BYTES);
+    }
+
+    private Cell getNewPutCell(Mutation orig, List<Cell> origList, Long ts, KeyValue.Type type) {
+        return CellUtil.createCell(orig.getRow(),
+                CellUtil.cloneFamily(origList.get(0)), Bytes.toBytes(INCLUDED_COLUMN),
+                ts, type.getCode(), Bytes.toBytes("asdfg"));
+    }
+}