You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2022/02/25 22:15:32 UTC
[phoenix] branch 5.1 updated: PHOENIX-6656 Reindent NonAggregateRegionScannerFactory
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new 04ff09e PHOENIX-6656 Reindent NonAggregateRegionScannerFactory
04ff09e is described below
commit 04ff09e4cb3e02ac9b6c002cf78a9100a8c4012c
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Fri Feb 25 14:13:53 2022 -0800
PHOENIX-6656 Reindent NonAggregateRegionScannerFactory
---
.../iterate/NonAggregateRegionScannerFactory.java | 558 ++++++++++-----------
1 file changed, 279 insertions(+), 279 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index b1d8136..0cb5102 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -81,111 +81,111 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
- public NonAggregateRegionScannerFactory(RegionCoprocessorEnvironment env) {
- this.env = env;
- }
+ public NonAggregateRegionScannerFactory(RegionCoprocessorEnvironment env) {
+ this.env = env;
+ }
- @Override
- public RegionScanner getRegionScanner(final Scan scan, final RegionScanner s) throws Throwable {
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- int offset = 0;
- if (ScanUtil.isLocalIndex(scan)) {
+ @Override
+ public RegionScanner getRegionScanner(final Scan scan, final RegionScanner s) throws Throwable {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ int offset = 0;
+ if (ScanUtil.isLocalIndex(scan)) {
/*
* For local indexes, we need to set an offset on row key expressions to skip
* the region start key.
*/
- Region region = getRegion();
- offset = region.getRegionInfo().getStartKey().length != 0 ?
- region.getRegionInfo().getStartKey().length :
- region.getRegionInfo().getEndKey().length;
- ScanUtil.setRowKeyOffset(scan, offset);
- }
- byte[] scanOffsetBytes = scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET);
- Integer scanOffset = null;
- if (scanOffsetBytes != null) {
- scanOffset = (Integer)PInteger.INSTANCE.toObject(scanOffsetBytes);
- }
- RegionScanner innerScanner = s;
- PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
- boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
+ Region region = getRegion();
+ offset = region.getRegionInfo().getStartKey().length != 0 ?
+ region.getRegionInfo().getStartKey().length :
+ region.getRegionInfo().getEndKey().length;
+ ScanUtil.setRowKeyOffset(scan, offset);
+ }
+ byte[] scanOffsetBytes = scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET);
+ Integer scanOffset = null;
+ if (scanOffsetBytes != null) {
+ scanOffset = (Integer)PInteger.INSTANCE.toObject(scanOffsetBytes);
+ }
+ RegionScanner innerScanner = s;
+ PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+ boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
- Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet();
- KeyValueSchema kvSchema = null;
- ValueBitSet kvSchemaBitSet = null;
- Expression[] arrayFuncRefs = deserializeArrayPositionalExpressionInfoFromScan(scan, innerScanner, arrayKVRefs);
- if (arrayFuncRefs != null) {
- KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0);
- for (Expression expression : arrayFuncRefs) {
- builder.addField(expression);
+ Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet();
+ KeyValueSchema kvSchema = null;
+ ValueBitSet kvSchemaBitSet = null;
+ Expression[] arrayFuncRefs = deserializeArrayPositionalExpressionInfoFromScan(scan, innerScanner, arrayKVRefs);
+ if (arrayFuncRefs != null) {
+ KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0);
+ for (Expression expression : arrayFuncRefs) {
+ builder.addField(expression);
+ }
+ kvSchema = builder.build();
+ kvSchemaBitSet = ValueBitSet.newInstance(kvSchema);
+ }
+ TupleProjector tupleProjector = null;
+ Region dataRegion = null;
+ IndexMaintainer indexMaintainer = null;
+ byte[][] viewConstants = null;
+ PhoenixTransactionContext tx = null;
+ ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
+ if (dataColumns != null) {
+ tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
+ dataRegion = env.getRegion();
+ int clientVersion = ScanUtil.getClientVersion(scan);
+ List<IndexMaintainer> indexMaintainers =
+ IndexUtil.deSerializeIndexMaintainersFromScan(scan);
+ indexMaintainer = indexMaintainers.get(0);
+ viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
+ byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
+ tx = TransactionFactory.getTransactionContext(txState, clientVersion);
}
- kvSchema = builder.build();
- kvSchemaBitSet = ValueBitSet.newInstance(kvSchema);
- }
- TupleProjector tupleProjector = null;
- Region dataRegion = null;
- IndexMaintainer indexMaintainer = null;
- byte[][] viewConstants = null;
- PhoenixTransactionContext tx = null;
- ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
- if (dataColumns != null) {
- tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
- dataRegion = env.getRegion();
- int clientVersion = ScanUtil.getClientVersion(scan);
- List<IndexMaintainer> indexMaintainers =
- IndexUtil.deSerializeIndexMaintainersFromScan(scan);
- indexMaintainer = indexMaintainers.get(0);
- viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
- byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
- tx = TransactionFactory.getTransactionContext(txState, clientVersion);
- }
- final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
- final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
- boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(getMinMaxQualifiersFromScan(scan))
- && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null;
- // setting dataRegion in case of a non-coprocessor environment
- if (dataRegion == null &&
- env.getConfiguration().get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY) != null) {
- dataRegion = env.getRegion();
- }
- innerScanner = getWrappedScanner(env, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, dataColumns,
- tupleProjector, dataRegion, indexMaintainer, tx, viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null,
- ptr, useQualifierAsIndex);
+ final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
+ final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(getMinMaxQualifiersFromScan(scan))
+ && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null;
+ // setting dataRegion in case of a non-coprocessor environment
+ if (dataRegion == null &&
+ env.getConfiguration().get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY) != null) {
+ dataRegion = env.getRegion();
+ }
+ innerScanner = getWrappedScanner(env, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, dataColumns,
+ tupleProjector, dataRegion, indexMaintainer, tx, viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null,
+ ptr, useQualifierAsIndex);
- final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
- if (j != null) {
- innerScanner = new HashJoinRegionScanner(env, innerScanner, scan, arrayKVRefs, arrayFuncRefs,
- p, j, tenantId, useQualifierAsIndex,
- useNewValueColumnQualifier);
- }
- if (scanOffset != null) {
- innerScanner = getOffsetScanner(innerScanner, new OffsetResultIterator(
- new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme),
- scanOffset, getPageSizeMsForRegionScanner(scan)),
- scan.getAttribute(QueryConstants.LAST_SCAN) != null);
- }
- boolean spoolingEnabled =
- env.getConfiguration().getBoolean(
- QueryServices.SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB,
- QueryServicesOptions.DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED);
- long thresholdBytes =
- env.getConfiguration().getLong(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES);
- final OrderedResultIterator iterator =
- deserializeFromScan(scan, innerScanner, spoolingEnabled, thresholdBytes);
- if (iterator == null) {
- return innerScanner;
+ final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
+ if (j != null) {
+ innerScanner = new HashJoinRegionScanner(env, innerScanner, scan, arrayKVRefs, arrayFuncRefs,
+ p, j, tenantId, useQualifierAsIndex,
+ useNewValueColumnQualifier);
+ }
+ if (scanOffset != null) {
+ innerScanner = getOffsetScanner(innerScanner, new OffsetResultIterator(
+ new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme),
+ scanOffset, getPageSizeMsForRegionScanner(scan)),
+ scan.getAttribute(QueryConstants.LAST_SCAN) != null);
+ }
+ boolean spoolingEnabled =
+ env.getConfiguration().getBoolean(
+ QueryServices.SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+ QueryServicesOptions.DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED);
+ long thresholdBytes =
+ env.getConfiguration().getLong(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES);
+ final OrderedResultIterator iterator =
+ deserializeFromScan(scan, innerScanner, spoolingEnabled, thresholdBytes);
+ if (iterator == null) {
+ return innerScanner;
+ }
+ // TODO:the above wrapped scanner should be used here also
+ return getTopNScanner(env, innerScanner, iterator, tenantId);
}
- // TODO:the above wrapped scanner should be used here also
- return getTopNScanner(env, innerScanner, iterator, tenantId);
- }
@VisibleForTesting
static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s,
- boolean spoolingEnabled, long thresholdBytes) {
- byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN);
- if (topN == null) {
- return null;
+ boolean spoolingEnabled, long thresholdBytes) {
+ byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN);
+ if (topN == null) {
+ return null;
}
int clientVersion = ScanUtil.getClientVersion(scan);
// Client including and after 4.15 and 5.1 are not going to serialize thresholdBytes
@@ -194,214 +194,214 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
(scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION) == null)
|| (VersionUtil.decodeMajorVersion(clientVersion) > 5)
|| (VersionUtil.decodeMajorVersion(clientVersion) == 5
- && clientVersion < MetaDataProtocol.MIN_5_x_DISABLE_SERVER_SPOOL_THRESHOLD)
+ && clientVersion < MetaDataProtocol.MIN_5_x_DISABLE_SERVER_SPOOL_THRESHOLD)
|| (VersionUtil.decodeMajorVersion(clientVersion) == 4
- && clientVersion < MetaDataProtocol.MIN_4_x_DISABLE_SERVER_SPOOL_THRESHOLD);
+ && clientVersion < MetaDataProtocol.MIN_4_x_DISABLE_SERVER_SPOOL_THRESHOLD);
ByteArrayInputStream stream = new ByteArrayInputStream(topN); // TODO: size?
try {
- DataInputStream input = new DataInputStream(stream);
- if (shouldDecodeSpoolThreshold) {
- // Read off the scan but ignore, we won't honor client sent thresholdbytes, but the
- // one set on server
- WritableUtils.readVInt(input);
- }
- int limit = WritableUtils.readVInt(input);
- int estimatedRowSize = WritableUtils.readVInt(input);
- int size = WritableUtils.readVInt(input);
- List<OrderByExpression> orderByExpressions = Lists.newArrayListWithExpectedSize(size);
- for (int i = 0; i < size; i++) {
- OrderByExpression orderByExpression = new OrderByExpression();
- orderByExpression.readFields(input);
- orderByExpressions.add(orderByExpression);
- }
- PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
- ResultIterator inner = new RegionScannerResultIterator(s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
- return new OrderedResultIterator(inner, orderByExpressions, spoolingEnabled,
- thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize, getPageSizeMsForRegionScanner(scan));
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- try {
- stream.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private Expression[] deserializeArrayPositionalExpressionInfoFromScan(Scan scan, RegionScanner s,
- Set<KeyValueColumnExpression> arrayKVRefs) {
- byte[] specificArrayIdx = scan.getAttribute(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX);
- if (specificArrayIdx == null) {
- return null;
- }
- ByteArrayInputStream stream = new ByteArrayInputStream(specificArrayIdx);
- try {
- DataInputStream input = new DataInputStream(stream);
- int arrayKVRefSize = WritableUtils.readVInt(input);
- for (int i = 0; i < arrayKVRefSize; i++) {
- PTable.ImmutableStorageScheme scheme = EncodedColumnsUtil.getImmutableStorageScheme(scan);
- KeyValueColumnExpression kvExp = scheme != PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN ? new SingleCellColumnExpression(scheme)
- : new KeyValueColumnExpression();
- kvExp.readFields(input);
- arrayKVRefs.add(kvExp);
- }
- int arrayKVFuncSize = WritableUtils.readVInt(input);
- Expression[] arrayFuncRefs = new Expression[arrayKVFuncSize];
- for (int i = 0; i < arrayKVFuncSize; i++) {
- ArrayIndexFunction arrayIdxFunc = new ArrayIndexFunction();
- arrayIdxFunc.readFields(input);
- arrayFuncRefs[i] = arrayIdxFunc;
- }
- return arrayFuncRefs;
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- try {
- stream.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ DataInputStream input = new DataInputStream(stream);
+ if (shouldDecodeSpoolThreshold) {
+ // Read off the scan but ignore, we won't honor client sent thresholdbytes, but the
+ // one set on server
+ WritableUtils.readVInt(input);
+ }
+ int limit = WritableUtils.readVInt(input);
+ int estimatedRowSize = WritableUtils.readVInt(input);
+ int size = WritableUtils.readVInt(input);
+ List<OrderByExpression> orderByExpressions = Lists.newArrayListWithExpectedSize(size);
+ for (int i = 0; i < size; i++) {
+ OrderByExpression orderByExpression = new OrderByExpression();
+ orderByExpression.readFields(input);
+ orderByExpressions.add(orderByExpression);
+ }
+ PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+ ResultIterator inner = new RegionScannerResultIterator(s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
+ return new OrderedResultIterator(inner, orderByExpressions, spoolingEnabled,
+ thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize, getPageSizeMsForRegionScanner(scan));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
- }
-
- private RegionScanner getOffsetScanner(final RegionScanner s,
- final OffsetResultIterator iterator, final boolean isLastScan) throws IOException {
- final Tuple firstTuple;
- final Region region = getRegion();
- region.startRegionOperation();
- try {
- Tuple tuple = iterator.next();
- if (tuple == null && !isLastScan) {
- List<Cell> kvList = new ArrayList<Cell>(1);
- KeyValue kv = new KeyValue(QueryConstants.OFFSET_ROW_KEY_BYTES, QueryConstants.OFFSET_FAMILY,
- QueryConstants.OFFSET_COLUMN, PInteger.INSTANCE.toBytes(iterator.getRemainingOffset()));
- kvList.add(kv);
- Result r = Result.create(kvList);
- firstTuple = new ResultTuple(r);
- } else {
- firstTuple = tuple;
- }
- } catch (Throwable t) {
- ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
- return null;
- } finally {
- region.closeRegionOperation();
+ private Expression[] deserializeArrayPositionalExpressionInfoFromScan(Scan scan, RegionScanner s,
+ Set<KeyValueColumnExpression> arrayKVRefs) {
+ byte[] specificArrayIdx = scan.getAttribute(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX);
+ if (specificArrayIdx == null) {
+ return null;
+ }
+ ByteArrayInputStream stream = new ByteArrayInputStream(specificArrayIdx);
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ int arrayKVRefSize = WritableUtils.readVInt(input);
+ for (int i = 0; i < arrayKVRefSize; i++) {
+ PTable.ImmutableStorageScheme scheme = EncodedColumnsUtil.getImmutableStorageScheme(scan);
+ KeyValueColumnExpression kvExp = scheme != PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN ? new SingleCellColumnExpression(scheme)
+ : new KeyValueColumnExpression();
+ kvExp.readFields(input);
+ arrayKVRefs.add(kvExp);
+ }
+ int arrayKVFuncSize = WritableUtils.readVInt(input);
+ Expression[] arrayFuncRefs = new Expression[arrayKVFuncSize];
+ for (int i = 0; i < arrayKVFuncSize; i++) {
+ ArrayIndexFunction arrayIdxFunc = new ArrayIndexFunction();
+ arrayIdxFunc.readFields(input);
+ arrayFuncRefs[i] = arrayIdxFunc;
+ }
+ return arrayFuncRefs;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
- return new BaseRegionScanner(s) {
- private Tuple tuple = firstTuple;
- @Override
- public boolean isFilterDone() {
- return tuple == null;
- }
- @Override
- public boolean next(List<Cell> results) throws IOException {
+ private RegionScanner getOffsetScanner(final RegionScanner s,
+ final OffsetResultIterator iterator, final boolean isLastScan) throws IOException {
+ final Tuple firstTuple;
+ final Region region = getRegion();
+ region.startRegionOperation();
try {
- if (isFilterDone()) { return false; }
- for (int i = 0; i < tuple.size(); i++) {
- results.add(tuple.getValue(i));
- }
- tuple = iterator.next();
- return !isFilterDone();
+ Tuple tuple = iterator.next();
+ if (tuple == null && !isLastScan) {
+ List<Cell> kvList = new ArrayList<Cell>(1);
+ KeyValue kv = new KeyValue(QueryConstants.OFFSET_ROW_KEY_BYTES, QueryConstants.OFFSET_FAMILY,
+ QueryConstants.OFFSET_COLUMN, PInteger.INSTANCE.toBytes(iterator.getRemainingOffset()));
+ kvList.add(kv);
+ Result r = Result.create(kvList);
+ firstTuple = new ResultTuple(r);
+ } else {
+ firstTuple = tuple;
+ }
} catch (Throwable t) {
- ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
- return false;
+ ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
+ return null;
+ } finally {
+ region.closeRegionOperation();
}
- }
+ return new BaseRegionScanner(s) {
+ private Tuple tuple = firstTuple;
- @Override
- public void close() throws IOException {
- try {
- s.close();
- } finally {
- try {
- if (iterator != null) {
- iterator.close();
+ @Override
+ public boolean isFilterDone() {
+ return tuple == null;
}
- } catch (SQLException e) {
- ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), e);
- }
- }
- }
- };
- }
- /**
- * Return region scanner that does TopN.
- * We only need to call startRegionOperation and closeRegionOperation when
- * getting the first Tuple (which forces running through the entire region)
- * since after this everything is held in memory
- */
- private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env, final RegionScanner s,
- final OrderedResultIterator iterator, ImmutableBytesPtr tenantId) throws Throwable {
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
+ try {
+ if (isFilterDone()) { return false; }
+ for (int i = 0; i < tuple.size(); i++) {
+ results.add(tuple.getValue(i));
+ }
+ tuple = iterator.next();
+ return !isFilterDone();
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
+ return false;
+ }
+ }
- final Tuple firstTuple;
- TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId);
- long estSize = iterator.getEstimatedByteSize();
- final MemoryManager.MemoryChunk chunk = tenantCache.getMemoryManager().allocate(estSize);
- final Region region = getRegion();
- region.startRegionOperation();
- try {
- // Once we return from the first call to next, we've run through and cached
- // the topN rows, so we no longer need to start/stop a region operation.
- firstTuple = iterator.next();
- // Now that the topN are cached, we can resize based on the real size
- long actualSize = iterator.getByteSize();
- chunk.resize(actualSize);
- } catch (Throwable t) {
- ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
- return null;
- } finally {
- region.closeRegionOperation();
+ @Override
+ public void close() throws IOException {
+ try {
+ s.close();
+ } finally {
+ try {
+ if (iterator != null) {
+ iterator.close();
+ }
+ } catch (SQLException e) {
+ ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), e);
+ }
+ }
+ }
+ };
}
- return new BaseRegionScanner(s) {
- private Tuple tuple = firstTuple;
- @Override
- public boolean isFilterDone() {
- return tuple == null;
- }
+ /**
+ * Return region scanner that does TopN.
+ * We only need to call startRegionOperation and closeRegionOperation when
+ * getting the first Tuple (which forces running through the entire region)
+ * since after this everything is held in memory
+ */
+ private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env, final RegionScanner s,
+ final OrderedResultIterator iterator, ImmutableBytesPtr tenantId) throws Throwable {
- @Override
- public boolean next(List<Cell> results) throws IOException {
+ final Tuple firstTuple;
+ TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId);
+ long estSize = iterator.getEstimatedByteSize();
+ final MemoryManager.MemoryChunk chunk = tenantCache.getMemoryManager().allocate(estSize);
+ final Region region = getRegion();
+ region.startRegionOperation();
try {
- if (isFilterDone()) {
- return false;
- }
- if (isDummy(tuple)) {
- getDummyResult(results);
- } else {
- for (int i = 0; i < tuple.size(); i++) {
- results.add(tuple.getValue(i));
- }
- }
- tuple = iterator.next();
- return !isFilterDone();
+ // Once we return from the first call to next, we've run through and cached
+ // the topN rows, so we no longer need to start/stop a region operation.
+ firstTuple = iterator.next();
+ // Now that the topN are cached, we can resize based on the real size
+ long actualSize = iterator.getByteSize();
+ chunk.resize(actualSize);
} catch (Throwable t) {
- ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
- return false;
+ ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+ return null;
+ } finally {
+ region.closeRegionOperation();
}
- }
+ return new BaseRegionScanner(s) {
+ private Tuple tuple = firstTuple;
- @Override
- public void close() throws IOException {
- try {
- s.close();
- } finally {
- try {
- if(iterator != null) {
- iterator.close();
+ @Override
+ public boolean isFilterDone() {
+ return tuple == null;
}
- } catch (SQLException e) {
- ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), e);
- } finally {
- chunk.close();
- }
- }
- }
- };
- }
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
+ try {
+ if (isFilterDone()) {
+ return false;
+ }
+ if (isDummy(tuple)) {
+ getDummyResult(results);
+ } else {
+ for (int i = 0; i < tuple.size(); i++) {
+ results.add(tuple.getValue(i));
+ }
+ }
+ tuple = iterator.next();
+ return !isFilterDone();
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ s.close();
+ } finally {
+ try {
+ if(iterator != null) {
+ iterator.close();
+ }
+ } catch (SQLException e) {
+ ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), e);
+ } finally {
+ chunk.close();
+ }
+ }
+ }
+ };
+ }
}