You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2022/03/21 06:42:10 UTC

[phoenix] branch 5.1 updated: PHOENIX-6658 Replace HRegion.get() calls

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new b65fc1b  PHOENIX-6658 Replace HRegion.get() calls
b65fc1b is described below

commit b65fc1b0d959200d00651f99108c97d9d77a4898
Author: Istvan Toth <st...@apache.org>
AuthorDate: Mon Feb 28 18:53:19 2022 +0100

    PHOENIX-6658 Replace HRegion.get() calls
---
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  | 33 ++++++----
 .../coprocessor/SequenceRegionObserver.java        | 71 ++++++++++++----------
 .../apache/phoenix/index/PhoenixIndexBuilder.java  | 13 +++-
 .../apache/phoenix/util/PhoenixKeyValueUtil.java   | 27 ++++++--
 4 files changed, 92 insertions(+), 52 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 238a90f..77da732 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3514,19 +3514,22 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             if (rowLock == null) {
                 throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
             }
-            try {
-                Get get = new Get(key);
-                get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
-                get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
-                get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
-                get.addColumn(TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
-                Result currentResult = region.get(get);
-                if (currentResult.rawCells().length == 0) {
+
+            Get get = new Get(key);
+            get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
+            get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
+            get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
+            get.addColumn(TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
+            try (RegionScanner scanner = region.getScanner(new Scan(get))) {
+                List<Cell> cells = new ArrayList<>();
+                scanner.next(cells);
+                if (cells.isEmpty()) {
                     builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
                     builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                     done.run(builder.build());
                     return;
                 }
+                Result currentResult = Result.create(cells);
                 Cell dataTableKV = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
                 Cell currentStateKV = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
                 Cell currentDisableTimeStamp = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
@@ -3628,10 +3631,14 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 if (currentState == PIndexState.PENDING_DISABLE) {
                     if (newState == PIndexState.ACTIVE) {
                         //before making index ACTIVE check if all clients succeed otherwise keep it PENDING_DISABLE
-                        byte[] count = region
-                                .get(new Get(key).addColumn(TABLE_FAMILY_BYTES,
-                                        PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES))
-                                .getValue(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES);
+                        byte[] count;
+                        try (RegionScanner countScanner = region.getScanner(new Scan(get))) {
+                            List<Cell> countCells = new ArrayList<>();
+                            scanner.next(countCells);
+                            count = Result.create(countCells)
+                                    .getValue(TABLE_FAMILY_BYTES,
+                                        PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES);
+                        }
                         if (count != null && Bytes.toLong(count) != 0) {
                             newState = PIndexState.PENDING_DISABLE;
                             newKVs.remove(disableTimeStampKVIndex);
@@ -4055,7 +4062,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 Region region = env.getRegion();
                 Scan scan = MetaDataUtil.newTableRowsScan(keys.get(0), MIN_TABLE_TIMESTAMP, clientTimeStamp);
                 List<Cell> results = Lists.newArrayList();
-                try (RegionScanner scanner = region.getScanner(scan);) {
+                try (RegionScanner scanner = region.getScanner(scan)) {
                     scanner.next(results);
                     if (results.isEmpty()) { // Should not be possible
                         return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index 8e48899..33e7bf0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -19,6 +19,7 @@
 package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -36,13 +37,14 @@ import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Region.RowLock;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -121,32 +123,35 @@ public class SequenceRegionObserver implements RegionObserver, RegionCoprocessor
         region.startRegionOperation();
         try {
             ServerUtil.acquireLock(region, row, locks);
-            try {
-                long maxTimestamp = tr.getMax();
-                boolean validateOnly = true;
-                Get get = new Get(row);
-                get.setTimeRange(tr.getMin(), tr.getMax());
-                for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) {
-                    byte[] cf = entry.getKey();
-                    for (Cell cq : entry.getValue()) {
-                    	long value = Bytes.toLong(cq.getValueArray(), cq.getValueOffset());
-                        get.addColumn(cf, CellUtil.cloneQualifier(cq));
-                        long cellTimestamp = cq.getTimestamp();
-                        // Workaround HBASE-15698 by using the lowest of the timestamps found
-                        // on the Increment or any of its Cells.
-                        if (cellTimestamp > 0 && cellTimestamp < maxTimestamp) {
-                            maxTimestamp = cellTimestamp;
-                            get.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, maxTimestamp);
-                        }
-                        validateOnly &= (Sequence.ValueOp.VALIDATE_SEQUENCE.ordinal() == value);
+            long maxTimestamp = tr.getMax();
+            boolean validateOnly = true;
+            Get get = new Get(row);
+            get.setTimeRange(tr.getMin(), tr.getMax());
+            for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) {
+                byte[] cf = entry.getKey();
+                for (Cell cq : entry.getValue()) {
+                    long value = Bytes.toLong(cq.getValueArray(), cq.getValueOffset());
+                    get.addColumn(cf, CellUtil.cloneQualifier(cq));
+                    long cellTimestamp = cq.getTimestamp();
+                    // Workaround HBASE-15698 by using the lowest of the timestamps found
+                    // on the Increment or any of its Cells.
+                    if (cellTimestamp > 0 && cellTimestamp < maxTimestamp) {
+                        maxTimestamp = cellTimestamp;
+                        get.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, maxTimestamp);
                     }
+                    validateOnly &= (Sequence.ValueOp.VALIDATE_SEQUENCE.ordinal() == value);
                 }
-                Result result = region.get(get);
-                if (result.isEmpty()) {
+            }
+            try (RegionScanner scanner = region.getScanner(new Scan(get))) {
+                List<Cell> currentCells = new ArrayList<>();
+                scanner.next(currentCells);
+                // These cells are returned by this method, and may be backed by ByteBuffers
+                // that we free when the RegionScanner is closed on return
+                PhoenixKeyValueUtil.maybeCopyCellList(currentCells);
+                if (currentCells.isEmpty()) {
                     return getErrorResult(row, maxTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode());
                 }
-                
-                 
+                Result result = Result.create(currentCells);
                 Cell currentValueKV = Sequence.getCurrentValueKV(result);
                 Cell incrementByKV = Sequence.getIncrementByKV(result);
                 Cell cacheSizeKV = Sequence.getCacheSizeKV(result);
@@ -379,15 +384,17 @@ public class SequenceRegionObserver implements RegionObserver, RegionCoprocessor
         region.startRegionOperation();
         try {
             ServerUtil.acquireLock(region, row, locks);
-            try {
-                byte[] family = CellUtil.cloneFamily(keyValue);
-                byte[] qualifier = CellUtil.cloneQualifier(keyValue);
 
-                Get get = new Get(row);
-                get.setTimeRange(minGetTimestamp, maxGetTimestamp);
-                get.addColumn(family, qualifier);
-                Result result = region.get(get);
-                if (result.isEmpty()) {
+            byte[] family = CellUtil.cloneFamily(keyValue);
+            byte[] qualifier = CellUtil.cloneQualifier(keyValue);
+
+            Get get = new Get(row);
+            get.setTimeRange(minGetTimestamp, maxGetTimestamp);
+            get.addColumn(family, qualifier);
+            try (RegionScanner scanner = region.getScanner(new Scan(get))) {
+                List<Cell> cells = new ArrayList<>();
+                scanner.next(cells);
+                if (cells.isEmpty()) {
                     if (op == Sequence.MetaOp.DROP_SEQUENCE || op == Sequence.MetaOp.RETURN_SEQUENCE) {
                         return getErrorResult(row, clientTimestamp, SQLExceptionCode.SEQUENCE_UNDEFINED.getErrorCode());
                     }
@@ -399,7 +406,7 @@ public class SequenceRegionObserver implements RegionObserver, RegionCoprocessor
                 Mutation m = null;
                 switch (op) {
                 case RETURN_SEQUENCE:
-                    KeyValue currentValueKV = PhoenixKeyValueUtil.maybeCopyCell(result.rawCells()[0]);
+                    KeyValue currentValueKV = PhoenixKeyValueUtil.maybeCopyCell(cells.get(0));
                     long expectedValue = PLong.INSTANCE.getCodec().decodeLong(append.getAttribute(CURRENT_VALUE_ATTRIB), 0, SortOrder.getDefault());
                     long value = PLong.INSTANCE.getCodec().decodeLong(currentValueKV.getValueArray(),
                       currentValueKV.getValueOffset(), SortOrder.getDefault());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 7cef8dc..1ab132d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -37,12 +38,13 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
@@ -150,8 +152,13 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
         final Get get = new Get(rowKey);
         if (isDupKeyIgnore(opBytes)) {
             get.setFilter(new FirstKeyOnlyFilter());
-            Result result = this.env.getRegion().get(get);
-            return result.isEmpty() ? convertIncrementToPutInSingletonList(inc) : Collections.<Mutation>emptyList();
+            try (RegionScanner scanner = this.env.getRegion().getScanner(new Scan(get))) {
+                List<Cell> cells = new ArrayList<>();
+                scanner.next(cells);
+                return cells.isEmpty()
+                        ? convertIncrementToPutInSingletonList(inc)
+                        : Collections.<Mutation>emptyList();
+            }
         }
         ByteArrayInputStream stream = new ByteArrayInputStream(opBytes);
         DataInputStream input = new DataInputStream(stream);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
index 0af2148..f553c85 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java
@@ -21,9 +21,11 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.hbase.ByteBufferExtendedCell;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.Cell.Type;
 import org.apache.hadoop.hbase.CellBuilderFactory;
@@ -34,16 +36,12 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compat.hbase.CompatUtil;
 import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.types.PArrayDataTypeEncoder;
 
 /**
  * 
@@ -216,12 +214,33 @@ public class PhoenixKeyValueUtil {
         // Same as KeyValueUtil, but HBase has deprecated this method. Avoid depending on something
         // that will likely be removed at some point in time.
         if (c == null) return null;
+        // TODO Do we really want to return only KeyValues, or would it be enough to
+        // copy ByteBufferExtendedCells to heap ?
+        // i.e can we avoid copying on-heap cells like BufferedDataBlockEncoder.OnheapDecodedCell ?
         if (c instanceof KeyValue) {
             return (KeyValue) c;
         }
         return KeyValueUtil.copyToNewKeyValue(c);
     }
 
+    /**
+     * Copy all Off-Heap cells to KeyValues
+     * The input list is modified.
+     *
+     * @param cells is modified in place
+     * @return the modified list (optional, input list is modified in place)
+     */
+    public static List<Cell> maybeCopyCellList(List<Cell> cells) {
+        ListIterator<Cell> cellsIt = cells.listIterator();
+        while (cellsIt.hasNext()) {
+            Cell c = cellsIt.next();
+            if (c instanceof ByteBufferExtendedCell) {
+                cellsIt.set(KeyValueUtil.copyToNewKeyValue(c));
+            }
+        }
+        return cells;
+    }
+
     private static long calculateMultiRowMutationSize(MultiRowMutationState mutations) {
         long size = 0;
         // iterate over rows