You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/11/23 06:09:23 UTC

[phoenix] branch master updated: Revert "PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one"

This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ef37d9  Revert "PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one"
1ef37d9 is described below

commit 1ef37d98ac0fab7ae004e8d39dd438a2fc639fe3
Author: chenglei <ch...@apache.org>
AuthorDate: Sat Nov 23 14:08:37 2019 +0800

    Revert "PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one"
    
    This reverts commit a48aa8766f8a448b6eb6bd8e7d3708ea95799130.
---
 .../hbase/index/builder/BaseIndexBuilder.java      |   4 -
 .../hbase/index/builder/IndexBuildManager.java     |  20 +--
 .../phoenix/hbase/index/builder/IndexBuilder.java  |   3 +-
 .../hbase/index/covered/LocalTableState.java       |   2 +-
 .../hbase/index/covered/NonTxIndexBuilder.java     |   8 +-
 .../hbase/index/covered/data/CachedLocalTable.java |  72 ---------
 .../hbase/index/covered/data/LocalHBaseState.java  |   3 +-
 .../hbase/index/covered/data/LocalTable.java       |  98 ++++++++++++
 .../hbase/index/util/IndexManagementUtil.java      | 165 ++++-----------------
 .../hbase/index/covered/LocalTableStateTest.java   | 122 +++++++++++----
 .../hbase/index/covered/NonTxIndexBuilderTest.java |  32 +---
 .../index/covered/TestCoveredColumnIndexCodec.java |   4 +-
 .../hbase/index/covered/data/TestLocalTable.java   |  63 ++++++++
 13 files changed, 304 insertions(+), 292 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index 5571512..a909cf2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -133,8 +133,4 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
     public ReplayWrite getReplayWrite(Mutation m) {
         return null;
     }
-
-    public RegionCoprocessorEnvironment getEnv() {
-        return this.env;
-    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 2dcd6c6..165f0c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+
 import com.google.common.collect.ListMultimap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -36,8 +37,6 @@ import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +49,6 @@ public class IndexBuildManager implements Stoppable {
   private static final Logger LOGGER = LoggerFactory.getLogger(IndexBuildManager.class);
   private final IndexBuilder delegate;
   private boolean stopped;
-  private RegionCoprocessorEnvironment regionCoprocessorEnvironment;
 
   /**
    * @param env environment in which <tt>this</tt> is running. Used to setup the
@@ -61,7 +59,6 @@ public class IndexBuildManager implements Stoppable {
     // Prevent deadlock by using single thread for all reads so that we know
     // we can get the ReentrantRWLock. See PHOENIX-2671 for more details.
     this.delegate = getIndexBuilder(env);
-    this.regionCoprocessorEnvironment = env;
   }
   
   private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
@@ -91,14 +88,10 @@ public class IndexBuildManager implements Stoppable {
       IndexMetaData indexMetaData) throws Throwable {
     // notify the delegate that we have started processing a batch
     this.delegate.batchStarted(miniBatchOp, indexMetaData);
-    CachedLocalTable cachedLocalTable =
-            IndexManagementUtil.preScanAllRequiredRows(
-                    mutations,
-                    (PhoenixIndexMetaData)indexMetaData,
-                    this.regionCoprocessorEnvironment.getRegion());
+
     // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
     for (Mutation m : mutations) {
-      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable);
+      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
       for (Pair<Mutation, byte[]> update : updates) {
         indexUpdates.put(new HTableInterfaceReference(new ImmutableBytesPtr(update.getSecond())), new Pair<>(update.getFirst(), m.getRow()));
       }
@@ -112,15 +105,10 @@ public class IndexBuildManager implements Stoppable {
     final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
     this.delegate.batchStarted(miniBatchOp, indexMetaData);
 
-    CachedLocalTable cachedLocalTable =
-            IndexManagementUtil.preScanAllRequiredRows(
-                    mutations,
-                    (PhoenixIndexMetaData)indexMetaData,
-                    this.regionCoprocessorEnvironment.getRegion());
     // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
     ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
     for (Mutation m : mutations) {
-      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable);
+      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
       if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
         for (Pair<Mutation, byte[]> update : updates) {
           update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index 7bce22e..f561e79 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
 
 /**
  * Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table
@@ -73,7 +72,7 @@ public interface IndexBuilder extends Stoppable {
    * @return a Map of the mutations to make -> target index table name
    * @throws IOException on failure
    */
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context, LocalHBaseState localHBaseState) throws IOException;
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException;
 
     /**
      * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index bf592ba..54d7f87 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -161,7 +161,7 @@ public class LocalTableState implements TableState {
             // needing to lookup the prior row values.
             if (requiresPriorRowState) {
                 // add the current state of the row. Uses listCells() to avoid a new array creation.
-                this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations), false);
+                this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
             }
         }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 63d2751..645f2c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -20,6 +20,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
+import org.apache.phoenix.hbase.index.covered.data.LocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
 import org.slf4j.Logger;
@@ -37,15 +38,18 @@ import org.slf4j.LoggerFactory;
 public class NonTxIndexBuilder extends BaseIndexBuilder {
     private static final Logger LOGGER = LoggerFactory.getLogger(NonTxIndexBuilder.class);
 
+    protected LocalHBaseState localTable;
+
     @Override
     public void setup(RegionCoprocessorEnvironment env) throws IOException {
         super.setup(env);
+        this.localTable = new LocalTable(env);
     }
 
     @Override
-    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData, LocalHBaseState localHBaseState) throws IOException {
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData) throws IOException {
     	// create a state manager, so we can manage each batch
-        LocalTableState state = new LocalTableState(localHBaseState, mutation);
+        LocalTableState state = new LocalTableState(localTable, mutation);
         // build the index updates for each group
         IndexUpdateManager manager = new IndexUpdateManager(indexMetaData);
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
deleted file mode 100644
index f0fbfe6..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered.data;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-
-import java.util.HashMap;
-
-public class CachedLocalTable implements LocalHBaseState {
-
-    private final HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells;
-
-    public CachedLocalTable(HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells) {
-        this.rowKeyPtrToCells = rowKeyPtrToCells;
-    }
-
-    @Override
-    public List<Cell> getCurrentRowState(
-            Mutation mutation,
-            Collection<? extends ColumnReference> columnReferences,
-            boolean ignoreNewerMutations) throws IOException {
-        byte[] rowKey = mutation.getRow();
-        List<Cell> cells = this.rowKeyPtrToCells.get(new ImmutableBytesPtr(rowKey));
-
-        if(cells == null || cells.isEmpty()) {
-            return cells;
-        }
-
-        if(!ignoreNewerMutations) {
-            return cells;
-        }
-        /**
-         * because of previous {@link IndexManagementUtil#flattenMutationsByTimestamp}(which is called
-         * in {@link IndexRegionObserver#groupMutations} or {@link Indexer#preBatchMutateWithExceptions}),
-         * all cells in the mutation have the same rowKey and timestamp.
-         */
-        long timestamp =
-                IndexManagementUtil.getMutationTimestampWhenAllCellTimestampIsSame(mutation);
-        List<Cell> newCells = new ArrayList<Cell>();
-        for(Cell cell : cells) {
-            if(cell.getTimestamp() < timestamp ) {
-                newCells.add(cell);
-            }
-        }
-        return newCells;
-    }
-
-}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
index aae0763..5b06910 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.hbase.index.covered.data;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -44,7 +43,7 @@ public interface LocalHBaseState {
    *         {@link Result} with no stored {@link Cell}s.
    * @throws IOException if there is an issue reading the row
    */
-  public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
+  public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
       throws IOException;
 
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
new file mode 100644
index 0000000..402620f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.data;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import com.google.common.primitives.Longs;
+
+/**
+ * Wrapper around a lazily instantiated, local HTable.
+ * <p>
+ * Previously, we had used various row and batch caches. However, this ends up being very
+ * complicated when attempting manage updating and invalidating the cache with no real gain as any
+ * row accessed multiple times will likely be in HBase's block cache, invalidating any extra caching
+ * we are doing here. In the end, its simpler and about as efficient to just get the current state
+ * of the row from HBase and let HBase manage caching the row from disk on its own.
+ */
+public class LocalTable implements LocalHBaseState {
+
+  private RegionCoprocessorEnvironment env;
+
+  public LocalTable(RegionCoprocessorEnvironment env) {
+    this.env = env;
+  }
+
+  @Override
+  public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations)
+      throws IOException {
+    byte[] row = m.getRow();
+    // need to use a scan here so we can get raw state, which Get doesn't provide.
+    Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns));
+    s.setStartRow(row);
+    s.setStopRow(row);
+    if (ignoreNewerMutations) {
+        // Provides a means of client indicating that newer cells should not be considered,
+        // enabling mutations to be replayed to partially rebuild the index when a write fails.
+        // When replaying mutations we want the oldest timestamp (as anything newer we be replayed)
+        long ts = getOldestTimestamp(m.getFamilyCellMap().values());
+        s.setTimeRange(0,ts);
+    }
+    Region region = this.env.getRegion();
+    try (RegionScanner scanner = region.getScanner(s)) {
+      List<Cell> kvs = new ArrayList<Cell>(1);
+      boolean more = scanner.next(kvs);
+      assert !more : "Got more than one result when scanning"
+          + " a single row in the primary table!";
+
+      Result r = Result.create(kvs);
+      return r;
+    }
+  }
+
+    // Returns the smallest timestamp in the given cell lists.
+    // It is assumed that the lists have cells ordered from largest to smallest timestamp
+    protected long getOldestTimestamp(Collection<List<Cell>> cellLists) {
+        Ordering<List<Cell>> cellListOrdering = new Ordering<List<Cell>>() {
+            @Override
+            public int compare(List<Cell> left, List<Cell> right) {
+                // compare the last element of each list, since that is the smallest in that list
+                return Longs.compare(Iterables.getLast(left).getTimestamp(),
+                    Iterables.getLast(right).getTimestamp());
+            }
+        };
+        List<Cell> minList = cellListOrdering.min(cellLists);
+        return Iterables.getLast(minList).getTimestamp();
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index 7ed7a2a..a027f54 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -23,10 +23,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -38,30 +36,18 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.builder.FatalIndexBuildingFailureException;
 import org.apache.phoenix.hbase.index.builder.IndexBuildingFailureException;
 import org.apache.phoenix.hbase.index.covered.Batch;
-import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
 import org.apache.phoenix.hbase.index.covered.data.LazyValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
-import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.index.PhoenixIndexMetaData;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.types.PVarbinary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.phoenix.hbase.index.IndexRegionObserver;
-import org.apache.phoenix.hbase.index.Indexer;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import com.google.common.primitives.Longs;
 
 /**
@@ -273,124 +259,35 @@ public class IndexManagementUtil {
     }
 
     public static Collection<? extends Mutation> flattenMutationsByTimestamp(Collection<? extends Mutation> mutations) {
-        List<Mutation> flattenedMutations = Lists.newArrayListWithExpectedSize(mutations.size() * 10);
-        for (Mutation m : mutations) {
-            byte[] row = m.getRow();
-            Collection<Batch> batches = createTimestampBatchesFromMutation(m);
-            for (Batch batch : batches) {
-                Mutation mWithSameTS;
-                Cell firstCell = batch.getKvs().get(0);
-                if (KeyValue.Type.codeToType(firstCell.getTypeByte()) == KeyValue.Type.Put) {
-                    mWithSameTS = new Put(row);
-                } else {
-                    mWithSameTS = new Delete(row);
-                }
-                if (m.getAttributesMap() != null) {
-                    for (Map.Entry<String,byte[]> entry : m.getAttributesMap().entrySet()) {
-                        mWithSameTS.setAttribute(entry.getKey(), entry.getValue());
-                    }
-                }
-                for (Cell cell : batch.getKvs()) {
-                    byte[] fam = CellUtil.cloneFamily(cell);
-                    List<Cell> famCells = mWithSameTS.getFamilyCellMap().get(fam);
-                    if (famCells == null) {
-                        famCells = Lists.newArrayList();
-                        mWithSameTS.getFamilyCellMap().put(fam, famCells);
-                    }
-                    famCells.add(cell);
-                }
-                flattenedMutations.add(mWithSameTS);
-            }
-        }
-        return flattenedMutations;
-    }
-
-    /**
-     * Pre-scan all the required rows before we building the indexes for the dataTableMutationsWithSameRowKeyAndTimestamp
-     * parameter.
-     * Note: When we calling this method, for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp
-     * parameter, all cells in the mutation have the same rowKey and timestamp.
-     * @param dataTableMutationsWithSameRowKeyAndTimestamp
-     * @param indexMetaData
-     * @param region
-     * @throws IOException
-     */
-    public static CachedLocalTable preScanAllRequiredRows(
-            Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp,
-            final PhoenixIndexMetaData indexMetaData,
-            Region region) throws IOException {
-        List<IndexMaintainer> indexTableMaintainers = indexMetaData.getIndexMaintainers();
-        Set<KeyRange> keys = new HashSet<KeyRange>(dataTableMutationsWithSameRowKeyAndTimestamp.size());
-        for (Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) {
-            keys.add(PVarbinary.INSTANCE.getKeyRange(mutation.getRow()));
-        }
-
-        Set<ColumnReference> getterColumnReferences = Sets.newHashSet();
-        for (IndexMaintainer indexTableMaintainer : indexTableMaintainers) {
-            getterColumnReferences.addAll(
-                    indexTableMaintainer.getAllColumns());
-        }
-
-        getterColumnReferences.add(new ColumnReference(
-                indexTableMaintainers.get(0).getDataEmptyKeyValueCF(),
-                indexTableMaintainers.get(0).getEmptyKeyValueQualifier()));
-
-        Scan scan = IndexManagementUtil.newLocalStateScan(
-                Collections.singletonList(getterColumnReferences));
-        ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys));
-        scanRanges.initializeScan(scan);
-        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
-
-        if(indexMetaData.getReplayWrite() != null) {
-            /**
-             * Because of previous {@link IndexManagementUtil#flattenMutationsByTimestamp}(which is called
-             * in {@link IndexRegionObserver#groupMutations} or {@link Indexer#preBatchMutateWithExceptions}),
-             * for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp, all cells in the mutation
-             * have the same rowKey and timestamp.
-             */
-            long timestamp = getMaxTimestamp(dataTableMutationsWithSameRowKeyAndTimestamp);
-            scan.setTimeRange(0, timestamp);
-            scan.setFilter(new SkipScanFilter(skipScanFilter, true));
-        } else {
-            assert scan.isRaw();
-            scan.setMaxVersions(1);
-            scan.setFilter(skipScanFilter);
-        }
-
-        HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
-                new HashMap<ImmutableBytesPtr, List<Cell>>();
-        try (RegionScanner scanner = region.getScanner(scan)) {
-            boolean more = true;
-            while(more) {
-                List<Cell> cells = new ArrayList<Cell>();
-                more = scanner.next(cells);
-                if (cells.isEmpty()) {
-                    continue;
-                }
-                Cell cell = cells.get(0);
-                byte[] rowKey = CellUtil.cloneRow(cell);
-                rowKeyPtrToCells.put(new ImmutableBytesPtr(rowKey), cells);
-            }
-        }
-
-        return new CachedLocalTable(rowKeyPtrToCells);
-    }
-
-    private static long getMaxTimestamp(Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp) {
-        long maxTimestamp = Long.MIN_VALUE;
-        for(Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) {
-            /**
-             * all the cells in this mutation have the same timestamp.
-             */
-            long timestamp = getMutationTimestampWhenAllCellTimestampIsSame(mutation);
-            if(timestamp > maxTimestamp) {
-                maxTimestamp = timestamp;
-            }
-        }
-        return maxTimestamp;
-    }
-
-    public static long getMutationTimestampWhenAllCellTimestampIsSame(Mutation mutation) {
-        return mutation.getFamilyCellMap().values().iterator().next().get(0).getTimestamp();
-    }
+          List<Mutation> flattenedMutations = Lists.newArrayListWithExpectedSize(mutations.size() * 10);
+          for (Mutation m : mutations) {
+              byte[] row = m.getRow();
+              Collection<Batch> batches = createTimestampBatchesFromMutation(m);
+              for (Batch batch : batches) {
+                  Mutation mWithSameTS;
+                  Cell firstCell = batch.getKvs().get(0);
+                  if (KeyValue.Type.codeToType(firstCell.getTypeByte()) == KeyValue.Type.Put) {
+                      mWithSameTS = new Put(row);
+                  } else {
+                      mWithSameTS = new Delete(row);
+                  }
+                  if (m.getAttributesMap() != null) {
+                      for (Map.Entry<String,byte[]> entry : m.getAttributesMap().entrySet()) {
+                          mWithSameTS.setAttribute(entry.getKey(), entry.getValue());
+                      }
+                  }
+                  for (Cell cell : batch.getKvs()) {
+                      byte[] fam = CellUtil.cloneFamily(cell);
+                      List<Cell> famCells = mWithSameTS.getFamilyCellMap().get(fam);
+                      if (famCells == null) {
+                          famCells = Lists.newArrayList();
+                          mWithSameTS.getFamilyCellMap().put(fam, famCells);
+                      }
+                      famCells.add(cell);
+                  }
+                  flattenedMutations.add(mWithSameTS);
+              }
+          }
+          return flattenedMutations;
+      }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
index 272bb26..f07caf7 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
@@ -20,12 +20,9 @@ package org.apache.phoenix.hbase.index.covered;
 import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.KeyValueUtil;
@@ -38,12 +35,12 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
-import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
+import org.apache.phoenix.hbase.index.covered.data.LocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -91,18 +88,25 @@ public class LocalTableStateTest {
 
     Region region = Mockito.mock(Region.class);
     Mockito.when(env.getRegion()).thenReturn(region);
+    RegionScanner scanner = Mockito.mock(RegionScanner.class);
+    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
     final byte[] stored = Bytes.toBytes("stored-value");
+    Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) throws Throwable {
+        List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
+        KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
+        kv.setSequenceId(0);
+        list.add(kv);
+        return false;
+      }
+    });
 
 
-    KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
-    kv.setSequenceId(0);
-    HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
-            new  HashMap<ImmutableBytesPtr, List<Cell>>();
-    rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)kv));
-    CachedLocalTable cachedLocalTable = new CachedLocalTable(rowKeyPtrToCells);
-    LocalTableState table = new LocalTableState(cachedLocalTable, m);
+    LocalHBaseState state = new LocalTable(env);
+    LocalTableState table = new LocalTableState(state, m);
     //add the kvs from the mutation
-    table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
+    table.addPendingUpdates(m.get(fam, qual));
 
     // setup the lookup
     ColumnReference col = new ColumnReference(fam, qual);
@@ -118,6 +122,48 @@ public class LocalTableStateTest {
           super(msg);
       }
   }
+  
+  @Test(expected = ScannerCreatedException.class)
+  public void testScannerForMutableRows() throws Exception {
+      IndexMetaData indexMetaData = new IndexMetaData() {
+
+          @Override
+          public ReplayWrite getReplayWrite() {
+              return null;
+          }
+
+        @Override
+        public boolean requiresPriorRowState(Mutation m) {
+            return true;
+        }
+            
+        @Override
+        public int getClientVersion() {
+            return ScanUtil.UNKNOWN_CLIENT_VERSION;
+        }
+
+    };
+    Put m = new Put(row);
+    m.addColumn(fam, qual, ts, val);
+    // setup mocks
+    Configuration conf = new Configuration(false);
+    RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class);
+    Mockito.when(env.getConfiguration()).thenReturn(conf);
+
+    Region region = Mockito.mock(Region.class);
+    Mockito.when(env.getRegion()).thenReturn(region);
+    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable"));
+
+    LocalHBaseState state = new LocalTable(env);
+    LocalTableState table = new LocalTableState(state, m);
+    //add the kvs from the mutation
+    table.addPendingUpdates(m.get(fam, qual));
+
+    // setup the lookup
+    ColumnReference col = new ColumnReference(fam, qual);
+    table.setCurrentTimestamp(ts);
+    table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
+  }
 
   @Test
   public void testNoScannerForImmutableRows() throws Exception {
@@ -148,11 +194,12 @@ public class LocalTableStateTest {
 
     Region region = Mockito.mock(Region.class);
     Mockito.when(env.getRegion()).thenReturn(region);
+    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable"));
 
-    CachedLocalTable cachedLocalTable = new CachedLocalTable(null);
-    LocalTableState table = new LocalTableState(cachedLocalTable, m);
+    LocalHBaseState state = new LocalTable(env);
+    LocalTableState table = new LocalTableState(state, m);
     //add the kvs from the mutation
-    table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
+    table.addPendingUpdates(m.get(fam, qual));
 
     // setup the lookup
     ColumnReference col = new ColumnReference(fam, qual);
@@ -177,18 +224,24 @@ public class LocalTableStateTest {
 
     Region region = Mockito.mock(Region.class);
     Mockito.when(env.getRegion()).thenReturn(region);
+    RegionScanner scanner = Mockito.mock(RegionScanner.class);
+    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
     final byte[] stored = Bytes.toBytes("stored-value");
     final KeyValue storedKv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
     storedKv.setSequenceId(2);
+    Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) throws Throwable {
+        List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
 
-    HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
-            new  HashMap<ImmutableBytesPtr, List<Cell>>();
-    rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv));
-    CachedLocalTable cachedLocalTable = new CachedLocalTable(rowKeyPtrToCells);
-    LocalTableState table = new LocalTableState(cachedLocalTable, m);
-
+        list.add(storedKv);
+        return false;
+      }
+    });
+    LocalHBaseState state = new LocalTable(env);
+    LocalTableState table = new LocalTableState(state, m);
     // add the kvs from the mutation
-    KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0));
+    KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(m.get(fam, qual).get(0));
     kv.setSequenceId(0);
     table.addPendingUpdates(kv);
 
@@ -205,6 +258,8 @@ public class LocalTableStateTest {
     p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
     s = p.getFirst();
     assertEquals("Didn't correctly rollback the row - still found it!", null, s.next());
+    Mockito.verify(env, Mockito.times(1)).getRegion();
+    Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class));
   }
 
   @SuppressWarnings("unchecked")
@@ -215,19 +270,24 @@ public class LocalTableStateTest {
 
     Region region = Mockito.mock(Region.class);
     Mockito.when(env.getRegion()).thenReturn(region);
+    RegionScanner scanner = Mockito.mock(RegionScanner.class);
+    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
     final KeyValue storedKv =
         new KeyValue(row, fam, qual, ts, Type.Put, Bytes.toBytes("stored-value"));
     storedKv.setSequenceId(2);
+    Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) throws Throwable {
+        List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
 
-
+        list.add(storedKv);
+        return false;
+      }
+    });
+    LocalHBaseState state = new LocalTable(env);
     Put pendingUpdate = new Put(row);
     pendingUpdate.addColumn(fam, qual, ts, val);
-    HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
-            new  HashMap<ImmutableBytesPtr, List<Cell>>();
-    rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv));
-    CachedLocalTable cachedLocalTable = new CachedLocalTable(rowKeyPtrToCells);
-    LocalTableState table = new LocalTableState(cachedLocalTable, pendingUpdate);
-
+    LocalTableState table = new LocalTableState(state, pendingUpdate);
 
     // do the lookup for the given column
     ColumnReference col = new ColumnReference(fam, qual);
@@ -243,6 +303,8 @@ public class LocalTableStateTest {
     p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
     s = p.getFirst();
     assertEquals("Lost already loaded update!", storedKv, s.next());
+    Mockito.verify(env, Mockito.times(1)).getRegion();
+    Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class));
   }
 
   // TODO add test here for making sure multiple column references with the same column family don't
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
index e6785bf..f9c6798 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.BaseRegionScanner;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.MultiMutation;
-import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
+import org.apache.phoenix.hbase.index.covered.data.LocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -149,7 +149,6 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
 
         mockIndexMetaData = Mockito.mock(PhoenixIndexMetaData.class);
         Mockito.when(mockIndexMetaData.requiresPriorRowState((Mutation)Mockito.any())).thenReturn(true);
-        Mockito.when(mockIndexMetaData.getReplayWrite()).thenReturn(null);
         Mockito.when(mockIndexMetaData.getIndexMaintainers())
                 .thenReturn(Collections.singletonList(getTestIndexMaintainer()));
 
@@ -213,13 +212,8 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
         MultiMutation mutation = new MultiMutation(new ImmutableBytesPtr(ROW));
         mutation.addAll(put);
 
-        CachedLocalTable cachedLocalTable = IndexManagementUtil.preScanAllRequiredRows(
-                Collections.singletonList(mutation),
-                this.mockIndexMetaData,
-                this.indexBuilder.getEnv().getRegion());
-
         Collection<Pair<Mutation, byte[]>> indexUpdates =
-                indexBuilder.getIndexUpdate(mutation, mockIndexMetaData, cachedLocalTable);
+                indexBuilder.getIndexUpdate(mutation, mockIndexMetaData);
         assertEquals(2, indexUpdates.size());
         assertContains(indexUpdates, 2, ROW, KeyValue.Type.DeleteFamily, FAM,
             new byte[0] /* qual not needed */, 2);
@@ -260,16 +254,8 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
         mutation.addAll(put);
 
         Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList();
-        Collection<? extends Mutation> mutations =
-                IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation));
-
-        CachedLocalTable cachedLocalTable = IndexManagementUtil.preScanAllRequiredRows(
-                mutations,
-                this.mockIndexMetaData,
-                this.indexBuilder.getEnv().getRegion());
-
-        for (Mutation m : mutations) {
-            indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable));
+        for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) {
+            indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData));
         }
         // 3 puts and 3 deletes (one to hide existing index row for VALUE_1, and two to hide index
         // rows for VALUE_2, VALUE_3)
@@ -301,17 +287,9 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
         MultiMutation mutation = getMultipleVersionMutation(200);
         currentRowCells = mutation.getFamilyCellMap().get(FAM);
 
-        Collection<? extends Mutation> mutations =
-                IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation));
-
-        CachedLocalTable cachedLocalTable = IndexManagementUtil.preScanAllRequiredRows(
-                mutations,
-                this.mockIndexMetaData,
-                this.indexBuilder.getEnv().getRegion());
-
         Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList();
         for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) {
-            indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable));
+            indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData));
         }
         assertNotEquals(0, indexUpdates.size());
     }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
index 8ed61e8..571ed85 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
@@ -134,9 +134,9 @@ public class TestCoveredColumnIndexCodec {
     }
 
     @Override
-    public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly)
+    public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly)
         throws IOException {
-      return r.listCells();
+      return r;
     }
 
   }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java
new file mode 100644
index 0000000..b11ac8d
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.data;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+public class TestLocalTable {
+    private static final byte[] ROW = Bytes.toBytes("test_row");
+
+    @Test
+    public void testGetOldestTimestamp() {
+        LocalTable localTable = new LocalTable(null);
+
+        List<Cell> cellList1 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 4L));
+        assertEquals(4L, localTable.getOldestTimestamp(Collections.singletonList(cellList1)));
+
+        List<Cell> cellList2 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 2L));
+        List<List<Cell>> set1 = new ArrayList<>(Arrays.asList(cellList1, cellList2));
+        assertEquals(2L, localTable.getOldestTimestamp(set1));
+
+        List<Cell> cellList3 = getCellList(new KeyValue(ROW, 1L));
+        set1.add(cellList3);
+        assertEquals(1L, localTable.getOldestTimestamp(set1));
+
+        List<Cell> cellList4 =
+                getCellList(new KeyValue(ROW, 3L), new KeyValue(ROW, 1L), new KeyValue(ROW, 0L));
+        set1.add(cellList4);
+        assertEquals(0L, localTable.getOldestTimestamp(set1));
+    }
+
+    private List<Cell> getCellList(KeyValue... kvs) {
+        List<Cell> cellList = new ArrayList<>();
+        for (KeyValue kv : kvs) {
+            cellList.add(kv);
+        }
+        return cellList;
+    }
+}