You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/06/27 16:21:04 UTC

phoenix git commit: PHOENIX-3023 Slow performance when limit queries are executed in parallel by default

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 8cec07dca -> e0bf41f98


PHOENIX-3023 Slow performance when limit queries are executed in parallel by default


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e0bf41f9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e0bf41f9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e0bf41f9

Branch: refs/heads/4.x-HBase-0.98
Commit: e0bf41f98bfb72c094237fdc3465523e04c1c0ef
Parents: 8cec07d
Author: Samarth <sa...@salesforce.com>
Authored: Mon Jun 27 09:20:51 2016 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Mon Jun 27 09:20:51 2016 -0700

----------------------------------------------------------------------
 .../apache/phoenix/execute/AggregatePlan.java   |  2 +-
 .../org/apache/phoenix/execute/ScanPlan.java    | 65 ++++++++++----------
 .../phoenix/iterate/BaseResultIterators.java    | 14 ++++-
 .../phoenix/iterate/ParallelIterators.java      | 32 ++++++----
 .../phoenix/iterate/TableResultIterator.java    |  4 +-
 .../java/org/apache/phoenix/util/ScanUtil.java  |  2 +-
 .../compile/StatementHintsCompilationTest.java  |  8 +--
 .../query/ParallelIteratorsSplitTest.java       |  2 +-
 8 files changed, 75 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index afb187a..82d854b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -215,7 +215,7 @@ public class AggregatePlan extends BaseQueryPlan {
         }
         BaseResultIterators iterators = hasSerialHint && canBeExecutedSerially
                 ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan)
-                : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan);
+                : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false);
 
         splits = iterators.getSplits();
         scans = iterators.getScans();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 0975b3f..31354be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -62,7 +62,6 @@ import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.PTableStats;
-import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -101,25 +100,28 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     private static boolean isSerial(StatementContext context, FilterableStatement statement,
-            TableRef tableRef, OrderBy orderBy, Integer limit, Integer offset, boolean allowPageFilter) throws SQLException {
-        PTable table = tableRef.getTable();
-        boolean hasSerialHint = statement.getHint().hasHint(HintNode.Hint.SERIAL);
-        boolean canBeExecutedSerially = ScanUtil.canQueryBeExecutedSerially(table, orderBy, context); 
-        if (!canBeExecutedSerially) { 
-            if (hasSerialHint) {
-                logger.warn("This query cannot be executed serially. Ignoring the hint");
+            TableRef tableRef, OrderBy orderBy, boolean isDataWithinThreshold) throws SQLException {
+        if (isDataWithinThreshold) {
+            PTable table = tableRef.getTable();
+            boolean hasSerialHint = statement.getHint().hasHint(HintNode.Hint.SERIAL);
+            boolean canBeExecutedSerially = ScanUtil.canQueryBeExecutedSerially(table, orderBy, context); 
+            if (!canBeExecutedSerially) { 
+                if (hasSerialHint) {
+                    logger.warn("This query cannot be executed serially. Ignoring the hint");
+                }
+                return false;
             }
-            return false;
-        } else if (hasSerialHint) {
             return true;
         }
-        
+        return false;
+    }
+    
+    private static boolean isAmountOfDataToScanWithinThreshold(StatementContext context, PTable table, Integer perScanLimit) throws SQLException {
         Scan scan = context.getScan();
         /*
-         * If a limit is provided and we have no filter, run the scan serially when we estimate that
-         * the limit's worth of data is less than the threshold bytes provided in QueryServices.QUERY_PARALLEL_LIMIT_THRESHOLD
+         * If a limit is not provided or if we have a filter, then we are not able to decide whether
+         * the amount of data we need to scan is less than the threshold.
          */
-        Integer perScanLimit = !allowPageFilter ? null : limit;
         if (perScanLimit == null || scan.getFilter() != null) {
             return false;
         }
@@ -129,7 +131,7 @@ public class ScanPlan extends BaseQueryPlan {
         ConnectionQueryServices services = context.getConnection().getQueryServices();
         long estRowSize;
         long estimatedParallelThresholdBytes;
-        if (gpsInfo == null) {
+        if (gpsInfo == null || gpsInfo.getGuidePostsCount() == 0) {
             estRowSize = SchemaUtil.estimateRowSize(table);
             estimatedParallelThresholdBytes = services.getProps().getLong(HConstants.HREGION_MAX_FILESIZE,
                     HConstants.DEFAULT_MAX_FILE_SIZE);
@@ -149,19 +151,13 @@ public class ScanPlan extends BaseQueryPlan {
         }
         long limitThreshold = services.getProps().getLong(QueryServices.QUERY_PARALLEL_LIMIT_THRESHOLD,
                 estimatedParallelThresholdBytes);
-        boolean isSerial = (perScanLimit * estRowSize < limitThreshold);
-
-        if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations(
-                "With LIMIT=" + perScanLimit + ", estimated row size=" + estRowSize + ", limitThreshold="
-                        + limitThreshold + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution",
-                context.getConnection()));
-        return isSerial;
+        return (perScanLimit * estRowSize < limitThreshold);
     }
     
     private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context, FilterableStatement statement,
-            TableRef table, OrderBy orderBy, Integer limit,Integer offset, boolean allowPageFilter) throws SQLException {
+            TableRef tableRef, OrderBy orderBy, Integer limit,Integer offset, boolean allowPageFilter) throws SQLException {
 
-        if ((isSerial(context, statement, table, orderBy, limit, offset, allowPageFilter)
+        if ((isSerial(context, statement, tableRef, orderBy, isAmountOfDataToScanWithinThreshold(context, tableRef.getTable(), QueryUtil.getOffsetLimit(limit, offset)))
                 || isRoundRobinPossible(orderBy, context) || isPacingScannersPossible(context))) {
             return ParallelIteratorFactory.NOOP_FACTORY;
         }
@@ -176,7 +172,7 @@ public class ScanPlan extends BaseQueryPlan {
             return spoolingResultIteratorFactory;
         } else {
             return new ChunkedResultIterator.ChunkedResultIteratorFactory(
-                    spoolingResultIteratorFactory, context.getConnection().getMutationState(), table);
+                    spoolingResultIteratorFactory, context.getConnection().getMutationState(), tableRef);
         }
     }
 
@@ -215,19 +211,26 @@ public class ScanPlan extends BaseQueryPlan {
          * limit is provided, run query serially.
          */
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
-        boolean isSerial = isSerial(context, statement, tableRef, orderBy, limit, offset, allowPageFilter);
         Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
-        if (perScanLimit != null) {
-            perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset);
-        }
-        BaseResultIterators iterators;
+        perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset);
+        boolean isDataWithinThreshold = isAmountOfDataToScanWithinThreshold(context, table, perScanLimit);
+        boolean isSerial = isSerial(context, statement, tableRef, orderBy, isDataWithinThreshold);
         boolean isOffsetOnServer = isOffsetPossibleOnServer(context, orderBy, offset, isSalted, table.getIndexType());
+        /*
+         * For queries that are doing a row key order by and are not possibly querying more than a
+         * threshold worth of data, then we only need to initialize scanners corresponding to the
+         * first (or last, if reverse) scan per region.
+         */
+        boolean initFirstScanOnly =
+                (orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || orderBy == OrderBy.REV_ROW_KEY_ORDER_BY)
+                        && isDataWithinThreshold; 
+        BaseResultIterators iterators;
         if (isOffsetOnServer) {
             iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan);
         } else if (isSerial) {
             iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan);
         } else {
-            iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan);
+            iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly);
         }
         splits = iterators.getSplits();
         scans = iterators.getScans();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 42fe0d9..7796a17 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -492,7 +492,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     private List<List<Scan>> getParallelScans() throws SQLException {
         // If the scan boundaries are not matching with scan in context that means we need to get
         // parallel scans for the chunk after split/merge.
-        if (!ScanUtil.isConextScan(scan, context)) {
+        if (!ScanUtil.isContextScan(scan, context)) {
             return getParallelScans(scan);
         }
         return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
@@ -919,11 +919,15 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     	private final int outerListIndex;
     	private final int innerListIndex;
     	private final Scan scan;
+    	private final boolean isFirstScan;
+    	private final boolean isLastScan;
     	
-    	public ScanLocator(Scan scan, int outerListIndex, int innerListIndex) {
+    	public ScanLocator(Scan scan, int outerListIndex, int innerListIndex, boolean isFirstScan, boolean isLastScan) {
     		this.outerListIndex = outerListIndex;
     		this.innerListIndex = innerListIndex;
     		this.scan = scan;
+    		this.isFirstScan = isFirstScan;
+    		this.isLastScan = isLastScan;
     	}
     	public int getOuterListIndex() {
     		return outerListIndex;
@@ -934,6 +938,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     	public Scan getScan() {
     		return scan;
     	}
+    	public boolean isFirstScan()  {
+    	    return isFirstScan;
+    	}
+    	public boolean isLastScan() {
+    	    return isLastScan;
+    	}
     }
     
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index d038f77..8c9b689 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -53,21 +53,23 @@ public class ParallelIterators extends BaseResultIterators {
 	private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class);
 	private static final String NAME = "PARALLEL";
     private final ParallelIteratorFactory iteratorFactory;
+    private final boolean initFirstScanOnly;
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly)
             throws SQLException {
         super(plan, perScanLimit, null, scanGrouper, scan);
         this.iteratorFactory = iteratorFactory;
+        this.initFirstScanOnly = initFirstScanOnly;
     }   
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion)
             throws SQLException {
-        this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan);
+        this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion);
     }  
 
     @Override
-    protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException {
+    protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
+            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, final boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
@@ -77,11 +79,12 @@ public class ParallelIterators extends BaseResultIterators {
         List<ScanLocator> scanLocations = Lists.newArrayListWithExpectedSize(estFlattenedSize);
         for (int i = 0; i < nestedScans.size(); i++) {
             List<Scan> scans = nestedScans.get(i);
-            List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(scans.size());
+            int numScans = scans.size();
+            List<Pair<Scan,Future<PeekingResultIterator>>> futures = Lists.newArrayListWithExpectedSize(numScans);
             nestedFutures.add(futures);
-            for (int j = 0; j < scans.size(); j++) {
+            for (int j = 0; j < numScans; j++) {
             	Scan scan = nestedScans.get(i).get(j);
-                scanLocations.add(new ScanLocator(scan, i, j));
+                scanLocations.add(new ScanLocator(scan, i, j, j == 0, (j == numScans - 1)));
                 futures.add(null); // placeholder
             }
         }
@@ -94,7 +97,7 @@ public class ParallelIterators extends BaseResultIterators {
         context.getOverallQueryMetrics().updateNumParallelScans(numScans);
         GLOBAL_NUM_PARALLEL_SCANS.update(numScans);
         final long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
-        for (ScanLocator scanLocation : scanLocations) {
+        for (final ScanLocator scanLocation : scanLocations) {
             final Scan scan = scanLocation.getScan();
             final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName);
             final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName);
@@ -105,13 +108,18 @@ public class ParallelIterators extends BaseResultIterators {
                 @Override
                 public PeekingResultIterator call() throws Exception {
                     long startTime = System.currentTimeMillis();
-                    tableResultItr.initScanner();
                     if (logger.isDebugEnabled()) {
                         logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
                     }
                     PeekingResultIterator iterator = iteratorFactory.newIterator(context, tableResultItr, scan, physicalTableName, ParallelIterators.this.plan);
-                    // Fill the scanner's cache. This helps reduce latency since we are parallelizing the I/O needed.
-                    iterator.peek();
+                    if (initFirstScanOnly) {
+                        if ((!isReverse && scanLocation.isFirstScan()) || (isReverse && scanLocation.isLastScan())) {
+                            // Fill the scanner's cache. This helps reduce latency since we are parallelizing the I/O needed.
+                            iterator.peek();
+                        }
+                    } else {
+                        iterator.peek();
+                    }
                     allIterators.add(iterator);
                     return iterator;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 9256278..48b763b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -17,8 +17,8 @@
  */
 package org.apache.phoenix.iterate;
 
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
@@ -40,7 +40,6 @@ import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
@@ -48,6 +47,7 @@ import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 711717a..d7f6f2f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -679,7 +679,7 @@ public class ScanUtil {
          }
     }
 
-    public static boolean isConextScan(Scan scan, StatementContext context) {
+    public static boolean isContextScan(Scan scan, StatementContext context) {
         return Bytes.compareTo(context.getScan().getStartRow(), scan.getStartRow()) == 0 && Bytes
                 .compareTo(context.getScan().getStopRow(), scan.getStopRow()) == 0;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
index 9adf414..394bf27 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/StatementHintsCompilationTest.java
@@ -106,12 +106,12 @@ public class StatementHintsCompilationTest extends BaseConnectionlessQueryTest {
 
     @Test
     public void testSerialHint() throws Exception {
-        // test ScanPlan
+        // test AggregatePlan
         String query = "SELECT /*+ SERIAL */ COUNT(*) FROM atable";
         assertTrue("Expected a SERIAL query", compileStatement(query).getExplainPlan().getPlanSteps().get(0).contains("SERIAL"));
 
-        // test AggregatePlan
-        query = "SELECT /*+ SERIAL */ * FROM atable";
-        assertTrue("Expected a SERIAL query", compileStatement(query).getExplainPlan().getPlanSteps().get(0).contains("SERIAL"));
+        // test ScanPlan
+        query = "SELECT /*+ SERIAL */ * FROM atable limit 10";
+        assertTrue("Expected a SERIAL query", compileStatement(query, Collections.emptyList(), 10).getExplainPlan().getPlanSteps().get(0).contains("SERIAL"));
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0bf41f9/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 1da68ba..05fbf81 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -472,7 +472,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
                 return false;
             }
             
-        }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan());
+        }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false);
         List<KeyRange> keyRanges = parallelIterators.getSplits();
         return keyRanges;
     }