You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2021/01/10 03:46:20 UTC

[phoenix] branch master updated: PHOENIX-6211 Paged scan filters

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new fe0b3ca  PHOENIX-6211 Paged scan filters
fe0b3ca is described below

commit fe0b3ca404284a860ab684f45ce1cd6d66a9792f
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Tue Nov 17 18:03:34 2020 -0800

    PHOENIX-6211 Paged scan filters
---
 .../apache/phoenix/end2end/SpillableGroupByIT.java |   2 +-
 .../java/org/apache/phoenix/end2end/ViewTTLIT.java | 285 +++++++++++---------
 .../phoenix/coprocessor/BaseRegionScanner.java     |   6 +
 .../coprocessor/BaseScannerRegionObserver.java     |  27 +-
 .../coprocessor/GlobalIndexRegionScanner.java      |  56 +++-
 .../GroupedAggregateRegionObserver.java            |  50 ++--
 .../phoenix/coprocessor/HashJoinRegionScanner.java |  29 ++-
 .../coprocessor/IndexRebuildRegionScanner.java     |  25 +-
 .../coprocessor/IndexRepairRegionScanner.java      |   4 +
 .../phoenix/coprocessor/IndexerRegionScanner.java  |   6 +
 .../phoenix/coprocessor/PagedRegionScanner.java    | 103 ++++++++
 .../coprocessor/PhoenixTTLRegionObserver.java      |  70 +++--
 .../UngroupedAggregateRegionObserver.java          |  30 +--
 .../UngroupedAggregateRegionScanner.java           |  41 ++-
 .../org/apache/phoenix/filter/DelegateFilter.java  |  10 +
 .../org/apache/phoenix/filter/PagedFilter.java     | 289 +++++++++++++++++++++
 .../apache/phoenix/index/GlobalIndexChecker.java   |  71 +++--
 .../iterate/NonAggregateRegionScannerFactory.java  |  20 +-
 .../phoenix/iterate/OffsetResultIterator.java      |  16 +-
 .../phoenix/iterate/OrderedResultIterator.java     |  52 +++-
 .../phoenix/iterate/RegionScannerFactory.java      |  27 +-
 .../iterate/RegionScannerResultIterator.java       |   7 +
 .../phoenix/iterate/ScanningResultIterator.java    |   2 +-
 .../phoenix/iterate/TableResultIterator.java       |  14 +-
 .../org/apache/phoenix/query/QueryServices.java    |   5 +-
 .../apache/phoenix/query/QueryServicesOptions.java |   2 -
 .../tuple/EncodedColumnQualiferCellsList.java      |   7 +
 .../apache/phoenix/schema/tuple/ResultTuple.java   |   1 -
 .../java/org/apache/phoenix/util/ScanUtil.java     |  91 ++++++-
 .../java/org/apache/phoenix/query/BaseTest.java    |   7 +-
 30 files changed, 1026 insertions(+), 329 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
index ec4526c..f415d61 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
@@ -82,7 +82,7 @@ public class SpillableGroupByIT extends BaseOwnClusterIT {
         props.put(QueryServices.STATS_COLLECTION_ENABLED, Boolean.toString(false));
         props.put(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString());
         props.put(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.TRUE.toString());
-        props.put(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, Long.toString(1000));
+        props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(60000));
         // Must update config before starting server
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
index 37b9116..dff52e8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
@@ -50,6 +50,7 @@ import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TableOptions;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewOptions;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.types.PDataType;
@@ -58,6 +59,7 @@ import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -759,7 +761,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         GlobalViewIndexOptions
                 globalViewIndexOptions =
                 SchemaBuilder.GlobalViewIndexOptions.withDefaults();
-        globalViewIndexOptions.setLocal(false);
 
         TenantViewOptions tenantViewOptions = new TenantViewOptions();
         tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
@@ -775,65 +776,69 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         testCaseWhenAllCFMatchAndAllDefault
                 .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null));
 
-        // Define the test schema.
-        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
-        schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
-                .withGlobalViewIndexOptions(globalViewIndexOptions)
-                .withTenantViewOptions(tenantViewOptions)
-                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+        for (boolean isGlobalIndexLocal : Lists.newArrayList(true, false)) {
+            globalViewIndexOptions.setLocal(isGlobalIndexLocal);
 
-        // Define the test data.
-        final List<String> outerCol4s = Lists.newArrayList();
-        DataSupplier dataSupplier = new DataSupplier() {
-            String col4ForWhereClause;
+            // Define the test schema.
+            final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+            schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+                    .withGlobalViewIndexOptions(globalViewIndexOptions)
+                    .withTenantViewOptions(tenantViewOptions)
+                    .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
 
-            @Override public List<Object> getValues(int rowIndex) {
-                Random rnd = new Random();
-                String id = String.format(ID_FMT, rowIndex);
-                String zid = String.format(ZID_FMT, rowIndex);
-                String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+            // Define the test data.
+            final List<String> outerCol4s = Lists.newArrayList();
+            DataSupplier dataSupplier = new DataSupplier() {
+                String col4ForWhereClause;
 
-                // Store the col4 data to be used later in a where clause
-                outerCol4s.add(col4);
-                String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
-                String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
-                String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
-                String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
-                String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                @Override public List<Object> getValues(int rowIndex) {
+                    Random rnd = new Random();
+                    String id = String.format(ID_FMT, rowIndex);
+                    String zid = String.format(ZID_FMT, rowIndex);
+                    String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
 
-                return Lists
-                        .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 });
-            }
-        };
+                    // Store the col4 data to be used later in a where clause
+                    outerCol4s.add(col4);
+                    String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
 
-        // Create a test data reader/writer for the above schema.
-        DataWriter dataWriter = new BasicDataWriter();
-        DataReader dataReader = new BasicDataReader();
+                    return Lists
+                            .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 });
+                }
+            };
 
-        List<String> columns =
-                Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
-        List<String> rowKeyColumns = Lists.newArrayList("COL6");
-        String tenantConnectUrl =
-                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
-        try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
-            writeConnection.setAutoCommit(true);
-            dataWriter.setConnection(writeConnection);
-            dataWriter.setDataSupplier(dataSupplier);
-            dataWriter.setUpsertColumns(columns);
-            dataWriter.setRowKeyColumns(rowKeyColumns);
-            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+            // Create a test data reader/writer for the above schema.
+            DataWriter dataWriter = new BasicDataWriter();
+            DataReader dataReader = new BasicDataReader();
 
-            // Upsert data for validation
-            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+            List<String> columns =
+                    Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+            List<String> rowKeyColumns = Lists.newArrayList("COL6");
+            String tenantConnectUrl =
+                    getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+            try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
+                writeConnection.setAutoCommit(true);
+                dataWriter.setConnection(writeConnection);
+                dataWriter.setDataSupplier(dataSupplier);
+                dataWriter.setUpsertColumns(columns);
+                dataWriter.setRowKeyColumns(rowKeyColumns);
+                dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
 
-            dataReader.setValidationColumns(rowKeyColumns);
-            dataReader.setRowKeyColumns(rowKeyColumns);
-            dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'",
-                    schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
-            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+                // Upsert data for validation
+                upsertData(dataWriter, DEFAULT_NUM_ROWS);
 
-            // Validate data before and after ttl expiration.
-            validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+                dataReader.setValidationColumns(rowKeyColumns);
+                dataReader.setRowKeyColumns(rowKeyColumns);
+                dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'",
+                        schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
+                dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+            }
         }
     }
 
@@ -857,7 +862,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
 
         GlobalViewIndexOptions globalViewIndexOptions =
                 SchemaBuilder.GlobalViewIndexOptions.withDefaults();
-        globalViewIndexOptions.setLocal(false);
 
         TenantViewOptions tenantViewOptions = new TenantViewOptions();
         tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
@@ -873,101 +877,105 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         testCaseWhenAllCFMatchAndAllDefault
                 .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null));
 
-        // Define the test schema.
-        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
-        schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
-                .withGlobalViewIndexOptions(globalViewIndexOptions)
-                .withTenantViewOptions(tenantViewOptions)
-                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
-
-        // Define the test data.
-        final List<String> outerCol4s = Lists.newArrayList();
-        DataSupplier dataSupplier = new DataSupplier() {
-
-            @Override public List<Object> getValues(int rowIndex) {
-                Random rnd = new Random();
-                String id = String.format(ID_FMT, rowIndex);
-                String zid = String.format(ZID_FMT, rowIndex);
-                String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
-
-                // Store the col4 data to be used later in a where clause
-                outerCol4s.add(col4);
-                String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
-                String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
-                String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
-                String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
-                String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
-
-                return Lists
-                        .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 });
-            }
-        };
-
-        // Create a test data reader/writer for the above schema.
-        DataWriter dataWriter = new BasicDataWriter();
-        DataReader dataReader = new BasicDataReader();
-
-        List<String> columns =
-                Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
-        List<String> nonCoveredColumns =
-                Lists.newArrayList("ID", "ZID", "COL5", "COL7", "COL8", "COL9");
-        List<String> rowKeyColumns = Lists.newArrayList("COL6");
-        String tenantConnectUrl =
-                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
-        try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
-            writeConnection.setAutoCommit(true);
-            dataWriter.setConnection(writeConnection);
-            dataWriter.setDataSupplier(dataSupplier);
-            dataWriter.setUpsertColumns(columns);
-            dataWriter.setRowKeyColumns(rowKeyColumns);
-            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
-
-            // Upsert data for validation
-            upsertData(dataWriter, DEFAULT_NUM_ROWS);
-
-            dataReader.setValidationColumns(rowKeyColumns);
-            dataReader.setRowKeyColumns(rowKeyColumns);
-            dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'",
-                    schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
-            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
-
-            // Validate data before and after ttl expiration.
-            validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+        for (boolean isGlobalIndexLocal : Lists.newArrayList(true, false)) {
+            globalViewIndexOptions.setLocal(isGlobalIndexLocal);
 
-            // Now update the above data but not modifying the covered columns.
-            // Ensure/validate that empty columns for the index are still updated.
+            // Define the test schema.
+            final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+            schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+                    .withGlobalViewIndexOptions(globalViewIndexOptions)
+                    .withTenantViewOptions(tenantViewOptions)
+                    .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
 
-            // Data supplier where covered and included (col4 and col6) columns are not updated.
-            DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier() {
+            // Define the test data.
+            final List<String> outerCol4s = Lists.newArrayList();
+            DataSupplier dataSupplier = new DataSupplier() {
 
                 @Override public List<Object> getValues(int rowIndex) {
                     Random rnd = new Random();
                     String id = String.format(ID_FMT, rowIndex);
                     String zid = String.format(ZID_FMT, rowIndex);
+                    String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                    // Store the col4 data to be used later in a where clause
+                    outerCol4s.add(col4);
                     String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
                     String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
                     String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
                     String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
 
-                    return Lists.newArrayList(new Object[] { id, zid, col5, col7, col8, col9 });
+                    return Lists
+                            .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 });
                 }
             };
 
-            // Upsert data for validation with non covered columns
-            dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns);
-            dataWriter.setUpsertColumns(nonCoveredColumns);
-            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+            // Create a test data reader/writer for the above schema.
+            DataWriter dataWriter = new BasicDataWriter();
+            DataReader dataReader = new BasicDataReader();
 
-            List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6");
-            dataReader.setValidationColumns(rowKeyColumns1);
-            dataReader.setRowKeyColumns(rowKeyColumns1);
-            dataReader.setDML(String.format("SELECT id, col6 from %s where col4 = '%s'",
-                    schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
+            List<String> columns =
+                    Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+            List<String> nonCoveredColumns =
+                    Lists.newArrayList("ID", "ZID", "COL5", "COL7", "COL8", "COL9");
+            List<String> rowKeyColumns = Lists.newArrayList("COL6");
+            String tenantConnectUrl =
+                    getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+            try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
+                writeConnection.setAutoCommit(true);
+                dataWriter.setConnection(writeConnection);
+                dataWriter.setDataSupplier(dataSupplier);
+                dataWriter.setUpsertColumns(columns);
+                dataWriter.setRowKeyColumns(rowKeyColumns);
+                dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
 
-            // Validate data before and after ttl expiration.
-            validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+                // Upsert data for validation
+                upsertData(dataWriter, DEFAULT_NUM_ROWS);
 
+                dataReader.setValidationColumns(rowKeyColumns);
+                dataReader.setRowKeyColumns(rowKeyColumns);
+                dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'",
+                        schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
+                dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+
+                // Now update the above data but not modifying the covered columns.
+                // Ensure/validate that empty columns for the index are still updated.
+
+                // Data supplier where covered and included (col4 and col6) columns are not updated.
+                DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier() {
+
+                    @Override public List<Object> getValues(int rowIndex) {
+                        Random rnd = new Random();
+                        String id = String.format(ID_FMT, rowIndex);
+                        String zid = String.format(ZID_FMT, rowIndex);
+                        String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                        String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                        String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                        String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                        return Lists.newArrayList(new Object[] { id, zid, col5, col7, col8, col9 });
+                    }
+                };
+
+                // Upsert data for validation with non covered columns
+                dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns);
+                dataWriter.setUpsertColumns(nonCoveredColumns);
+                upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+                List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6");
+                dataReader.setValidationColumns(rowKeyColumns1);
+                dataReader.setRowKeyColumns(rowKeyColumns1);
+                dataReader.setDML(String.format("SELECT id, col6 from %s where col4 = '%s'",
+                        schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+            }
         }
+
     }
 
 
@@ -2369,6 +2377,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis();
         props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+        props.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
         try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) {
 
             dataReader.setConnection(readConnection);
@@ -2486,7 +2495,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
                 final Statement statement = deleteConnection.createStatement()) {
             deleteConnection.setAutoCommit(true);
 
-            final String deleteIfExpiredStatement = String.format("select * from  %s", viewName);
+            final String deleteIfExpiredStatement = String.format("select /*+NO_INDEX*/ count(*) from  %s", viewName);
             Preconditions.checkNotNull(deleteIfExpiredStatement);
 
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
@@ -2505,6 +2514,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
             scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName);
             scan.setAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES);
             scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL, Bytes.toBytes(Long.valueOf(table.getPhoenixTTL())));
+
             PhoenixResultSet
                     rs = pstmt.newResultSet(queryPlan.iterator(), queryPlan.getProjector(), queryPlan.getContext());
             while (rs.next());
@@ -2526,7 +2536,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
                 final Statement statement = deleteConnection.createStatement()) {
             deleteConnection.setAutoCommit(true);
 
-            final String deleteIfExpiredStatement = String.format("select * from %s", indexName);
+            final String deleteIfExpiredStatement = String.format("select count(*) from %s", indexName);
             Preconditions.checkNotNull(deleteIfExpiredStatement);
 
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
@@ -2596,4 +2606,19 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
 
         return testCases;
     }
+
+    private void runValidations(long phoenixTTL,
+            org.apache.phoenix.thirdparty.com.google.common.collect.Table<String, String, Object> table,
+            DataReader dataReader, SchemaBuilder schemaBuilder)
+            throws Exception {
+
+        //Insert for the first time and validate them.
+        validateExpiredRowsAreNotReturnedUsingData(phoenixTTL, table,
+                dataReader, schemaBuilder);
+
+        // Update the above rows and validate the same.
+        validateExpiredRowsAreNotReturnedUsingData(phoenixTTL, table,
+                dataReader, schemaBuilder);
+
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
index 945c1c4..5c54854 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
@@ -22,6 +22,8 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 
@@ -58,4 +60,8 @@ public abstract class BaseRegionScanner extends DelegateRegionScanner {
     public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
         throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment");
     }
+
+    public RegionScanner getNewRegionScanner(Scan scan) throws IOException {
+        return ((BaseRegionScanner)delegate).getNewRegionScanner(scan);
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 31e6444..f449d8d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -26,24 +26,18 @@ import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.ScanOptions;
-import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
 import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.filter.PagedFilter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
@@ -53,6 +47,7 @@ import org.apache.phoenix.schema.types.PUnsignedTinyint;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForFilter;
 
 abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionObserver {
 
@@ -77,10 +72,7 @@ abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionO
     public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
     // The number of index rows to be rebuild in one RPC call
     public static final String INDEX_REBUILD_PAGE_ROWS = "_IndexRebuildPageRows";
-    public static final String SERVER_PAGING = "_ServerPaging";
-    // The number of rows to be scanned in one RPC call
-    public static final String AGGREGATE_PAGE_SIZE_IN_MS = "_AggregatePageSizeInMs";
-
+    public static final String SERVER_PAGE_SIZE_MS = "_ServerPageSizeMs";
     // Index verification type done by the index tool
     public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType";
     public static final String INDEX_RETRY_VERIFY = "_IndexRetryVerify";
@@ -241,6 +233,12 @@ abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionO
             // last possible moment. You need to swap the start/stop and make the
             // start exclusive and the stop inclusive.
             ScanUtil.setupReverseScan(scan);
+            if (!(scan.getFilter() instanceof PagedFilter)) {
+                byte[] pageSizeMsBytes = scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS);
+                if (pageSizeMsBytes != null) {
+                    scan.setFilter(new PagedFilter(scan.getFilter(), getPageSizeMsForFilter(scan)));
+                }
+            }
         }
     }
 
@@ -341,11 +339,14 @@ abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionO
     public final RegionScanner postScannerOpen(
             final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan,
             final RegionScanner s) throws IOException {
-       try {
+        try {
             if (!isRegionObserverFor(scan)) {
                 return s;
             }
-            return new RegionScannerHolder(c, scan, s);
+            // Make sure PageRegionScanner wraps only the lowest region scanner, i.e., HBase region scanner. We assume
+            // here every Phoenix region scanner extends BaseRegionScanner
+            return new RegionScannerHolder(c, scan, s instanceof BaseRegionScanner ? s :
+                    new PagedRegionScanner(c.getEnvironment().getRegion(), s, scan));
         } catch (Throwable t) {
             // If the exception is NotServingRegionException then throw it as
             // StaleRegionBoundaryCacheException to handle it by phoenix client other wise hbase
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index d071405..8f76ace 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
 import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
+import org.apache.phoenix.filter.PagedFilter;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.compile.ScanRanges;
@@ -97,6 +99,7 @@ import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputReposito
 import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 /**
  * This is an abstract region scanner which is used to scan index or data table rows locally. From the data table rows,
@@ -156,7 +159,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
     protected Map<byte[], NavigableSet<byte[]>> familyMap;
     protected IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
     protected boolean verify = false;
-    protected boolean useSkipScanFilter;
+    protected boolean isRawFilterSupported;
 
     public GlobalIndexRegionScanner(final RegionScanner innerScanner,
                                     final Region region,
@@ -244,7 +247,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
                     new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory);
             nextStartKey = null;
             minTimestamp = scan.getTimeRange().getMin();
-            useSkipScanFilter = HbaseCompatCapabilities.isRawFilterSupported();
+            isRawFilterSupported = HbaseCompatCapabilities.isRawFilterSupported();
         }
     }
 
@@ -1055,7 +1058,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
         Scan indexScan = new Scan();
         indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
         scanRanges.initializeScan(indexScan);
-        if (useSkipScanFilter) {
+        if (isRawFilterSupported) {
             SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
             indexScan.setFilter(new SkipScanFilter(skipScanFilter, true));
         }
@@ -1347,6 +1350,36 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
         return indexMutations.size();
     }
 
+    static boolean adjustScanFilter(Scan scan) {
+        // For rebuilds we use count (*) as query for regular tables which ends up setting the FirstKeyOnlyFilter on scan
+        // This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col
+        // For rebuilds we need all columns and all versions
+
+        Filter filter = scan.getFilter();
+        if (filter instanceof PagedFilter) {
+            PagedFilter pageFilter = (PagedFilter) filter;
+            Filter delegateFilter = pageFilter.getDelegateFilter();
+            if (!HbaseCompatCapabilities.isRawFilterSupported() &&
+                    (delegateFilter == null || delegateFilter instanceof FirstKeyOnlyFilter)) {
+                scan.setFilter(null);
+                return true;
+            }
+            if (delegateFilter instanceof FirstKeyOnlyFilter) {
+                pageFilter.setDelegateFilter(null);
+            } else if (delegateFilter != null) {
+                // Override the filter so that we get all versions
+                pageFilter.setDelegateFilter(new AllVersionsIndexRebuildFilter(delegateFilter));
+            }
+        } else if (filter instanceof FirstKeyOnlyFilter) {
+            scan.setFilter(null);
+            return true;
+        } else if (filter != null) {
+            // Override the filter so that we get all versions
+            scan.setFilter(new AllVersionsIndexRebuildFilter(filter));
+        }
+        return false;
+    }
+
     protected RegionScanner getLocalScanner() throws IOException {
         // override the filter to skip scan and open new scanner
         // when lower bound of timerange is passed or newStartKey was populated
@@ -1355,26 +1388,18 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
             Scan incrScan = new Scan(scan);
             incrScan.setTimeRange(minTimestamp, scan.getTimeRange().getMax());
             incrScan.setRaw(true);
-            incrScan.setMaxVersions();
+            incrScan.readAllVersions();
             incrScan.getFamilyMap().clear();
             incrScan.setCacheBlocks(false);
             for (byte[] family : scan.getFamilyMap().keySet()) {
                 incrScan.addFamily(family);
             }
-            // For rebuilds we use count (*) as query for regular tables which ends up setting the FKOF on scan
-            // This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col
-            // For rebuilds we need all columns and all versions
-            if (scan.getFilter() instanceof FirstKeyOnlyFilter) {
-                incrScan.setFilter(null);
-            } else if (scan.getFilter() != null) {
-                // Override the filter so that we get all versions
-                incrScan.setFilter(new AllVersionsIndexRebuildFilter(scan.getFilter()));
-            }
+            adjustScanFilter(incrScan);
             if(nextStartKey != null) {
                 incrScan.setStartRow(nextStartKey);
             }
             List<KeyRange> keys = new ArrayList<>();
-            try(RegionScanner scanner = region.getScanner(incrScan)) {
+            try (RegionScanner scanner = new PagedRegionScanner(region, region.getScanner(incrScan), incrScan)) {
                 List<Cell> row = new ArrayList<>();
                 int rowCount = 0;
                 // collect row keys that have been modified in the given time-range
@@ -1383,6 +1408,9 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
                     ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
                     hasMoreIncr = scanner.nextRaw(row);
                     if (!row.isEmpty()) {
+                        if (isDummy(row)) {
+                            continue;
+                        }
                         keys.add(PVarbinary.INSTANCE.getKeyRange(CellUtil.cloneRow(row.get(0))));
                         rowCount++;
                     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 428cee5..5f82866 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -22,10 +22,11 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_SPILLABLE;
 import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -66,7 +67,6 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
@@ -171,7 +171,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im
     
             if (j != null) {
                 innerScanner =
-                        new HashJoinRegionScanner(innerScanner, p, j, ScanUtil.getTenantId(scan),
+                        new HashJoinRegionScanner(innerScanner, scan, p, j, ScanUtil.getTenantId(scan),
                                 c.getEnvironment(), useQualifierAsIndex, useNewValueColumnQualifier);
             }
     
@@ -180,22 +180,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im
             if (limitBytes != null) {
                 limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault());
             }
-            long pageSizeInMs = Long.MAX_VALUE;
-            if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != null) {
-                byte[] pageSizeFromScan =
-                        scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_SIZE_IN_MS);
-                if (pageSizeFromScan != null) {
-                    pageSizeInMs = Bytes.toLong(pageSizeFromScan);
-                } else {
-                    pageSizeInMs = c.getEnvironment().getConfiguration().getLong(GROUPED_AGGREGATE_PAGE_SIZE_IN_MS,
-                                    QueryServicesOptions.DEFAULT_GROUPED_AGGREGATE_PAGE_SIZE_IN_MS);
-                }
-            }
+            long pageSizeMs = getPageSizeMsForRegionScanner(scan);
             if (keyOrdered) { // Optimize by taking advantage that the rows are
                               // already in the required group by key order
-                return new OrderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeInMs);
+                return new OrderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeMs);
             } else { // Otherwse, collect them all up in an in memory map
-                return new UnorderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeInMs);
+                return new UnorderedGroupByRegionScanner(c, scan, innerScanner, expressions, aggregators, limit, pageSizeMs);
             }
         }
     }
@@ -411,18 +401,18 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im
         private final ServerAggregators aggregators;
         private final long limit;
         private final List<Expression> expressions;
-        private final long pageSizeInMs;
+        private final long pageSizeMs;
         private RegionScanner regionScanner = null;
         private final GroupByCache groupByCache;
 
         private UnorderedGroupByRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
                                               final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
-                                              final ServerAggregators aggregators, final long limit, final long pageSizeInMs) {
+                                              final ServerAggregators aggregators, final long limit, final long pageSizeMs) {
             super(scanner);
             this.region = c.getEnvironment().getRegion();
             this.aggregators = aggregators;
             this.limit = limit;
-            this.pageSizeInMs = pageSizeInMs;
+            this.pageSizeMs = pageSizeMs;
             this.expressions = expressions;
             RegionCoprocessorEnvironment env = c.getEnvironment();
             Configuration conf = env.getConfiguration();
@@ -478,6 +468,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im
                         // ones returned
                         hasMore = delegate.nextRaw(results);
                         if (!results.isEmpty()) {
+                            if (isDummy(results)) {
+                                getDummyResult(resultsToReturn);
+                                return true;
+                            }
                             result.setKeyValues(results);
                             ImmutableBytesPtr key =
                                     TupleUtil.getConcatenatedValue(result, expressions);
@@ -486,8 +480,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im
                             aggregators.aggregate(rowAggregators, result);
                         }
                         now = EnvironmentEdgeManager.currentTimeMillis();
-                    } while (hasMore && groupByCache.size() < limit && (now - startTime) < pageSizeInMs);
-                    if (hasMore && groupByCache.size() < limit && (now - startTime) >= pageSizeInMs) {
+                    } while (hasMore && groupByCache.size() < limit && (now - startTime) < pageSizeMs);
+                    if (hasMore && groupByCache.size() < limit && (now - startTime) >= pageSizeMs) {
                         // Return a dummy result as we have processed a page worth of rows
                         // but we are not ready to aggregate
                         getDummyResult(resultsToReturn);
@@ -528,18 +522,18 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im
         private final ServerAggregators aggregators;
         private final long limit;
         private final List<Expression> expressions;
-        private final long pageSizeInMs;
+        private final long pageSizeMs;
         private long rowCount = 0;
         private ImmutableBytesPtr currentKey = null;
 
         private OrderedGroupByRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
                                             final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
-                                            final ServerAggregators aggregators, final long limit, final long pageSizeInMs) {
+                                            final ServerAggregators aggregators, final long limit, final long pageSizeMs) {
             super(scanner);
             this.scan = scan;
             this.aggregators = aggregators;
             this.limit = limit;
-            this.pageSizeInMs = pageSizeInMs;
+            this.pageSizeMs = pageSizeMs;
             this.expressions = expressions;
             region = c.getEnvironment().getRegion();
             minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
@@ -582,6 +576,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im
                         // ones returned
                         hasMore = delegate.nextRaw(kvs);
                         if (!kvs.isEmpty()) {
+                            if (isDummy(kvs)) {
+                                getDummyResult(results);
+                                return true;
+                            }
                             result.setKeyValues(kvs);
                             key = TupleUtil.getConcatenatedValue(result, expressions);
                             aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
@@ -601,12 +599,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im
                         // Do rowCount + 1 b/c we don't have to wait for a complete
                         // row in the case of a DISTINCT with a LIMIT
                         now = EnvironmentEdgeManager.currentTimeMillis();
-                    } while (hasMore && !aggBoundary && !atLimit && (now - startTime) < pageSizeInMs);
+                    } while (hasMore && !aggBoundary && !atLimit && (now - startTime) < pageSizeMs);
                 }
             } finally {
                 if (acquiredLock) region.closeRegionOperation();
             }
-            if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= pageSizeInMs) {
+            if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= pageSizeMs) {
                 // Return a dummy result as we have processed a page worth of rows
                 // but we are not ready to aggregate
                 getDummyResult(results);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 404a7ad..647b5f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -26,9 +26,11 @@ import java.util.Queue;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -52,9 +54,14 @@ import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TupleUtil;
 
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
 public class HashJoinRegionScanner implements RegionScanner {
 
     private final RegionScanner scanner;
@@ -72,20 +79,21 @@ public class HashJoinRegionScanner implements RegionScanner {
     private final boolean useQualifierAsListIndex;
     private final boolean useNewValueColumnQualifier;
     private final boolean addArrayCell;
+    private final long pageSizeMs;
 
     @SuppressWarnings("unchecked")
-    public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector,
+    public HashJoinRegionScanner(RegionScanner scanner, Scan scan, TupleProjector projector,
                                  HashJoinInfo joinInfo, ImmutableBytesPtr tenantId,
                                  RegionCoprocessorEnvironment env, boolean useQualifierAsIndex,
                                  boolean useNewValueColumnQualifier)
         throws IOException {
 
-        this(env, scanner, null, null, projector, joinInfo,
+        this(env, scanner, scan, null, null, projector, joinInfo,
              tenantId, useQualifierAsIndex, useNewValueColumnQualifier);
     }
 
     @SuppressWarnings("unchecked")
-    public HashJoinRegionScanner(RegionCoprocessorEnvironment env, RegionScanner scanner,
+    public HashJoinRegionScanner(RegionCoprocessorEnvironment env, RegionScanner scanner, Scan scan,
                                  final Set<KeyValueColumnExpression> arrayKVRefs,
                                  final Expression[] arrayFuncRefs, TupleProjector projector,
                                  HashJoinInfo joinInfo, ImmutableBytesPtr tenantId,
@@ -137,6 +145,7 @@ public class HashJoinRegionScanner implements RegionScanner {
         this.useNewValueColumnQualifier = useNewValueColumnQualifier;
         this.addArrayCell = (arrayFuncRefs != null && arrayFuncRefs.length > 0 &&
                              arrayKVRefs != null && arrayKVRefs.size() > 0);
+        this.pageSizeMs = getPageSizeMsForRegionScanner(scan);
     }
 
     private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException {
@@ -288,9 +297,23 @@ public class HashJoinRegionScanner implements RegionScanner {
     @Override
     public boolean nextRaw(List<Cell> result) throws IOException {
         try {
+            long startTime = EnvironmentEdgeManager.currentTimeMillis();
             while (shouldAdvance()) {
                 hasMore = scanner.nextRaw(result);
+                if (isDummy(result)) {
+                    return true;
+                }
+                if (result.isEmpty()) {
+                    return hasMore;
+                }
+                Cell cell = result.get(0);
                 processResults(result, false);
+                if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+                    byte[] rowKey = CellUtil.cloneRow(cell);
+                    result.clear();
+                    getDummyResult(rowKey, result);
+                    return true;
+                }
                 result.clear();
             }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index abe428b..904b135 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -18,27 +18,16 @@
 package org.apache.phoenix.coprocessor;
 
 
-import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
-import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
-import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
-import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
-import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID;
-import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING;
-import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.EXTRA_CELLS;
-import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.INVALID_ROW;
-import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.MISSING_ROW;
 import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -46,15 +35,11 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 
 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -62,8 +47,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
-import org.apache.phoenix.filter.AllVersionsIndexRebuildFilter;
 import org.apache.phoenix.query.HBaseFactoryProvider;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.hbase.index.parallel.Task;
@@ -72,7 +55,6 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -172,7 +154,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner {
         Scan indexScan = prepareIndexScan(expectedIndexMutationMap);
         try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
             for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
-                if (!useSkipScanFilter && !expectedIndexMutationMap.containsKey(result.getRow())) {
+                if (!isRawFilterSupported && !expectedIndexMutationMap.containsKey(result.getRow())) {
                         continue;
                 }
                 ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
@@ -340,6 +322,9 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner {
                     hasMore = localScanner.nextRaw(row);
                     if (!row.isEmpty()) {
                         lastCell = row.get(0); // lastCell is any cell from the last visited row
+                        if (isDummy(row)) {
+                            break;
+                        }
                         Put put = null;
                         Delete del = null;
                         for (Cell cell : row) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
index 6d3d38d..2de3cc4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -354,6 +355,9 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
                     hasMore = localScanner.nextRaw(row);
                     if (!row.isEmpty()) {
                         lastCell = row.get(0); // lastCell is any cell from the last visited row
+                        if (isDummy(row)) {
+                            break;
+                        }
                         indexMutationCount += populateIndexMutationFromIndexRow(row, indexMutationMap);
                         rowCount++;
                     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index acd8a44..6333a8c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -23,13 +23,16 @@ import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -365,6 +368,9 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner {
                     hasMore = innerScanner.nextRaw(row);
                     if (!row.isEmpty()) {
                         lastCell = row.get(0); // lastCell is any cell from the last visited row
+                        if (isDummy(row)) {
+                            break;
+                        }
                         Put put = null;
                         Delete del = null;
                         for (Cell cell : row) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java
new file mode 100644
index 0000000..2b8b6b6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.filter.PagedFilter;
+
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPhoenixPagedFilter;
+
+/**
+ *  PagedRegionScanner works with PagedFilter to make sure that the time between two rows returned by the HBase region
+ *  scanner should not exceed the configured page size in ms (on PagedFilter). When the page size is reached (because
+ *  there are too many cells/rows to be filtered out), PagedFilter stops the HBase region scanner and sets its state
+ *  to STOPPED. In this case, the HBase region scanner next() returns false and PagedFilter#isStopped() returns true.
+ *  PagedRegionScanner is responsible for detecting PagedFilter has stopped the scanner, and then closing the current
+ *  HBase region scanner, starting a new one to resume the scan operation and returning a dummy result to signal to
+ *  Phoenix client to resume the scan operation by skipping this dummy result and calling ResultScanner#next().
+ */
+public class PagedRegionScanner extends BaseRegionScanner {
+    protected Region region;
+    protected Scan scan;
+    protected PagedFilter pageFilter;
+	public PagedRegionScanner(Region region, RegionScanner scanner, Scan scan) {
+	    super(scanner);
+	    this.region = region;
+	    this.scan = scan;
+	    pageFilter = getPhoenixPagedFilter(scan);
+	    if (pageFilter != null) {
+	        pageFilter.init();
+        }
+	}
+
+    private boolean next(List<Cell> results, boolean raw) throws IOException {
+	    try {
+            boolean hasMore = raw ? delegate.nextRaw(results) : delegate.next(results);
+            if (pageFilter == null) {
+                return hasMore;
+            }
+            if (!hasMore) {
+                // There is no more row from the HBase region scanner. We need to check if PageFilter
+                // has stopped the region scanner
+                if (pageFilter.isStopped()) {
+                    // Close the current region scanner, start a new one and return a dummy result
+                    delegate.close();
+                    byte[] rowKey = pageFilter.getRowKeyAtStop();
+                    scan.withStartRow(rowKey, true);
+                    delegate = region.getScanner(scan);
+                    if (results.isEmpty()) {
+                        getDummyResult(rowKey, results);
+                    }
+                    pageFilter.init();
+                    return true;
+                }
+                return false;
+            } else {
+                // We got a row from the HBase scanner within the configured time (i.e., the page size). We need to
+                // start a new page on the next next() call.
+                pageFilter.resetStartTime();
+                return true;
+            }
+        } catch (Exception e) {
+            pageFilter.init();
+            throw e;
+        }
+    }
+
+    @Override
+    public boolean next(List<Cell> results) throws IOException {
+	   return next(results, false);
+    }
+
+    @Override
+    public boolean nextRaw(List<Cell> results) throws IOException {
+        return next(results, true);
+    }
+
+    @Override
+    public RegionScanner getNewRegionScanner(Scan scan) throws IOException {
+        return new PagedRegionScanner(region, region.getScanner(scan), scan);
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
index 5e18241..74e3818 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
@@ -18,15 +18,17 @@
 package org.apache.phoenix.coprocessor;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -41,35 +43,41 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 /**
  * Coprocessor that checks whether the row is expired based on the TTL spec.
  */
-public class PhoenixTTLRegionObserver extends BaseRegionObserver {
+public class PhoenixTTLRegionObserver extends BaseScannerRegionObserver implements RegionCoprocessor {
     private static final Logger LOG = LoggerFactory.getLogger(PhoenixTTLRegionObserver.class);
     private MetricsPhoenixTTLSource metricSource;
 
-    @Override public void start(CoprocessorEnvironment e) throws IOException {
-        super.start(e);
-        metricSource = MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixTTLSource();
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+        return Optional.of(this);
     }
 
-    @Override public void stop(CoprocessorEnvironment e) throws IOException {
-        super.stop(e);
+    @Override
+    public void start(CoprocessorEnvironment e) throws IOException {
+        metricSource = MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixTTLSource();
     }
 
     @Override
-    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
-            RegionScanner s) throws IOException {
+    protected boolean isRegionObserverFor(Scan scan) {
+        return ScanUtil.isMaskTTLExpiredRows(scan) || ScanUtil.isDeleteTTLExpiredRows(scan);
+    }
 
-        if (!ScanUtil.isMaskTTLExpiredRows(scan) && !ScanUtil.isDeleteTTLExpiredRows(scan)) {
-            return s;
-        } else if (ScanUtil.isMaskTTLExpiredRows(scan) && ScanUtil.isDeleteTTLExpiredRows(scan)) {
+    @Override
+    protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan,
+                                              final RegionScanner s) throws IOException, SQLException {
+        if (ScanUtil.isMaskTTLExpiredRows(scan) && ScanUtil.isDeleteTTLExpiredRows(scan)) {
             throw new IOException("Both mask and delete expired rows property cannot be set");
         } else if (ScanUtil.isMaskTTLExpiredRows(scan)) {
             metricSource.incrementMaskExpiredRequestCount();
@@ -99,10 +107,11 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
     /**
      * A region scanner that checks the TTL expiration of rows
      */
-    private static class PhoenixTTLRegionScanner implements RegionScanner {
+    private static class PhoenixTTLRegionScanner extends BaseRegionScanner {
         private static final String MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR =
                 "MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID";
 
+        private final RegionCoprocessorEnvironment env;
         private final RegionScanner scanner;
         private final Scan scan;
         private final byte[] emptyCF;
@@ -119,9 +128,12 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
         private long numRowsScanned;
         private long numRowsDeleted;
         private boolean reported = false;
+        private long pageSizeMs;
 
         public PhoenixTTLRegionScanner(RegionCoprocessorEnvironment env, Scan scan,
                 RegionScanner scanner) throws IOException {
+            super(scanner);
+            this.env = env;
             this.scan = scan;
             this.scanner = scanner;
             byte[] requestIdBytes = scan.getAttribute(MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR);
@@ -145,6 +157,7 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
             now = maxTimestamp != HConstants.LATEST_TIMESTAMP ?
                             maxTimestamp :
                             EnvironmentEdgeManager.currentTimeMillis();
+            pageSizeMs = getPageSizeMsForRegionScanner(scan);
         }
 
         @Override public int getBatch() {
@@ -186,10 +199,6 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
             return scanner.getRegionInfo();
         }
 
-        @Override public boolean isFilterDone() throws IOException {
-            return scanner.isFilterDone();
-        }
-
         @Override public boolean reseek(byte[] row) throws IOException {
             return scanner.reseek(row);
         }
@@ -204,17 +213,30 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
 
         private boolean doNext(List<Cell> result, boolean raw) throws IOException {
             try {
+                long startTime = EnvironmentEdgeManager.currentTimeMillis();
                 boolean hasMore;
                 do {
                     hasMore = raw ? scanner.nextRaw(result) : scanner.next(result);
                     if (result.isEmpty()) {
                         break;
                     }
+                    if (isDummy(result)) {
+                        return true;
+                    }
+
+                    /**
+                     Note : That both MaskIfExpiredRequest and DeleteIfExpiredRequest cannot be set at the same time.
+                     Case : MaskIfExpiredRequest, If row not expired then return.
+                     */
                     numRowsScanned++;
                     if (maskIfExpired && checkRowNotExpired(result)) {
                         break;
                     }
 
+                    /**
+                     Case : DeleteIfExpiredRequest, If deleted then return.
+                     So that it will count towards the aggregate deleted count.
+                     */
                     if (deleteIfExpired && deleteRowIfExpired(result)) {
                         numRowsDeleted++;
                         break;
@@ -226,6 +248,12 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
                     if (maskIfExpired) {
                         numRowsExpired++;
                     }
+                    if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) {
+                        byte[] rowKey = CellUtil.cloneRow(result.get(0));
+                        result.clear();
+                        getDummyResult(rowKey, result);
+                        return true;
+                    }
                     result.clear();
                 } while (hasMore);
                 return hasMore;
@@ -303,5 +331,9 @@ public class PhoenixTTLRegionObserver extends BaseRegionObserver {
             return true;
         }
 
+        @Override
+        public RegionScanner getNewRegionScanner(Scan scan) throws IOException {
+            return new PhoenixTTLRegionScanner(env, scan, ((BaseRegionScanner)delegate).getNewRegionScanner(scan));
+        }
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d96dfe3..e9a394d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import static org.apache.phoenix.coprocessor.GlobalIndexRegionScanner.adjustScanFilter;
 import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
@@ -58,7 +59,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
@@ -79,7 +79,6 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
-import org.apache.phoenix.filter.AllVersionsIndexRebuildFilter;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.exception.IndexWriteException;
@@ -455,7 +454,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
 
         if (j != null) {
-            theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier);
+            theScanner = new HashJoinRegionScanner(theScanner, scan, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier);
         }
         return new UngroupedAggregateRegionScanner(c, theScanner, region, scan, env, this);
     }
@@ -640,7 +639,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
     }
 
-    private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
+    private RegionScanner rebuildIndices(RegionScanner innerScanner, final Region region, final Scan scan,
                                          final RegionCoprocessorEnvironment env) throws IOException {
         boolean oldCoproc = region.getTableDescriptor().hasCoprocessor(Indexer.class.getCanonicalName());
         byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
@@ -649,29 +648,28 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         if (oldCoproc && verifyType == IndexTool.IndexVerifyType.ONLY) {
             return new IndexerRegionScanner(innerScanner, region, scan, env, this);
         }
+        RegionScanner scanner;
         if (!scan.isRaw()) {
             Scan rawScan = new Scan(scan);
             rawScan.setRaw(true);
             rawScan.readAllVersions();
             rawScan.getFamilyMap().clear();
-            // For rebuilds we use count (*) as query for regular tables which ends up setting the FKOF on scan
-            // This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col
-            // For rebuilds we need all columns and all versions
-            if (scan.getFilter() instanceof FirstKeyOnlyFilter) {
-                rawScan.setFilter(null);
-            } else if (scan.getFilter() != null) {
-                // Override the filter so that we get all versions
-                rawScan.setFilter(new AllVersionsIndexRebuildFilter(scan.getFilter()));
-            }
+            adjustScanFilter(rawScan);
             rawScan.setCacheBlocks(false);
             for (byte[] family : scan.getFamilyMap().keySet()) {
                 rawScan.addFamily(family);
             }
+            scanner = ((BaseRegionScanner)innerScanner).getNewRegionScanner(rawScan);
             innerScanner.close();
-            RegionScanner scanner = region.getScanner(rawScan);
-            return getRegionScanner(scanner, region, scan, env, oldCoproc);
+        } else {
+            if (adjustScanFilter(scan)) {
+                scanner = ((BaseRegionScanner) innerScanner).getNewRegionScanner(scan);
+                innerScanner.close();
+            } else {
+                scanner = innerScanner;
+            }
         }
-        return getRegionScanner(innerScanner, region, scan, env, oldCoproc);
+        return getRegionScanner(scanner, region, scan, env, oldCoproc);
     }
 
     private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index 9de75d2..c7769d7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -32,9 +32,10 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 import static org.apache.phoenix.util.WALAnnotationUtil.annotateMutation;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -124,11 +125,11 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(UngroupedAggregateRegionScanner.class);
 
-    private long pageSizeInMs = Long.MAX_VALUE;
-    private int maxBatchSize = 0;
-    private final Scan scan;
-    private final RegionScanner innerScanner;
-    private final Region region;
+    private long pageSizeMs;
+    private  int maxBatchSize = 0;
+    private  Scan scan;
+    private  RegionScanner innerScanner;
+    private  Region region;
     private final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
     private final RegionCoprocessorEnvironment env;
     private final boolean useQualifierAsIndex;
@@ -176,17 +177,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
         this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
         this.innerScanner = innerScanner;
         Configuration conf = env.getConfiguration();
-        if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != null) {
-            byte[] pageSizeFromScan =
-                    scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_SIZE_IN_MS);
-            if (pageSizeFromScan != null) {
-                pageSizeInMs = Bytes.toLong(pageSizeFromScan);
-            } else {
-                pageSizeInMs =
-                        conf.getLong(UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS,
-                                QueryServicesOptions.DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS);
-            }
-        }
+        pageSizeMs = getPageSizeMsForRegionScanner(scan);
         ts = scan.getTimeRange().getMax();
         boolean localIndexScan = ScanUtil.isLocalIndex(scan);
         encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
@@ -598,6 +589,13 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
                         // since this is an indication of whether or not there are more values after the
                         // ones returned
                         hasMore = innerScanner.nextRaw(results);
+                        if (isDummy(results)) {
+                            if (!hasAny) {
+                                resultsToReturn.addAll(results);
+                                return true;
+                            }
+                            break;
+                        }
                         if (!results.isEmpty()) {
                             lastCell = results.get(0);
                             result.setKeyValues(results);
@@ -638,8 +636,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
                             aggregators.aggregate(rowAggregators, result);
                             hasAny = true;
                         }
-                    } while (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeInMs);
-
+                    } while (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeMs);
                     if (!mutations.isEmpty()) {
                         annotateAndCommit(mutations);
                     }
@@ -653,9 +650,11 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
             } catch (DataExceedsCapacityException e) {
                 throw new DoNotRetryIOException(e.getMessage(), e);
             } catch (Throwable e) {
-                LOGGER.error("Exception in UngroupedAggreagteRegionScanner for region "
+                LOGGER.error("Exception in UngroupedAggregateRegionScanner for region "
                         + region.getRegionInfo().getRegionNameAsString(), e);
                 throw e;
+            } finally {
+                region.closeRegionOperation();
             }
             Cell keyValue;
             if (hasAny) {
@@ -665,8 +664,6 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
                 resultsToReturn.add(keyValue);
             }
             return hasMore;
-        } finally {
-            region.closeRegionOperation();
         }
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/DelegateFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/DelegateFilter.java
index b5c941a..4230867 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/DelegateFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/DelegateFilter.java
@@ -92,4 +92,14 @@ public class DelegateFilter extends FilterBase {
     public String toString() {
         return delegate.toString();
     }
+
+    @Override
+    public void setReversed(boolean reversed) {
+        delegate.setReversed(reversed);
+    }
+
+    @Override
+    public boolean isReversed() {
+        return delegate.isReversed();
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java
new file mode 100644
index 0000000..8d785c3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.Writable;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+/**
+ * This filter overrides the behavior of delegate so that we do not scan more rows than pageSizeInRows .
+ */
+public class PagedFilter extends FilterBase implements Writable {
+    private enum State {
+        INITIAL, STARTED, TIME_TO_STOP, STOPPED
+    }
+    State state;
+    private long pageSizeMs;
+    private long startTime;
+    private byte[] rowKeyAtStop;
+    private Filter delegate = null;
+
+    public PagedFilter() {
+        init();
+    }
+
+    public PagedFilter(Filter delegate, long pageSizeMs) {
+        init();
+        this.delegate = delegate;
+        this.pageSizeMs = pageSizeMs;
+    }
+
+    public Filter getDelegateFilter() {
+        return delegate;
+    }
+
+    public void setDelegateFilter (Filter delegate) {
+        this.delegate = delegate;
+    }
+
+    public byte[] getRowKeyAtStop() {
+        if (rowKeyAtStop != null) {
+            return Arrays.copyOf(rowKeyAtStop, rowKeyAtStop.length);
+        }
+        return null;
+    }
+
+    public boolean isStopped() {
+        return state == State.STOPPED;
+    }
+
+    public void init() {
+        state = State.INITIAL;
+        rowKeyAtStop = null;
+    }
+
+    public void resetStartTime() {
+        if (state == State.STARTED) {
+            init();
+        }
+    }
+
+    @Override
+    public void reset() throws IOException {
+        if (state == State.INITIAL) {
+            startTime = EnvironmentEdgeManager.currentTimeMillis();
+            state = State.STARTED;
+        } else if (state == State.STARTED && EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+            state = State.TIME_TO_STOP;
+        }
+        if (delegate != null) {
+            delegate.reset();
+            return;
+        }
+        super.reset();
+    }
+
+    @Override
+    public Cell getNextCellHint(Cell currentKV) throws IOException {
+        if (delegate != null) {
+            return delegate.getNextCellHint(currentKV);
+        }
+        return super.getNextCellHint(currentKV);
+    }
+
+    public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+        if (state == State.TIME_TO_STOP) {
+            if (rowKeyAtStop == null) {
+                rowKeyAtStop = new byte[length];
+                Bytes.putBytes(rowKeyAtStop, 0, buffer, offset, length);
+            }
+            return true;
+        }
+        if (delegate != null) {
+            return delegate.filterRowKey(buffer, offset, length);
+        }
+        return super.filterRowKey(buffer, offset, length);
+    }
+
+    @Override
+    public boolean filterRowKey(Cell cell) throws IOException {
+        if (state == State.TIME_TO_STOP) {
+            if (rowKeyAtStop == null) {
+                rowKeyAtStop = CellUtil.cloneRow(cell);
+            }
+            return true;
+        }
+        if (delegate != null) {
+            return delegate.filterRowKey(cell);
+        }
+        return super.filterRowKey(cell);
+    }
+
+    @Override
+    public boolean filterAllRemaining() throws IOException {
+        if (state == State.TIME_TO_STOP && rowKeyAtStop != null) {
+            state = State.STOPPED;
+            return true;
+        }
+        if (delegate != null) {
+            return delegate.filterAllRemaining();
+        }
+        return super.filterAllRemaining();
+    }
+
+    @Override
+    public boolean hasFilterRow() {
+        return true;
+    }
+
+    @Override
+    public boolean filterRow() throws IOException {
+        if (state == State.TIME_TO_STOP) {
+            return true;
+        }
+        if (delegate != null) {
+            return delegate.filterRow();
+        }
+        return super.filterRow();
+    }
+
+    @Override
+    public Cell transformCell(Cell v) throws IOException {
+        if (delegate != null) {
+            return delegate.transformCell(v);
+        }
+        return super.transformCell(v);
+    }
+
+    @Override
+    public void filterRowCells(List<Cell> kvs) throws IOException {
+        if (delegate != null) {
+            delegate.filterRowCells(kvs);
+            return;
+        }
+        super.filterRowCells(kvs);
+    }
+
+    @Override
+    public void setReversed(boolean reversed) {
+        if (delegate != null) {
+            delegate.setReversed(reversed);
+        }
+        super.setReversed(reversed);
+    }
+
+    @Override
+    public boolean isReversed() {
+        if (delegate != null) {
+            return delegate.isReversed();
+        }
+        return super.isReversed();
+    }
+
+    @Override
+    public boolean isFamilyEssential(byte[] name) throws IOException {
+        if (delegate != null) {
+            return delegate.isFamilyEssential(name);
+        }
+        return super.isFamilyEssential(name);
+    }
+
+    @Override
+    public ReturnCode filterKeyValue(Cell v) throws IOException {
+        if (delegate != null) {
+            return delegate.filterKeyValue(v);
+        }
+        return super.filterKeyValue(v);
+    }
+
+    @Override
+    public Filter.ReturnCode filterCell(Cell c) throws IOException {
+        if (delegate != null) {
+            return delegate.filterCell(c);
+        }
+        return super.filterCell(c);
+    }
+
+    public static PagedFilter parseFrom(final byte [] pbBytes) throws DeserializationException {
+        try {
+            return (PagedFilter) Writables.getWritable(pbBytes, new PagedFilter());
+        } catch (IOException e) {
+            throw new DeserializationException(e);
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeLong(pageSizeMs);
+        if (delegate != null) {
+            out.writeUTF(delegate.getClass().getName());
+            byte[] b = delegate.toByteArray();
+            out.writeInt(b.length);
+            out.write(b);
+        } else {
+            out.writeUTF("");
+        }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        pageSizeMs = in.readLong();
+        String className = in.readUTF();
+        if (className.length() == 0) {
+            return;
+        }
+        Class cls = null;
+        try {
+            cls = Class.forName(className);
+        } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+            throw new DoNotRetryIOException(e);
+        }
+
+        Method m = null;
+        try {
+            m = cls.getDeclaredMethod("parseFrom", byte[].class);
+        } catch (NoSuchMethodException e) {
+            e.printStackTrace();
+            throw new DoNotRetryIOException(e);
+        }
+        int length = in.readInt();
+        byte[] b = new byte[length];
+        in.readFully(b);
+        try {
+            delegate = (Filter) m.invoke(null, b);
+        } catch (IllegalAccessException e) {
+            e.printStackTrace();
+            throw new DoNotRetryIOException(e);
+        } catch (InvocationTargetException e) {
+            e.printStackTrace();
+            throw new DoNotRetryIOException(e);
+        }
+    }
+
+    @Override
+    public byte[] toByteArray() throws IOException {
+        return Writables.getBytes(this);
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 7683e4f..9b2bd37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -17,15 +17,15 @@
  */
 package org.apache.phoenix.index;
 
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CHECK_VERIFY_COLUMN;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
 import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 import java.io.IOException;
+import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
@@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -55,7 +54,9 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.BaseRegionScanner;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.filter.PagedFilter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
@@ -92,7 +93,7 @@ import org.slf4j.LoggerFactory;
  * the verified version that is masked by the unverified version(s).
  *
  */
-public class GlobalIndexChecker extends BaseRegionObserver implements RegionCoprocessor, RegionObserver {
+public class GlobalIndexChecker extends BaseScannerRegionObserver implements RegionCoprocessor{
     private static final Logger LOG =
         LoggerFactory.getLogger(GlobalIndexChecker.class);
     private GlobalIndexCheckerSource metricsSource;
@@ -118,13 +119,11 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
      * An instance of this class is created for each scanner on an index
      * and used to verify individual rows and rebuild them if they are not valid
      */
-    private static class GlobalIndexScanner implements RegionScanner {
+    private class GlobalIndexScanner extends BaseRegionScanner {
         private RegionScanner scanner;
-        private RegionScanner deleteRowScanner;
         private long ageThreshold;
         private Scan scan;
         private Scan indexScan;
-        private Scan deleteRowScan;
         private Scan singleRowIndexScan;
         private Scan buildIndexScan = null;
         private Table dataHTable = null;
@@ -142,11 +141,13 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
         private boolean restartScanDueToPageFilterRemoval = false;
         private boolean hasMore;
         private String indexName;
+        private long pageSizeMs;
 
         public GlobalIndexScanner(RegionCoprocessorEnvironment env,
                                   Scan scan,
                                   RegionScanner scanner,
                                   GlobalIndexCheckerSource metricsSource) throws IOException {
+            super(scanner);
             this.env = env;
             this.scan = scan;
             this.scanner = scanner;
@@ -170,6 +171,7 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
                         "repairIndexRows: IndexMaintainer is not included in scan attributes for " +
                                 region.getRegionInfo().getTable().getNameAsString());
             }
+            pageSizeMs = getPageSizeMsForRegionScanner(scan);
         }
 
         @Override
@@ -184,6 +186,7 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
 
         public boolean next(List<Cell> result, boolean raw) throws IOException {
             try {
+                long startTime = EnvironmentEdgeManager.currentTimeMillis();
                 do {
                     if (raw) {
                         hasMore = scanner.nextRaw(result);
@@ -193,9 +196,19 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
                     if (result.isEmpty()) {
                         break;
                     }
+                    if (isDummy(result)) {
+                        return true;
+                    }
+                    Cell cell = result.get(0);
                     if (verifyRowAndRepairIfNecessary(result)) {
                         break;
                     }
+                    if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) {
+                        byte[] rowKey = CellUtil.cloneRow(cell);
+                        result.clear();
+                        getDummyResult(rowKey, result);
+                        return true;
+                    }
                     // skip this row as it is invalid
                     // if there is no more row, then result will be an empty list
                 } while (hasMore);
@@ -244,11 +257,6 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
         }
 
         @Override
-        public boolean isFilterDone() throws IOException {
-            return scanner.isFilterDone();
-        }
-
-        @Override
         public boolean reseek(byte[] row) throws IOException {
             return scanner.reseek(row);
         }
@@ -296,6 +304,12 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
         private PageFilter removePageFilter(Scan scan) {
             Filter filter = scan.getFilter();
             if (filter != null) {
+                if (filter instanceof PagedFilter) {
+                    filter = ((PagedFilter) filter).getDelegateFilter();
+                    if (filter == null) {
+                        return null;
+                    }
+                }
                 if (filter instanceof PageFilter) {
                     scan.setFilter(null);
                     return (PageFilter) filter;
@@ -315,7 +329,6 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
                 }
                 buildIndexScan = new Scan();
                 indexScan = new Scan(scan);
-                deleteRowScan = new Scan();
                 singleRowIndexScan = new Scan(scan);
                 byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
                 dataHTable =
@@ -365,7 +378,7 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
                 if (restartScanDueToPageFilterRemoval) {
                     scanner.close();
                     indexScan.withStartRow(indexRowKey, false);
-                    scanner = region.getScanner(indexScan);
+                    scanner = ((BaseRegionScanner)delegate).getNewRegionScanner(indexScan);
                     hasMore = true;
                     // Set restartScanDueToPageFilterRemoval to false as we do not restart the scan unnecessarily next time
                     restartScanDueToPageFilterRemoval = false;
@@ -383,7 +396,7 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
                 deleteRowIfAgedEnough(indexRowKey, ts, false);
                 // Open a new scanner starting from the row after the current row
                 indexScan.withStartRow(indexRowKey, false);
-                scanner = region.getScanner(indexScan);
+                scanner = ((BaseRegionScanner)delegate).getNewRegionScanner(indexScan);
                 hasMore = true;
                 // Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is
                 // sufficient to do that
@@ -393,12 +406,15 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
             // code == RebuildReturnCode.INDEX_ROW_EXISTS.getValue()
             // Open a new scanner starting from the current row
             indexScan.withStartRow(indexRowKey, true);
-            scanner = region.getScanner(indexScan);
+            scanner = ((BaseRegionScanner)delegate).getNewRegionScanner(indexScan);
             hasMore = scanner.next(row);
             if (row.isEmpty()) {
                 // This means the index row has been deleted before opening the new scanner.
                 return;
             }
+            if (isDummy(row)) {
+                return;
+            }
             // Check if the index row still exist after rebuild
             if  (Bytes.compareTo(row.get(0).getRowArray(), row.get(0).getRowOffset(), row.get(0).getRowLength(),
                     indexRowKey, 0, indexRowKey.length) != 0) {
@@ -410,7 +426,7 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
                 // The row is "unverified". Rewind the scanner and let the row be scanned again
                 // so that it can be repaired
                 scanner.close();
-                scanner = region.getScanner(indexScan);
+                scanner =((BaseRegionScanner)delegate).getNewRegionScanner(indexScan);
                 hasMore = true;
                 row.clear();
                 return;
@@ -435,7 +451,7 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
                 singleRowIndexScan.withStartRow(indexRowKey, true);
                 singleRowIndexScan.withStopRow(indexRowKey, true);
                 singleRowIndexScan.setTimeRange(minTimestamp, ts);
-                RegionScanner singleRowScanner = region.getScanner(singleRowIndexScan);
+                RegionScanner singleRowScanner = ((BaseRegionScanner)delegate).getNewRegionScanner(singleRowIndexScan);
                 row.clear();
                 singleRowScanner.next(row);
                 singleRowScanner.close();
@@ -447,6 +463,9 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
                     // possibly by compaction
                     return;
                 }
+                if (isDummy(row)) {
+                    return;
+                }
                 if (verifyRowAndRemoveEmptyColumn(row)) {
                     // The index row status is "verified". This row is good to return to the client. We are done here.
                     return;
@@ -592,11 +611,13 @@ public class GlobalIndexChecker extends BaseRegionObserver implements RegionCopr
     }
 
     @Override
-    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
-                                         Scan scan, RegionScanner s) throws IOException {
-        if (scan.getAttribute(CHECK_VERIFY_COLUMN) == null) {
-            return s;
-        }
+    protected boolean isRegionObserverFor(Scan scan) {
+        return scan.getAttribute(CHECK_VERIFY_COLUMN) != null;
+    }
+
+    @Override
+    protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan,
+                                              final RegionScanner s) throws IOException, SQLException {
         return new GlobalIndexScanner(c.getEnvironment(), scan, s, metricsSource);
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index 305ec3b..2a541c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -19,6 +19,9 @@
 package org.apache.phoenix.iterate;
 
 import static org.apache.phoenix.util.EncodedColumnsUtil.getMinMaxQualifiersFromScan;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -157,13 +160,14 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
 
     final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
     if (j != null) {
-        innerScanner = new HashJoinRegionScanner(env, innerScanner, arrayKVRefs, arrayFuncRefs,
+        innerScanner = new HashJoinRegionScanner(env, innerScanner, scan, arrayKVRefs, arrayFuncRefs,
                                                  p, j, tenantId, useQualifierAsIndex,
                                                  useNewValueColumnQualifier);
     }
     if (scanOffset != null) {
       innerScanner = getOffsetScanner(innerScanner, new OffsetResultIterator(
-              new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme), scanOffset),
+              new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme),
+                      scanOffset, getPageSizeMsForRegionScanner(scan)),
           scan.getAttribute(QueryConstants.LAST_SCAN) != null);
     }
     boolean spoolingEnabled =
@@ -219,7 +223,7 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
       PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
       ResultIterator inner = new RegionScannerResultIterator(s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
       return new OrderedResultIterator(inner, orderByExpressions, spoolingEnabled,
-              thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize);
+              thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize, getPageSizeMsForRegionScanner(scan));
     } catch (IOException e) {
       throw new RuntimeException(e);
     } finally {
@@ -373,11 +377,13 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
           if (isFilterDone()) {
             return false;
           }
-
-          for (int i = 0; i < tuple.size(); i++) {
-            results.add(tuple.getValue(i));
+          if (isDummy(tuple)) {
+            getDummyResult(results);
+          } else {
+            for (int i = 0; i < tuple.size(); i++) {
+              results.add(tuple.getValue(i));
+            }
           }
-
           tuple = iterator.next();
           return !isFilterDone();
         } catch (Throwable t) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
index 5c5a6d3..3eecfc8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
@@ -23,6 +23,10 @@ import java.util.List;
 import org.apache.phoenix.compile.ExplainPlanAttributes
     .ExplainPlanAttributesBuilder;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getDummyTuple;
 
 /**
  * Iterates through tuples up to a limit
@@ -32,17 +36,27 @@ import org.apache.phoenix.schema.tuple.Tuple;
 public class OffsetResultIterator extends DelegateResultIterator {
     private int rowCount;
     private int offset;
+    private long pageSizeMs = Long.MAX_VALUE;
 
     public OffsetResultIterator(ResultIterator delegate, Integer offset) {
         super(delegate);
         this.offset = offset == null ? -1 : offset;
     }
 
+    public OffsetResultIterator(ResultIterator delegate, Integer offset, long pageSizeMs) {
+        this(delegate, offset);
+        this.pageSizeMs = pageSizeMs;
+    }
     @Override
     public Tuple next() throws SQLException {
+        long startTime = EnvironmentEdgeManager.currentTimeMillis();
         while (rowCount < offset) {
-            if (super.next() == null) { return null; }
+            Tuple tuple = super.next();
+            if (tuple == null) { return null; }
             rowCount++;
+            if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+                return getDummyTuple(tuple);
+            }
         }
         return super.next();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index bb07cff..670ced7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -19,13 +19,14 @@ package org.apache.phoenix.iterate;
 
 import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
 import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkPositionIndex;
+import static org.apache.phoenix.util.ScanUtil.getDummyTuple;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Queue;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Result;
@@ -36,9 +37,9 @@ import org.apache.phoenix.compile.ExplainPlanAttributes
 import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
-import org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.SizedUtil;
@@ -146,7 +147,10 @@ public class OrderedResultIterator implements PeekingResultIterator {
     private final long estimatedByteSize;
     
     private PeekingResultIterator resultIterator;
+    private boolean resultIteratorReady = false;
+    private Tuple dummyTuple = null;
     private long byteSize;
+    private long pageSizeMs;
 
     protected ResultIterator getDelegate() {
         return delegate;
@@ -154,7 +158,7 @@ public class OrderedResultIterator implements PeekingResultIterator {
     
     public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions,
             boolean spoolingEnabled, long thresholdBytes, Integer limit, Integer offset) {
-        this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, limit, offset, 0);
+        this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, limit, offset, 0, Long.MAX_VALUE);
     }
 
     public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions,
@@ -163,8 +167,14 @@ public class OrderedResultIterator implements PeekingResultIterator {
     }
 
     public OrderedResultIterator(ResultIterator delegate,
+                                 List<OrderByExpression> orderByExpressions, boolean spoolingEnabled,
+                                 long thresholdBytes, Integer limit, Integer offset, int estimatedRowSize) {
+        this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, limit, offset, estimatedRowSize, Long.MAX_VALUE);
+    }
+
+    public OrderedResultIterator(ResultIterator delegate,
             List<OrderByExpression> orderByExpressions, boolean spoolingEnabled,
-            long thresholdBytes, Integer limit, Integer offset, int estimatedRowSize) {
+            long thresholdBytes, Integer limit, Integer offset, int estimatedRowSize, long pageSizeMs) {
         checkArgument(!orderByExpressions.isEmpty());
         this.delegate = delegate;
         this.orderByExpressions = orderByExpressions;
@@ -188,6 +198,7 @@ public class OrderedResultIterator implements PeekingResultIterator {
         assert(limit == null || Long.MAX_VALUE / estimatedEntrySize >= limit + this.offset);
 
         this.estimatedByteSize = limit == null ? 0 : (limit + this.offset) * estimatedEntrySize;
+        this.pageSizeMs = pageSizeMs;
     }
 
     public Integer getLimit() {
@@ -245,11 +256,17 @@ public class OrderedResultIterator implements PeekingResultIterator {
     
     @Override
     public Tuple next() throws SQLException {
-        return getResultIterator().next();
+        getResultIterator();
+        if (!resultIteratorReady) {
+            return dummyTuple;
+        }
+        return resultIterator.next();
     }
     
     private PeekingResultIterator getResultIterator() throws SQLException {
-        if (resultIterator != null) {
+        if (resultIteratorReady) {
+            // The results have not been ordered yet. When the results are ordered then the result iterator
+            // will be ready to iterate over them
             return resultIterator;
         }
         
@@ -257,11 +274,17 @@ public class OrderedResultIterator implements PeekingResultIterator {
         List<Expression> expressions = Lists.newArrayList(Collections2.transform(orderByExpressions, TO_EXPRESSION));
         final Comparator<ResultEntry> comparator = buildComparator(orderByExpressions);
         try{
-            final SizeAwareQueue<ResultEntry> queueEntries =
-                    PhoenixQueues.newResultEntrySortedQueue(comparator, limit, spoolingEnabled,
-                        thresholdBytes);
-            resultIterator = new RecordPeekingResultIterator(queueEntries);
+            if (resultIterator == null) {
+                resultIterator = new RecordPeekingResultIterator(PhoenixQueues.newResultEntrySortedQueue(comparator,
+                        limit, spoolingEnabled, thresholdBytes));
+            }
+            final SizeAwareQueue<ResultEntry> queueEntries = ((RecordPeekingResultIterator)resultIterator).getQueueEntries();
+            long startTime = EnvironmentEdgeManager.currentTimeMillis();
             for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
+                if (isDummy(result)) {
+                    dummyTuple = result;
+                    return resultIterator;
+                }
                 int pos = 0;
                 ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[numSortKeys];
                 for (Expression expression : expressions) {
@@ -271,7 +294,12 @@ public class OrderedResultIterator implements PeekingResultIterator {
                     sortKeys[pos++] = evaluated && sortKey.getLength() > 0 ? sortKey : null;
                 }
                 queueEntries.add(new ResultEntry(sortKeys, result));
+                if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+                    dummyTuple = getDummyTuple(result);
+                    return resultIterator;
+                }
             }
+            resultIteratorReady = true;
             this.byteSize = queueEntries.getByteSize();
         } catch (IOException e) {
             ServerUtil.createIOException(e.getMessage(), e);
@@ -337,6 +365,10 @@ public class OrderedResultIterator implements PeekingResultIterator {
             this.queueEntries = queueEntries;
         }
 
+        public SizeAwareQueue<ResultEntry> getQueueEntries() {
+            return queueEntries;
+        }
+
         @Override
         public Tuple next() throws SQLException {
             ResultEntry entry = queueEntries.poll();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index a00a6a9..83e2290 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -20,7 +20,11 @@ package org.apache.phoenix.iterate;
 
 import static org.apache.phoenix.coprocessor.ScanRegionObserver.WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
 
 import org.apache.hadoop.hbase.Cell;
@@ -54,6 +58,7 @@ import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -118,6 +123,7 @@ public abstract class RegionScannerFactory {
       private RegionInfo regionInfo = env.getRegionInfo();
       private byte[] actualStartKey = getActualStartKey();
       private boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
+      final long pageSizeMs = getPageSizeMsForRegionScanner(scan);
 
       // Get the actual scan start row of local index. This will be used to compare the row
       // key of the results less than scan start row when there are references.
@@ -129,7 +135,11 @@ public abstract class RegionScannerFactory {
       @Override
       public boolean next(List<Cell> results) throws IOException {
         try {
-          return s.next(results);
+          boolean next = s.next(results);
+          if (isDummy(results)) {
+            return true;
+          }
+          return next;
         } catch (Throwable t) {
           ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
           return false; // impossible
@@ -170,6 +180,9 @@ public abstract class RegionScannerFactory {
       public boolean nextRaw(List<Cell> result) throws IOException {
         try {
           boolean next = s.nextRaw(result);
+          if (isDummy(result)) {
+            return true;
+          }
           Cell arrayElementCell = null;
           if (result.size() == 0) {
             return next;
@@ -182,7 +195,7 @@ public abstract class RegionScannerFactory {
             if(actualStartKey!=null) {
               next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result,
                   null, arrayElementCell);
-              if (result.isEmpty()) {
+              if (result.isEmpty() || isDummy(result)) {
                 return next;
               }
             }
@@ -290,8 +303,15 @@ public abstract class RegionScannerFactory {
           ScannerContext scannerContext, Cell arrayElementCell) throws IOException {
         boolean next = true;
         Cell firstCell = result.get(0);
+        long startTime = EnvironmentEdgeManager.currentTimeMillis();
         while (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(),
             firstCell.getRowLength(), actualStartKey, 0, actualStartKey.length) < 0) {
+          if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+            byte[] rowKey = CellUtil.cloneRow(result.get(0));
+            result.clear();
+            getDummyResult(rowKey, result);
+            return true;
+          }
           result.clear();
           if(scannerContext == null) {
             next = s.nextRaw(result);
@@ -301,6 +321,9 @@ public abstract class RegionScannerFactory {
           if (result.isEmpty()) {
             return next;
           }
+          if (isDummy(result)) {
+            return true;
+          }
           if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
             int arrayElementCellPosition = replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
             arrayElementCell = result.get(arrayElementCellPosition);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index a5a40e2..caf91c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -23,16 +23,20 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ServerUtil;
 
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
 
 public class RegionScannerResultIterator extends BaseResultIterator {
     private final RegionScanner scanner;
@@ -63,6 +67,9 @@ public class RegionScannerResultIterator extends BaseResultIterator {
                 if (!hasMore && results.isEmpty()) {
                     return null;
                 }
+                if (isDummy(results)) {
+                    return new ResultTuple(Result.create(results));
+                }
                 // We instantiate a new tuple because in all cases currently we hang on to it
                 // (i.e. to compute and hold onto the TopN).
                 Tuple tuple = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index c1081ee..61924c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -156,7 +156,7 @@ public class ScanningResultIterator implements ResultIterator {
     public Tuple next() throws SQLException {
         try {
             Result result = scanner.next();
-            while (result != null && isDummy(result)) {
+            while (result != null && (result.isEmpty() || isDummy(result))) {
                 result = scanner.next();
             }
             if (result == null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index c0f9c2d..5d35311 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -26,7 +26,6 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NO
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED;
-import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -37,10 +36,10 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.AbstractClientScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
@@ -55,6 +54,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
@@ -139,7 +139,15 @@ public class TableResultIterator implements ResultIterator {
                 .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
         ScanUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection());
         ScanUtil.setScanAttributesForPhoenixTTL(scan, table, plan.getContext().getConnection());
-        scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGING, TRUE_BYTES);
+        long pageSizeMs = plan.getContext().getConnection().getQueryServices().getProps()
+                .getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1);
+        if (pageSizeMs == -1) {
+            // Use the half of the HBase RPC timeout value as the the server page size to make sure that the HBase
+            // region server will be able to send a heartbeat message to the client before the client times out
+            pageSizeMs = (long) (plan.getContext().getConnection().getQueryServices().getProps()
+                    .getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT) * 0.5);
+        }
+        scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, Bytes.toBytes(Long.valueOf(pageSizeMs)));
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index efcb3ab..77aa683 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -327,11 +327,10 @@ public interface QueryServices extends SQLCloseable {
     public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = "phoenix.index.longViewIndex.enabled";
     // The number of index rows to be rebuild in one RPC call
     public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows";
-    // The number of rows to be scanned in one RPC call
-    public static final String UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.ungrouped.aggregate_page_size_in_ms";
-    public static final String GROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.grouped.aggregate_page_size_in_ms";
     // Flag indicating that server side masking of ttl expired rows is enabled.
     public static final String PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED = "phoenix.ttl.server_side.masking.enabled";
+    // The time limit on the amount of work to be done in one RPC call
+    public static final String PHOENIX_SERVER_PAGE_SIZE_MS = "phoenix.server.page.size.ms";
 
 
     // Before 4.15 when we created a view we included the parent table column metadata in the view
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index a1c89f6..2424451 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -338,8 +338,6 @@ public class QueryServicesOptions {
     public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
     public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
     public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024;
-    public static final long DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = 1000; // 1 second
-    public static final long DEFAULT_GROUPED_AGGREGATE_PAGE_SIZE_IN_MS = 1000;
 
     public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
index 2bdd799..e280de7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.schema.tuple;
 import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
 import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
 import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 import java.util.Collection;
 import java.util.ConcurrentModificationException;
@@ -137,6 +138,12 @@ public class EncodedColumnQualiferCellsList implements List<Cell> {
         if (e == null) {
             throw new NullPointerException();
         }
+        if (isDummy(e)) {
+            array[0] = e;
+            firstNonNullElementIdx = 0;
+            numNonNullElements = 1;
+            return true;
+        }
         int columnQualifier =
                 encodingScheme.decode(e.getQualifierArray(), e.getQualifierOffset(),
                     e.getQualifierLength());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
index 3419e3c..825728f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
@@ -35,7 +35,6 @@ import org.apache.phoenix.util.PhoenixKeyValueUtil;
 public class ResultTuple extends BaseTuple {
     private final Result result;
     public static final ResultTuple EMPTY_TUPLE = new ResultTuple(Result.create(Collections.<Cell>emptyList()));
-    
     public ResultTuple(Result result) {
         this.result = result;
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index e3b7a4e..ad3dd9f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -68,6 +68,7 @@ import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.filter.DistinctPrefixFilter;
 import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
 import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
+import org.apache.phoenix.filter.PagedFilter;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
@@ -89,6 +90,8 @@ import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.slf4j.Logger;
@@ -677,7 +680,7 @@ public class ScanUtil {
 
     // Start/stop row must be swapped if scan is being done in reverse
     public static void setupReverseScan(Scan scan) {
-        if (isReversed(scan)) {
+        if (isReversed(scan) && !scan.isReversed()) {
             byte[] newStartRow = getReversedRow(scan.getStartRow());
             byte[] newStopRow = getReversedRow(scan.getStopRow());
             scan.setStartRow(newStopRow);
@@ -761,6 +764,12 @@ public class ScanUtil {
         if (filter == null) {
             return;
         }
+        if (filter instanceof PagedFilter) {
+            filter = ((PagedFilter) filter).getDelegateFilter();
+            if (filter == null) {
+                return;
+            }
+        }
         if (filter instanceof FilterList) {
             FilterList filterList = (FilterList)filter;
             for (Filter childFilter : filterList.getFilters()) {
@@ -1206,6 +1215,15 @@ public class ScanUtil {
             if (!ScanUtil.isDeleteTTLExpiredRows(scan)) {
                 scan.setAttribute(BaseScannerRegionObserver.MASK_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES);
             }
+            if (ScanUtil.isLocalIndex(scan)) {
+                byte[] actualStartRow = scan.getAttribute(SCAN_ACTUAL_START_ROW) != null ?
+                        scan.getAttribute(SCAN_ACTUAL_START_ROW) :
+                        HConstants.EMPTY_BYTE_ARRAY;
+                ScanUtil.setLocalIndexAttributes(scan, 0,
+                        actualStartRow,
+                        HConstants.EMPTY_BYTE_ARRAY,
+                        scan.getStartRow(), scan.getStopRow());
+            }
             addEmptyColumnToScan(scan, emptyColumnFamilyName, emptyColumnName);
         }
     }
@@ -1222,22 +1240,85 @@ public class ScanUtil {
         getDummyResult(EMPTY_BYTE_ARRAY, result);
     }
 
+    public static Tuple getDummyTuple(byte[] rowKey) {
+        List<Cell> result = new ArrayList<Cell>(1);
+        getDummyResult(rowKey, result);
+        return new ResultTuple(Result.create(result));
+    }
+
+    public static Tuple getDummyTuple() {
+        List<Cell> result = new ArrayList<Cell>(1);
+        getDummyResult(result);
+        return new ResultTuple(Result.create(result));
+    }
+
+    public static Tuple getDummyTuple(Tuple tuple) {
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        tuple.getKey(ptr);
+        return getDummyTuple(ptr.copyBytes());
+    }
+
+    public static boolean isDummy(Cell cell) {
+        return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+    }
+
     public static boolean isDummy(Result result) {
-        // Check if the result is a dummy result
         if (result.rawCells().length != 1) {
             return false;
         }
         Cell cell = result.rawCells()[0];
-        return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+        return isDummy(cell);
     }
 
     public static boolean isDummy(List<Cell> result) {
-        // Check if the result is a dummy result
         if (result.size() != 1) {
             return false;
         }
         Cell cell = result.get(0);
-        return CellUtil.matchingColumn(cell, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+        return isDummy(cell);
+    }
+
+    public static boolean isDummy(Tuple tuple) {
+        if (tuple instanceof ResultTuple) {
+            isDummy(((ResultTuple) tuple).getResult());
+        }
+        return false;
+    }
+
+    public static PagedFilter getPhoenixPagedFilter(Scan scan) {
+        Filter filter = scan.getFilter();
+        if (filter != null && filter instanceof PagedFilter) {
+            PagedFilter pageFilter = (PagedFilter) filter;
+            return pageFilter;
+        }
+        return null;
+    }
+
+    /**
+     *
+     * The server page size expressed in ms is the maximum time we want the Phoenix server code to spend
+     * for each iteration of ResultScanner. For each ResultScanner#next() can be translated into one or more
+     * HBase RegionScanner#next() calls by a Phoenix RegionScanner object in a loop. To ensure that the total
+     * time spent by the Phoenix server code will not exceed the configured page size value, SERVER_PAGE_SIZE_MS,
+     * the loop time in a Phoenix region scanner is limited by 0.6 * SERVER_PAGE_SIZE_MS and
+     * each HBase RegionScanner#next() time which is controlled by PagedFilter is set to 0.3 * SERVER_PAGE_SIZE_MS.
+     *
+     */
+    private static long getPageSizeMs(Scan scan) {
+        long pageSizeMs = Long.MAX_VALUE;
+        byte[] pageSizeMsBytes = scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS);
+        if (pageSizeMsBytes != null) {
+            pageSizeMs = Bytes.toLong(pageSizeMsBytes);
+        }
+        return pageSizeMs;
+    }
+
+    public static long getPageSizeMsForRegionScanner(Scan scan) {
+        return (long) (getPageSizeMs(scan) * 0.6);
+    }
+
+    public static long getPageSizeMsForFilter(Scan scan) {
+        return (long) (getPageSizeMs(scan) * 0.3);
     }
 
     /**
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 4894727..797ad7e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -640,11 +640,8 @@ public abstract class BaseTest {
         // This results in processing one row at a time in each next operation of the aggregate region
         // scanner, i.e.,  one row pages. In other words, 0ms page allows only one row to be processed
         // within one page; 0ms page is equivalent to one-row page
-        if (conf.getLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0) == 0) {
-            conf.setLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0);
-        }
-        if (conf.getLong(QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0) == 0) {
-            conf.setLong(QueryServices.GROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0);
+        if (conf.getLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 0) == 0) {
+            conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 0);
         }
         return conf;
     }