You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2022/02/25 22:15:32 UTC

[phoenix] branch 5.1 updated: PHOENIX-6656 Reindent NonAggregateRegionScannerFactory

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new 04ff09e  PHOENIX-6656 Reindent NonAggregateRegionScannerFactory
04ff09e is described below

commit 04ff09e4cb3e02ac9b6c002cf78a9100a8c4012c
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Fri Feb 25 14:13:53 2022 -0800

    PHOENIX-6656 Reindent NonAggregateRegionScannerFactory
---
 .../iterate/NonAggregateRegionScannerFactory.java  | 558 ++++++++++-----------
 1 file changed, 279 insertions(+), 279 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index b1d8136..0cb5102 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -81,111 +81,111 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
 
 public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
 
-  public NonAggregateRegionScannerFactory(RegionCoprocessorEnvironment env) {
-    this.env = env;
-  }
+    public NonAggregateRegionScannerFactory(RegionCoprocessorEnvironment env) {
+        this.env = env;
+    }
 
-  @Override
-  public RegionScanner getRegionScanner(final Scan scan, final RegionScanner s) throws Throwable {
-      ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-    int offset = 0;
-    if (ScanUtil.isLocalIndex(scan)) {
+    @Override
+    public RegionScanner getRegionScanner(final Scan scan, final RegionScanner s) throws Throwable {
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        int offset = 0;
+        if (ScanUtil.isLocalIndex(scan)) {
             /*
              * For local indexes, we need to set an offset on row key expressions to skip
              * the region start key.
              */
-      Region region = getRegion();
-      offset = region.getRegionInfo().getStartKey().length != 0 ?
-          region.getRegionInfo().getStartKey().length :
-          region.getRegionInfo().getEndKey().length;
-      ScanUtil.setRowKeyOffset(scan, offset);
-    }
-    byte[] scanOffsetBytes = scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET);
-    Integer scanOffset = null;
-    if (scanOffsetBytes != null) {
-      scanOffset = (Integer)PInteger.INSTANCE.toObject(scanOffsetBytes);
-    }
-    RegionScanner innerScanner = s;
-    PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
-    boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
+            Region region = getRegion();
+            offset = region.getRegionInfo().getStartKey().length != 0 ?
+                    region.getRegionInfo().getStartKey().length :
+                    region.getRegionInfo().getEndKey().length;
+            ScanUtil.setRowKeyOffset(scan, offset);
+        }
+        byte[] scanOffsetBytes = scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET);
+        Integer scanOffset = null;
+        if (scanOffsetBytes != null) {
+            scanOffset = (Integer)PInteger.INSTANCE.toObject(scanOffsetBytes);
+        }
+        RegionScanner innerScanner = s;
+        PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+        boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
 
-    Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet();
-    KeyValueSchema kvSchema = null;
-    ValueBitSet kvSchemaBitSet = null;
-    Expression[] arrayFuncRefs = deserializeArrayPositionalExpressionInfoFromScan(scan, innerScanner, arrayKVRefs);
-    if (arrayFuncRefs != null) {
-        KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0);
-        for (Expression expression : arrayFuncRefs) {
-            builder.addField(expression);
+        Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet();
+        KeyValueSchema kvSchema = null;
+        ValueBitSet kvSchemaBitSet = null;
+        Expression[] arrayFuncRefs = deserializeArrayPositionalExpressionInfoFromScan(scan, innerScanner, arrayKVRefs);
+        if (arrayFuncRefs != null) {
+            KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0);
+            for (Expression expression : arrayFuncRefs) {
+                builder.addField(expression);
+            }
+            kvSchema = builder.build();
+            kvSchemaBitSet = ValueBitSet.newInstance(kvSchema);
+        }
+        TupleProjector tupleProjector = null;
+        Region dataRegion = null;
+        IndexMaintainer indexMaintainer = null;
+        byte[][] viewConstants = null;
+        PhoenixTransactionContext tx = null;
+        ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
+        if (dataColumns != null) {
+            tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
+            dataRegion = env.getRegion();
+            int clientVersion = ScanUtil.getClientVersion(scan);
+            List<IndexMaintainer> indexMaintainers =
+                    IndexUtil.deSerializeIndexMaintainersFromScan(scan);
+            indexMaintainer = indexMaintainers.get(0);
+            viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
+            byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
+            tx = TransactionFactory.getTransactionContext(txState, clientVersion);
         }
-        kvSchema = builder.build();
-        kvSchemaBitSet = ValueBitSet.newInstance(kvSchema);
-    }
-    TupleProjector tupleProjector = null;
-    Region dataRegion = null;
-    IndexMaintainer indexMaintainer = null;
-    byte[][] viewConstants = null;
-    PhoenixTransactionContext tx = null;
-    ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
-    if (dataColumns != null) {
-        tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
-        dataRegion = env.getRegion();
-        int clientVersion = ScanUtil.getClientVersion(scan);
-        List<IndexMaintainer> indexMaintainers =
-                IndexUtil.deSerializeIndexMaintainersFromScan(scan);
-        indexMaintainer = indexMaintainers.get(0);
-        viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
-        byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
-        tx = TransactionFactory.getTransactionContext(txState, clientVersion);
-    }
 
-    final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
-    final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
-    boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(getMinMaxQualifiersFromScan(scan))
-        && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null;
-    // setting dataRegion in case of a non-coprocessor environment
-    if (dataRegion == null &&
-        env.getConfiguration().get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY) != null) {
-      dataRegion = env.getRegion();
-    }
-    innerScanner = getWrappedScanner(env, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, dataColumns,
-        tupleProjector, dataRegion, indexMaintainer, tx, viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null,
-        ptr, useQualifierAsIndex);
+        final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
+        final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+        boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(getMinMaxQualifiersFromScan(scan))
+                && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null;
+        // setting dataRegion in case of a non-coprocessor environment
+        if (dataRegion == null &&
+                env.getConfiguration().get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY) != null) {
+            dataRegion = env.getRegion();
+        }
+        innerScanner = getWrappedScanner(env, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, dataColumns,
+                tupleProjector, dataRegion, indexMaintainer, tx, viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null,
+                ptr, useQualifierAsIndex);
 
-    final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
-    if (j != null) {
-        innerScanner = new HashJoinRegionScanner(env, innerScanner, scan, arrayKVRefs, arrayFuncRefs,
-                                                 p, j, tenantId, useQualifierAsIndex,
-                                                 useNewValueColumnQualifier);
-    }
-    if (scanOffset != null) {
-      innerScanner = getOffsetScanner(innerScanner, new OffsetResultIterator(
-              new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme),
-                      scanOffset, getPageSizeMsForRegionScanner(scan)),
-          scan.getAttribute(QueryConstants.LAST_SCAN) != null);
-    }
-    boolean spoolingEnabled =
-            env.getConfiguration().getBoolean(
-                QueryServices.SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB,
-                QueryServicesOptions.DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED);
-    long thresholdBytes =
-            env.getConfiguration().getLong(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB,
-                QueryServicesOptions.DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES);
-    final OrderedResultIterator iterator =
-            deserializeFromScan(scan, innerScanner, spoolingEnabled, thresholdBytes);
-    if (iterator == null) {
-      return innerScanner;
+        final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
+        if (j != null) {
+            innerScanner = new HashJoinRegionScanner(env, innerScanner, scan, arrayKVRefs, arrayFuncRefs,
+                    p, j, tenantId, useQualifierAsIndex,
+                    useNewValueColumnQualifier);
+        }
+        if (scanOffset != null) {
+            innerScanner = getOffsetScanner(innerScanner, new OffsetResultIterator(
+                            new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme),
+                            scanOffset, getPageSizeMsForRegionScanner(scan)),
+                    scan.getAttribute(QueryConstants.LAST_SCAN) != null);
+        }
+        boolean spoolingEnabled =
+                env.getConfiguration().getBoolean(
+                        QueryServices.SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+                        QueryServicesOptions.DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED);
+        long thresholdBytes =
+                env.getConfiguration().getLong(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB,
+                        QueryServicesOptions.DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES);
+        final OrderedResultIterator iterator =
+                deserializeFromScan(scan, innerScanner, spoolingEnabled, thresholdBytes);
+        if (iterator == null) {
+            return innerScanner;
+        }
+        // TODO:the above wrapped scanner should be used here also
+        return getTopNScanner(env, innerScanner, iterator, tenantId);
     }
-    // TODO:the above wrapped scanner should be used here also
-    return getTopNScanner(env, innerScanner, iterator, tenantId);
-  }
 
     @VisibleForTesting
     static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s,
-            boolean spoolingEnabled, long thresholdBytes) {
-    byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN);
-    if (topN == null) {
-      return null;
+                                                     boolean spoolingEnabled, long thresholdBytes) {
+        byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN);
+        if (topN == null) {
+            return null;
         }
         int clientVersion = ScanUtil.getClientVersion(scan);
         // Client including and after 4.15 and 5.1 are not going to serialize thresholdBytes
@@ -194,214 +194,214 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
                 (scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION) == null)
                         || (VersionUtil.decodeMajorVersion(clientVersion) > 5)
                         || (VersionUtil.decodeMajorVersion(clientVersion) == 5
-                                && clientVersion < MetaDataProtocol.MIN_5_x_DISABLE_SERVER_SPOOL_THRESHOLD)
+                        && clientVersion < MetaDataProtocol.MIN_5_x_DISABLE_SERVER_SPOOL_THRESHOLD)
                         || (VersionUtil.decodeMajorVersion(clientVersion) == 4
-                                && clientVersion < MetaDataProtocol.MIN_4_x_DISABLE_SERVER_SPOOL_THRESHOLD);
+                        && clientVersion < MetaDataProtocol.MIN_4_x_DISABLE_SERVER_SPOOL_THRESHOLD);
         ByteArrayInputStream stream = new ByteArrayInputStream(topN); // TODO: size?
         try {
-      DataInputStream input = new DataInputStream(stream);
-      if (shouldDecodeSpoolThreshold) {
-        // Read off the scan but ignore, we won't honor client sent thresholdbytes, but the
-        // one set on server 
-        WritableUtils.readVInt(input);
-      }
-      int limit = WritableUtils.readVInt(input);
-      int estimatedRowSize = WritableUtils.readVInt(input);
-      int size = WritableUtils.readVInt(input);
-      List<OrderByExpression> orderByExpressions = Lists.newArrayListWithExpectedSize(size);
-      for (int i = 0; i < size; i++) {
-        OrderByExpression orderByExpression = new OrderByExpression();
-        orderByExpression.readFields(input);
-        orderByExpressions.add(orderByExpression);
-      }
-      PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
-      ResultIterator inner = new RegionScannerResultIterator(s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
-      return new OrderedResultIterator(inner, orderByExpressions, spoolingEnabled,
-              thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize, getPageSizeMsForRegionScanner(scan));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } finally {
-      try {
-        stream.close();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  private Expression[] deserializeArrayPositionalExpressionInfoFromScan(Scan scan, RegionScanner s,
-                                                                        Set<KeyValueColumnExpression> arrayKVRefs) {
-    byte[] specificArrayIdx = scan.getAttribute(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX);
-    if (specificArrayIdx == null) {
-      return null;
-    }
-    ByteArrayInputStream stream = new ByteArrayInputStream(specificArrayIdx);
-    try {
-      DataInputStream input = new DataInputStream(stream);
-      int arrayKVRefSize = WritableUtils.readVInt(input);
-      for (int i = 0; i < arrayKVRefSize; i++) {
-        PTable.ImmutableStorageScheme scheme = EncodedColumnsUtil.getImmutableStorageScheme(scan);
-        KeyValueColumnExpression kvExp = scheme != PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN ? new SingleCellColumnExpression(scheme)
-            : new KeyValueColumnExpression();
-        kvExp.readFields(input);
-        arrayKVRefs.add(kvExp);
-      }
-      int arrayKVFuncSize = WritableUtils.readVInt(input);
-      Expression[] arrayFuncRefs = new Expression[arrayKVFuncSize];
-      for (int i = 0; i < arrayKVFuncSize; i++) {
-        ArrayIndexFunction arrayIdxFunc = new ArrayIndexFunction();
-        arrayIdxFunc.readFields(input);
-        arrayFuncRefs[i] = arrayIdxFunc;
-      }
-      return arrayFuncRefs;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } finally {
-      try {
-        stream.close();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+            DataInputStream input = new DataInputStream(stream);
+            if (shouldDecodeSpoolThreshold) {
+                // Read off the scan but ignore, we won't honor client sent thresholdbytes, but the
+                // one set on server
+                WritableUtils.readVInt(input);
+            }
+            int limit = WritableUtils.readVInt(input);
+            int estimatedRowSize = WritableUtils.readVInt(input);
+            int size = WritableUtils.readVInt(input);
+            List<OrderByExpression> orderByExpressions = Lists.newArrayListWithExpectedSize(size);
+            for (int i = 0; i < size; i++) {
+                OrderByExpression orderByExpression = new OrderByExpression();
+                orderByExpression.readFields(input);
+                orderByExpressions.add(orderByExpression);
+            }
+            PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+            ResultIterator inner = new RegionScannerResultIterator(s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
+            return new OrderedResultIterator(inner, orderByExpressions, spoolingEnabled,
+                    thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize, getPageSizeMsForRegionScanner(scan));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
     }
-  }
 
-
-  private RegionScanner getOffsetScanner(final RegionScanner s,
-      final OffsetResultIterator iterator, final boolean isLastScan) throws IOException {
-    final Tuple firstTuple;
-    final Region region = getRegion();
-    region.startRegionOperation();
-    try {
-      Tuple tuple = iterator.next();
-      if (tuple == null && !isLastScan) {
-        List<Cell> kvList = new ArrayList<Cell>(1);
-        KeyValue kv = new KeyValue(QueryConstants.OFFSET_ROW_KEY_BYTES, QueryConstants.OFFSET_FAMILY,
-            QueryConstants.OFFSET_COLUMN, PInteger.INSTANCE.toBytes(iterator.getRemainingOffset()));
-        kvList.add(kv);
-        Result r = Result.create(kvList);
-        firstTuple = new ResultTuple(r);
-      } else {
-        firstTuple = tuple;
-      }
-    } catch (Throwable t) {
-      ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
-      return null;
-    } finally {
-      region.closeRegionOperation();
+    private Expression[] deserializeArrayPositionalExpressionInfoFromScan(Scan scan, RegionScanner s,
+                                                                          Set<KeyValueColumnExpression> arrayKVRefs) {
+        byte[] specificArrayIdx = scan.getAttribute(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX);
+        if (specificArrayIdx == null) {
+            return null;
+        }
+        ByteArrayInputStream stream = new ByteArrayInputStream(specificArrayIdx);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            int arrayKVRefSize = WritableUtils.readVInt(input);
+            for (int i = 0; i < arrayKVRefSize; i++) {
+                PTable.ImmutableStorageScheme scheme = EncodedColumnsUtil.getImmutableStorageScheme(scan);
+                KeyValueColumnExpression kvExp = scheme != PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN ? new SingleCellColumnExpression(scheme)
+                        : new KeyValueColumnExpression();
+                kvExp.readFields(input);
+                arrayKVRefs.add(kvExp);
+            }
+            int arrayKVFuncSize = WritableUtils.readVInt(input);
+            Expression[] arrayFuncRefs = new Expression[arrayKVFuncSize];
+            for (int i = 0; i < arrayKVFuncSize; i++) {
+                ArrayIndexFunction arrayIdxFunc = new ArrayIndexFunction();
+                arrayIdxFunc.readFields(input);
+                arrayFuncRefs[i] = arrayIdxFunc;
+            }
+            return arrayFuncRefs;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
     }
-    return new BaseRegionScanner(s) {
-      private Tuple tuple = firstTuple;
 
-      @Override
-      public boolean isFilterDone() {
-        return tuple == null;
-      }
 
-      @Override
-      public boolean next(List<Cell> results) throws IOException {
+    private RegionScanner getOffsetScanner(final RegionScanner s,
+                                           final OffsetResultIterator iterator, final boolean isLastScan) throws IOException {
+        final Tuple firstTuple;
+        final Region region = getRegion();
+        region.startRegionOperation();
         try {
-          if (isFilterDone()) { return false; }
-          for (int i = 0; i < tuple.size(); i++) {
-            results.add(tuple.getValue(i));
-          }
-          tuple = iterator.next();
-          return !isFilterDone();
+            Tuple tuple = iterator.next();
+            if (tuple == null && !isLastScan) {
+                List<Cell> kvList = new ArrayList<Cell>(1);
+                KeyValue kv = new KeyValue(QueryConstants.OFFSET_ROW_KEY_BYTES, QueryConstants.OFFSET_FAMILY,
+                        QueryConstants.OFFSET_COLUMN, PInteger.INSTANCE.toBytes(iterator.getRemainingOffset()));
+                kvList.add(kv);
+                Result r = Result.create(kvList);
+                firstTuple = new ResultTuple(r);
+            } else {
+                firstTuple = tuple;
+            }
         } catch (Throwable t) {
-          ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
-          return false;
+            ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
+            return null;
+        } finally {
+            region.closeRegionOperation();
         }
-      }
+        return new BaseRegionScanner(s) {
+            private Tuple tuple = firstTuple;
 
-      @Override
-      public void close() throws IOException {
-        try {
-          s.close();
-        } finally {
-          try {
-            if (iterator != null) {
-              iterator.close();
+            @Override
+            public boolean isFilterDone() {
+                return tuple == null;
             }
-          } catch (SQLException e) {
-            ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), e);
-          }
-        }
-      }
-    };
-  }
 
-  /**
-   *  Return region scanner that does TopN.
-   *  We only need to call startRegionOperation and closeRegionOperation when
-   *  getting the first Tuple (which forces running through the entire region)
-   *  since after this everything is held in memory
-   */
-  private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env, final RegionScanner s,
-      final OrderedResultIterator iterator, ImmutableBytesPtr tenantId) throws Throwable {
+            @Override
+            public boolean next(List<Cell> results) throws IOException {
+                try {
+                    if (isFilterDone()) { return false; }
+                    for (int i = 0; i < tuple.size(); i++) {
+                        results.add(tuple.getValue(i));
+                    }
+                    tuple = iterator.next();
+                    return !isFilterDone();
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
+                    return false;
+                }
+            }
 
-    final Tuple firstTuple;
-    TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId);
-    long estSize = iterator.getEstimatedByteSize();
-    final MemoryManager.MemoryChunk chunk = tenantCache.getMemoryManager().allocate(estSize);
-    final Region region = getRegion();
-    region.startRegionOperation();
-    try {
-      // Once we return from the first call to next, we've run through and cached
-      // the topN rows, so we no longer need to start/stop a region operation.
-      firstTuple = iterator.next();
-      // Now that the topN are cached, we can resize based on the real size
-      long actualSize = iterator.getByteSize();
-      chunk.resize(actualSize);
-    } catch (Throwable t) {
-      ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
-      return null;
-    } finally {
-      region.closeRegionOperation();
+            @Override
+            public void close() throws IOException {
+                try {
+                    s.close();
+                } finally {
+                    try {
+                        if (iterator != null) {
+                            iterator.close();
+                        }
+                    } catch (SQLException e) {
+                        ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), e);
+                    }
+                }
+            }
+        };
     }
-    return new BaseRegionScanner(s) {
-      private Tuple tuple = firstTuple;
 
-      @Override
-      public boolean isFilterDone() {
-        return tuple == null;
-      }
+    /**
+     *  Return region scanner that does TopN.
+     *  We only need to call startRegionOperation and closeRegionOperation when
+     *  getting the first Tuple (which forces running through the entire region)
+     *  since after this everything is held in memory
+     */
+    private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env, final RegionScanner s,
+                                         final OrderedResultIterator iterator, ImmutableBytesPtr tenantId) throws Throwable {
 
-      @Override
-      public boolean next(List<Cell> results) throws IOException {
+        final Tuple firstTuple;
+        TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId);
+        long estSize = iterator.getEstimatedByteSize();
+        final MemoryManager.MemoryChunk chunk = tenantCache.getMemoryManager().allocate(estSize);
+        final Region region = getRegion();
+        region.startRegionOperation();
         try {
-          if (isFilterDone()) {
-            return false;
-          }
-          if (isDummy(tuple)) {
-            getDummyResult(results);
-          } else {
-            for (int i = 0; i < tuple.size(); i++) {
-              results.add(tuple.getValue(i));
-            }
-          }
-          tuple = iterator.next();
-          return !isFilterDone();
+            // Once we return from the first call to next, we've run through and cached
+            // the topN rows, so we no longer need to start/stop a region operation.
+            firstTuple = iterator.next();
+            // Now that the topN are cached, we can resize based on the real size
+            long actualSize = iterator.getByteSize();
+            chunk.resize(actualSize);
         } catch (Throwable t) {
-          ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
-          return false;
+            ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+            return null;
+        } finally {
+            region.closeRegionOperation();
         }
-      }
+        return new BaseRegionScanner(s) {
+            private Tuple tuple = firstTuple;
 
-      @Override
-      public void close() throws IOException {
-        try {
-          s.close();
-        } finally {
-          try {
-            if(iterator != null) {
-              iterator.close();
+            @Override
+            public boolean isFilterDone() {
+                return tuple == null;
             }
-          } catch (SQLException e) {
-            ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), e);
-          } finally {
-            chunk.close();
-          }
-        }
-      }
-    };
-  }
+
+            @Override
+            public boolean next(List<Cell> results) throws IOException {
+                try {
+                    if (isFilterDone()) {
+                        return false;
+                    }
+                    if (isDummy(tuple)) {
+                        getDummyResult(results);
+                    } else {
+                        for (int i = 0; i < tuple.size(); i++) {
+                            results.add(tuple.getValue(i));
+                        }
+                    }
+                    tuple = iterator.next();
+                    return !isFilterDone();
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+                    return false;
+                }
+            }
+
+            @Override
+            public void close() throws IOException {
+                try {
+                    s.close();
+                } finally {
+                    try {
+                        if(iterator != null) {
+                            iterator.close();
+                        }
+                    } catch (SQLException e) {
+                        ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), e);
+                    } finally {
+                        chunk.close();
+                    }
+                }
+            }
+        };
+    }
 }