You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/11/23 06:09:23 UTC
[phoenix] branch master updated: Revert "PHOENIX-5494 Batched,
mutable Index updates are unnecessarily run one-by-one"
This is an automated email from the ASF dual-hosted git repository.
chenglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 1ef37d9 Revert "PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one"
1ef37d9 is described below
commit 1ef37d98ac0fab7ae004e8d39dd438a2fc639fe3
Author: chenglei <ch...@apache.org>
AuthorDate: Sat Nov 23 14:08:37 2019 +0800
Revert "PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one"
This reverts commit a48aa8766f8a448b6eb6bd8e7d3708ea95799130.
---
.../hbase/index/builder/BaseIndexBuilder.java | 4 -
.../hbase/index/builder/IndexBuildManager.java | 20 +--
.../phoenix/hbase/index/builder/IndexBuilder.java | 3 +-
.../hbase/index/covered/LocalTableState.java | 2 +-
.../hbase/index/covered/NonTxIndexBuilder.java | 8 +-
.../hbase/index/covered/data/CachedLocalTable.java | 72 ---------
.../hbase/index/covered/data/LocalHBaseState.java | 3 +-
.../hbase/index/covered/data/LocalTable.java | 98 ++++++++++++
.../hbase/index/util/IndexManagementUtil.java | 165 ++++-----------------
.../hbase/index/covered/LocalTableStateTest.java | 122 +++++++++++----
.../hbase/index/covered/NonTxIndexBuilderTest.java | 32 +---
.../index/covered/TestCoveredColumnIndexCodec.java | 4 +-
.../hbase/index/covered/data/TestLocalTable.java | 63 ++++++++
13 files changed, 304 insertions(+), 292 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index 5571512..a909cf2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -133,8 +133,4 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
public ReplayWrite getReplayWrite(Mutation m) {
return null;
}
-
- public RegionCoprocessorEnvironment getEnv() {
- return this.env;
- }
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 2dcd6c6..165f0c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+
import com.google.common.collect.ListMultimap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -36,8 +37,6 @@ import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.index.PhoenixIndexMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +49,6 @@ public class IndexBuildManager implements Stoppable {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexBuildManager.class);
private final IndexBuilder delegate;
private boolean stopped;
- private RegionCoprocessorEnvironment regionCoprocessorEnvironment;
/**
* @param env environment in which <tt>this</tt> is running. Used to setup the
@@ -61,7 +59,6 @@ public class IndexBuildManager implements Stoppable {
// Prevent deadlock by using single thread for all reads so that we know
// we can get the ReentrantRWLock. See PHOENIX-2671 for more details.
this.delegate = getIndexBuilder(env);
- this.regionCoprocessorEnvironment = env;
}
private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
@@ -91,14 +88,10 @@ public class IndexBuildManager implements Stoppable {
IndexMetaData indexMetaData) throws Throwable {
// notify the delegate that we have started processing a batch
this.delegate.batchStarted(miniBatchOp, indexMetaData);
- CachedLocalTable cachedLocalTable =
- IndexManagementUtil.preScanAllRequiredRows(
- mutations,
- (PhoenixIndexMetaData)indexMetaData,
- this.regionCoprocessorEnvironment.getRegion());
+
// Avoid the Object overhead of the executor when it's not actually parallelizing anything.
for (Mutation m : mutations) {
- Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable);
+ Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
for (Pair<Mutation, byte[]> update : updates) {
indexUpdates.put(new HTableInterfaceReference(new ImmutableBytesPtr(update.getSecond())), new Pair<>(update.getFirst(), m.getRow()));
}
@@ -112,15 +105,10 @@ public class IndexBuildManager implements Stoppable {
final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
this.delegate.batchStarted(miniBatchOp, indexMetaData);
- CachedLocalTable cachedLocalTable =
- IndexManagementUtil.preScanAllRequiredRows(
- mutations,
- (PhoenixIndexMetaData)indexMetaData,
- this.regionCoprocessorEnvironment.getRegion());
// Avoid the Object overhead of the executor when it's not actually parallelizing anything.
ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
for (Mutation m : mutations) {
- Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable);
+ Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
for (Pair<Mutation, byte[]> update : updates) {
update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index 7bce22e..f561e79 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
/**
* Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table
@@ -73,7 +72,7 @@ public interface IndexBuilder extends Stoppable {
* @return a Map of the mutations to make -> target index table name
* @throws IOException on failure
*/
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context, LocalHBaseState localHBaseState) throws IOException;
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException;
/**
* Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index bf592ba..54d7f87 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -161,7 +161,7 @@ public class LocalTableState implements TableState {
// needing to lookup the prior row values.
if (requiresPriorRowState) {
// add the current state of the row. Uses listCells() to avoid a new array creation.
- this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations), false);
+ this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 63d2751..645f2c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -20,6 +20,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
+import org.apache.phoenix.hbase.index.covered.data.LocalTable;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
import org.slf4j.Logger;
@@ -37,15 +38,18 @@ import org.slf4j.LoggerFactory;
public class NonTxIndexBuilder extends BaseIndexBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(NonTxIndexBuilder.class);
+ protected LocalHBaseState localTable;
+
@Override
public void setup(RegionCoprocessorEnvironment env) throws IOException {
super.setup(env);
+ this.localTable = new LocalTable(env);
}
@Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData, LocalHBaseState localHBaseState) throws IOException {
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData) throws IOException {
// create a state manager, so we can manage each batch
- LocalTableState state = new LocalTableState(localHBaseState, mutation);
+ LocalTableState state = new LocalTableState(localTable, mutation);
// build the index updates for each group
IndexUpdateManager manager = new IndexUpdateManager(indexMetaData);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
deleted file mode 100644
index f0fbfe6..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered.data;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-
-import java.util.HashMap;
-
-public class CachedLocalTable implements LocalHBaseState {
-
- private final HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells;
-
- public CachedLocalTable(HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells) {
- this.rowKeyPtrToCells = rowKeyPtrToCells;
- }
-
- @Override
- public List<Cell> getCurrentRowState(
- Mutation mutation,
- Collection<? extends ColumnReference> columnReferences,
- boolean ignoreNewerMutations) throws IOException {
- byte[] rowKey = mutation.getRow();
- List<Cell> cells = this.rowKeyPtrToCells.get(new ImmutableBytesPtr(rowKey));
-
- if(cells == null || cells.isEmpty()) {
- return cells;
- }
-
- if(!ignoreNewerMutations) {
- return cells;
- }
- /**
- * because of previous {@link IndexManagementUtil#flattenMutationsByTimestamp}(which is called
- * in {@link IndexRegionObserver#groupMutations} or {@link Indexer#preBatchMutateWithExceptions}),
- * all cells in the mutation have the same rowKey and timestamp.
- */
- long timestamp =
- IndexManagementUtil.getMutationTimestampWhenAllCellTimestampIsSame(mutation);
- List<Cell> newCells = new ArrayList<Cell>();
- for(Cell cell : cells) {
- if(cell.getTimestamp() < timestamp ) {
- newCells.add(cell);
- }
- }
- return newCells;
- }
-
-}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
index aae0763..5b06910 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.hbase.index.covered.data;
import java.io.IOException;
import java.util.Collection;
-import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
@@ -44,7 +43,7 @@ public interface LocalHBaseState {
* {@link Result} with no stored {@link Cell}s.
* @throws IOException if there is an issue reading the row
*/
- public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
+ public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
throws IOException;
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
new file mode 100644
index 0000000..402620f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.data;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import com.google.common.primitives.Longs;
+
+/**
+ * Wrapper around a lazily instantiated, local HTable.
+ * <p>
+ * Previously, we had used various row and batch caches. However, this ends up being very
+ * complicated when attempting manage updating and invalidating the cache with no real gain as any
+ * row accessed multiple times will likely be in HBase's block cache, invalidating any extra caching
+ * we are doing here. In the end, its simpler and about as efficient to just get the current state
+ * of the row from HBase and let HBase manage caching the row from disk on its own.
+ */
+public class LocalTable implements LocalHBaseState {
+
+ private RegionCoprocessorEnvironment env;
+
+ public LocalTable(RegionCoprocessorEnvironment env) {
+ this.env = env;
+ }
+
+ @Override
+ public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations)
+ throws IOException {
+ byte[] row = m.getRow();
+ // need to use a scan here so we can get raw state, which Get doesn't provide.
+ Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns));
+ s.setStartRow(row);
+ s.setStopRow(row);
+ if (ignoreNewerMutations) {
+ // Provides a means of client indicating that newer cells should not be considered,
+ // enabling mutations to be replayed to partially rebuild the index when a write fails.
+ // When replaying mutations we want the oldest timestamp (as anything newer we be replayed)
+ long ts = getOldestTimestamp(m.getFamilyCellMap().values());
+ s.setTimeRange(0,ts);
+ }
+ Region region = this.env.getRegion();
+ try (RegionScanner scanner = region.getScanner(s)) {
+ List<Cell> kvs = new ArrayList<Cell>(1);
+ boolean more = scanner.next(kvs);
+ assert !more : "Got more than one result when scanning"
+ + " a single row in the primary table!";
+
+ Result r = Result.create(kvs);
+ return r;
+ }
+ }
+
+ // Returns the smallest timestamp in the given cell lists.
+ // It is assumed that the lists have cells ordered from largest to smallest timestamp
+ protected long getOldestTimestamp(Collection<List<Cell>> cellLists) {
+ Ordering<List<Cell>> cellListOrdering = new Ordering<List<Cell>>() {
+ @Override
+ public int compare(List<Cell> left, List<Cell> right) {
+ // compare the last element of each list, since that is the smallest in that list
+ return Longs.compare(Iterables.getLast(left).getTimestamp(),
+ Iterables.getLast(right).getTimestamp());
+ }
+ };
+ List<Cell> minList = cellListOrdering.min(cellLists);
+ return Iterables.getLast(minList).getTimestamp();
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index 7ed7a2a..a027f54 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -23,10 +23,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -38,30 +36,18 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.builder.FatalIndexBuildingFailureException;
import org.apache.phoenix.hbase.index.builder.IndexBuildingFailureException;
import org.apache.phoenix.hbase.index.covered.Batch;
-import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
import org.apache.phoenix.hbase.index.covered.data.LazyValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
-import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.index.PhoenixIndexMetaData;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.types.PVarbinary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.phoenix.hbase.index.IndexRegionObserver;
-import org.apache.phoenix.hbase.index.Indexer;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
/**
@@ -273,124 +259,35 @@ public class IndexManagementUtil {
}
public static Collection<? extends Mutation> flattenMutationsByTimestamp(Collection<? extends Mutation> mutations) {
- List<Mutation> flattenedMutations = Lists.newArrayListWithExpectedSize(mutations.size() * 10);
- for (Mutation m : mutations) {
- byte[] row = m.getRow();
- Collection<Batch> batches = createTimestampBatchesFromMutation(m);
- for (Batch batch : batches) {
- Mutation mWithSameTS;
- Cell firstCell = batch.getKvs().get(0);
- if (KeyValue.Type.codeToType(firstCell.getTypeByte()) == KeyValue.Type.Put) {
- mWithSameTS = new Put(row);
- } else {
- mWithSameTS = new Delete(row);
- }
- if (m.getAttributesMap() != null) {
- for (Map.Entry<String,byte[]> entry : m.getAttributesMap().entrySet()) {
- mWithSameTS.setAttribute(entry.getKey(), entry.getValue());
- }
- }
- for (Cell cell : batch.getKvs()) {
- byte[] fam = CellUtil.cloneFamily(cell);
- List<Cell> famCells = mWithSameTS.getFamilyCellMap().get(fam);
- if (famCells == null) {
- famCells = Lists.newArrayList();
- mWithSameTS.getFamilyCellMap().put(fam, famCells);
- }
- famCells.add(cell);
- }
- flattenedMutations.add(mWithSameTS);
- }
- }
- return flattenedMutations;
- }
-
- /**
- * Pre-scan all the required rows before we building the indexes for the dataTableMutationsWithSameRowKeyAndTimestamp
- * parameter.
- * Note: When we calling this method, for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp
- * parameter, all cells in the mutation have the same rowKey and timestamp.
- * @param dataTableMutationsWithSameRowKeyAndTimestamp
- * @param indexMetaData
- * @param region
- * @throws IOException
- */
- public static CachedLocalTable preScanAllRequiredRows(
- Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp,
- final PhoenixIndexMetaData indexMetaData,
- Region region) throws IOException {
- List<IndexMaintainer> indexTableMaintainers = indexMetaData.getIndexMaintainers();
- Set<KeyRange> keys = new HashSet<KeyRange>(dataTableMutationsWithSameRowKeyAndTimestamp.size());
- for (Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) {
- keys.add(PVarbinary.INSTANCE.getKeyRange(mutation.getRow()));
- }
-
- Set<ColumnReference> getterColumnReferences = Sets.newHashSet();
- for (IndexMaintainer indexTableMaintainer : indexTableMaintainers) {
- getterColumnReferences.addAll(
- indexTableMaintainer.getAllColumns());
- }
-
- getterColumnReferences.add(new ColumnReference(
- indexTableMaintainers.get(0).getDataEmptyKeyValueCF(),
- indexTableMaintainers.get(0).getEmptyKeyValueQualifier()));
-
- Scan scan = IndexManagementUtil.newLocalStateScan(
- Collections.singletonList(getterColumnReferences));
- ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys));
- scanRanges.initializeScan(scan);
- SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
-
- if(indexMetaData.getReplayWrite() != null) {
- /**
- * Because of previous {@link IndexManagementUtil#flattenMutationsByTimestamp}(which is called
- * in {@link IndexRegionObserver#groupMutations} or {@link Indexer#preBatchMutateWithExceptions}),
- * for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp, all cells in the mutation
- * have the same rowKey and timestamp.
- */
- long timestamp = getMaxTimestamp(dataTableMutationsWithSameRowKeyAndTimestamp);
- scan.setTimeRange(0, timestamp);
- scan.setFilter(new SkipScanFilter(skipScanFilter, true));
- } else {
- assert scan.isRaw();
- scan.setMaxVersions(1);
- scan.setFilter(skipScanFilter);
- }
-
- HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
- new HashMap<ImmutableBytesPtr, List<Cell>>();
- try (RegionScanner scanner = region.getScanner(scan)) {
- boolean more = true;
- while(more) {
- List<Cell> cells = new ArrayList<Cell>();
- more = scanner.next(cells);
- if (cells.isEmpty()) {
- continue;
- }
- Cell cell = cells.get(0);
- byte[] rowKey = CellUtil.cloneRow(cell);
- rowKeyPtrToCells.put(new ImmutableBytesPtr(rowKey), cells);
- }
- }
-
- return new CachedLocalTable(rowKeyPtrToCells);
- }
-
- private static long getMaxTimestamp(Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp) {
- long maxTimestamp = Long.MIN_VALUE;
- for(Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) {
- /**
- * all the cells in this mutation have the same timestamp.
- */
- long timestamp = getMutationTimestampWhenAllCellTimestampIsSame(mutation);
- if(timestamp > maxTimestamp) {
- maxTimestamp = timestamp;
- }
- }
- return maxTimestamp;
- }
-
- public static long getMutationTimestampWhenAllCellTimestampIsSame(Mutation mutation) {
- return mutation.getFamilyCellMap().values().iterator().next().get(0).getTimestamp();
- }
+ List<Mutation> flattenedMutations = Lists.newArrayListWithExpectedSize(mutations.size() * 10);
+ for (Mutation m : mutations) {
+ byte[] row = m.getRow();
+ Collection<Batch> batches = createTimestampBatchesFromMutation(m);
+ for (Batch batch : batches) {
+ Mutation mWithSameTS;
+ Cell firstCell = batch.getKvs().get(0);
+ if (KeyValue.Type.codeToType(firstCell.getTypeByte()) == KeyValue.Type.Put) {
+ mWithSameTS = new Put(row);
+ } else {
+ mWithSameTS = new Delete(row);
+ }
+ if (m.getAttributesMap() != null) {
+ for (Map.Entry<String,byte[]> entry : m.getAttributesMap().entrySet()) {
+ mWithSameTS.setAttribute(entry.getKey(), entry.getValue());
+ }
+ }
+ for (Cell cell : batch.getKvs()) {
+ byte[] fam = CellUtil.cloneFamily(cell);
+ List<Cell> famCells = mWithSameTS.getFamilyCellMap().get(fam);
+ if (famCells == null) {
+ famCells = Lists.newArrayList();
+ mWithSameTS.getFamilyCellMap().put(fam, famCells);
+ }
+ famCells.add(cell);
+ }
+ flattenedMutations.add(mWithSameTS);
+ }
+ }
+ return flattenedMutations;
+ }
}
\ No newline at end of file
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
index 272bb26..f07caf7 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
@@ -20,12 +20,9 @@ package org.apache.phoenix.hbase.index.covered;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
@@ -38,12 +35,12 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
-import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
+import org.apache.phoenix.hbase.index.covered.data.LocalTable;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.scanner.Scanner;
import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.ScanUtil;
import org.junit.Test;
import org.mockito.Mockito;
@@ -91,18 +88,25 @@ public class LocalTableStateTest {
Region region = Mockito.mock(Region.class);
Mockito.when(env.getRegion()).thenReturn(region);
+ RegionScanner scanner = Mockito.mock(RegionScanner.class);
+ Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
final byte[] stored = Bytes.toBytes("stored-value");
+ Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
+ KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
+ kv.setSequenceId(0);
+ list.add(kv);
+ return false;
+ }
+ });
- KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
- kv.setSequenceId(0);
- HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
- new HashMap<ImmutableBytesPtr, List<Cell>>();
- rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)kv));
- CachedLocalTable cachedLocalTable = new CachedLocalTable(rowKeyPtrToCells);
- LocalTableState table = new LocalTableState(cachedLocalTable, m);
+ LocalHBaseState state = new LocalTable(env);
+ LocalTableState table = new LocalTableState(state, m);
//add the kvs from the mutation
- table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
+ table.addPendingUpdates(m.get(fam, qual));
// setup the lookup
ColumnReference col = new ColumnReference(fam, qual);
@@ -118,6 +122,48 @@ public class LocalTableStateTest {
super(msg);
}
}
+
+ @Test(expected = ScannerCreatedException.class)
+ public void testScannerForMutableRows() throws Exception {
+ IndexMetaData indexMetaData = new IndexMetaData() {
+
+ @Override
+ public ReplayWrite getReplayWrite() {
+ return null;
+ }
+
+ @Override
+ public boolean requiresPriorRowState(Mutation m) {
+ return true;
+ }
+
+ @Override
+ public int getClientVersion() {
+ return ScanUtil.UNKNOWN_CLIENT_VERSION;
+ }
+
+ };
+ Put m = new Put(row);
+ m.addColumn(fam, qual, ts, val);
+ // setup mocks
+ Configuration conf = new Configuration(false);
+ RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class);
+ Mockito.when(env.getConfiguration()).thenReturn(conf);
+
+ Region region = Mockito.mock(Region.class);
+ Mockito.when(env.getRegion()).thenReturn(region);
+ Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable"));
+
+ LocalHBaseState state = new LocalTable(env);
+ LocalTableState table = new LocalTableState(state, m);
+ //add the kvs from the mutation
+ table.addPendingUpdates(m.get(fam, qual));
+
+ // setup the lookup
+ ColumnReference col = new ColumnReference(fam, qual);
+ table.setCurrentTimestamp(ts);
+ table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
+ }
@Test
public void testNoScannerForImmutableRows() throws Exception {
@@ -148,11 +194,12 @@ public class LocalTableStateTest {
Region region = Mockito.mock(Region.class);
Mockito.when(env.getRegion()).thenReturn(region);
+ Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable"));
- CachedLocalTable cachedLocalTable = new CachedLocalTable(null);
- LocalTableState table = new LocalTableState(cachedLocalTable, m);
+ LocalHBaseState state = new LocalTable(env);
+ LocalTableState table = new LocalTableState(state, m);
//add the kvs from the mutation
- table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
+ table.addPendingUpdates(m.get(fam, qual));
// setup the lookup
ColumnReference col = new ColumnReference(fam, qual);
@@ -177,18 +224,24 @@ public class LocalTableStateTest {
Region region = Mockito.mock(Region.class);
Mockito.when(env.getRegion()).thenReturn(region);
+ RegionScanner scanner = Mockito.mock(RegionScanner.class);
+ Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
final byte[] stored = Bytes.toBytes("stored-value");
final KeyValue storedKv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
storedKv.setSequenceId(2);
+ Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
- HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
- new HashMap<ImmutableBytesPtr, List<Cell>>();
- rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv));
- CachedLocalTable cachedLocalTable = new CachedLocalTable(rowKeyPtrToCells);
- LocalTableState table = new LocalTableState(cachedLocalTable, m);
-
+ list.add(storedKv);
+ return false;
+ }
+ });
+ LocalHBaseState state = new LocalTable(env);
+ LocalTableState table = new LocalTableState(state, m);
// add the kvs from the mutation
- KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0));
+ KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(m.get(fam, qual).get(0));
kv.setSequenceId(0);
table.addPendingUpdates(kv);
@@ -205,6 +258,8 @@ public class LocalTableStateTest {
p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
s = p.getFirst();
assertEquals("Didn't correctly rollback the row - still found it!", null, s.next());
+ Mockito.verify(env, Mockito.times(1)).getRegion();
+ Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class));
}
@SuppressWarnings("unchecked")
@@ -215,19 +270,24 @@ public class LocalTableStateTest {
Region region = Mockito.mock(Region.class);
Mockito.when(env.getRegion()).thenReturn(region);
+ RegionScanner scanner = Mockito.mock(RegionScanner.class);
+ Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
final KeyValue storedKv =
new KeyValue(row, fam, qual, ts, Type.Put, Bytes.toBytes("stored-value"));
storedKv.setSequenceId(2);
+ Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
-
+ list.add(storedKv);
+ return false;
+ }
+ });
+ LocalHBaseState state = new LocalTable(env);
Put pendingUpdate = new Put(row);
pendingUpdate.addColumn(fam, qual, ts, val);
- HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
- new HashMap<ImmutableBytesPtr, List<Cell>>();
- rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv));
- CachedLocalTable cachedLocalTable = new CachedLocalTable(rowKeyPtrToCells);
- LocalTableState table = new LocalTableState(cachedLocalTable, pendingUpdate);
-
+ LocalTableState table = new LocalTableState(state, pendingUpdate);
// do the lookup for the given column
ColumnReference col = new ColumnReference(fam, qual);
@@ -243,6 +303,8 @@ public class LocalTableStateTest {
p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
s = p.getFirst();
assertEquals("Lost already loaded update!", storedKv, s.next());
+ Mockito.verify(env, Mockito.times(1)).getRegion();
+ Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class));
}
// TODO add test here for making sure multiple column references with the same column family don't
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
index e6785bf..f9c6798 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.BaseRegionScanner;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
import org.apache.phoenix.hbase.index.MultiMutation;
-import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
+import org.apache.phoenix.hbase.index.covered.data.LocalTable;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -149,7 +149,6 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
mockIndexMetaData = Mockito.mock(PhoenixIndexMetaData.class);
Mockito.when(mockIndexMetaData.requiresPriorRowState((Mutation)Mockito.any())).thenReturn(true);
- Mockito.when(mockIndexMetaData.getReplayWrite()).thenReturn(null);
Mockito.when(mockIndexMetaData.getIndexMaintainers())
.thenReturn(Collections.singletonList(getTestIndexMaintainer()));
@@ -213,13 +212,8 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
MultiMutation mutation = new MultiMutation(new ImmutableBytesPtr(ROW));
mutation.addAll(put);
- CachedLocalTable cachedLocalTable = IndexManagementUtil.preScanAllRequiredRows(
- Collections.singletonList(mutation),
- this.mockIndexMetaData,
- this.indexBuilder.getEnv().getRegion());
-
Collection<Pair<Mutation, byte[]>> indexUpdates =
- indexBuilder.getIndexUpdate(mutation, mockIndexMetaData, cachedLocalTable);
+ indexBuilder.getIndexUpdate(mutation, mockIndexMetaData);
assertEquals(2, indexUpdates.size());
assertContains(indexUpdates, 2, ROW, KeyValue.Type.DeleteFamily, FAM,
new byte[0] /* qual not needed */, 2);
@@ -260,16 +254,8 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
mutation.addAll(put);
Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList();
- Collection<? extends Mutation> mutations =
- IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation));
-
- CachedLocalTable cachedLocalTable = IndexManagementUtil.preScanAllRequiredRows(
- mutations,
- this.mockIndexMetaData,
- this.indexBuilder.getEnv().getRegion());
-
- for (Mutation m : mutations) {
- indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable));
+ for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) {
+ indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData));
}
// 3 puts and 3 deletes (one to hide existing index row for VALUE_1, and two to hide index
// rows for VALUE_2, VALUE_3)
@@ -301,17 +287,9 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
MultiMutation mutation = getMultipleVersionMutation(200);
currentRowCells = mutation.getFamilyCellMap().get(FAM);
- Collection<? extends Mutation> mutations =
- IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation));
-
- CachedLocalTable cachedLocalTable = IndexManagementUtil.preScanAllRequiredRows(
- mutations,
- this.mockIndexMetaData,
- this.indexBuilder.getEnv().getRegion());
-
Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList();
for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) {
- indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable));
+ indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData));
}
assertNotEquals(0, indexUpdates.size());
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
index 8ed61e8..571ed85 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
@@ -134,9 +134,9 @@ public class TestCoveredColumnIndexCodec {
}
@Override
- public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly)
+ public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly)
throws IOException {
- return r.listCells();
+ return r;
}
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java
new file mode 100644
index 0000000..b11ac8d
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.data;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+public class TestLocalTable {
+ private static final byte[] ROW = Bytes.toBytes("test_row");
+
+ @Test
+ public void testGetOldestTimestamp() {
+ LocalTable localTable = new LocalTable(null);
+
+ List<Cell> cellList1 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 4L));
+ assertEquals(4L, localTable.getOldestTimestamp(Collections.singletonList(cellList1)));
+
+ List<Cell> cellList2 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 2L));
+ List<List<Cell>> set1 = new ArrayList<>(Arrays.asList(cellList1, cellList2));
+ assertEquals(2L, localTable.getOldestTimestamp(set1));
+
+ List<Cell> cellList3 = getCellList(new KeyValue(ROW, 1L));
+ set1.add(cellList3);
+ assertEquals(1L, localTable.getOldestTimestamp(set1));
+
+ List<Cell> cellList4 =
+ getCellList(new KeyValue(ROW, 3L), new KeyValue(ROW, 1L), new KeyValue(ROW, 0L));
+ set1.add(cellList4);
+ assertEquals(0L, localTable.getOldestTimestamp(set1));
+ }
+
+ private List<Cell> getCellList(KeyValue... kvs) {
+ List<Cell> cellList = new ArrayList<>();
+ for (KeyValue kv : kvs) {
+ cellList.add(kv);
+ }
+ return cellList;
+ }
+}