You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/11/23 05:40:46 UTC

[phoenix] branch master updated: PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one

This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new a48aa87  PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one
a48aa87 is described below

commit a48aa8766f8a448b6eb6bd8e7d3708ea95799130
Author: chenglei <ch...@apache.org>
AuthorDate: Sat Nov 23 13:39:45 2019 +0800

    PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one
    
    Co-authored-by: kadirozde kadir@apache.org
---
 .../hbase/index/builder/BaseIndexBuilder.java      |   4 +
 .../hbase/index/builder/IndexBuildManager.java     |  20 ++-
 .../phoenix/hbase/index/builder/IndexBuilder.java  |   3 +-
 .../hbase/index/covered/LocalTableState.java       |   2 +-
 .../hbase/index/covered/NonTxIndexBuilder.java     |   8 +-
 .../hbase/index/covered/data/CachedLocalTable.java |  72 +++++++++
 .../hbase/index/covered/data/LocalHBaseState.java  |   3 +-
 .../hbase/index/covered/data/LocalTable.java       |  98 ------------
 .../hbase/index/util/IndexManagementUtil.java      | 165 +++++++++++++++++----
 .../hbase/index/covered/LocalTableStateTest.java   | 122 ++++-----------
 .../hbase/index/covered/NonTxIndexBuilderTest.java |  32 +++-
 .../index/covered/TestCoveredColumnIndexCodec.java |   4 +-
 .../hbase/index/covered/data/TestLocalTable.java   |  63 --------
 13 files changed, 292 insertions(+), 304 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index a909cf2..5571512 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -133,4 +133,8 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
     public ReplayWrite getReplayWrite(Mutation m) {
         return null;
     }
+
+    public RegionCoprocessorEnvironment getEnv() {
+        return this.env;
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 165f0c9..2dcd6c6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-
 import com.google.common.collect.ListMultimap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -37,6 +36,8 @@ import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +50,7 @@ public class IndexBuildManager implements Stoppable {
   private static final Logger LOGGER = LoggerFactory.getLogger(IndexBuildManager.class);
   private final IndexBuilder delegate;
   private boolean stopped;
+  private RegionCoprocessorEnvironment regionCoprocessorEnvironment;
 
   /**
    * @param env environment in which <tt>this</tt> is running. Used to setup the
@@ -59,6 +61,7 @@ public class IndexBuildManager implements Stoppable {
     // Prevent deadlock by using single thread for all reads so that we know
     // we can get the ReentrantRWLock. See PHOENIX-2671 for more details.
     this.delegate = getIndexBuilder(env);
+    this.regionCoprocessorEnvironment = env;
   }
   
   private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
@@ -88,10 +91,14 @@ public class IndexBuildManager implements Stoppable {
       IndexMetaData indexMetaData) throws Throwable {
     // notify the delegate that we have started processing a batch
     this.delegate.batchStarted(miniBatchOp, indexMetaData);
-
+    CachedLocalTable cachedLocalTable =
+            IndexManagementUtil.preScanAllRequiredRows(
+                    mutations,
+                    (PhoenixIndexMetaData)indexMetaData,
+                    this.regionCoprocessorEnvironment.getRegion());
     // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
     for (Mutation m : mutations) {
-      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
+      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable);
       for (Pair<Mutation, byte[]> update : updates) {
         indexUpdates.put(new HTableInterfaceReference(new ImmutableBytesPtr(update.getSecond())), new Pair<>(update.getFirst(), m.getRow()));
       }
@@ -105,10 +112,15 @@ public class IndexBuildManager implements Stoppable {
     final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
     this.delegate.batchStarted(miniBatchOp, indexMetaData);
 
+    CachedLocalTable cachedLocalTable =
+            IndexManagementUtil.preScanAllRequiredRows(
+                    mutations,
+                    (PhoenixIndexMetaData)indexMetaData,
+                    this.regionCoprocessorEnvironment.getRegion());
     // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
     ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
     for (Mutation m : mutations) {
-      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
+      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable);
       if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
         for (Pair<Mutation, byte[]> update : updates) {
           update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index f561e79..7bce22e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
 
 /**
  * Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table
@@ -72,7 +73,7 @@ public interface IndexBuilder extends Stoppable {
    * @return a Map of the mutations to make -> target index table name
    * @throws IOException on failure
    */
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException;
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context, LocalHBaseState localHBaseState) throws IOException;
 
     /**
      * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 54d7f87..bf592ba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -161,7 +161,7 @@ public class LocalTableState implements TableState {
             // needing to lookup the prior row values.
             if (requiresPriorRowState) {
                 // add the current state of the row. Uses listCells() to avoid a new array creation.
-                this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
+                this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations), false);
             }
         }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 645f2c4..63d2751 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -20,7 +20,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
-import org.apache.phoenix.hbase.index.covered.data.LocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
 import org.slf4j.Logger;
@@ -38,18 +37,15 @@ import org.slf4j.LoggerFactory;
 public class NonTxIndexBuilder extends BaseIndexBuilder {
     private static final Logger LOGGER = LoggerFactory.getLogger(NonTxIndexBuilder.class);
 
-    protected LocalHBaseState localTable;
-
     @Override
     public void setup(RegionCoprocessorEnvironment env) throws IOException {
         super.setup(env);
-        this.localTable = new LocalTable(env);
     }
 
     @Override
-    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData) throws IOException {
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData, LocalHBaseState localHBaseState) throws IOException {
     	// create a state manager, so we can manage each batch
-        LocalTableState state = new LocalTableState(localTable, mutation);
+        LocalTableState state = new LocalTableState(localHBaseState, mutation);
         // build the index updates for each group
         IndexUpdateManager manager = new IndexUpdateManager(indexMetaData);
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
new file mode 100644
index 0000000..f0fbfe6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.data;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+
+import java.util.HashMap;
+
+public class CachedLocalTable implements LocalHBaseState {
+
+    private final HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells;
+
+    public CachedLocalTable(HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells) {
+        this.rowKeyPtrToCells = rowKeyPtrToCells;
+    }
+
+    @Override
+    public List<Cell> getCurrentRowState(
+            Mutation mutation,
+            Collection<? extends ColumnReference> columnReferences,
+            boolean ignoreNewerMutations) throws IOException {
+        byte[] rowKey = mutation.getRow();
+        List<Cell> cells = this.rowKeyPtrToCells.get(new ImmutableBytesPtr(rowKey));
+
+        if(cells == null || cells.isEmpty()) {
+            return cells;
+        }
+
+        if(!ignoreNewerMutations) {
+            return cells;
+        }
+        /**
+         * because of previous {@link IndexManagementUtil#flattenMutationsByTimestamp}(which is called
+         * in {@link IndexRegionObserver#groupMutations} or {@link Indexer#preBatchMutateWithExceptions}),
+         * all cells in the mutation have the same rowKey and timestamp.
+         */
+        long timestamp =
+                IndexManagementUtil.getMutationTimestampWhenAllCellTimestampIsSame(mutation);
+        List<Cell> newCells = new ArrayList<Cell>();
+        for(Cell cell : cells) {
+            if(cell.getTimestamp() < timestamp ) {
+                newCells.add(cell);
+            }
+        }
+        return newCells;
+    }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
index 5b06910..aae0763 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.hbase.index.covered.data;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -43,7 +44,7 @@ public interface LocalHBaseState {
    *         {@link Result} with no stored {@link Cell}s.
    * @throws IOException if there is an issue reading the row
    */
-  public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
+  public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
       throws IOException;
 
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
deleted file mode 100644
index 402620f..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered.data;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import com.google.common.primitives.Longs;
-
-/**
- * Wrapper around a lazily instantiated, local HTable.
- * <p>
- * Previously, we had used various row and batch caches. However, this ends up being very
- * complicated when attempting manage updating and invalidating the cache with no real gain as any
- * row accessed multiple times will likely be in HBase's block cache, invalidating any extra caching
- * we are doing here. In the end, its simpler and about as efficient to just get the current state
- * of the row from HBase and let HBase manage caching the row from disk on its own.
- */
-public class LocalTable implements LocalHBaseState {
-
-  private RegionCoprocessorEnvironment env;
-
-  public LocalTable(RegionCoprocessorEnvironment env) {
-    this.env = env;
-  }
-
-  @Override
-  public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations)
-      throws IOException {
-    byte[] row = m.getRow();
-    // need to use a scan here so we can get raw state, which Get doesn't provide.
-    Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns));
-    s.setStartRow(row);
-    s.setStopRow(row);
-    if (ignoreNewerMutations) {
-        // Provides a means of client indicating that newer cells should not be considered,
-        // enabling mutations to be replayed to partially rebuild the index when a write fails.
-        // When replaying mutations we want the oldest timestamp (as anything newer we be replayed)
-        long ts = getOldestTimestamp(m.getFamilyCellMap().values());
-        s.setTimeRange(0,ts);
-    }
-    Region region = this.env.getRegion();
-    try (RegionScanner scanner = region.getScanner(s)) {
-      List<Cell> kvs = new ArrayList<Cell>(1);
-      boolean more = scanner.next(kvs);
-      assert !more : "Got more than one result when scanning"
-          + " a single row in the primary table!";
-
-      Result r = Result.create(kvs);
-      return r;
-    }
-  }
-
-    // Returns the smallest timestamp in the given cell lists.
-    // It is assumed that the lists have cells ordered from largest to smallest timestamp
-    protected long getOldestTimestamp(Collection<List<Cell>> cellLists) {
-        Ordering<List<Cell>> cellListOrdering = new Ordering<List<Cell>>() {
-            @Override
-            public int compare(List<Cell> left, List<Cell> right) {
-                // compare the last element of each list, since that is the smallest in that list
-                return Longs.compare(Iterables.getLast(left).getTimestamp(),
-                    Iterables.getLast(right).getTimestamp());
-            }
-        };
-        List<Cell> minList = cellListOrdering.min(cellLists);
-        return Iterables.getLast(minList).getTimestamp();
-    }
-}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index a027f54..7ed7a2a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -23,8 +23,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -36,18 +38,30 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.builder.FatalIndexBuildingFailureException;
 import org.apache.phoenix.hbase.index.builder.IndexBuildingFailureException;
 import org.apache.phoenix.hbase.index.covered.Batch;
+import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
 import org.apache.phoenix.hbase.index.covered.data.LazyValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PVarbinary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.Indexer;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.primitives.Longs;
 
 /**
@@ -259,35 +273,124 @@ public class IndexManagementUtil {
     }
 
     public static Collection<? extends Mutation> flattenMutationsByTimestamp(Collection<? extends Mutation> mutations) {
-          List<Mutation> flattenedMutations = Lists.newArrayListWithExpectedSize(mutations.size() * 10);
-          for (Mutation m : mutations) {
-              byte[] row = m.getRow();
-              Collection<Batch> batches = createTimestampBatchesFromMutation(m);
-              for (Batch batch : batches) {
-                  Mutation mWithSameTS;
-                  Cell firstCell = batch.getKvs().get(0);
-                  if (KeyValue.Type.codeToType(firstCell.getTypeByte()) == KeyValue.Type.Put) {
-                      mWithSameTS = new Put(row);
-                  } else {
-                      mWithSameTS = new Delete(row);
-                  }
-                  if (m.getAttributesMap() != null) {
-                      for (Map.Entry<String,byte[]> entry : m.getAttributesMap().entrySet()) {
-                          mWithSameTS.setAttribute(entry.getKey(), entry.getValue());
-                      }
-                  }
-                  for (Cell cell : batch.getKvs()) {
-                      byte[] fam = CellUtil.cloneFamily(cell);
-                      List<Cell> famCells = mWithSameTS.getFamilyCellMap().get(fam);
-                      if (famCells == null) {
-                          famCells = Lists.newArrayList();
-                          mWithSameTS.getFamilyCellMap().put(fam, famCells);
-                      }
-                      famCells.add(cell);
-                  }
-                  flattenedMutations.add(mWithSameTS);
-              }
-          }
-          return flattenedMutations;
-      }
+        List<Mutation> flattenedMutations = Lists.newArrayListWithExpectedSize(mutations.size() * 10);
+        for (Mutation m : mutations) {
+            byte[] row = m.getRow();
+            Collection<Batch> batches = createTimestampBatchesFromMutation(m);
+            for (Batch batch : batches) {
+                Mutation mWithSameTS;
+                Cell firstCell = batch.getKvs().get(0);
+                if (KeyValue.Type.codeToType(firstCell.getTypeByte()) == KeyValue.Type.Put) {
+                    mWithSameTS = new Put(row);
+                } else {
+                    mWithSameTS = new Delete(row);
+                }
+                if (m.getAttributesMap() != null) {
+                    for (Map.Entry<String,byte[]> entry : m.getAttributesMap().entrySet()) {
+                        mWithSameTS.setAttribute(entry.getKey(), entry.getValue());
+                    }
+                }
+                for (Cell cell : batch.getKvs()) {
+                    byte[] fam = CellUtil.cloneFamily(cell);
+                    List<Cell> famCells = mWithSameTS.getFamilyCellMap().get(fam);
+                    if (famCells == null) {
+                        famCells = Lists.newArrayList();
+                        mWithSameTS.getFamilyCellMap().put(fam, famCells);
+                    }
+                    famCells.add(cell);
+                }
+                flattenedMutations.add(mWithSameTS);
+            }
+        }
+        return flattenedMutations;
+    }
+
+    /**
+     * Pre-scan all the required rows before we building the indexes for the dataTableMutationsWithSameRowKeyAndTimestamp
+     * parameter.
+     * Note: When we calling this method, for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp
+     * parameter, all cells in the mutation have the same rowKey and timestamp.
+     * @param dataTableMutationsWithSameRowKeyAndTimestamp
+     * @param indexMetaData
+     * @param region
+     * @throws IOException
+     */
+    public static CachedLocalTable preScanAllRequiredRows(
+            Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp,
+            final PhoenixIndexMetaData indexMetaData,
+            Region region) throws IOException {
+        List<IndexMaintainer> indexTableMaintainers = indexMetaData.getIndexMaintainers();
+        Set<KeyRange> keys = new HashSet<KeyRange>(dataTableMutationsWithSameRowKeyAndTimestamp.size());
+        for (Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(mutation.getRow()));
+        }
+
+        Set<ColumnReference> getterColumnReferences = Sets.newHashSet();
+        for (IndexMaintainer indexTableMaintainer : indexTableMaintainers) {
+            getterColumnReferences.addAll(
+                    indexTableMaintainer.getAllColumns());
+        }
+
+        getterColumnReferences.add(new ColumnReference(
+                indexTableMaintainers.get(0).getDataEmptyKeyValueCF(),
+                indexTableMaintainers.get(0).getEmptyKeyValueQualifier()));
+
+        Scan scan = IndexManagementUtil.newLocalStateScan(
+                Collections.singletonList(getterColumnReferences));
+        ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys));
+        scanRanges.initializeScan(scan);
+        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+
+        if(indexMetaData.getReplayWrite() != null) {
+            /**
+             * Because of previous {@link IndexManagementUtil#flattenMutationsByTimestamp}(which is called
+             * in {@link IndexRegionObserver#groupMutations} or {@link Indexer#preBatchMutateWithExceptions}),
+             * for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp, all cells in the mutation
+             * have the same rowKey and timestamp.
+             */
+            long timestamp = getMaxTimestamp(dataTableMutationsWithSameRowKeyAndTimestamp);
+            scan.setTimeRange(0, timestamp);
+            scan.setFilter(new SkipScanFilter(skipScanFilter, true));
+        } else {
+            assert scan.isRaw();
+            scan.setMaxVersions(1);
+            scan.setFilter(skipScanFilter);
+        }
+
+        HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
+                new HashMap<ImmutableBytesPtr, List<Cell>>();
+        try (RegionScanner scanner = region.getScanner(scan)) {
+            boolean more = true;
+            while(more) {
+                List<Cell> cells = new ArrayList<Cell>();
+                more = scanner.next(cells);
+                if (cells.isEmpty()) {
+                    continue;
+                }
+                Cell cell = cells.get(0);
+                byte[] rowKey = CellUtil.cloneRow(cell);
+                rowKeyPtrToCells.put(new ImmutableBytesPtr(rowKey), cells);
+            }
+        }
+
+        return new CachedLocalTable(rowKeyPtrToCells);
+    }
+
+    private static long getMaxTimestamp(Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp) {
+        long maxTimestamp = Long.MIN_VALUE;
+        for(Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) {
+            /**
+             * all the cells in this mutation have the same timestamp.
+             */
+            long timestamp = getMutationTimestampWhenAllCellTimestampIsSame(mutation);
+            if(timestamp > maxTimestamp) {
+                maxTimestamp = timestamp;
+            }
+        }
+        return maxTimestamp;
+    }
+
+    public static long getMutationTimestampWhenAllCellTimestampIsSame(Mutation mutation) {
+        return mutation.getFamilyCellMap().values().iterator().next().get(0).getTimestamp();
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
index f07caf7..272bb26 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
@@ -20,9 +20,12 @@ package org.apache.phoenix.hbase.index.covered;
 import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.KeyValueUtil;
@@ -35,12 +38,12 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
+import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
-import org.apache.phoenix.hbase.index.covered.data.LocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
-import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.util.ScanUtil;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -88,25 +91,18 @@ public class LocalTableStateTest {
 
     Region region = Mockito.mock(Region.class);
     Mockito.when(env.getRegion()).thenReturn(region);
-    RegionScanner scanner = Mockito.mock(RegionScanner.class);
-    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
     final byte[] stored = Bytes.toBytes("stored-value");
-    Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
-      @Override
-      public Boolean answer(InvocationOnMock invocation) throws Throwable {
-        List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
-        KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
-        kv.setSequenceId(0);
-        list.add(kv);
-        return false;
-      }
-    });
 
 
-    LocalHBaseState state = new LocalTable(env);
-    LocalTableState table = new LocalTableState(state, m);
+    KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
+    kv.setSequenceId(0);
+    HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
+            new  HashMap<ImmutableBytesPtr, List<Cell>>();
+    rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)kv));
+    CachedLocalTable cachedLocalTable = new CachedLocalTable(rowKeyPtrToCells);
+    LocalTableState table = new LocalTableState(cachedLocalTable, m);
     //add the kvs from the mutation
-    table.addPendingUpdates(m.get(fam, qual));
+    table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
 
     // setup the lookup
     ColumnReference col = new ColumnReference(fam, qual);
@@ -122,48 +118,6 @@ public class LocalTableStateTest {
           super(msg);
       }
   }
-  
-  @Test(expected = ScannerCreatedException.class)
-  public void testScannerForMutableRows() throws Exception {
-      IndexMetaData indexMetaData = new IndexMetaData() {
-
-          @Override
-          public ReplayWrite getReplayWrite() {
-              return null;
-          }
-
-        @Override
-        public boolean requiresPriorRowState(Mutation m) {
-            return true;
-        }
-            
-        @Override
-        public int getClientVersion() {
-            return ScanUtil.UNKNOWN_CLIENT_VERSION;
-        }
-
-    };
-    Put m = new Put(row);
-    m.addColumn(fam, qual, ts, val);
-    // setup mocks
-    Configuration conf = new Configuration(false);
-    RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class);
-    Mockito.when(env.getConfiguration()).thenReturn(conf);
-
-    Region region = Mockito.mock(Region.class);
-    Mockito.when(env.getRegion()).thenReturn(region);
-    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable"));
-
-    LocalHBaseState state = new LocalTable(env);
-    LocalTableState table = new LocalTableState(state, m);
-    //add the kvs from the mutation
-    table.addPendingUpdates(m.get(fam, qual));
-
-    // setup the lookup
-    ColumnReference col = new ColumnReference(fam, qual);
-    table.setCurrentTimestamp(ts);
-    table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
-  }
 
   @Test
   public void testNoScannerForImmutableRows() throws Exception {
@@ -194,12 +148,11 @@ public class LocalTableStateTest {
 
     Region region = Mockito.mock(Region.class);
     Mockito.when(env.getRegion()).thenReturn(region);
-    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable"));
 
-    LocalHBaseState state = new LocalTable(env);
-    LocalTableState table = new LocalTableState(state, m);
+    CachedLocalTable cachedLocalTable = new CachedLocalTable(null);
+    LocalTableState table = new LocalTableState(cachedLocalTable, m);
     //add the kvs from the mutation
-    table.addPendingUpdates(m.get(fam, qual));
+    table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
 
     // setup the lookup
     ColumnReference col = new ColumnReference(fam, qual);
@@ -224,24 +177,18 @@ public class LocalTableStateTest {
 
     Region region = Mockito.mock(Region.class);
     Mockito.when(env.getRegion()).thenReturn(region);
-    RegionScanner scanner = Mockito.mock(RegionScanner.class);
-    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
     final byte[] stored = Bytes.toBytes("stored-value");
     final KeyValue storedKv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
     storedKv.setSequenceId(2);
-    Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
-      @Override
-      public Boolean answer(InvocationOnMock invocation) throws Throwable {
-        List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
 
-        list.add(storedKv);
-        return false;
-      }
-    });
-    LocalHBaseState state = new LocalTable(env);
-    LocalTableState table = new LocalTableState(state, m);
+    HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
+            new  HashMap<ImmutableBytesPtr, List<Cell>>();
+    rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv));
+    CachedLocalTable cachedLocalTable = new CachedLocalTable(rowKeyPtrToCells);
+    LocalTableState table = new LocalTableState(cachedLocalTable, m);
+
     // add the kvs from the mutation
-    KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(m.get(fam, qual).get(0));
+    KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0));
     kv.setSequenceId(0);
     table.addPendingUpdates(kv);
 
@@ -258,8 +205,6 @@ public class LocalTableStateTest {
     p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
     s = p.getFirst();
     assertEquals("Didn't correctly rollback the row - still found it!", null, s.next());
-    Mockito.verify(env, Mockito.times(1)).getRegion();
-    Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class));
   }
 
   @SuppressWarnings("unchecked")
@@ -270,24 +215,19 @@ public class LocalTableStateTest {
 
     Region region = Mockito.mock(Region.class);
     Mockito.when(env.getRegion()).thenReturn(region);
-    RegionScanner scanner = Mockito.mock(RegionScanner.class);
-    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
     final KeyValue storedKv =
         new KeyValue(row, fam, qual, ts, Type.Put, Bytes.toBytes("stored-value"));
     storedKv.setSequenceId(2);
-    Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
-      @Override
-      public Boolean answer(InvocationOnMock invocation) throws Throwable {
-        List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
 
-        list.add(storedKv);
-        return false;
-      }
-    });
-    LocalHBaseState state = new LocalTable(env);
+
     Put pendingUpdate = new Put(row);
     pendingUpdate.addColumn(fam, qual, ts, val);
-    LocalTableState table = new LocalTableState(state, pendingUpdate);
+    HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
+            new  HashMap<ImmutableBytesPtr, List<Cell>>();
+    rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv));
+    CachedLocalTable cachedLocalTable = new CachedLocalTable(rowKeyPtrToCells);
+    LocalTableState table = new LocalTableState(cachedLocalTable, pendingUpdate);
+
 
     // do the lookup for the given column
     ColumnReference col = new ColumnReference(fam, qual);
@@ -303,8 +243,6 @@ public class LocalTableStateTest {
     p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
     s = p.getFirst();
     assertEquals("Lost already loaded update!", storedKv, s.next());
-    Mockito.verify(env, Mockito.times(1)).getRegion();
-    Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class));
   }
 
   // TODO add test here for making sure multiple column references with the same column family don't
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
index f9c6798..e6785bf 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.BaseRegionScanner;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.MultiMutation;
-import org.apache.phoenix.hbase.index.covered.data.LocalTable;
+import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -149,6 +149,7 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
 
         mockIndexMetaData = Mockito.mock(PhoenixIndexMetaData.class);
         Mockito.when(mockIndexMetaData.requiresPriorRowState((Mutation)Mockito.any())).thenReturn(true);
+        Mockito.when(mockIndexMetaData.getReplayWrite()).thenReturn(null);
         Mockito.when(mockIndexMetaData.getIndexMaintainers())
                 .thenReturn(Collections.singletonList(getTestIndexMaintainer()));
 
@@ -212,8 +213,13 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
         MultiMutation mutation = new MultiMutation(new ImmutableBytesPtr(ROW));
         mutation.addAll(put);
 
+        CachedLocalTable cachedLocalTable = IndexManagementUtil.preScanAllRequiredRows(
+                Collections.singletonList(mutation),
+                this.mockIndexMetaData,
+                this.indexBuilder.getEnv().getRegion());
+
         Collection<Pair<Mutation, byte[]>> indexUpdates =
-                indexBuilder.getIndexUpdate(mutation, mockIndexMetaData);
+                indexBuilder.getIndexUpdate(mutation, mockIndexMetaData, cachedLocalTable);
         assertEquals(2, indexUpdates.size());
         assertContains(indexUpdates, 2, ROW, KeyValue.Type.DeleteFamily, FAM,
             new byte[0] /* qual not needed */, 2);
@@ -254,8 +260,16 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
         mutation.addAll(put);
 
         Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList();
-        for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) {
-            indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData));
+        Collection<? extends Mutation> mutations =
+                IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation));
+
+        CachedLocalTable cachedLocalTable = IndexManagementUtil.preScanAllRequiredRows(
+                mutations,
+                this.mockIndexMetaData,
+                this.indexBuilder.getEnv().getRegion());
+
+        for (Mutation m : mutations) {
+            indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable));
         }
         // 3 puts and 3 deletes (one to hide existing index row for VALUE_1, and two to hide index
         // rows for VALUE_2, VALUE_3)
@@ -287,9 +301,17 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
         MultiMutation mutation = getMultipleVersionMutation(200);
         currentRowCells = mutation.getFamilyCellMap().get(FAM);
 
+        Collection<? extends Mutation> mutations =
+                IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation));
+
+        CachedLocalTable cachedLocalTable = IndexManagementUtil.preScanAllRequiredRows(
+                mutations,
+                this.mockIndexMetaData,
+                this.indexBuilder.getEnv().getRegion());
+
         Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList();
         for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) {
-            indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData));
+            indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable));
         }
         assertNotEquals(0, indexUpdates.size());
     }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
index 571ed85..8ed61e8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
@@ -134,9 +134,9 @@ public class TestCoveredColumnIndexCodec {
     }
 
     @Override
-    public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly)
+    public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly)
         throws IOException {
-      return r;
+      return r.listCells();
     }
 
   }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java
deleted file mode 100644
index b11ac8d..0000000
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered.data;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-
-public class TestLocalTable {
-    private static final byte[] ROW = Bytes.toBytes("test_row");
-
-    @Test
-    public void testGetOldestTimestamp() {
-        LocalTable localTable = new LocalTable(null);
-
-        List<Cell> cellList1 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 4L));
-        assertEquals(4L, localTable.getOldestTimestamp(Collections.singletonList(cellList1)));
-
-        List<Cell> cellList2 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 2L));
-        List<List<Cell>> set1 = new ArrayList<>(Arrays.asList(cellList1, cellList2));
-        assertEquals(2L, localTable.getOldestTimestamp(set1));
-
-        List<Cell> cellList3 = getCellList(new KeyValue(ROW, 1L));
-        set1.add(cellList3);
-        assertEquals(1L, localTable.getOldestTimestamp(set1));
-
-        List<Cell> cellList4 =
-                getCellList(new KeyValue(ROW, 3L), new KeyValue(ROW, 1L), new KeyValue(ROW, 0L));
-        set1.add(cellList4);
-        assertEquals(0L, localTable.getOldestTimestamp(set1));
-    }
-
-    private List<Cell> getCellList(KeyValue... kvs) {
-        List<Cell> cellList = new ArrayList<>();
-        for (KeyValue kv : kvs) {
-            cellList.add(kv);
-        }
-        return cellList;
-    }
-}