You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/04/07 03:52:18 UTC

[phoenix] branch 4.x updated: PHOENIX-5735 IndexTool's inline verification should not verify rows beyond max lookback age

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 339a22d  PHOENIX-5735 IndexTool's inline verification should not verify rows beyond max lookback age
339a22d is described below

commit 339a22d29b3c5f268935c8c013aaa1e1f4bbbf69
Author: Weiming Wang <wa...@live.cn>
AuthorDate: Wed Apr 1 11:09:09 2020 -0700

    PHOENIX-5735 IndexTool's inline verification should not verify rows beyond max lookback age
    
    Signed-off-by: Kadir <ko...@salesforce.com>
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    |  51 ++++--
 .../hadoop/hbase/regionserver/ScanInfoUtil.java    |   2 +-
 .../coprocessor/IndexRebuildRegionScanner.java     | 173 +++++++++++++++------
 .../coprocessor/IndexToolVerificationResult.java   |  78 ++++++++--
 .../apache/phoenix/mapreduce/index/IndexTool.java  |   8 +
 .../index/PhoenixIndexImportDirectReducer.java     |   8 +
 .../index/PhoenixIndexToolJobCounters.java         |   6 +-
 .../phoenix/index/VerifySingleIndexRowTest.java    | 168 +++++++++++++++-----
 8 files changed, 376 insertions(+), 118 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 119c806..71cb530 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -62,6 +62,7 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -81,19 +82,21 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
 import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -251,6 +254,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
             assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
             assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
             dropIndexToolTables(conn);
         }
     }
@@ -356,6 +361,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
             assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
             assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
         } finally {
             conn.close();
         }
@@ -412,7 +419,9 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
             assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
             assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
-            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
             long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
             assertEquals(NROWS, actualRowCount);
 
@@ -429,10 +438,14 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
             assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
             assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
             assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
             assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
             assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
             assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+            assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
             actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
             assertEquals(2 * NROWS, actualRowCount);
             dropIndexToolTables(conn);
@@ -593,9 +606,13 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
                     null, -1, IndexTool.IndexVerifyType.AFTER);
             // The index tool output table should report that there is a missing index row
             Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName);
-            byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
-            assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
-                    expectedValueBytes, 0, expectedValueBytes.length) == 0);
+            try {
+                String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
+                String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                assertTrue(expectedErrorMsg.equals(actualErrorMsg));
+            } catch(Exception ex){
+                Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
+            }
             IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
             dropIndexToolTables(conn);
         }
@@ -627,9 +644,13 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                     null, -1, IndexTool.IndexVerifyType.ONLY);
             Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
-            byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
-            assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
-                    expectedValueBytes, 0, expectedValueBytes.length) == 0);
+            try {
+                String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
+                String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                assertTrue(expectedErrorMsg.equals(actualErrorMsg));
+            } catch(Exception ex) {
+                Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
+            }
             // Delete the output table for the next test
             dropIndexToolTables(conn);
             // Run the index tool to populate the index while verifying rows
@@ -668,9 +689,13 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             Cell cell =
                     getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName,
                         indexTableFullName);
-            byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
-            assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(),
-                cell.getValueLength(), expectedValueBytes, 0, expectedValueBytes.length) == 0);
+            try {
+                String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
+                String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                assertTrue(expectedErrorMsg.equals(actualErrorMsg));
+            } catch(Exception ex) {
+                Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
+            }
 
             // Run the index tool to populate the index while verifying rows
             runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
index e70ffc7..7d54228 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
@@ -121,7 +121,7 @@ public class ScanInfoUtil {
             DEFAULT_PHOENIX_MAX_LOOKBACK_AGE));
     }
 
-    private static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
+    public static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
         return maxLookbackTime > 0L;
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 0bd8c7c..2bdaf1d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -17,21 +17,10 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
 import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY;
-import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.*;
 import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
@@ -48,7 +37,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
-import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -72,8 +60,9 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.compile.ScanRanges;
@@ -109,6 +98,7 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 
 public class IndexRebuildRegionScanner extends BaseRegionScanner {
+    public static final String ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK = "Missing index row beyond maxLookBack";
 
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
     public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
@@ -154,6 +144,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     private int  singleRowRebuildReturnCode;
     private Map<byte[], NavigableSet<byte[]>> familyMap;
     private byte[][] viewConstants;
+    private long maxLookBackInMills;
 
     @VisibleForTesting
     public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
@@ -219,6 +210,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                         DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
             }
         }
+
+        maxLookBackInMills = ScanInfoUtil.getMaxLookbackInMillis(config);
     }
 
     private void setReturnCodeForSingleRowRebuild() throws IOException {
@@ -302,6 +295,10 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                     scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.missingIndexRowCount)));
             put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
                     scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.invalidIndexRowCount)));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.beyondMaxLookBackMissingIndexRowCount)));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.beyondMaxLookBackInvalidIndexRowCount)));
         }
         if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
             put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
@@ -312,6 +309,10 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                     scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.missingIndexRowCount)));
             put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
                     scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.invalidIndexRowCount)));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.beyondMaxLookBackMissingIndexRowCount)));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
+                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.beyondMaxLookBackInvalidIndexRowCount)));
         }
         resultHTable.put(put);
     }
@@ -370,6 +371,12 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return 0;
     }
 
+    @VisibleForTesting
+    public long setMaxLookBackInMills(long maxLookBackInMills) {
+        this.maxLookBackInMills = maxLookBackInMills;
+        return 0;
+    }
+
     public static class SimpleValueGetter implements ValueGetter {
         final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
         final Put put;
@@ -516,13 +523,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return null;
     }
 
-    private boolean isMatchingMutation(Mutation expected, Mutation actual, int iteration) throws IOException {
+    private void logMismatch(Mutation expected, Mutation actual, int iteration) throws IOException {
         if (getTimestamp(expected) != getTimestamp(actual)) {
             String errorMsg = "Not matching timestamp";
             byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
             logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
                     errorMsg, null, null);
-            return false;
+            return;
         }
         int expectedCellCount = 0;
         for (List<Cell> cells : expected.getFamilyCellMap().values()) {
@@ -539,14 +546,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                     byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
                     String errorMsg = "Missing cell (in iteration " + iteration + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
                     logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg);
-                    return false;
+                    return;
                 }
                 if (!CellUtil.matchingValue(actualCell, expectedCell)) {
                     String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
                     byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
                     logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
                             errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
-                    return false;
+                    return;
                 }
             }
         }
@@ -562,6 +569,40 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
             logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
                     errorMsg);
+        }
+    }
+
+    private boolean isMatchingMutation(Mutation expected, Mutation actual) throws IOException {
+        if (getTimestamp(expected) != getTimestamp(actual)) {
+            return false;
+        }
+        int expectedCellCount = 0;
+        for (List<Cell> cells : expected.getFamilyCellMap().values()) {
+            if (cells == null) {
+                continue;
+            }
+            for (Cell expectedCell : cells) {
+                expectedCellCount++;
+                byte[] family = CellUtil.cloneFamily(expectedCell);
+                byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
+                Cell actualCell = getCell(actual, family, qualifier);
+                if (actualCell == null ||
+                        !CellUtil.matchingType(expectedCell, actualCell)) {
+                    return false;
+                }
+                if (!CellUtil.matchingValue(actualCell, expectedCell)) {
+                    return false;
+                }
+            }
+        }
+        int actualCellCount = 0;
+        for (List<Cell> cells : actual.getFamilyCellMap().values()) {
+            if (cells == null) {
+                continue;
+            }
+            actualCellCount += cells.size();
+        }
+        if (expectedCellCount != actualCellCount) {
             return false;
         }
         return true;
@@ -698,7 +739,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         while (iterator.hasNext()) {
             Mutation mutation = iterator.next();
             if ((mutation instanceof Put && !isVerified((Put) mutation)) ||
-                    (mutation instanceof Delete && isDeleteFamilyVersion(mutation))) {
+                    (mutation instanceof Delete && !isDeleteFamily(mutation))) {
                 iterator.remove();
             } else {
                 if (previous != null && getTimestamp(previous) == getTimestamp(mutation) &&
@@ -796,15 +837,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
             repairActualMutationList(actualMutationList, expectedMutationList);
         }
         cleanUpActualMutationList(actualMutationList);
-        long currentTime = EnvironmentEdgeManager.currentTime();
+        long currentTime = EnvironmentEdgeManager.currentTimeMillis();
         int actualIndex = 0;
         int expectedIndex = 0;
-        int matchingCount = 0;
         int expectedSize = expectedMutationList.size();
         int actualSize = actualMutationList.size();
         Mutation expected = null;
         Mutation previousExpected;
-        Mutation actual;
+        Mutation actual = null;
         while (expectedIndex < expectedSize && actualIndex <actualSize) {
             previousExpected = expected;
             expected = expectedMutationList.get(expectedIndex);
@@ -822,7 +862,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 if (previousExpected instanceof Delete) {
                     // Between an expected delete and put, there can be one or more deletes due to
                     // concurrent mutations or data table write failures. Skip all of them if any
-                    while (getTimestamp(actual) > getTimestamp(expected) && (actual instanceof Delete)) {
+                    // There cannot be any actual delete mutation between two expected put mutations.
+                    while (getTimestamp(actual) >= getTimestamp(expected) && actual instanceof Delete) {
                         actualIndex++;
                         if (actualIndex == actualSize) {
                             break;
@@ -836,10 +877,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 if (actual instanceof Delete) {
                     break;
                 }
-                if (isMatchingMutation(expected, actual, expectedIndex)) {
+                if (isMatchingMutation(expected, actual)) {
                     expectedIndex++;
                     actualIndex++;
-                    matchingCount++;
                     continue;
                 }
             } else { // expected instanceof Delete
@@ -859,39 +899,54 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                         (actual instanceof Delete && isDeleteFamily(actual))) {
                     expectedIndex++;
                     actualIndex++;
-                    matchingCount++;
                     continue;
                 }
             }
-            if (matchingCount > 0) {
-                break;
+            break;
+        }
+
+        if (expectedIndex == expectedSize ){
+            // every expected mutation has its matching one in the actual list.
+            verificationPhaseResult.validIndexRowCount++;
+            return true;
+        }
+
+        if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(expectedMutationList.get(expectedIndex)))){
+            if (expectedIndex > 0) {
+                // if current expected index mutation is beyond max look back window, we only need to make sure its latest
+                // mutation is a matching one, as an SCN query is required.
+                verificationPhaseResult.validIndexRowCount++;
+                return true;
             }
-            verificationPhaseResult.invalidIndexRowCount++;
+
+            // All expected mutations are beyond the maxLookBack window, none of them can find its matching one in actual list
+            // It may be caused by real bug or compaction on the index table.
+            // We report it as a failure, so "before" option can trigger the index rebuild for this row.
+            // This repair is required, when there is only one index row for a given data table row and the timestamp of that row
+            // can be beyond maxLookBack.
+            verificationPhaseResult.beyondMaxLookBackInvalidIndexRowCount++;
+            byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+            String errorMsg = String.format("Expect %1$s mutations but got %2$s (beyond maxLookBack)",
+                    expectedSize,
+                    actualSize);
+            logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+                    getTimestamp(expectedMutationList.get(expectedIndex)),
+                    0, errorMsg);
             return false;
         }
-        if ((expectedIndex != expectedSize) || actualIndex != actualSize) {
-            if (matchingCount > 0) {
-                if (verifyType != IndexTool.IndexVerifyType.ONLY) {
-                    // We do not consider this as a verification issue but log it for further information.
-                    // This may happen due to compaction
-                    byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
-                    String errorMsg = "Expected to find " + expectedMutationList.size() + " mutations but got "
-                            + actualMutationList.size();
-                    logToIndexToolOutputTable(dataKey, indexRow.getRow(),
-                            getTimestamp(expectedMutationList.get(0)),
-                            getTimestamp(actualMutationList.get(0)), errorMsg);
-                }
-            } else {
+        else {
+            if (actualIndex < actualSize && actual instanceof Put  &&  expected instanceof Put){
+                logMismatch(expected, actual, expectedIndex);
+            }
+            else {
                 byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
                 String errorMsg = "Not matching index row";
                 logToIndexToolOutputTable(dataKey, indexRow.getRow(),
                         getTimestamp(expectedMutationList.get(0)), 0L, errorMsg);
-                verificationPhaseResult.invalidIndexRowCount++;
-                return false;
             }
+            verificationPhaseResult.invalidIndexRowCount++;
+            return false;
         }
-        verificationPhaseResult.validIndexRowCount++;
-        return true;
     }
 
     private static long getMaxTimestamp(Pair<Put, Delete> pair) {
@@ -937,7 +992,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         // TODO: metrics for expired rows
         if (!keys.isEmpty()) {
             Iterator<KeyRange> itr = keys.iterator();
-            long currentTime = EnvironmentEdgeManager.currentTime();
+            long currentTime = EnvironmentEdgeManager.currentTimeMillis();
             while(itr.hasNext()) {
                 KeyRange keyRange = itr.next();
                 byte[] key = keyRange.getLowerRange();
@@ -950,20 +1005,30 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         }
         if (keys.size() > 0) {
             for (KeyRange keyRange : keys) {
-                String errorMsg = "Missing index row";
                 byte[] key = keyRange.getLowerRange();
                 List<Mutation> mutationList = indexKeyToMutationMap.get(key);
-                if (mutationList.get(mutationList.size() - 1) instanceof Delete) {
+                Mutation mutation = mutationList.get(mutationList.size() - 1);
+                if (mutation instanceof Delete) {
                     continue;
                 }
+                long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+                String errorMsg;
+                if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(mutation))){
+                    errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
+                    verificationPhaseResult.beyondMaxLookBackMissingIndexRowCount++;
+                }
+                else {
+                    errorMsg = "Missing index row";
+                    verificationPhaseResult.missingIndexRowCount++;
+                }
                 byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
                 logToIndexToolOutputTable(dataKey,
                         keyRange.getLowerRange(),
                         getMaxTimestamp(dataKeyToMutationMap.get(dataKey)),
-                        getTimestamp(mutationList.get(mutationList.size() - 1)), errorMsg);
-                verificationPhaseResult.missingIndexRowCount++;
+                        getTimestamp(mutation), errorMsg);
             }
         }
+
         keys.addAll(invalidKeys);
     }
 
@@ -974,6 +1039,12 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
     }
 
+    private boolean isTimestampBeyondMaxLookBack(long currentTime, long tsToCheck){
+        if (!ScanInfoUtil.isMaxLookbackTimeEnabled(maxLookBackInMills))
+            return true;
+        return tsToCheck < (currentTime - maxLookBackInMills);
+    }
+
     private void addVerifyTask(final List<KeyRange> keys,
                                final IndexToolVerificationResult.PhaseResult verificationPhaseResult) {
         tasks.add(new Task<Boolean>() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
index ed92fad..1fbb866 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
@@ -30,17 +30,7 @@ import org.apache.phoenix.mapreduce.index.IndexTool;
 import java.io.IOException;
 import java.util.Arrays;
 
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY;
-import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.*;
 
 public class IndexToolVerificationResult {
     public static class PhaseResult {
@@ -48,26 +38,33 @@ public class IndexToolVerificationResult {
         long expiredIndexRowCount = 0;
         long missingIndexRowCount = 0;
         long invalidIndexRowCount = 0;
+        long beyondMaxLookBackMissingIndexRowCount = 0;
+        long beyondMaxLookBackInvalidIndexRowCount = 0;
 
         public void add(PhaseResult phaseResult) {
             validIndexRowCount += phaseResult.validIndexRowCount;
             expiredIndexRowCount += phaseResult.expiredIndexRowCount;
             missingIndexRowCount += phaseResult.missingIndexRowCount;
             invalidIndexRowCount += phaseResult.invalidIndexRowCount;
+            beyondMaxLookBackMissingIndexRowCount += phaseResult.beyondMaxLookBackMissingIndexRowCount;
+            beyondMaxLookBackInvalidIndexRowCount += phaseResult.beyondMaxLookBackInvalidIndexRowCount;
         }
 
         public PhaseResult(){}
 
         public PhaseResult(long validIndexRowCount, long expiredIndexRowCount,
-                long missingIndexRowCount, long invalidIndexRowCount) {
+                long missingIndexRowCount, long invalidIndexRowCount,
+                long beyondMaxLookBackMissingIndexRowCount, long beyondMaxLookBackInvalidIndexRowCount) {
             this.validIndexRowCount = validIndexRowCount;
             this.expiredIndexRowCount = expiredIndexRowCount;
             this.missingIndexRowCount = missingIndexRowCount;
             this.invalidIndexRowCount = invalidIndexRowCount;
+            this.beyondMaxLookBackInvalidIndexRowCount = beyondMaxLookBackInvalidIndexRowCount;
+            this.beyondMaxLookBackMissingIndexRowCount = beyondMaxLookBackMissingIndexRowCount;
         }
 
         public long getTotalCount() {
-            return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount;
+            return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount + beyondMaxLookBackMissingIndexRowCount + beyondMaxLookBackInvalidIndexRowCount;
         }
 
         @Override
@@ -77,7 +74,8 @@ public class IndexToolVerificationResult {
                     ", expiredIndexRowCount=" + expiredIndexRowCount +
                     ", missingIndexRowCount=" + missingIndexRowCount +
                     ", invalidIndexRowCount=" + invalidIndexRowCount +
-                    '}';
+                    ", beyondMaxLookBackMissingIndexRowCount=" + beyondMaxLookBackMissingIndexRowCount +
+                    ", beyondMaxLookBackInvalidIndexRowCount=" + beyondMaxLookBackInvalidIndexRowCount;
         }
 
         @Override
@@ -92,7 +90,9 @@ public class IndexToolVerificationResult {
             return this.expiredIndexRowCount == pr.expiredIndexRowCount
                     && this.validIndexRowCount == pr.validIndexRowCount
                     && this.invalidIndexRowCount == pr.invalidIndexRowCount
-                    && this.missingIndexRowCount == pr.missingIndexRowCount;
+                    && this.missingIndexRowCount == pr.missingIndexRowCount
+                    && this.beyondMaxLookBackInvalidIndexRowCount == pr.beyondMaxLookBackInvalidIndexRowCount
+                    && this.beyondMaxLookBackMissingIndexRowCount == pr.beyondMaxLookBackMissingIndexRowCount;
         }
 
         @Override
@@ -102,6 +102,8 @@ public class IndexToolVerificationResult {
             result = 31 * result + validIndexRowCount;
             result = 31 * result + missingIndexRowCount;
             result = 31 * result + invalidIndexRowCount;
+            result = 31 * result + beyondMaxLookBackMissingIndexRowCount;
+            result = 31 * result + beyondMaxLookBackInvalidIndexRowCount;
             return (int)result;
         }
     }
@@ -141,6 +143,14 @@ public class IndexToolVerificationResult {
         return before.invalidIndexRowCount;
     }
 
+    public long getBeforeRebuildBeyondMaxLookBackMissingIndexRowCount() {
+        return before.beyondMaxLookBackMissingIndexRowCount;
+    };
+
+    public long getBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount() {
+        return before.beyondMaxLookBackInvalidIndexRowCount;
+    };
+
     public long getBeforeRebuildMissingIndexRowCount() {
         return before.missingIndexRowCount;
     }
@@ -161,6 +171,14 @@ public class IndexToolVerificationResult {
         return after.missingIndexRowCount;
     }
 
+    public long getAfterRebuildBeyondMaxLookBackMissingIndexRowCount() {
+        return after.beyondMaxLookBackMissingIndexRowCount;
+    };
+
+    public long getAfterRebuildBeyondMaxLookBackInvalidIndexRowCount() {
+        return after.beyondMaxLookBackInvalidIndexRowCount;
+    };
+
     private void addScannedDataRowCount(long count) {
         this.scannedDataRowCount += count;
     }
@@ -185,6 +203,14 @@ public class IndexToolVerificationResult {
         before.invalidIndexRowCount += count;
     }
 
+    private void addBeforeRebuildBeyondMaxLookBackMissingIndexRowCount(long count) {
+        before.beyondMaxLookBackMissingIndexRowCount += count;
+    }
+
+    private void addBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount(long count) {
+        before.beyondMaxLookBackInvalidIndexRowCount += count;
+    }
+
     private void addAfterRebuildValidIndexRowCount(long count) {
         after.validIndexRowCount += count;
     }
@@ -201,6 +227,14 @@ public class IndexToolVerificationResult {
         after.invalidIndexRowCount += count;
     }
 
+    private void addAfterRebuildBeyondMaxLookBackMissingIndexRowCount(long count) {
+        after.beyondMaxLookBackMissingIndexRowCount += count;
+    }
+
+    private void addAfterRebuildBeyondMaxLookBackInvalidIndexRowCount(long count) {
+        after.beyondMaxLookBackInvalidIndexRowCount += count;
+    }
+
     private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
         if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
                 AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
@@ -229,6 +263,10 @@ public class IndexToolVerificationResult {
             addBeforeRebuildMissingIndexRowCount(getValue(cell));
         } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
             addBeforeRebuildInvalidIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES)) {
+            addBeforeRebuildBeyondMaxLookBackMissingIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES)) {
+            addBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount(getValue(cell));
         } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
             addAfterRebuildValidIndexRowCount(getValue(cell));
         } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
@@ -237,6 +275,10 @@ public class IndexToolVerificationResult {
             addAfterRebuildMissingIndexRowCount(getValue(cell));
         } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
             addAfterRebuildInvalidIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES)) {
+            addAfterRebuildBeyondMaxLookBackMissingIndexRowCount(getValue(cell));
+        } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES)) {
+            addAfterRebuildBeyondMaxLookBackInvalidIndexRowCount(getValue(cell));
         }
     }
 
@@ -284,11 +326,13 @@ public class IndexToolVerificationResult {
         if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
             return false;
         } else if (verifyType == IndexTool.IndexVerifyType.ONLY) {
-            if (before.invalidIndexRowCount + before.missingIndexRowCount > 0) {
+            if (before.invalidIndexRowCount + before.missingIndexRowCount
+                    + before.beyondMaxLookBackInvalidIndexRowCount + before.beyondMaxLookBackMissingIndexRowCount > 0) {
                 return true;
             }
         } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
-            if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
+            if (after.invalidIndexRowCount + after.missingIndexRowCount
+                    + after.beyondMaxLookBackInvalidIndexRowCount + after.beyondMaxLookBackMissingIndexRowCount > 0) {
                 return true;
             }
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 0347811..95703a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -187,6 +187,10 @@ public class IndexTool extends Configured implements Tool {
     public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT);
     public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT = "BeforeRebuildInvalidIndexRowCount";
     public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT);
+    public static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = "BeforeRebuildBeyondMaxLookBackMissingIndexRowCount";
+    public static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT);
+    public static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = "BeforeRebuildBeyondMaxLookBackInvalidIndexRowCount";
+    public static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT);
     public static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT = "AfterValidExpiredIndexRowCount";
     public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT);
     public static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT = "AfterRebuildExpiredIndexRowCount";
@@ -195,6 +199,10 @@ public class IndexTool extends Configured implements Tool {
     public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT);
     public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT = "AfterRebuildInvalidIndexRowCount";
     public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT);
+    public static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = "AfterRebuildBeyondMaxLookBackMissingIndexRowCount";
+    public static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT);
+    public static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = "AfterRebuildBeyondMaxLookBackInvalidIndexRowCount";
+    public static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT);
     public static String  VERIFICATION_PHASE = "Phase";
     public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE);
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 8d1b4db..a24e3ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -80,6 +80,10 @@ public class PhoenixIndexImportDirectReducer extends
                         setValue(verificationResult.getBeforeRebuildMissingIndexRowCount());
                 context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).
                         setValue(verificationResult.getBeforeRebuildInvalidIndexRowCount());
+                context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).
+                        setValue(verificationResult.getBeforeRebuildBeyondMaxLookBackMissingIndexRowCount());
+                context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).
+                        setValue(verificationResult.getBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount());
             }
             if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
                 context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT).
@@ -90,6 +94,10 @@ public class PhoenixIndexImportDirectReducer extends
                         setValue(verificationResult.getAfterRebuildMissingIndexRowCount());
                 context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).
                         setValue(verificationResult.getAfterRebuildInvalidIndexRowCount());
+                context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).
+                        setValue(verificationResult.getAfterRebuildBeyondMaxLookBackMissingIndexRowCount());
+                context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).
+                        setValue(verificationResult.getAfterRebuildBeyondMaxLookBackInvalidIndexRowCount());
             }
             if (verificationResult.isVerificationFailed(verifyType)) {
                 throw new IOException("Index verification failed! " + verificationResult);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
index c10694d..b736787 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
@@ -28,8 +28,12 @@ public enum PhoenixIndexToolJobCounters {
     BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT,
     BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT,
     BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT,
+    BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT,
+    BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT,
     AFTER_REBUILD_VALID_INDEX_ROW_COUNT,
     AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT,
     AFTER_REBUILD_MISSING_INDEX_ROW_COUNT,
-    AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
+    AFTER_REBUILD_INVALID_INDEX_ROW_COUNT,
+    AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT,
+    AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
index 30961e3..5df041d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -32,16 +32,14 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
 import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.util.EnvironmentEdge;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.*;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -55,17 +53,13 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Properties;
+import java.util.*;
 
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
 import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_BYTES;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.when;
 
@@ -265,9 +259,12 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
         when(rebuildScanner.setIndexTableTTL(Matchers.anyInt())).thenCallRealMethod();
         when(rebuildScanner.setIndexMaintainer(Matchers.<IndexMaintainer>any())).thenCallRealMethod();
         when(rebuildScanner.setIndexKeyToMutationMap(Matchers.<Map>any())).thenCallRealMethod();
+        when(rebuildScanner.setMaxLookBackInMills(Matchers.anyLong())).thenCallRealMethod();
         rebuildScanner.setIndexTableTTL(HConstants.FOREVER);
         indexMaintainer = pIndexTable.getIndexMaintainer(pDataTable, pconn);
         rebuildScanner.setIndexMaintainer(indexMaintainer);
+        // set the maxLookBack to infinite to avoid the compaction
+        rebuildScanner.setMaxLookBackInMills(Long.MAX_VALUE);
     }
 
     private void initializeGlobalMockitoSetup() throws IOException {
@@ -354,7 +351,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
     @Test
     public void testVerifySingleIndexRow_expiredIndexRowCount_nonZero() throws IOException {
         IndexToolVerificationResult.PhaseResult
-                expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0);
+                expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0, 0, 0);
         for (Map.Entry<byte[], List<Mutation>>
                 entry : indexKeyToMutationMapLocal.entrySet()) {
             initializeLocalMockitoSetup(entry, TestType.EXPIRED);
@@ -365,7 +362,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
             assertTrue(actualPR.equals(expectedPR));
         }
     }
-    @Ignore
+
     @Test
     public void testVerifySingleIndexRow_invalidIndexRowCount_cellValue() throws IOException {
         IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
@@ -379,7 +376,6 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
         }
     }
 
-    @Ignore
     @Test
     public void testVerifySingleIndexRow_invalidIndexRowCount_emptyCell() throws IOException {
         IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
@@ -406,7 +402,6 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
         }
     }
 
-    @Ignore
     @Test
     public void testVerifySingleIndexRow_invalidIndexRowCount_extraCell() throws IOException {
         IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
@@ -428,33 +423,136 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
         rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
     }
 
+
+    // Test the major compaction on index table only.
+    // There is at least one expected mutation within maxLookBack that has its matching one in the actual list.
+    // However there are some expected mutations outside of maxLookBack, which matching ones in actual list may be compacted away.
+    // We will report such row as a valid row.
     @Test
-    public void testVerifySingleIndexRow_actualMutations_null() throws IOException {
-        byte [] validRowKey = getValidRowKey();
-        when(indexRow.getRow()).thenReturn(validRowKey);
-        when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(null);
-        exceptionRule.expect(DoNotRetryIOException.class);
-        exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
-        rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+    public void testVerifySingleIndexRow_compactionOnIndexTable_atLeastOneExpectedMutationWithinMaxLookBack() throws Exception {
+        String dataRowKey = "k1";
+        byte[] indexRowKey1Bytes = generateIndexRowKey(dataRowKey, "val1");
+        ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+        injectEdge.setValue(1);
+        EnvironmentEdgeManager.injectEdge(injectEdge);
+
+        List<Mutation> expectedMutations = new ArrayList<>();
+        List<Mutation> actualMutations = new ArrayList<>();
+        // change the maxLookBack from infinite to some interval, which allows to simulate the mutation beyond the maxLookBack window.
+        long maxLookbackInMills = 10 * 1000;
+        rebuildScanner.setMaxLookBackInMills(maxLookbackInMills);
+
+        Put put = new Put(indexRowKey1Bytes);
+        Cell cell = CellUtil.createCell(indexRowKey1Bytes,
+                                        QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                                        QueryConstants.EMPTY_COLUMN_BYTES,
+                                        EnvironmentEdgeManager.currentTimeMillis(),
+                                        KeyValue.Type.Put.getCode(),
+                                        IndexRegionObserver.VERIFIED_BYTES);
+        put.add(cell);
+        // This mutation is beyond maxLookBack, so add it to expectedMutations only.
+        expectedMutations.add(put);
+
+        // advance the time of maxLookBack, so last mutation will be outside of maxLookBack,
+        // next mutation will be within maxLookBack
+        injectEdge.incrementValue(maxLookbackInMills);
+        put =  new Put(indexRowKey1Bytes);
+        cell = CellUtil.createCell(indexRowKey1Bytes,
+                        QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                        QueryConstants.EMPTY_COLUMN_BYTES,
+                        EnvironmentEdgeManager.currentTimeMillis(),
+                        KeyValue.Type.Put.getCode(),
+                        IndexRegionObserver.VERIFIED_BYTES);
+        put.add(cell);
+        // This mutation is in both expectedMutations and actualMutations, as it is within the maxLookBack, so it will not get chance to be compacted away
+        expectedMutations.add(put);
+        actualMutations.add(put);
+        Result actualMutationsScanResult = Result.create(Arrays.asList(cell));
+
+        Map<byte[], List<Mutation>> indexKeyToMutationMap = Maps.newTreeMap((Bytes.BYTES_COMPARATOR));
+        indexKeyToMutationMap.put(indexRowKey1Bytes, expectedMutations);
+        rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMap);
+        when(rebuildScanner.prepareActualIndexMutations(any(Result.class))).thenReturn(actualMutations);
+
+        injectEdge.incrementValue(1);
+        IndexToolVerificationResult.PhaseResult actualPR = new IndexToolVerificationResult.PhaseResult();
+        // Report this validation as a success
+        assertTrue(rebuildScanner.verifySingleIndexRow(actualMutationsScanResult, actualPR));
+        // validIndexRowCount = 1
+        IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0);
+        assertTrue(actualPR.equals(expectedPR));
     }
 
+    // Test the major compaction on index table only.
+    // All expected mutations are beyond the maxLookBack, and there are no matching ones in the actual list because of major compaction.
+    // We will report such row as an invalid beyond maxLookBack row.
     @Test
-    public void testVerifySingleIndexRow_actualMutations_empty() throws IOException {
-        byte [] validRowKey = getValidRowKey();
-        when(indexRow.getRow()).thenReturn(validRowKey);
-        actualMutationList = new ArrayList<>();
-        when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(actualMutationList);
-        exceptionRule.expect(DoNotRetryIOException.class);
-        exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
-        rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+    public void testVerifySingleIndexRow_compactionOnIndexTable_noExpectedMutationWithinMaxLookBack() throws Exception {
+        String dataRowKey = "k1";
+        byte[] indexRowKey1Bytes = generateIndexRowKey(dataRowKey, "val1");
+        List<Mutation> expectedMutations = new ArrayList<>();
+        List<Mutation> actualMutations = new ArrayList<>();
+        // change the maxLookBack from infinite to some interval, which allows to simulate the mutation beyond the maxLookBack window.
+        long maxLookbackInMills = 10 * 1000;
+        rebuildScanner.setMaxLookBackInMills(maxLookbackInMills);
+
+        ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+        injectEdge.setValue(1);
+        EnvironmentEdgeManager.injectEdge(injectEdge);
+
+        Put put = new Put(indexRowKey1Bytes);
+        Cell cell = CellUtil.createCell(indexRowKey1Bytes,
+                QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                QueryConstants.EMPTY_COLUMN_BYTES,
+                EnvironmentEdgeManager.currentTimeMillis(),
+                KeyValue.Type.Put.getCode(),
+                VERIFIED_BYTES);
+        put.add(cell);
+        // This mutation is beyond maxLookBack, so add it to expectedMutations only.
+        expectedMutations.add(put);
+
+        injectEdge.incrementValue(maxLookbackInMills);
+        put =  new Put(indexRowKey1Bytes);
+        cell = CellUtil.createCell(indexRowKey1Bytes,
+                QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                QueryConstants.EMPTY_COLUMN_BYTES,
+                EnvironmentEdgeManager.currentTimeMillis(),
+                KeyValue.Type.Put.getCode(),
+                UNVERIFIED_BYTES);
+        put.add(cell);
+        // This mutation is actualMutations only, as it is an unverified put
+        actualMutations.add(put);
+        Result actualMutationsScanResult = Result.create(Arrays.asList(cell));
+
+        Map<byte[], List<Mutation>> indexKeyToMutationMap = Maps.newTreeMap((Bytes.BYTES_COMPARATOR));
+        indexKeyToMutationMap.put(indexRowKey1Bytes, expectedMutations);
+        rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMap);
+        when(rebuildScanner.prepareActualIndexMutations(any(Result.class))).thenReturn(actualMutations);
+
+        injectEdge.incrementValue(1);
+        IndexToolVerificationResult.PhaseResult actualPR = new IndexToolVerificationResult.PhaseResult();
+        // Report this validation as a failure
+        assertFalse(rebuildScanner.verifySingleIndexRow(actualMutationsScanResult, actualPR));
+        // beyondMaxLookBackInvalidIndexRowCount = 1
+        IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(0, 0, 0, 0, 0, 1);
+        assertTrue(actualPR.equals(expectedPR));
+    }
+
+    private static byte[] generateIndexRowKey(String dataRowKey, String dataVal){
+        List<Byte> idxKey = new ArrayList<>();
+        if (dataVal != null && !dataVal.isEmpty())
+            idxKey.addAll(com.google.common.primitives.Bytes.asList(Bytes.toBytes(dataVal)));
+        idxKey.add(QueryConstants.SEPARATOR_BYTE);
+        idxKey.addAll(com.google.common.primitives.Bytes.asList(Bytes.toBytes(dataRowKey)));
+        return com.google.common.primitives.Bytes.toArray(idxKey);
     }
 
     private IndexToolVerificationResult.PhaseResult getValidPhaseResult() {
-        return new IndexToolVerificationResult.PhaseResult(1,0,0,0);
+        return new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0);
     }
 
     private IndexToolVerificationResult.PhaseResult getInvalidPhaseResult() {
-        return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1);
+        return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1, 0, 0);
     }
 
     private void initializeLocalMockitoSetup(Map.Entry<byte[], List<Mutation>> entry,
@@ -537,7 +635,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
             newCell =
                     CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
                             Bytes.toBytes(UNEXPECTED_COLUMN),
-                            EnvironmentEdgeManager.currentTimeMillis(),
+                            c.getTimestamp(),
                             KeyValue.Type.Put.getCode(), Bytes.toBytes("zxcv"));
             newCellList.add(newCell);
             newCellList.add(c);
@@ -575,7 +673,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
     private Cell getVerifiedEmptyCell(Cell c) {
         return CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
                 indexMaintainer.getEmptyKeyValueQualifier(),
-                EnvironmentEdgeManager.currentTimeMillis(),
+                c.getTimestamp(),
                 KeyValue.Type.Put.getCode(), VERIFIED_BYTES);
     }