You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by tk...@apache.org on 2023/11/30 18:54:00 UTC

(phoenix) branch 5.1 updated: Backport PHOENIX-7024 and PHOENIX-6909 to 5.1 (#1746)

This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new 0120f95489 Backport PHOENIX-7024 and PHOENIX-6909 to 5.1 (#1746)
0120f95489 is described below

commit 0120f954891741a3473e8e6cd720ff7042e91ec7
Author: tkhurana <kh...@gmail.com>
AuthorDate: Thu Nov 30 10:53:54 2023 -0800

    Backport PHOENIX-7024 and PHOENIX-6909 to 5.1 (#1746)
    
    * PHOENIX-7024 Fix issues in Server Paging
    
    * PHOENIX-6909 Paged rows counter metric
---
 .../org/apache/phoenix/end2end/BaseOrderByIT.java  |  20 +-
 .../org/apache/phoenix/end2end/ServerPagingIT.java | 369 +++++++++++++++++++++
 .../phoenix/iterate/PhoenixQueryTimeoutIT.java     |  14 +-
 .../phoenix/coprocessor/PagedRegionScanner.java    |  19 +-
 .../UncoveredGlobalIndexRegionScanner.java         |   3 +
 .../coprocessor/UncoveredIndexRegionScanner.java   |  30 +-
 .../UncoveredLocalIndexRegionScanner.java          |   3 +
 .../org/apache/phoenix/filter/PagedFilter.java     |  84 ++---
 .../phoenix/iterate/OffsetResultIterator.java      |  19 +-
 .../phoenix/iterate/ScanningResultIterator.java    |  14 +-
 .../phoenix/monitoring/GlobalClientMetrics.java    |   4 +-
 .../org/apache/phoenix/monitoring/MetricType.java  |   4 +-
 .../phoenix/monitoring/ScanMetricsHolder.java      |   8 +
 .../java/org/apache/phoenix/util/ScanUtil.java     |  12 +-
 14 files changed, 515 insertions(+), 88 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java
index a0ef9bdb72..dcd7463887 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java
@@ -44,16 +44,25 @@ import java.util.Properties;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.ExplainPlanAttributes;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryBuilder;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
 import org.junit.Test;
 
 
 public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
 
+    Properties props;
+    @Before
+    public void setup() {
+        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(0));
+    }
+
     @Test
     public void testMultiOrderByExpr() throws Exception {
         String tenantId = getOrganizationId();
@@ -63,7 +72,7 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
                     Lists.newArrayList("ENTITY_ID", "B_STRING"))
             .setFullTableName(tableName)
             .setOrderByClause("B_STRING, ENTITY_ID");
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             ResultSet rs = executeQuery(conn, queryBuilder);
             assertTrue (rs.next());
@@ -98,7 +107,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
                     Lists.newArrayList("ENTITY_ID", "B_STRING"))
             .setFullTableName(tableName)
             .setOrderByClause("B_STRING || ENTITY_ID DESC");
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             ResultSet rs = executeQuery(conn, queryBuilder);
             assertTrue (rs.next());
@@ -126,7 +134,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
 
     @Test
     public void testOrderByDifferentColumns() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(false);
             String tableName = generateUniqueName();
@@ -201,7 +208,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
 
     @Test
     public void testAggregateOrderBy() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
         String tableName = generateUniqueName();
         String ddl = "create table " + tableName + " (ID VARCHAR NOT NULL PRIMARY KEY, VAL1 VARCHAR, VAL2 INTEGER)";
@@ -262,7 +268,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
 
     @Test
     public void testAggregateOptimizedOutOrderBy() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
         String tableName = generateUniqueName();
         String ddl = "create table " + tableName + " (K1 VARCHAR NOT NULL, K2 VARCHAR NOT NULL, VAL1 VARCHAR, VAL2 INTEGER, CONSTRAINT pk PRIMARY KEY(K1,K2))";
@@ -342,7 +347,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
 
     @Test
     public void testNullsLastWithDesc() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             String tableName=generateUniqueName();
             String sql="CREATE TABLE "+tableName+" ( "+
@@ -606,7 +610,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
     }
 
     private void doTestOrderByReverseOptimization(boolean salted,boolean desc1,boolean desc2,boolean desc3) throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             String tableName=generateUniqueName();
             String sql="CREATE TABLE "+tableName+" ( "+
@@ -712,7 +715,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
     }
 
     private void doTestOrderByReverseOptimizationWithNullsLast(boolean salted,boolean desc1,boolean desc2,boolean desc3) throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             String tableName=generateUniqueName();
             String sql="CREATE TABLE "+tableName+" ( "+
@@ -958,7 +960,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
 
     @Test
     public void testOrderByNullable() throws SQLException {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             String sql = "CREATE TABLE IF NOT EXISTS us_population (state CHAR(2) NOT NULL," +
                     "city VARCHAR NOT NULL, population BIGINT CONSTRAINT my_pk PRIMARY KEY" +
@@ -1000,7 +1001,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
 
     @Test
     public void testPhoenix6999() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         String tableName = "TBL_" + generateUniqueName();
         String descTableName = "TBL_" + generateUniqueName();
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
new file mode 100644
index 0000000000..d4368ee1d7
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlan;
+import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlanWithLimit;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ServerPagingIT extends ParallelStatsDisabledIT {
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(0));
+        props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    private void assertServerPagingMetric(String tableName, ResultSet rs, boolean isPaged) throws SQLException {
+        Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+        for (Map.Entry<String, Map<MetricType, Long>> entry : metrics.entrySet()) {
+            assertEquals(String.format("Got %s", entry.getKey()), tableName, entry.getKey());
+            Map<MetricType, Long> metricValues = entry.getValue();
+            Long pagedRowsCntr = metricValues.get(MetricType.PAGED_ROWS_COUNTER);
+            assertNotNull(pagedRowsCntr);
+            if (isPaged) {
+                assertTrue(String.format("Got %d", pagedRowsCntr.longValue()), pagedRowsCntr > 0);
+            } else {
+                assertTrue(String.format("Got %d", pagedRowsCntr.longValue()), pagedRowsCntr == 0);
+            }
+        }
+        assertTrue(GLOBAL_PAGED_ROWS_COUNTER.getMetric().getValue() > 0);
+    }
+
+    @Test
+    public void testOrderByNonAggregation() throws Exception {
+        final String tablename = generateUniqueName();
+        final String tenantId = getOrganizationId();
+
+        final Date D1 = DateUtil.parseDate("1970-01-01 00:58:00");
+        final Date D2 = DateUtil.parseDate("1970-01-01 01:02:00");
+        final Date D3 = DateUtil.parseDate("1970-01-01 01:30:00");
+        final Date D4 = DateUtil.parseDate("1970-01-01 01:45:00");
+        final Date D5 = DateUtil.parseDate("1970-01-01 02:00:00");
+        final Date D6 = DateUtil.parseDate("1970-01-01 04:00:00");
+        final String F1 = "A";
+        final String F2 = "B";
+        final String F3 = "C";
+        final String R1 = "R1";
+        final String R2 = "R2";
+        byte[][] splits = new byte[][] {
+                ByteUtil.concat(Bytes.toBytes(tenantId), PDate.INSTANCE.toBytes(D3)),
+                ByteUtil.concat(Bytes.toBytes(tenantId), PDate.INSTANCE.toBytes(D5)),
+        };
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table " + tablename +
+                    "   (organization_id char(15) not null," +
+                    "    date date not null," +
+                    "    feature char(1) not null," +
+                    "    unique_users integer not null,\n" +
+                    "    transactions bigint,\n" +
+                    "    region varchar,\n" +
+                    "    CONSTRAINT pk PRIMARY KEY (organization_id, \"DATE\", feature, unique_users))";
+            StringBuilder buf = new StringBuilder(ddl);
+            if (splits != null) {
+                buf.append(" SPLIT ON (");
+                for (int i = 0; i < splits.length; i++) {
+                    buf.append("'").append(Bytes.toString(splits[i])).append("'").append(",");
+                }
+                buf.setCharAt(buf.length()-1, ')');
+            }
+            ddl = buf.toString();
+            conn.createStatement().execute(ddl);
+
+            PreparedStatement stmt = conn.prepareStatement(
+                    "upsert into " + tablename +
+                            " (" +
+                            "    ORGANIZATION_ID, " +
+                            "    \"DATE\", " +
+                            "    FEATURE, " +
+                            "    UNIQUE_USERS, " +
+                            "    TRANSACTIONS, " +
+                            "    REGION) " +
+                            "VALUES (?, ?, ?, ?, ?, ?)");
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, D1);
+            stmt.setString(3, F1);
+            stmt.setInt(4, 10);
+            stmt.setLong(5, 100L);
+            stmt.setString(6, R2);
+            stmt.execute();
+
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, D2);
+            stmt.setString(3, F1);
+            stmt.setInt(4, 20);
+            stmt.setLong(5, 200);
+            stmt.setString(6, null);
+            stmt.execute();
+
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, D3);
+            stmt.setString(3, F1);
+            stmt.setInt(4, 30);
+            stmt.setLong(5, 300);
+            stmt.setString(6, R1);
+            stmt.execute();
+
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, D4);
+            stmt.setString(3, F2);
+            stmt.setInt(4, 40);
+            stmt.setLong(5, 400);
+            stmt.setString(6, R1);
+            stmt.execute();
+
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, D5);
+            stmt.setString(3, F3);
+            stmt.setInt(4, 50);
+            stmt.setLong(5, 500);
+            stmt.setString(6, R2);
+            stmt.execute();
+
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, D6);
+            stmt.setString(3, F1);
+            stmt.setInt(4, 60);
+            stmt.setLong(5, 600);
+            stmt.setString(6, null);
+            stmt.execute();
+            conn.commit();
+        }
+
+        String query = "SELECT \"DATE\", transactions t FROM "+tablename+
+                " WHERE organization_id=? AND unique_users <= 30 ORDER BY t DESC LIMIT 2";
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+            PreparedStatement statement = conn.prepareStatement(query)) {
+            statement.setString(1, tenantId);
+            try (ResultSet rs = statement.executeQuery()) {
+                assertTrue(rs.next());
+                assertEquals(D3.getTime(), rs.getDate(1).getTime());
+                assertTrue(rs.next());
+                assertEquals(D2.getTime(), rs.getDate(1).getTime());
+                assertFalse(rs.next());
+                assertServerPagingMetric(tablename, rs, true);
+            }
+        }
+    }
+
+    @Test
+    public void testLimitOffset() throws SQLException {
+        final String tablename = generateUniqueName();
+        final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n",
+                "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
+        String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
+                + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n"
+                + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) " + "SPLIT ON ('e','i','o')";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            createTestTable(getUrl(), ddl);
+            for (int i = 0; i < 26; i++) {
+                conn.createStatement().execute("UPSERT INTO " + tablename + " values('" + STRINGS[i] + "'," + i + ","
+                        + (i + 1) + "," + (i + 2) + ",'" + STRINGS[25 - i] + "')");
+            }
+            conn.commit();
+            int limit = 10;
+            // Testing 0 as remaining offset after 4 rows in first region, 4 rows in second region
+            int offset = 8;
+            ResultSet rs;
+            rs = conn.createStatement()
+                    .executeQuery("SELECT t_id from " + tablename + " order by t_id limit " + limit + " offset " + offset);
+            int i = 0;
+            while (i < limit) {
+                assertTrue(rs.next());
+                assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i], rs.getString(1));
+                i++;
+            }
+            // no paging when serial offset
+            assertServerPagingMetric(tablename, rs, false);
+
+            // Testing query with offset + filter
+            int filterCond = 10;
+            rs = conn.createStatement().executeQuery(
+                    "SELECT t_id from " + tablename + " where k2 > " + filterCond +
+                            " order by t_id limit " + limit + " offset " + offset);
+            i = 0;
+            limit = 5;
+            while (i < limit) {
+                assertTrue(rs.next());
+                assertEquals("Expected string didn't match for i = " + i,
+                        STRINGS[offset + filterCond + i], rs.getString(1));
+                i++;
+            }
+            // no paging when serial offset
+            assertServerPagingMetric(tablename, rs, false);
+
+            limit = 35;
+            rs = conn.createStatement().executeQuery("SELECT t_id from " + tablename + " union all SELECT t_id from "
+                    + tablename + " offset " + offset + " FETCH FIRST " + limit + " rows only");
+            i = 0;
+            while (i++ < STRINGS.length - offset) {
+                assertTrue(rs.next());
+                assertEquals(STRINGS[offset + i - 1], rs.getString(1));
+            }
+            i = 0;
+            while (i++ < limit - STRINGS.length - offset) {
+                assertTrue(rs.next());
+                assertEquals(STRINGS[i - 1], rs.getString(1));
+            }
+            // no paging when serial offset
+            assertServerPagingMetric(tablename, rs, false);
+            limit = 1;
+            offset = 1;
+            rs = conn.createStatement()
+                    .executeQuery("SELECT k2 from " + tablename + " order by k2 desc limit " + limit + " offset " + offset);
+            assertTrue(rs.next());
+            assertEquals(25, rs.getInt(1));
+            assertFalse(rs.next());
+            // because of descending order the offset is implemented on client
+            // so this generates a parallel scan and paging happens
+            assertServerPagingMetric(tablename, rs, true);
+        }
+    }
+
+    @Test
+    public void testGroupBy() throws SQLException {
+        final String tablename = generateUniqueName();
+        final String indexName = generateUniqueName();
+
+        String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
+                + "k2 INTEGER CONSTRAINT pk PRIMARY KEY (t_id, k1)) ";
+        String indexDDl = "CREATE INDEX IF NOT EXISTS " +  indexName + " ON " + tablename + "(k2)";
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            createTestTable(getUrl(), ddl);
+            createTestTable(getUrl(), indexDDl);
+            for (int i = 0; i < 8; i++) {
+                conn.createStatement().execute("UPSERT INTO " + tablename + " values('tenant1'," + i + ","
+                        + (i + 1) + ")");
+            }
+            conn.commit();
+            ResultSet rs = conn.createStatement().executeQuery(
+                    "SELECT count(*) FROM " + tablename + " where t_id = 'tenant1' AND (k2 IN (5,6) or k2 is null) group by k2=6");
+            while (rs.next()) {
+                Assert.assertEquals(1, rs.getInt(1));
+            }
+            Assert.assertFalse(rs.next());
+            assertServerPagingMetric(indexName, rs, true);
+        }
+    }
+
+    @Test
+    public void testUncoveredQuery() throws Exception {
+        String dataTableName = generateUniqueName();
+        populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX "
+                    + indexTableName + " on " + dataTableName + " (val1) include (val2) ");
+            String selectSql;
+            int limit = 10;
+            //Verify that with index hint, we will read from the index table even though val3 is not included by the index table
+            selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ val3,val2 from "
+                    + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde') LIMIT " + limit;
+            assertExplainPlanWithLimit(conn, selectSql, dataTableName, indexTableName, limit);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("bcde", rs.getString(1));
+            assertEquals("bcd", rs.getString(2));
+            assertFalse(rs.next());
+            assertServerPagingMetric(indexTableName, rs, true);
+
+            // Add another row and run a group by query where the uncovered index should be used
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2, val3) values ('c', 'ab','cde', 'cdef')");
+            conn.commit();
+
+            selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(val3) from " + dataTableName + " where val1 > '0' GROUP BY val1";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertFalse(rs.next());
+
+            selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(val3) from " + dataTableName + " where val1 > '0'";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+
+            // Run an order by query where the uncovered index should be used
+            selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + "*/ val3 from " + dataTableName + " where val1 > '0' ORDER BY val1";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("abcd", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("cdef", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("bcde", rs.getString(1));
+            assertFalse(rs.next());
+        }
+    }
+
+    private void populateTable(String tableName) throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("create table " + tableName +
+                " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10)," +
+                " val3 varchar(10))");
+        conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
+        conn.commit();
+        conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')");
+        conn.commit();
+        conn.close();
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
index b73da96d98..e807e75047 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
@@ -27,6 +27,7 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
 import java.util.Properties;
 
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
@@ -119,11 +120,16 @@ public class PhoenixQueryTimeoutIT extends ParallelStatsDisabledIT {
             fail("Expected query to timeout with a 1 ms timeout");
         } catch (SQLException e) {
             //OPERATION_TIMED_OUT Exception expected
-            if (e.getErrorCode() == IO_EXCEPTION.getErrorCode() && e.getCause() instanceof SQLException) {
-                assertEquals(OPERATION_TIMED_OUT.getErrorCode(), ((SQLException) e.getCause()).getErrorCode());
-            } else {
-                assertEquals(OPERATION_TIMED_OUT.getErrorCode(), e.getErrorCode());
+            Throwable t = e;
+            // SQLTimeoutException can be wrapped inside outer exceptions like PhoenixIOException
+            while (t != null && !(t instanceof SQLTimeoutException)) {
+                t = t.getCause();
             }
+            if (t == null) {
+                fail("Expected query to fail with SQLTimeoutException");
+            }
+            assertEquals(OPERATION_TIMED_OUT.getErrorCode(),
+                    ((SQLTimeoutException)t).getErrorCode());
         } finally {
             BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(false);
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java
index 9e4e0d625d..03460af96a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java
@@ -24,7 +24,11 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.filter.PagedFilter;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.phoenix.util.ScanUtil.getDummyResult;
 import static org.apache.phoenix.util.ScanUtil.getPhoenixPagedFilter;
@@ -42,6 +46,9 @@ public class PagedRegionScanner extends BaseRegionScanner {
     protected Region region;
     protected Scan scan;
     protected PagedFilter pageFilter;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PagedRegionScanner.class);
+
 	public PagedRegionScanner(Region region, RegionScanner scanner, Scan scan) {
 	    super(scanner);
 	    this.region = region;
@@ -54,6 +61,9 @@ public class PagedRegionScanner extends BaseRegionScanner {
 
     private boolean next(List<Cell> results, boolean raw) throws IOException {
 	    try {
+            if (pageFilter != null) {
+                pageFilter.init();
+            }
             boolean hasMore = raw ? delegate.nextRaw(results) : delegate.next(results);
             if (pageFilter == null) {
                 return hasMore;
@@ -65,19 +75,20 @@ public class PagedRegionScanner extends BaseRegionScanner {
                     // Close the current region scanner, start a new one and return a dummy result
                     delegate.close();
                     byte[] rowKey = pageFilter.getRowKeyAtStop();
-                    scan.withStartRow(rowKey, true);
+                    boolean isInclusive = pageFilter.isNextRowInclusive();
+                    scan.withStartRow(rowKey, isInclusive);
                     delegate = region.getScanner(scan);
                     if (results.isEmpty()) {
-                        getDummyResult(rowKey, results);
+                        LOGGER.info("Page filter stopped, generating dummy key {} inclusive={}",
+                                Bytes.toStringBinary(rowKey), isInclusive);
+                        ScanUtil.getDummyResult(rowKey, results);
                     }
-                    pageFilter.init();
                     return true;
                 }
                 return false;
             } else {
                 // We got a row from the HBase scanner within the configured time (i.e., the page size). We need to
                 // start a new page on the next next() call.
-                pageFilter.resetStartTime();
                 return true;
             }
         } catch (Exception e) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
index 5fb1244e91..ddcb6fff7a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
@@ -123,6 +123,9 @@ public class UncoveredGlobalIndexRegionScanner extends UncoveredIndexRegionScann
 
     protected void scanDataRows(Collection<byte[]> dataRowKeys, long startTime) throws IOException {
         Scan dataScan = prepareDataTableScan(dataRowKeys);
+        if (dataScan == null) {
+            return;
+        }
         try (ResultScanner resultScanner = dataHTable.getScanner(dataScan)) {
             for (Result result = resultScanner.next(); (result != null);
                  result = resultScanner.next()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index 7cf87dcf00..c6c45eae02 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -158,17 +158,27 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException {
         List<KeyRange> keys = new ArrayList<>(dataRowKeys.size());
         for (byte[] dataRowKey : dataRowKeys) {
-            keys.add(PVarbinary.INSTANCE.getKeyRange(dataRowKey, SortOrder.ASC));
+            // If the data table scan was interrupted because of paging we retry the scan
+            // but on retry we should only fetch data table rows which we haven't already
+            // fetched.
+            if (!dataRows.containsKey(new ImmutableBytesPtr(dataRowKey))) {
+                keys.add(PVarbinary.INSTANCE.getKeyRange(dataRowKey, SortOrder.ASC));
+            }
+        }
+        if (!keys.isEmpty()) {
+            ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
+            Scan dataScan = new Scan(dataTableScan);
+            dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
+            scanRanges.initializeScan(dataScan);
+            SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+            dataScan.setFilter(new SkipScanFilter(skipScanFilter, false));
+            dataScan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS,
+                    Bytes.toBytes(Long.valueOf(pageSizeMs)));
+            return dataScan;
+        } else {
+            LOGGER.info("All data rows have already been fetched");
+            return null;
         }
-        ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
-        Scan dataScan = new Scan(dataTableScan);
-        dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
-        scanRanges.initializeScan(dataScan);
-        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
-        dataScan.setFilter(new SkipScanFilter(skipScanFilter, false));
-        scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS,
-                Bytes.toBytes(Long.valueOf(pageSizeMs)));
-        return dataScan;
     }
 
     protected boolean scanIndexTableRows(List<Cell> result,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
index 0d6ddc39d0..d7273aa8a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
@@ -67,6 +67,9 @@ public class UncoveredLocalIndexRegionScanner extends UncoveredIndexRegionScanne
 
     protected void scanDataRows(Collection<byte[]> dataRowKeys, long startTime) throws IOException {
         Scan dataScan = prepareDataTableScan(dataRowKeys);
+        if (dataScan == null) {
+            return;
+        }
         try (RegionScanner regionScanner = region.getScanner(dataScan)) {
             boolean hasMore;
             do {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java
index 8d785c3ab4..370dbf007d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java
@@ -46,7 +46,11 @@ public class PagedFilter extends FilterBase implements Writable {
     State state;
     private long pageSizeMs;
     private long startTime;
-    private byte[] rowKeyAtStop;
+    // tracks the row which we will visit next. It is not always a full row key and maybe
+    // just the row key prefix.
+    private Cell nextHintCell;
+    // tracks the row we last visited
+    private Cell currentCell;
     private Filter delegate = null;
 
     public PagedFilter() {
@@ -68,10 +72,19 @@ public class PagedFilter extends FilterBase implements Writable {
     }
 
     public byte[] getRowKeyAtStop() {
-        if (rowKeyAtStop != null) {
-            return Arrays.copyOf(rowKeyAtStop, rowKeyAtStop.length);
+        byte[] rowKeyAtStop = null;
+        if (nextHintCell != null) {
+            // if we have already seeked to the next cell use that when we resume the scan
+            rowKeyAtStop = CellUtil.cloneRow(nextHintCell);
+        } else if (currentCell != null) {
+            rowKeyAtStop = CellUtil.cloneRow(currentCell);
         }
-        return null;
+        return rowKeyAtStop;
+    }
+
+    public boolean isNextRowInclusive() {
+        // since this can be a key prefix we have to set inclusive to true when resuming scan
+        return nextHintCell != null;
     }
 
     public boolean isStopped() {
@@ -80,21 +93,19 @@ public class PagedFilter extends FilterBase implements Writable {
 
     public void init() {
         state = State.INITIAL;
-        rowKeyAtStop = null;
-    }
-
-    public void resetStartTime() {
-        if (state == State.STARTED) {
-            init();
-        }
+        currentCell = null;
+        nextHintCell = null;
     }
 
     @Override
     public void reset() throws IOException {
-        if (state == State.INITIAL) {
-            startTime = EnvironmentEdgeManager.currentTimeMillis();
-            state = State.STARTED;
-        } else if (state == State.STARTED && EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+        long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+        // reset can be called multiple times for the same row sometimes even before we have
+        // scanned even one row. The order in which it is called is not very predictable.
+        // So we need to ensure that we have seen at least one row before we page.
+        // The currentCell != null check ensures that.
+        if (state == State.STARTED && currentCell != null
+                && currentTime - startTime >= pageSizeMs) {
             state = State.TIME_TO_STOP;
         }
         if (delegate != null) {
@@ -107,33 +118,19 @@ public class PagedFilter extends FilterBase implements Writable {
     @Override
     public Cell getNextCellHint(Cell currentKV) throws IOException {
         if (delegate != null) {
-            return delegate.getNextCellHint(currentKV);
+            Cell ret = delegate.getNextCellHint(currentKV);
+            // save the hint so that if the filter stops we know where to resume the scan
+            nextHintCell = ret;
+            return ret;
         }
         return super.getNextCellHint(currentKV);
     }
 
-    public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
-        if (state == State.TIME_TO_STOP) {
-            if (rowKeyAtStop == null) {
-                rowKeyAtStop = new byte[length];
-                Bytes.putBytes(rowKeyAtStop, 0, buffer, offset, length);
-            }
-            return true;
-        }
-        if (delegate != null) {
-            return delegate.filterRowKey(buffer, offset, length);
-        }
-        return super.filterRowKey(buffer, offset, length);
-    }
-
     @Override
     public boolean filterRowKey(Cell cell) throws IOException {
-        if (state == State.TIME_TO_STOP) {
-            if (rowKeyAtStop == null) {
-                rowKeyAtStop = CellUtil.cloneRow(cell);
-            }
-            return true;
-        }
+        currentCell = cell;
+        // now that we have visited the row we need to reset the hint
+        nextHintCell = null;
         if (delegate != null) {
             return delegate.filterRowKey(cell);
         }
@@ -142,10 +139,13 @@ public class PagedFilter extends FilterBase implements Writable {
 
     @Override
     public boolean filterAllRemaining() throws IOException {
-        if (state == State.TIME_TO_STOP && rowKeyAtStop != null) {
+        if (state == State.TIME_TO_STOP) {
             state = State.STOPPED;
             return true;
         }
+        if (state == State.STOPPED) {
+            return true;
+        }
         if (delegate != null) {
             return delegate.filterAllRemaining();
         }
@@ -153,7 +153,14 @@ public class PagedFilter extends FilterBase implements Writable {
     }
 
     @Override
+    /**
+     * This is called once for every row in the beginning.
+     */
     public boolean hasFilterRow() {
+        if (state == State.INITIAL) {
+            startTime = EnvironmentEdgeManager.currentTimeMillis();
+            state = State.STARTED;
+        }
         return true;
     }
 
@@ -211,6 +218,7 @@ public class PagedFilter extends FilterBase implements Writable {
 
     @Override
     public ReturnCode filterKeyValue(Cell v) throws IOException {
+
         if (delegate != null) {
             return delegate.filterKeyValue(v);
         }
@@ -219,6 +227,8 @@ public class PagedFilter extends FilterBase implements Writable {
 
     @Override
     public Filter.ReturnCode filterCell(Cell c) throws IOException {
+        currentCell = c;
+        nextHintCell = null;
         if (delegate != null) {
             return delegate.filterCell(c);
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
index 3eecfc82c3..e00a9fbeb4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
@@ -20,13 +20,10 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 import java.util.List;
 
-import org.apache.phoenix.compile.ExplainPlanAttributes
-    .ExplainPlanAttributesBuilder;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
-import static org.apache.phoenix.util.ScanUtil.getDummyResult;
-import static org.apache.phoenix.util.ScanUtil.getDummyTuple;
+import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
+import org.apache.phoenix.schema.tuple.Tuple;
 
 /**
  * Iterates through tuples up to a limit
@@ -49,14 +46,16 @@ public class OffsetResultIterator extends DelegateResultIterator {
     }
     @Override
     public Tuple next() throws SQLException {
-        long startTime = EnvironmentEdgeManager.currentTimeMillis();
         while (rowCount < offset) {
             Tuple tuple = super.next();
             if (tuple == null) { return null; }
-            rowCount++;
-            if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
-                return getDummyTuple(tuple);
+            if (isDummy(tuple)) {
+                // while rowCount < offset absorb the dummy and call next on the underlying scanner
+                continue;
             }
+            rowCount++;
+            // no page timeout check at this level because we cannot correctly resume
+            // scans for OFFSET queries until the offset is reached
         }
         return super.next();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 8189fbcef9..6b9c84e2c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -40,6 +40,7 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COU
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_RETRIES;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
 import static org.apache.phoenix.util.ScanUtil.isDummy;
 
@@ -52,8 +53,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.phoenix.compile.ExplainPlanAttributes
-    .ExplainPlanAttributesBuilder;
+import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -81,6 +81,8 @@ public class ScanningResultIterator implements ResultIterator {
     private final boolean isMapReduceContext;
     private final long maxQueryEndTime;
 
+    private long dummyRowCounter = 0;
+
     public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext, long maxQueryEndTime) {
         this.scanner = scanner;
         this.scanMetricsHolder = scanMetricsHolder;
@@ -99,13 +101,13 @@ public class ScanningResultIterator implements ResultIterator {
     }
 
     private void changeMetric(CombinableMetric metric, Long value) {
-        if(value != null) {
+        if (value != null) {
             metric.change(value);
         }
     }
 
     private void changeMetric(GlobalClientMetrics metric, Long value) {
-        if(value != null) {
+        if (value != null) {
             metric.update(value);
         }
     }
@@ -139,6 +141,7 @@ public class ScanningResultIterator implements ResultIterator {
                     scanMetricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
             changeMetric(scanMetricsHolder.getCountOfRowsFiltered(),
                     scanMetricsMap.get(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME));
+            changeMetric(scanMetricsHolder.getCountOfRowsPaged(), dummyRowCounter);
 
             changeMetric(GLOBAL_SCAN_BYTES,
                     scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
@@ -165,6 +168,8 @@ public class ScanningResultIterator implements ResultIterator {
             changeMetric(GLOBAL_HBASE_COUNT_ROWS_FILTERED,
                     scanMetricsMap.get(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME));
 
+            changeMetric(GLOBAL_PAGED_ROWS_COUNTER, dummyRowCounter);
+
             scanMetricsUpdated = true;
         }
 
@@ -175,6 +180,7 @@ public class ScanningResultIterator implements ResultIterator {
         try {
             Result result = scanner.next();
             while (result != null && (result.isEmpty() || isDummy(result))) {
+                dummyRowCounter += 1;
                 long timeOutForScan = maxQueryEndTime - EnvironmentEdgeManager.currentTimeMillis();
                 if (timeOutForScan < 0) {
                     throw new SQLExceptionInfo.Builder(OPERATION_TIMED_OUT).setMessage(
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index ad19e05aa5..4a2dacd688 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -30,6 +30,7 @@ import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
 import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.INDEX_COMMIT_FAILURE_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.PAGED_ROWS_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_SERVICES_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_TIME;
@@ -109,7 +110,7 @@ public enum GlobalClientMetrics {
     GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER),
     GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(PHOENIX_CONNECTIONS_THROTTLED_COUNTER),
     GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER(PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER),
-
+    GLOBAL_PAGED_ROWS_COUNTER(PAGED_ROWS_COUNTER),
     GLOBAL_HBASE_COUNT_RPC_CALLS(COUNT_RPC_CALLS),
     GLOBAL_HBASE_COUNT_REMOTE_RPC_CALLS(COUNT_REMOTE_RPC_CALLS),
     GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS(COUNT_MILLS_BETWEEN_NEXTS),
@@ -124,7 +125,6 @@ public enum GlobalClientMetrics {
     GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER(CLIENT_METADATA_CACHE_MISS_COUNTER),
     GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER);
 
-
     private static final Logger LOGGER = LoggerFactory.getLogger(GlobalClientMetrics.class);
     private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled();
     private MetricType metricType;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 75907761e7..fae7c0a5fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -67,6 +67,8 @@ public enum MetricType {
     PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix connections, whether successful or not.",LogLevel.OFF, PLong.INSTANCE),
     CLIENT_METADATA_CACHE_MISS_COUNTER("cmcm", "Number of cache misses for the CQSI cache.", LogLevel.DEBUG, PLong.INSTANCE),
     CLIENT_METADATA_CACHE_HIT_COUNTER("cmch", "Number of cache hits for the CQSI cache.", LogLevel.DEBUG, PLong.INSTANCE),
+    PAGED_ROWS_COUNTER("prc", "Number of dummy rows returned to client due to paging.", LogLevel.DEBUG, PLong.INSTANCE),
+
     // hbase metrics
     COUNT_RPC_CALLS("rp", "Number of RPC calls",LogLevel.DEBUG, PLong.INSTANCE),
     COUNT_REMOTE_RPC_CALLS("rr", "Number of remote RPC calls",LogLevel.DEBUG, PLong.INSTANCE),
@@ -118,7 +120,7 @@ public enum MetricType {
 
     public static String getMetricColumnsDetails() {
         StringBuilder buffer=new StringBuilder();
-        for(MetricType metric:MetricType.values()){
+        for (MetricType metric:MetricType.values()) {
             if (metric.logLevel() != LogLevel.OFF) {
                 buffer.append(metric.columnName());
                 buffer.append(" ");
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
index dd6603e152..8b3276a437 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
@@ -28,6 +28,7 @@ import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED;
 import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS;
 import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES;
 import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS;
+import static org.apache.phoenix.monitoring.MetricType.PAGED_ROWS_COUNTER;
 
 import java.io.IOException;
 import java.util.Map;
@@ -49,6 +50,8 @@ public class ScanMetricsHolder {
     private final CombinableMetric countOfRemoteRPCRetries;
     private final CombinableMetric countOfRowsScanned;
     private final CombinableMetric countOfRowsFiltered;
+    private final CombinableMetric countOfRowsPaged;
+
     private  Map<String, Long> scanMetricMap;
     private Object scan;
 
@@ -78,6 +81,7 @@ public class ScanMetricsHolder {
         countOfRemoteRPCRetries = readMetrics.allotMetric(COUNT_REMOTE_RPC_RETRIES, tableName);
         countOfRowsScanned = readMetrics.allotMetric(COUNT_ROWS_SCANNED, tableName);
         countOfRowsFiltered = readMetrics.allotMetric(COUNT_ROWS_FILTERED, tableName);
+        countOfRowsPaged = readMetrics.allotMetric(PAGED_ROWS_COUNTER, tableName);
     }
 
     public CombinableMetric getCountOfRemoteRPCcalls() {
@@ -128,6 +132,10 @@ public class ScanMetricsHolder {
         return scanMetricMap;
     }
 
+    public CombinableMetric getCountOfRowsPaged() {
+        return countOfRowsPaged;
+    }
+
     public void setScanMetricMap(Map<String, Long> scanMetricMap) {
         this.scanMetricMap = scanMetricMap;
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 98e3a039ca..6122fec9b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -484,7 +484,7 @@ public class ScanUtil {
                 // But if last field of rowKey is variable length and also DESC, the trailing 0xFF
                 // is not removed when stored in HBASE, so for such case, we should not set
                 // lastInclusiveUpperSingleKey back to false.
-                if(sepByte != QueryConstants.DESC_SEPARATOR_BYTE) {
+                if (sepByte != QueryConstants.DESC_SEPARATOR_BYTE) {
                     lastInclusiveUpperSingleKey &= (fieldIndex + slotSpan[i]) < schema.getMaxFields()-1;
                 }
             }
@@ -686,10 +686,10 @@ public class ScanUtil {
     public static void setupLocalIndexScan(Scan scan) {
         byte[] prefix = scan.getStartRow().length == 0 ? new byte[scan.getStopRow().length]: scan.getStartRow();
         int prefixLength = scan.getStartRow().length == 0? scan.getStopRow().length: scan.getStartRow().length;
-        if(scan.getAttribute(SCAN_START_ROW_SUFFIX)!=null) {
+        if(scan.getAttribute(SCAN_START_ROW_SUFFIX) != null) {
             scan.setStartRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_START_ROW_SUFFIX), 0, prefix, prefixLength));
         }
-        if(scan.getAttribute(SCAN_STOP_ROW_SUFFIX)!=null) {
+        if(scan.getAttribute(SCAN_STOP_ROW_SUFFIX) != null) {
             scan.setStopRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_STOP_ROW_SUFFIX), 0, prefix, prefixLength));
         }
     }
@@ -711,7 +711,7 @@ public class ScanUtil {
      * @param newScan
      */
     public static void setLocalIndexAttributes(Scan newScan, int keyOffset, byte[] regionStartKey, byte[] regionEndKey, byte[] startRowSuffix, byte[] stopRowSuffix) {
-        if(ScanUtil.isLocalIndex(newScan)) {
+        if (ScanUtil.isLocalIndex(newScan)) {
              newScan.setAttribute(SCAN_ACTUAL_START_ROW, regionStartKey);
              newScan.setStartRow(regionStartKey);
              newScan.setStopRow(regionEndKey);
@@ -787,7 +787,7 @@ public class ScanUtil {
     public static int getRowKeyPosition(int[] slotSpan, int slotPosition) {
         int offset = 0;
 
-        for(int i = 0; i < slotPosition; i++) {
+        for (int i = 0; i < slotPosition; i++) {
             offset += slotSpan[i];
         }
 
@@ -1298,7 +1298,7 @@ public class ScanUtil {
 
     public static boolean isDummy(Tuple tuple) {
         if (tuple instanceof ResultTuple) {
-            isDummy(((ResultTuple) tuple).getResult());
+            return isDummy(((ResultTuple) tuple).getResult());
         }
         return false;
     }