You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2022/03/21 06:42:10 UTC
[phoenix] branch 5.1 updated: PHOENIX-6658 Replace HRegion.get() calls
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new b65fc1b PHOENIX-6658 Replace HRegion.get() calls
b65fc1b is described below
commit b65fc1b0d959200d00651f99108c97d9d77a4898
Author: Istvan Toth <st...@apache.org>
AuthorDate: Mon Feb 28 18:53:19 2022 +0100
PHOENIX-6658 Replace HRegion.get() calls
---
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 33 ++++++----
.../coprocessor/SequenceRegionObserver.java | 71 ++++++++++++----------
.../apache/phoenix/index/PhoenixIndexBuilder.java | 13 +++-
.../apache/phoenix/util/PhoenixKeyValueUtil.java | 27 ++++++--
4 files changed, 92 insertions(+), 52 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 238a90f..77da732 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3514,19 +3514,22 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
if (rowLock == null) {
throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
}
- try {
- Get get = new Get(key);
- get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
- get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
- get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
- get.addColumn(TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
- Result currentResult = region.get(get);
- if (currentResult.rawCells().length == 0) {
+
+ Get get = new Get(key);
+ get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
+ get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
+ get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
+ get.addColumn(TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
+ try (RegionScanner scanner = region.getScanner(new Scan(get))) {
+ List<Cell> cells = new ArrayList<>();
+ scanner.next(cells);
+ if (cells.isEmpty()) {
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
+ Result currentResult = Result.create(cells);
Cell dataTableKV = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
Cell currentStateKV = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
Cell currentDisableTimeStamp = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
@@ -3628,10 +3631,14 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
if (currentState == PIndexState.PENDING_DISABLE) {
if (newState == PIndexState.ACTIVE) {
//before making index ACTIVE check if all clients succeed otherwise keep it PENDING_DISABLE
- byte[] count = region
- .get(new Get(key).addColumn(TABLE_FAMILY_BYTES,
- PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES))
- .getValue(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES);
+ byte[] count;
+ try (RegionScanner countScanner = region.getScanner(new Scan(get))) {
+ List<Cell> countCells = new ArrayList<>();
+ scanner.next(countCells);
+ count = Result.create(countCells)
+ .getValue(TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES);
+ }
if (count != null && Bytes.toLong(count) != 0) {
newState = PIndexState.PENDING_DISABLE;
newKVs.remove(disableTimeStampKVIndex);
@@ -4055,7 +4062,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
Region region = env.getRegion();
Scan scan = MetaDataUtil.newTableRowsScan(keys.get(0), MIN_TABLE_TIMESTAMP, clientTimeStamp);
List<Cell> results = Lists.newArrayList();
- try (RegionScanner scanner = region.getScanner(scan);) {
+ try (RegionScanner scanner = region.getScanner(scan)) {
scanner.next(results);
if (results.isEmpty()) { // Should not be possible
return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index 8e48899..33e7bf0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -19,6 +19,7 @@
package org.apache.phoenix.coprocessor;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -36,13 +37,14 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -121,32 +123,35 @@ public class SequenceRegionObserver implements RegionObserver, RegionCoprocessor
region.startRegionOperation();
try {
ServerUtil.acquireLock(region, row, locks);
- try {
- long maxTimestamp = tr.getMax();
- boolean validateOnly = true;
- Get get = new Get(row);
- get.setTimeRange(tr.getMin(), tr.getMax());
- for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) {
- byte[] cf = entry.getKey();
- for (Cell cq : entry.getValue()) {
- long value = Bytes.toLong(cq.getValueArray(), cq.getValueOffset());
- get.addColumn(cf, CellUtil.cloneQualifier(cq));
- long cellTimestamp = cq.getTimestamp();
- // Workaround HBASE-15698 by using the lowest of the timestamps found
- // on the Increment or any of its Cells.
- if (cellTimestamp > 0 && cellTimestamp < maxTimestamp) {
- maxTimestamp = cellTimestamp;
- get.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, maxTimestamp);
- }
- validateOnly &= (Sequence.ValueOp.VALIDATE_SEQUENCE.ordinal() == value);
+ long maxTimestamp = tr.getMax();
+ boolean validateOnly = true;
+ Get get = new Get(row);
+ get.setTimeRange(tr.getMin(), tr.getMax());
+ for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) {
+ byte[] cf = entry.getKey();
+ for (Cell cq : entry.getValue()) {
+ long value = Bytes.toLong(cq.getValueArray(), cq.getValueOffset());
+ get.addColumn(cf, CellUtil.cloneQualifier(cq));
+ long cellTimestamp = cq.getTimestamp();
+ // Workaround HBASE-15698 by using the lowest of the timestamps found
+ // on the Increment or any of its Cells.
+ if (cellTimestamp > 0 && cellTimestamp < maxTimestamp) {
+ maxTimestamp = cellTimestamp;
+ get.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, maxTimestamp);
}
+ validateOnly &= (Sequence.ValueOp.VALIDATE_SEQUENCE.ordinal() == value);
}
- Result result = region.get(get);
- if (result.isEmpty()) {
+ }
+ try (RegionScanner scanner = region.getScanner(new Scan(get))) {
+ List<Cell> currentCells = new ArrayList<>();
+ scanner.next(currentCells);
+ // These cells are returned by this method, and may be backed by ByteBuffers
+ // that we free when the RegionScanner is closed on return
+ PhoenixKeyValueUtil.maybeCopyCellList(currentCells);
+ if (currentCells.isEmpty()) {
return getErrorResult(row, maxTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode());
}
-
-
+ Result result = Result.create(currentCells);
Cell currentValueKV = Sequence.getCurrentValueKV(result);
Cell incrementByKV = Sequence.getIncrementByKV(result);
Cell cacheSizeKV = Sequence.getCacheSizeKV(result);
@@ -379,15 +384,17 @@ public class SequenceRegionObserver implements RegionObserver, RegionCoprocessor
region.startRegionOperation();
try {
ServerUtil.acquireLock(region, row, locks);
- try {
- byte[] family = CellUtil.cloneFamily(keyValue);
- byte[] qualifier = CellUtil.cloneQualifier(keyValue);
- Get get = new Get(row);
- get.setTimeRange(minGetTimestamp, maxGetTimestamp);
- get.addColumn(family, qualifier);
- Result result = region.get(get);
- if (result.isEmpty()) {
+ byte[] family = CellUtil.cloneFamily(keyValue);
+ byte[] qualifier = CellUtil.cloneQualifier(keyValue);
+
+ Get get = new Get(row);
+ get.setTimeRange(minGetTimestamp, maxGetTimestamp);
+ get.addColumn(family, qualifier);
+ try (RegionScanner scanner = region.getScanner(new Scan(get))) {
+ List<Cell> cells = new ArrayList<>();
+ scanner.next(cells);
+ if (cells.isEmpty()) {
if (op == Sequence.MetaOp.DROP_SEQUENCE || op == Sequence.MetaOp.RETURN_SEQUENCE) {
return getErrorResult(row, clientTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode());
}
@@ -399,7 +406,7 @@ public class SequenceRegionObserver implements RegionObserver, RegionCoprocessor
Mutation m = null;
switch (op) {
case RETURN_SEQUENCE:
- KeyValue currentValueKV = PhoenixKeyValueUtil.maybeCopyCell(result.rawCells()[0]);
+ KeyValue currentValueKV = PhoenixKeyValueUtil.maybeCopyCell(cells.get(0));
long expectedValue = PLong.INSTANCE.getCodec().decodeLong(append.getAttribute(CURRENT_VALUE_ATTRIB), 0, SortOrder.getDefault());
long value = PLong.INSTANCE.getCodec().decodeLong(currentValueKV.getValueArray(),
currentValueKV.getValueOffset(), SortOrder.getDefault());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 7cef8dc..1ab132d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -37,12 +38,13 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
@@ -150,8 +152,13 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
final Get get = new Get(rowKey);
if (isDupKeyIgnore(opBytes)) {
get.setFilter(new FirstKeyOnlyFilter());
- Result result = this.env.getRegion().get(get);
- return result.isEmpty() ? convertIncrementToPutInSingletonList(inc) : Collections.<Mutation>emptyList();
+ try (RegionScanner scanner = this.env.getRegion().getScanner(new Scan(get))) {
+ List<Cell> cells = new ArrayList<>();
+ scanner.next(cells);
+ return cells.isEmpty()
+ ? convertIncrementToPutInSingletonList(inc)
+ : Collections.<Mutation>emptyList();
+ }
}
ByteArrayInputStream stream = new ByteArrayInputStream(opBytes);
DataInputStream input = new DataInputStream(stream);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
index 0af2148..f553c85 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
@@ -21,9 +21,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderFactory;
@@ -34,16 +36,12 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compat.hbase.CompatUtil;
import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.types.PArrayDataTypeEncoder;
/**
*
@@ -216,12 +214,33 @@ public class PhoenixKeyValueUtil {
// Same as KeyValueUtil, but HBase has deprecated this method. Avoid depending on something
// that will likely be removed at some point in time.
if (c == null) return null;
+ // TODO Do we really want to return only KeyValues, or would it be enough to
+ // copy ByteBufferExtendedCells to heap ?
+ // i.e can we avoid copying on-heap cells like BufferedDataBlockEncoder.OnheapDecodedCell ?
if (c instanceof KeyValue) {
return (KeyValue) c;
}
return KeyValueUtil.copyToNewKeyValue(c);
}
+ /**
+ * Copy all Off-Heap cells to KeyValues
+ * The input list is modified.
+ *
+ * @param cells is modified in place
+ * @return the modified list (optional, input list is modified in place)
+ */
+ public static List<Cell> maybeCopyCellList(List<Cell> cells) {
+ ListIterator<Cell> cellsIt = cells.listIterator();
+ while (cellsIt.hasNext()) {
+ Cell c = cellsIt.next();
+ if (c instanceof ByteBufferExtendedCell) {
+ cellsIt.set(KeyValueUtil.copyToNewKeyValue(c));
+ }
+ }
+ return cells;
+ }
+
private static long calculateMultiRowMutationSize(MultiRowMutationState mutations) {
long size = 0;
// iterate over rows