You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ap...@apache.org on 2015/02/22 05:47:47 UTC

[2/3] phoenix git commit: PHOENIX-1672 RegionScanner.nextRaw contract not implemented correctly

PHOENIX-1672 RegionScanner.nextRaw contract not implemented correctly


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3d50147f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3d50147f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3d50147f

Branch: refs/heads/master
Commit: 3d50147f213dd3f830b039159fde68eae10ae233
Parents: c633151
Author: Andrew Purtell <ap...@apache.org>
Authored: Sat Feb 21 20:34:08 2015 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Feb 21 20:34:08 2015 -0800

----------------------------------------------------------------------
 .../GroupedAggregateRegionObserver.java         |  96 ++++----
 .../UngroupedAggregateRegionObserver.java       | 246 ++++++++++---------
 .../phoenix/index/PhoenixIndexBuilder.java      |  18 +-
 .../iterate/RegionScannerResultIterator.java    |  36 +--
 4 files changed, 216 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3d50147f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 8b59b85..0984b06 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -375,7 +375,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
      * @param limit TODO
      */
     private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
-            final RegionScanner s, final List<Expression> expressions,
+            final RegionScanner scanner, final List<Expression> expressions,
             final ServerAggregators aggregators, long limit) throws IOException {
         if (logger.isDebugEnabled()) {
             logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan
@@ -410,28 +410,30 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
             HRegion region = c.getEnvironment().getRegion();
             region.startRegionOperation();
             try {
-                do {
-                    List<Cell> results = new ArrayList<Cell>();
-                    // Results are potentially returned even when the return
-                    // value of s.next is false
-                    // since this is an indication of whether or not there are
-                    // more values after the
-                    // ones returned
-                    hasMore = s.nextRaw(results);
-                    if (!results.isEmpty()) {
-                        result.setKeyValues(results);
-                        ImmutableBytesWritable key =
+                synchronized (scanner) {
+                    do {
+                        List<Cell> results = new ArrayList<Cell>();
+                        // Results are potentially returned even when the return
+                        // value of s.next is false
+                        // since this is an indication of whether or not there are
+                        // more values after the
+                        // ones returned
+                        hasMore = scanner.nextRaw(results);
+                        if (!results.isEmpty()) {
+                            result.setKeyValues(results);
+                            ImmutableBytesWritable key =
                                 TupleUtil.getConcatenatedValue(result, expressions);
-                        Aggregator[] rowAggregators = groupByCache.cache(key);
-                        // Aggregate values here
-                        aggregators.aggregate(rowAggregators, result);
-                    }
-                } while (hasMore && groupByCache.size() < limit);
+                            Aggregator[] rowAggregators = groupByCache.cache(key);
+                            // Aggregate values here
+                            aggregators.aggregate(rowAggregators, result);
+                        }
+                    } while (hasMore && groupByCache.size() < limit);
+                }
             } finally {
                 region.closeRegionOperation();
             }
 
-            RegionScanner regionScanner = groupByCache.getScanner(s);
+            RegionScanner regionScanner = groupByCache.getScanner(scanner);
 
             // Do not sort here, but sort back on the client instead
             // The reason is that if the scan ever extends beyond a region
@@ -453,7 +455,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
      * @throws IOException 
      */
     private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
-            final Scan scan, final RegionScanner s, final List<Expression> expressions,
+            final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
             final ServerAggregators aggregators, final long limit) throws IOException {
 
         if (logger.isDebugEnabled()) {
@@ -466,12 +468,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
 
             @Override
             public HRegionInfo getRegionInfo() {
-                return s.getRegionInfo();
+                return scanner.getRegionInfo();
             }
 
             @Override
             public void close() throws IOException {
-                s.close();
+                scanner.close();
             }
 
             @Override
@@ -488,32 +490,36 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                 HRegion region = c.getEnvironment().getRegion();
                 region.startRegionOperation();
                 try {
-                    do {
-                        List<Cell> kvs = new ArrayList<Cell>();
-                        // Results are potentially returned even when the return
-                        // value of s.next is false
-                        // since this is an indication of whether or not there
-                        // are more values after the
-                        // ones returned
-                        hasMore = s.nextRaw(kvs);
-                        if (!kvs.isEmpty()) {
-                            result.setKeyValues(kvs);
-                            key = TupleUtil.getConcatenatedValue(result, expressions);
-                            aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
-                            if (!aggBoundary) {
-                                aggregators.aggregate(rowAggregators, result);
-                                if (logger.isDebugEnabled()) {
-                                    logger.debug(LogUtil.addCustomAnnotations("Row passed filters: " + kvs
+                    synchronized (scanner) {
+                        do {
+                            List<Cell> kvs = new ArrayList<Cell>();
+                            // Results are potentially returned even when the return
+                            // value of s.next is false
+                            // since this is an indication of whether or not there
+                            // are more values after the
+                            // ones returned
+                            hasMore = scanner.nextRaw(kvs);
+                            if (!kvs.isEmpty()) {
+                                result.setKeyValues(kvs);
+                                key = TupleUtil.getConcatenatedValue(result, expressions);
+                                aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
+                                if (!aggBoundary) {
+                                    aggregators.aggregate(rowAggregators, result);
+                                    if (logger.isDebugEnabled()) {
+                                        logger.debug(LogUtil.addCustomAnnotations(
+                                            "Row passed filters: " + kvs
                                             + ", aggregated values: "
-                                            + Arrays.asList(rowAggregators), ScanUtil.getCustomAnnotations(scan)));
+                                            + Arrays.asList(rowAggregators),
+                                            ScanUtil.getCustomAnnotations(scan)));
+                                    }
+                                    currentKey = key;
                                 }
-                                currentKey = key;
                             }
-                        }
-                        atLimit = rowCount + countOffset >= limit;
-                        // Do rowCount + 1 b/c we don't have to wait for a complete
-                        // row in the case of a DISTINCT with a LIMIT
-                    } while (hasMore && !aggBoundary && !atLimit);
+                            atLimit = rowCount + countOffset >= limit;
+                            // Do rowCount + 1 b/c we don't have to wait for a complete
+                            // row in the case of a DISTINCT with a LIMIT
+                        } while (hasMore && !aggBoundary && !atLimit);
+                    }
                 } finally {
                     region.closeRegionOperation();
                 }
@@ -555,7 +561,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
             
             @Override
             public long getMaxResultSize() {
-                return s.getMaxResultSize();
+                return scanner.getMaxResultSize();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3d50147f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index a3b2faa..71c4dc6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -253,128 +253,152 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
         final RegionScanner innerScanner = theScanner;
         region.startRegionOperation();
         try {
-            do {
-                List<Cell> results = new ArrayList<Cell>();
-                // Results are potentially returned even when the return value of s.next is false
-                // since this is an indication of whether or not there are more values after the
-                // ones returned
-                hasMore = innerScanner.nextRaw(results);
-                if(stats != null) {
-                    stats.collectStatistics(results);
-                }
-                
-                if (!results.isEmpty()) {
-                    rowCount++;
-                    result.setKeyValues(results);
-                    try {
-                        if (buildLocalIndex) {
-                            for (IndexMaintainer maintainer : indexMaintainers) {
-                                if (!results.isEmpty()) {
-                                    result.getKey(ptr);
-                                    ValueGetter valueGetter = maintainer.createGetterFromKeyValues(ImmutableBytesPtr.copyBytesIfNecessary(ptr),results);
-                                    Put put = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, c.getEnvironment().getRegion().getStartKey(), c.getEnvironment().getRegion().getEndKey());
-                                    indexMutations.add(put);
+            synchronized (innerScanner) {
+                do {
+                    List<Cell> results = new ArrayList<Cell>();
+                    // Results are potentially returned even when the return value of s.next is false
+                    // since this is an indication of whether or not there are more values after the
+                    // ones returned
+                    hasMore = innerScanner.nextRaw(results);
+                    if (stats != null) {
+                        stats.collectStatistics(results);
+                    }
+                    if (!results.isEmpty()) {
+                        rowCount++;
+                        result.setKeyValues(results);
+                        try {
+                            if (buildLocalIndex) {
+                                for (IndexMaintainer maintainer : indexMaintainers) {
+                                    if (!results.isEmpty()) {
+                                        result.getKey(ptr);
+                                        ValueGetter valueGetter =
+                                            maintainer.createGetterFromKeyValues(
+                                                ImmutableBytesPtr.copyBytesIfNecessary(ptr),
+                                                results);
+                                        Put put = maintainer.buildUpdateMutation(kvBuilder,
+                                            valueGetter, ptr, ts,
+                                            c.getEnvironment().getRegion().getStartKey(),
+                                            c.getEnvironment().getRegion().getEndKey());
+                                        indexMutations.add(put);
+                                    }
                                 }
-                            }
-                            result.setKeyValues(results);
-                        } else if (isDelete) {
-                            // FIXME: the version of the Delete constructor without the lock args was introduced
-                            // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
-                            // of the client.
-                            Cell firstKV = results.get(0);
-                            Delete delete = new Delete(firstKV.getRowArray(), firstKV.getRowOffset(), 
-                                firstKV.getRowLength(),ts);
-                            mutations.add(delete);
-                        } else if (isUpsert) {
-                            Arrays.fill(values, null);
-                            int i = 0;
-                            List<PColumn> projectedColumns = projectedTable.getColumns();
-                            for (; i < projectedTable.getPKColumns().size(); i++) {
-                                Expression expression = selectExpressions.get(i);
-                                if (expression.evaluate(result, ptr)) {
-                                    values[i] = ptr.copyBytes();
-                                    // If SortOrder from expression in SELECT doesn't match the
-                                    // column being projected into then invert the bits.
-                                    if (expression.getSortOrder() != projectedColumns.get(i).getSortOrder()) {
-                                        SortOrder.invert(values[i], 0, values[i], 0, values[i].length);
+                                result.setKeyValues(results);
+                            } else if (isDelete) {
+                                // FIXME: the version of the Delete constructor without the lock
+                                // args was introduced in 0.94.4, thus if we try to use it here
+                                // we can no longer use the 0.94.2 version of the client.
+                              Cell firstKV = results.get(0);
+                              Delete delete = new Delete(firstKV.getRowArray(),
+                                  firstKV.getRowOffset(), firstKV.getRowLength(),ts);
+                              mutations.add(delete);
+                            } else if (isUpsert) {
+                                Arrays.fill(values, null);
+                                int i = 0;
+                                List<PColumn> projectedColumns = projectedTable.getColumns();
+                                for (; i < projectedTable.getPKColumns().size(); i++) {
+                                    Expression expression = selectExpressions.get(i);
+                                    if (expression.evaluate(result, ptr)) {
+                                        values[i] = ptr.copyBytes();
+                                        // If SortOrder from expression in SELECT doesn't match the
+                                        // column being projected into then invert the bits.
+                                        if (expression.getSortOrder() !=
+                                            projectedColumns.get(i).getSortOrder()) {
+                                            SortOrder.invert(values[i], 0, values[i], 0,
+                                                values[i].length);
+                                        }
                                     }
                                 }
-                            }
-                            projectedTable.newKey(ptr, values);
-                            PRow row = projectedTable.newRow(kvBuilder, ts, ptr);
-                            for (; i < projectedColumns.size(); i++) {
-                                Expression expression = selectExpressions.get(i);
-                                if (expression.evaluate(result, ptr)) {
-                                    PColumn column = projectedColumns.get(i);
-                                    Object value = expression.getDataType().toObject(ptr, column.getSortOrder());
-                                    // We are guaranteed that the two column will have the same type.
-                                    if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
-                                            expression.getMaxLength(),  expression.getScale(), 
-                                            column.getMaxLength(), column.getScale())) {
-                                        throw new ValueTypeIncompatibleException(column.getDataType(),
-                                                column.getMaxLength(), column.getScale());
+                                projectedTable.newKey(ptr, values);
+                                PRow row = projectedTable.newRow(kvBuilder, ts, ptr);
+                                for (; i < projectedColumns.size(); i++) {
+                                    Expression expression = selectExpressions.get(i);
+                                    if (expression.evaluate(result, ptr)) {
+                                        PColumn column = projectedColumns.get(i);
+                                        Object value = expression.getDataType()
+                                            .toObject(ptr, column.getSortOrder());
+                                        // We are guaranteed that the two column will have the
+                                        // same type.
+                                        if (!column.getDataType().isSizeCompatible(ptr, value,
+                                                column.getDataType(), expression.getMaxLength(),
+                                                expression.getScale(), column.getMaxLength(),
+                                                column.getScale())) {
+                                            throw new ValueTypeIncompatibleException(
+                                                column.getDataType(), column.getMaxLength(),
+                                                column.getScale());
+                                        }
+                                        column.getDataType().coerceBytes(ptr, value,
+                                            expression.getDataType(), expression.getMaxLength(),
+                                            expression.getScale(), expression.getSortOrder(), 
+                                            column.getMaxLength(), column.getScale(),
+                                            column.getSortOrder());
+                                        byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                                        row.setValue(column, bytes);
                                     }
-                                    column.getDataType().coerceBytes(ptr, value, expression.getDataType(),
-                                            expression.getMaxLength(), expression.getScale(), expression.getSortOrder(), 
-                                            column.getMaxLength(), column.getScale(), column.getSortOrder());
-                                    byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                                    row.setValue(column, bytes);
+                                }
+                                for (Mutation mutation : row.toRowMutations()) {
+                                    mutations.add(mutation);
+                                }
+                                for (i = 0; i < selectExpressions.size(); i++) {
+                                    selectExpressions.get(i).reset();
+                                }
+                            } else if (deleteCF != null && deleteCQ != null) {
+                                // No need to search for delete column, since we project only it
+                                // if no empty key value is being set
+                                if (emptyCF == null ||
+                                        result.getValue(deleteCF, deleteCQ) != null) {
+                                    Delete delete = new Delete(results.get(0).getRowArray(),
+                                        results.get(0).getRowOffset(),
+                                        results.get(0).getRowLength());
+                                    delete.deleteColumns(deleteCF,  deleteCQ, ts);
+                                    mutations.add(delete);
                                 }
                             }
-                            for (Mutation mutation : row.toRowMutations()) {
-                                mutations.add(mutation);
-                            }
-                            for (i = 0; i < selectExpressions.size(); i++) {
-                                selectExpressions.get(i).reset();
+                            if (emptyCF != null) {
+                                /*
+                                 * If we've specified an emptyCF, then we need to insert an empty
+                                 * key value "retroactively" for any key value that is visible at
+                                 * the timestamp that the DDL was issued. Key values that are not
+                                 * visible at this timestamp will not ever be projected up to
+                                 * scans past this timestamp, so don't need to be considered.
+                                 * We insert one empty key value per row per timestamp.
+                                 */
+                                Set<Long> timeStamps =
+                                    Sets.newHashSetWithExpectedSize(results.size());
+                                for (Cell kv : results) {
+                                    long kvts = kv.getTimestamp();
+                                    if (!timeStamps.contains(kvts)) {
+                                        Put put = new Put(kv.getRowArray(), kv.getRowOffset(),
+                                            kv.getRowLength());
+                                        put.add(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts,
+                                            ByteUtil.EMPTY_BYTE_ARRAY);
+                                        mutations.add(put);
+                                    }
+                                }
                             }
-                        } else if (deleteCF != null && deleteCQ != null) {
-                            // No need to search for delete column, since we project only it
-                            // if no empty key value is being set
-                            if (emptyCF == null || result.getValue(deleteCF, deleteCQ) != null) {
-                                Delete delete = new Delete(results.get(0).getRowArray(), results.get(0).getRowOffset(), 
-                                  results.get(0).getRowLength());
-                                delete.deleteColumns(deleteCF,  deleteCQ, ts);
-                                mutations.add(delete);
+                            // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
+                            if (!mutations.isEmpty() && batchSize > 0 &&
+                                    mutations.size() % batchSize == 0) {
+                                commitBatch(region, mutations, indexUUID);
+                                mutations.clear();
                             }
-                        }
-                        if (emptyCF != null) {
-                            /*
-                             * If we've specified an emptyCF, then we need to insert an empty
-                             * key value "retroactively" for any key value that is visible at
-                             * the timestamp that the DDL was issued. Key values that are not
-                             * visible at this timestamp will not ever be projected up to
-                             * scans past this timestamp, so don't need to be considered.
-                             * We insert one empty key value per row per timestamp.
-                             */
-                            Set<Long> timeStamps = Sets.newHashSetWithExpectedSize(results.size());
-                            for (Cell kv : results) {
-                                long kvts = kv.getTimestamp();
-                                if (!timeStamps.contains(kvts)) {
-                                    Put put = new Put(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
-                                    put.add(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts, ByteUtil.EMPTY_BYTE_ARRAY);
-                                    mutations.add(put);
-                                }
+                            // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
+                            if (!indexMutations.isEmpty() && batchSize > 0 &&
+                                    indexMutations.size() % batchSize == 0) {
+                                commitIndexMutations(c, region, indexMutations);
                             }
+                        } catch (ConstraintViolationException e) {
+                            // Log and ignore in count
+                            logger.error(LogUtil.addCustomAnnotations("Failed to create row in " +
+                                region.getRegionNameAsString() + " with values " +
+                                SchemaUtil.toString(values),
+                                ScanUtil.getCustomAnnotations(scan)), e);
+                            continue;
                         }
-                        // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
-                        if (!mutations.isEmpty() && batchSize > 0 && mutations.size() % batchSize == 0) {
-                            commitBatch(region, mutations, indexUUID);
-                            mutations.clear();
-                        }
-                        // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
-                        if (!indexMutations.isEmpty() && batchSize > 0 && indexMutations.size() % batchSize == 0) {
-                            commitIndexMutations(c, region, indexMutations);
-                        }
-
-                    } catch (ConstraintViolationException e) {
-                        // Log and ignore in count
-                        logger.error(LogUtil.addCustomAnnotations("Failed to create row in " + region.getRegionNameAsString() + " with values " + SchemaUtil.toString(values), ScanUtil.getCustomAnnotations(scan)), e);
-                        continue;
+                        aggregators.aggregate(rowAggregators, result);
+                        hasAny = true;
                     }
-                    aggregators.aggregate(rowAggregators, result);
-                    hasAny = true;
-                }
-            } while (hasMore);
+                } while (hasMore);
+            }
         } finally {
             try {
                 if (stats != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3d50147f/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index a0bd7c5..b89c807 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -78,14 +78,16 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
         // Run through the scanner using internal nextRaw method
         region.startRegionOperation();
         try {
-            boolean hasMore;
-            do {
-                List<Cell> results = Lists.newArrayList();
-                // Results are potentially returned even when the return value of s.next is false
-                // since this is an indication of whether or not there are more values after the
-                // ones returned
-                hasMore = scanner.nextRaw(results);
-            } while (hasMore);
+            synchronized (scanner) {
+                boolean hasMore;
+                do {
+                    List<Cell> results = Lists.newArrayList();
+                    // Results are potentially returned even when the return value of s.next is
+                    // false since this is an indication of whether or not there are more values
+                    // after the ones returned
+                    hasMore = scanner.nextRaw(results);
+                } while (hasMore);
+            }
         } finally {
             try {
                 scanner.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3d50147f/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index bff0936..88e141a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -38,23 +38,27 @@ public class RegionScannerResultIterator extends BaseResultIterator {
     
     @Override
     public Tuple next() throws SQLException {
-        try {
-            // TODO: size
-            List<Cell> results = new ArrayList<Cell>();
-            // Results are potentially returned even when the return value of s.next is false
-            // since this is an indication of whether or not there are more values after the
-            // ones returned
-            boolean hasMore = scanner.nextRaw(results);
-            if (!hasMore && results.isEmpty()) {
-                return null;
+        // XXX: No access here to the region instance to enclose this with startRegionOperation / 
+        // stopRegionOperation 
+        synchronized (scanner) {
+            try {
+                // TODO: size
+                List<Cell> results = new ArrayList<Cell>();
+                // Results are potentially returned even when the return value of s.next is false
+                // since this is an indication of whether or not there are more values after the
+                // ones returned
+                boolean hasMore = scanner.nextRaw(results);
+                if (!hasMore && results.isEmpty()) {
+                    return null;
+                }
+                // We instantiate a new tuple because in all cases currently we hang on to it
+                // (i.e. to compute and hold onto the TopN).
+                MultiKeyValueTuple tuple = new MultiKeyValueTuple();
+                tuple.setKeyValues(results);
+                return tuple;
+            } catch (IOException e) {
+                throw ServerUtil.parseServerException(e);
             }
-            // We instantiate a new tuple because in all cases currently we hang on to it (i.e.
-            // to compute and hold onto the TopN).
-            MultiKeyValueTuple tuple = new MultiKeyValueTuple();
-            tuple.setKeyValues(results);
-            return tuple;
-        } catch (IOException e) {
-            throw ServerUtil.parseServerException(e);
         }
     }