You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/12/01 10:17:59 UTC
[phoenix] branch 4.x-HBase-1.5 updated: PHOENIX-5494 Batched,
mutable Index updates are unnecessarily run one-by-one
This is an automated email from the ASF dual-hosted git repository.
chenglei pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
new aba120d PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one
aba120d is described below
commit aba120da44156e46c72ceec08e3694fd9d792623
Author: chenglei <ch...@apache.org>
AuthorDate: Sun Dec 1 18:16:59 2019 +0800
PHOENIX-5494 Batched, mutable Index updates are unnecessarily run one-by-one
---
.../hbase/index/builder/BaseIndexBuilder.java | 4 +
.../hbase/index/builder/IndexBuildManager.java | 19 ++-
.../phoenix/hbase/index/builder/IndexBuilder.java | 3 +-
.../hbase/index/covered/LocalTableState.java | 2 +-
.../hbase/index/covered/NonTxIndexBuilder.java | 8 +-
.../hbase/index/covered/data/CachedLocalTable.java | 188 +++++++++++++++++++++
.../hbase/index/covered/data/LocalHBaseState.java | 4 +-
.../hbase/index/covered/data/LocalTable.java | 98 -----------
.../hbase/index/covered/LocalTableStateTest.java | 124 +++-----------
.../hbase/index/covered/NonTxIndexBuilderTest.java | 32 +++-
.../index/covered/TestCoveredColumnIndexCodec.java | 4 +-
.../hbase/index/covered/data/TestLocalTable.java | 63 -------
12 files changed, 272 insertions(+), 277 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index f96bf9d..e3fdc1b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -133,4 +133,8 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
public ReplayWrite getReplayWrite(Mutation m) {
return null;
}
+
+ public RegionCoprocessorEnvironment getEnv() {
+ return this.env;
+ }
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 90d28b8..9683507 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-
import com.google.common.collect.ListMultimap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
@@ -37,6 +36,7 @@ import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
import org.apache.phoenix.index.PhoenixIndexMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +49,7 @@ public class IndexBuildManager implements Stoppable {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexBuildManager.class);
private final IndexBuilder delegate;
private boolean stopped;
+ private RegionCoprocessorEnvironment regionCoprocessorEnvironment;
/**
* @param env environment in which <tt>this</tt> is running. Used to setup the
@@ -59,6 +60,7 @@ public class IndexBuildManager implements Stoppable {
// Prevent deadlock by using single thread for all reads so that we know
// we can get the ReentrantRWLock. See PHOENIX-2671 for more details.
this.delegate = getIndexBuilder(env);
+ this.regionCoprocessorEnvironment = env;
}
private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
@@ -88,10 +90,14 @@ public class IndexBuildManager implements Stoppable {
IndexMetaData indexMetaData) throws Throwable {
// notify the delegate that we have started processing a batch
this.delegate.batchStarted(miniBatchOp, indexMetaData);
-
+ CachedLocalTable cachedLocalTable =
+ CachedLocalTable.build(
+ mutations,
+ (PhoenixIndexMetaData)indexMetaData,
+ this.regionCoprocessorEnvironment.getRegion());
// Avoid the Object overhead of the executor when it's not actually parallelizing anything.
for (Mutation m : mutations) {
- Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
+ Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable);
for (Pair<Mutation, byte[]> update : updates) {
indexUpdates.put(new HTableInterfaceReference(new ImmutableBytesPtr(update.getSecond())), new Pair<>(update.getFirst(), m.getRow()));
}
@@ -105,10 +111,15 @@ public class IndexBuildManager implements Stoppable {
final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
this.delegate.batchStarted(miniBatchOp, indexMetaData);
+ CachedLocalTable cachedLocalTable =
+ CachedLocalTable.build(
+ mutations,
+ (PhoenixIndexMetaData)indexMetaData,
+ this.regionCoprocessorEnvironment.getRegion());
// Avoid the Object overhead of the executor when it's not actually parallelizing anything.
ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
for (Mutation m : mutations) {
- Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
+ Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData, cachedLocalTable);
if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
for (Pair<Mutation, byte[]> update : updates) {
update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index d3b9d6b..18e361a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
/**
* Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table
@@ -72,7 +73,7 @@ public interface IndexBuilder extends Stoppable {
* @return a Map of the mutations to make -> target index table name
* @throws IOException on failure
*/
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException;
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context, LocalHBaseState localHBaseState) throws IOException;
/**
* Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 8a069f8..be18cbe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -172,7 +172,7 @@ public class LocalTableState implements TableState {
// needing to lookup the prior row values.
if (requiresPriorRowState) {
// add the current state of the row. Uses listCells() to avoid a new array creation.
- this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
+ this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations), false);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 19eda4d..a294ad7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
-import org.apache.phoenix.hbase.index.covered.data.LocalTable;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
import org.slf4j.Logger;
@@ -40,18 +39,15 @@ import org.slf4j.LoggerFactory;
public class NonTxIndexBuilder extends BaseIndexBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(NonTxIndexBuilder.class);
- protected LocalHBaseState localTable;
-
@Override
public void setup(RegionCoprocessorEnvironment env) throws IOException {
super.setup(env);
- this.localTable = new LocalTable(env);
}
@Override
- public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData) throws IOException {
+ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData, LocalHBaseState localHBaseState) throws IOException {
// create a state manager, so we can manage each batch
- LocalTableState state = new LocalTableState(localTable, mutation);
+ LocalTableState state = new LocalTableState(localHBaseState, mutation);
// build the index updates for each group
IndexUpdateManager manager = new IndexUpdateManager(indexMetaData);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
new file mode 100644
index 0000000..7091178
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.data;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PVarbinary;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+import java.util.HashMap;
+
+public class CachedLocalTable implements LocalHBaseState {
+
+ private final HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells;
+
+ private CachedLocalTable(HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells) {
+ this.rowKeyPtrToCells = rowKeyPtrToCells;
+ }
+
+ @Override
+ public List<Cell> getCurrentRowState(
+ Mutation mutation,
+ Collection<? extends ColumnReference> columnReferences,
+ boolean ignoreNewerMutations) throws IOException {
+ byte[] rowKey = mutation.getRow();
+ List<Cell> cells = this.rowKeyPtrToCells.get(new ImmutableBytesPtr(rowKey));
+
+ if(cells == null || cells.isEmpty()) {
+ return cells;
+ }
+
+ if(!ignoreNewerMutations) {
+ return cells;
+ }
+ /**
+ * because of previous {@link IndexManagementUtil#flattenMutationsByTimestamp}(which is called
+ * in {@link IndexRegionObserver#groupMutations} or {@link Indexer#preBatchMutateWithExceptions}),
+ * all cells in the mutation have the same rowKey and timestamp.
+ */
+ long timestamp =
+ getMutationTimestampWhenAllCellTimestampIsSame(mutation);
+ List<Cell> newCells = new ArrayList<Cell>();
+ for(Cell cell : cells) {
+ if(cell.getTimestamp() < timestamp ) {
+ newCells.add(cell);
+ }
+ }
+ return newCells;
+ }
+
+ @VisibleForTesting
+ public static CachedLocalTable build(HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells) {
+ return new CachedLocalTable(rowKeyPtrToCells);
+ }
+
+ public static CachedLocalTable build(
+ Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp,
+ PhoenixIndexMetaData indexMetaData,
+ Region region) throws IOException {
+ return preScanAllRequiredRows(dataTableMutationsWithSameRowKeyAndTimestamp, indexMetaData, region);
+ }
+
+ /**
+ * Pre-scan all the required rows before we building the indexes for the dataTableMutationsWithSameRowKeyAndTimestamp
+ * parameter.
+ * Note: When we calling this method, for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp
+ * parameter, all cells in the mutation have the same rowKey and timestamp.
+ * @param dataTableMutationsWithSameRowKeyAndTimestamp
+ * @param indexMetaData
+ * @param region
+ * @throws IOException
+ */
+ public static CachedLocalTable preScanAllRequiredRows(
+ Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp,
+ PhoenixIndexMetaData indexMetaData,
+ Region region) throws IOException {
+ List<IndexMaintainer> indexTableMaintainers = indexMetaData.getIndexMaintainers();
+ Set<KeyRange> keys = new HashSet<KeyRange>(dataTableMutationsWithSameRowKeyAndTimestamp.size());
+ for (Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) {
+ keys.add(PVarbinary.INSTANCE.getKeyRange(mutation.getRow()));
+ }
+
+ Set<ColumnReference> getterColumnReferences = Sets.newHashSet();
+ for (IndexMaintainer indexTableMaintainer : indexTableMaintainers) {
+ getterColumnReferences.addAll(
+ indexTableMaintainer.getAllColumns());
+ }
+
+ getterColumnReferences.add(new ColumnReference(
+ indexTableMaintainers.get(0).getDataEmptyKeyValueCF(),
+ indexTableMaintainers.get(0).getEmptyKeyValueQualifier()));
+
+ Scan scan = IndexManagementUtil.newLocalStateScan(
+ Collections.singletonList(getterColumnReferences));
+ ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys));
+ scanRanges.initializeScan(scan);
+ SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+
+ if(indexMetaData.getReplayWrite() != null) {
+ /**
+ * Because of previous {@link IndexManagementUtil#flattenMutationsByTimestamp}(which is called
+ * in {@link IndexRegionObserver#groupMutations} or {@link Indexer#preBatchMutateWithExceptions}),
+ * for single mutation in the dataTableMutationsWithSameRowKeyAndTimestamp, all cells in the mutation
+ * have the same rowKey and timestamp.
+ */
+ long timestamp = getMaxTimestamp(dataTableMutationsWithSameRowKeyAndTimestamp);
+ scan.setTimeRange(0, timestamp);
+ scan.setFilter(new SkipScanFilter(skipScanFilter, true));
+ } else {
+ assert scan.isRaw();
+ scan.setMaxVersions(1);
+ scan.setFilter(skipScanFilter);
+ }
+
+ HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
+ new HashMap<ImmutableBytesPtr, List<Cell>>();
+ try (RegionScanner scanner = region.getScanner(scan)) {
+ boolean more = true;
+ while(more) {
+ List<Cell> cells = new ArrayList<Cell>();
+ more = scanner.next(cells);
+ if (cells.isEmpty()) {
+ continue;
+ }
+ Cell cell = cells.get(0);
+ byte[] rowKey = CellUtil.cloneRow(cell);
+ rowKeyPtrToCells.put(new ImmutableBytesPtr(rowKey), cells);
+ }
+ }
+
+ return new CachedLocalTable(rowKeyPtrToCells);
+ }
+
+ private static long getMaxTimestamp(Collection<? extends Mutation> dataTableMutationsWithSameRowKeyAndTimestamp) {
+ long maxTimestamp = Long.MIN_VALUE;
+ for(Mutation mutation : dataTableMutationsWithSameRowKeyAndTimestamp) {
+ /**
+ * all the cells in this mutation have the same timestamp.
+ */
+ long timestamp = getMutationTimestampWhenAllCellTimestampIsSame(mutation);
+ if(timestamp > maxTimestamp) {
+ maxTimestamp = timestamp;
+ }
+ }
+ return maxTimestamp;
+ }
+
+ private static long getMutationTimestampWhenAllCellTimestampIsSame(Mutation mutation) {
+ return mutation.getFamilyCellMap().values().iterator().next().get(0).getTimestamp();
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
index 9968627..583e7f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
@@ -19,7 +19,9 @@ package org.apache.phoenix.hbase.index.covered.data;
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
@@ -43,7 +45,7 @@ public interface LocalHBaseState {
* {@link Result} with no stored {@link KeyValue}s.
* @throws IOException if there is an issue reading the row
*/
- public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
+ public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
throws IOException;
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
deleted file mode 100644
index 402620f..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered.data;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import com.google.common.primitives.Longs;
-
-/**
- * Wrapper around a lazily instantiated, local HTable.
- * <p>
- * Previously, we had used various row and batch caches. However, this ends up being very
- * complicated when attempting manage updating and invalidating the cache with no real gain as any
- * row accessed multiple times will likely be in HBase's block cache, invalidating any extra caching
- * we are doing here. In the end, its simpler and about as efficient to just get the current state
- * of the row from HBase and let HBase manage caching the row from disk on its own.
- */
-public class LocalTable implements LocalHBaseState {
-
- private RegionCoprocessorEnvironment env;
-
- public LocalTable(RegionCoprocessorEnvironment env) {
- this.env = env;
- }
-
- @Override
- public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations)
- throws IOException {
- byte[] row = m.getRow();
- // need to use a scan here so we can get raw state, which Get doesn't provide.
- Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns));
- s.setStartRow(row);
- s.setStopRow(row);
- if (ignoreNewerMutations) {
- // Provides a means of client indicating that newer cells should not be considered,
- // enabling mutations to be replayed to partially rebuild the index when a write fails.
- // When replaying mutations we want the oldest timestamp (as anything newer we be replayed)
- long ts = getOldestTimestamp(m.getFamilyCellMap().values());
- s.setTimeRange(0,ts);
- }
- Region region = this.env.getRegion();
- try (RegionScanner scanner = region.getScanner(s)) {
- List<Cell> kvs = new ArrayList<Cell>(1);
- boolean more = scanner.next(kvs);
- assert !more : "Got more than one result when scanning"
- + " a single row in the primary table!";
-
- Result r = Result.create(kvs);
- return r;
- }
- }
-
- // Returns the smallest timestamp in the given cell lists.
- // It is assumed that the lists have cells ordered from largest to smallest timestamp
- protected long getOldestTimestamp(Collection<List<Cell>> cellLists) {
- Ordering<List<Cell>> cellListOrdering = new Ordering<List<Cell>>() {
- @Override
- public int compare(List<Cell> left, List<Cell> right) {
- // compare the last element of each list, since that is the smallest in that list
- return Longs.compare(Iterables.getLast(left).getTimestamp(),
- Iterables.getLast(right).getTimestamp());
- }
- };
- List<Cell> minList = cellListOrdering.min(cellLists);
- return Iterables.getLast(minList).getTimestamp();
- }
-}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
index 56ba1d6..416c16a 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
@@ -20,35 +20,32 @@ package org.apache.phoenix.hbase.index.covered;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
-import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
-import org.apache.phoenix.hbase.index.covered.data.LocalTable;
+import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.scanner.Scanner;
import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.util.ScanUtil;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-/**
- *
- */
+
public class LocalTableStateTest {
private static final byte[] row = Bytes.toBytes("row");
@@ -87,23 +84,16 @@ public class LocalTableStateTest {
Region region = Mockito.mock(Region.class);
Mockito.when(env.getRegion()).thenReturn(region);
- RegionScanner scanner = Mockito.mock(RegionScanner.class);
- Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
final byte[] stored = Bytes.toBytes("stored-value");
- Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
- @Override
- public Boolean answer(InvocationOnMock invocation) throws Throwable {
- List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
- KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
- kv.setSequenceId(0);
- list.add(kv);
- return false;
- }
- });
- LocalHBaseState state = new LocalTable(env);
- LocalTableState table = new LocalTableState(state, m);
+ KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
+ kv.setSequenceId(0);
+ HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
+ new HashMap<ImmutableBytesPtr, List<Cell>>();
+ rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)kv));
+ CachedLocalTable cachedLocalTable = CachedLocalTable.build(rowKeyPtrToCells);
+ LocalTableState table = new LocalTableState(cachedLocalTable, m);
//add the kvs from the mutation
table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
@@ -121,48 +111,6 @@ public class LocalTableStateTest {
super(msg);
}
}
-
- @Test(expected = ScannerCreatedException.class)
- public void testScannerForMutableRows() throws Exception {
- IndexMetaData indexMetaData = new IndexMetaData() {
-
- @Override
- public ReplayWrite getReplayWrite() {
- return null;
- }
-
- @Override
- public boolean requiresPriorRowState(Mutation m) {
- return true;
- }
-
- @Override
- public int getClientVersion() {
- return ScanUtil.UNKNOWN_CLIENT_VERSION;
- }
-
- };
- Put m = new Put(row);
- m.addColumn(fam, qual, ts, val);
- // setup mocks
- Configuration conf = new Configuration(false);
- RegionCoprocessorEnvironment env = Mockito.mock(RegionCoprocessorEnvironment.class);
- Mockito.when(env.getConfiguration()).thenReturn(conf);
-
- Region region = Mockito.mock(Region.class);
- Mockito.when(env.getRegion()).thenReturn(region);
- Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable"));
-
- LocalHBaseState state = new LocalTable(env);
- LocalTableState table = new LocalTableState(state, m);
- //add the kvs from the mutation
- table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
-
- // setup the lookup
- ColumnReference col = new ColumnReference(fam, qual);
- table.setCurrentTimestamp(ts);
- table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
- }
@Test
public void testNoScannerForImmutableRows() throws Exception {
@@ -193,10 +141,9 @@ public class LocalTableStateTest {
Region region = Mockito.mock(Region.class);
Mockito.when(env.getRegion()).thenReturn(region);
- Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new ScannerCreatedException("Should not open scanner when data is immutable"));
- LocalHBaseState state = new LocalTable(env);
- LocalTableState table = new LocalTableState(state, m);
+ CachedLocalTable cachedLocalTable = CachedLocalTable.build(null);
+ LocalTableState table = new LocalTableState(cachedLocalTable, m);
//add the kvs from the mutation
table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
@@ -223,22 +170,16 @@ public class LocalTableStateTest {
Region region = Mockito.mock(Region.class);
Mockito.when(env.getRegion()).thenReturn(region);
- RegionScanner scanner = Mockito.mock(RegionScanner.class);
- Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
final byte[] stored = Bytes.toBytes("stored-value");
final KeyValue storedKv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
storedKv.setSequenceId(2);
- Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
- @Override
- public Boolean answer(InvocationOnMock invocation) throws Throwable {
- List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
- list.add(storedKv);
- return false;
- }
- });
- LocalHBaseState state = new LocalTable(env);
- LocalTableState table = new LocalTableState(state, m);
+ HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
+ new HashMap<ImmutableBytesPtr, List<Cell>>();
+ rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv));
+ CachedLocalTable cachedLocalTable = CachedLocalTable.build(rowKeyPtrToCells);
+ LocalTableState table = new LocalTableState(cachedLocalTable, m);
+
// add the kvs from the mutation
KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0));
kv.setSequenceId(0);
@@ -257,8 +198,6 @@ public class LocalTableStateTest {
p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
s = p.getFirst();
assertEquals("Didn't correctly rollback the row - still found it!", null, s.next());
- Mockito.verify(env, Mockito.times(1)).getRegion();
- Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class));
}
@SuppressWarnings("unchecked")
@@ -269,24 +208,19 @@ public class LocalTableStateTest {
Region region = Mockito.mock(Region.class);
Mockito.when(env.getRegion()).thenReturn(region);
- RegionScanner scanner = Mockito.mock(RegionScanner.class);
- Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
final KeyValue storedKv =
new KeyValue(row, fam, qual, ts, Type.Put, Bytes.toBytes("stored-value"));
storedKv.setSequenceId(2);
- Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new Answer<Boolean>() {
- @Override
- public Boolean answer(InvocationOnMock invocation) throws Throwable {
- List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
- list.add(storedKv);
- return false;
- }
- });
- LocalHBaseState state = new LocalTable(env);
+
Put pendingUpdate = new Put(row);
pendingUpdate.addColumn(fam, qual, ts, val);
- LocalTableState table = new LocalTableState(state, pendingUpdate);
+ HashMap<ImmutableBytesPtr, List<Cell>> rowKeyPtrToCells =
+ new HashMap<ImmutableBytesPtr, List<Cell>>();
+ rowKeyPtrToCells.put(new ImmutableBytesPtr(row), Collections.singletonList((Cell)storedKv));
+ CachedLocalTable cachedLocalTable = CachedLocalTable.build(rowKeyPtrToCells);
+ LocalTableState table = new LocalTableState(cachedLocalTable, pendingUpdate);
+
// do the lookup for the given column
ColumnReference col = new ColumnReference(fam, qual);
@@ -302,8 +236,6 @@ public class LocalTableStateTest {
p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData);
s = p.getFirst();
assertEquals("Lost already loaded update!", storedKv, s.next());
- Mockito.verify(env, Mockito.times(1)).getRegion();
- Mockito.verify(region, Mockito.times(1)).getScanner(Mockito.any(Scan.class));
}
// TODO add test here for making sure multiple column references with the same column family don't
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
index f587e98..ddd7d64 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.BaseRegionScanner;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
import org.apache.phoenix.hbase.index.MultiMutation;
-import org.apache.phoenix.hbase.index.covered.data.LocalTable;
+import org.apache.phoenix.hbase.index.covered.data.CachedLocalTable;
import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -149,6 +149,7 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
mockIndexMetaData = Mockito.mock(PhoenixIndexMetaData.class);
Mockito.when(mockIndexMetaData.requiresPriorRowState((Mutation)Mockito.any())).thenReturn(true);
+ Mockito.when(mockIndexMetaData.getReplayWrite()).thenReturn(null);
Mockito.when(mockIndexMetaData.getIndexMaintainers())
.thenReturn(Collections.singletonList(getTestIndexMaintainer()));
@@ -212,8 +213,13 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
MultiMutation mutation = new MultiMutation(new ImmutableBytesPtr(ROW));
mutation.addAll(put);
+ CachedLocalTable cachedLocalTable = CachedLocalTable.build(
+ Collections.singletonList(mutation),
+ this.mockIndexMetaData,
+ this.indexBuilder.getEnv().getRegion());
+
Collection<Pair<Mutation, byte[]>> indexUpdates =
- indexBuilder.getIndexUpdate(mutation, mockIndexMetaData);
+ indexBuilder.getIndexUpdate(mutation, mockIndexMetaData, cachedLocalTable);
assertEquals(2, indexUpdates.size());
assertContains(indexUpdates, 2, ROW, KeyValue.Type.DeleteFamily, FAM,
new byte[0] /* qual not needed */, 2);
@@ -254,8 +260,16 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
mutation.addAll(put);
Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList();
- for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) {
- indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData));
+ Collection<? extends Mutation> mutations =
+ IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation));
+
+ CachedLocalTable cachedLocalTable = CachedLocalTable.build(
+ mutations,
+ this.mockIndexMetaData,
+ this.indexBuilder.getEnv().getRegion());
+
+ for (Mutation m : mutations) {
+ indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable));
}
// 3 puts and 3 deletes (one to hide existing index row for VALUE_1, and two to hide index
// rows for VALUE_2, VALUE_3)
@@ -287,9 +301,17 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
MultiMutation mutation = getMultipleVersionMutation(200);
currentRowCells = mutation.getFamilyCellMap().get(FAM);
+ Collection<? extends Mutation> mutations =
+ IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation));
+
+ CachedLocalTable cachedLocalTable = CachedLocalTable.build(
+ mutations,
+ this.mockIndexMetaData,
+ this.indexBuilder.getEnv().getRegion());
+
Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayList();
for (Mutation m : IndexManagementUtil.flattenMutationsByTimestamp(Collections.singletonList(mutation))) {
- indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData));
+ indexUpdates.addAll(indexBuilder.getIndexUpdate(m, mockIndexMetaData, cachedLocalTable));
}
assertNotEquals(0, indexUpdates.size());
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
index e0a8ebe..ae63f83 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java
@@ -134,9 +134,9 @@ public class TestCoveredColumnIndexCodec {
}
@Override
- public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly)
+ public List<Cell> getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly)
throws IOException {
- return r;
+ return r.listCells();
}
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java
deleted file mode 100644
index b11ac8d..0000000
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestLocalTable.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered.data;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-
-public class TestLocalTable {
- private static final byte[] ROW = Bytes.toBytes("test_row");
-
- @Test
- public void testGetOldestTimestamp() {
- LocalTable localTable = new LocalTable(null);
-
- List<Cell> cellList1 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 4L));
- assertEquals(4L, localTable.getOldestTimestamp(Collections.singletonList(cellList1)));
-
- List<Cell> cellList2 = getCellList(new KeyValue(ROW, 5L), new KeyValue(ROW, 2L));
- List<List<Cell>> set1 = new ArrayList<>(Arrays.asList(cellList1, cellList2));
- assertEquals(2L, localTable.getOldestTimestamp(set1));
-
- List<Cell> cellList3 = getCellList(new KeyValue(ROW, 1L));
- set1.add(cellList3);
- assertEquals(1L, localTable.getOldestTimestamp(set1));
-
- List<Cell> cellList4 =
- getCellList(new KeyValue(ROW, 3L), new KeyValue(ROW, 1L), new KeyValue(ROW, 0L));
- set1.add(cellList4);
- assertEquals(0L, localTable.getOldestTimestamp(set1));
- }
-
- private List<Cell> getCellList(KeyValue... kvs) {
- List<Cell> cellList = new ArrayList<>();
- for (KeyValue kv : kvs) {
- cellList.add(kv);
- }
- return cellList;
- }
-}