You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2022/03/15 22:15:14 UTC

[phoenix] branch 5.1 updated: PHOENIX-6501 Use batching when joining data table rows with uncovered global index rows (#1399)

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new 859f46b  PHOENIX-6501 Use batching when joining data table rows with uncovered global index rows (#1399)
859f46b is described below

commit 859f46b8e9cd81cd643ca4413196852267de6be5
Author: kadirozde <37...@users.noreply.github.com>
AuthorDate: Tue Mar 15 15:12:08 2022 -0700

    PHOENIX-6501 Use batching when joining data table rows with uncovered global index rows (#1399)
---
 .../end2end/index/GlobalIndexCheckerIT.java        |  80 ----
 .../index/UncoveredGlobalIndexRegionScannerIT.java | 396 ++++++++++++++++++++
 .../coprocessor/BaseScannerRegionObserver.java     |   4 +-
 .../UncoveredGlobalIndexRegionScanner.java         | 405 +++++++++++++++++++++
 .../phoenix/iterate/RegionScannerFactory.java      |  60 +--
 .../org/apache/phoenix/query/QueryServices.java    |   2 +
 .../apache/phoenix/query/QueryServicesOptions.java |   3 +-
 .../java/org/apache/phoenix/util/IndexUtil.java    |  33 +-
 8 files changed, 861 insertions(+), 122 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 8eb0aac..95f0860 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -477,86 +477,6 @@ public class GlobalIndexCheckerIT extends BaseTest {
     }
 
     @Test
-    public void testUncoveredGlobalIndex() throws Exception {
-        if (async) {
-            return;
-        }
-        String dataTableName = generateUniqueName();
-        populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            String indexTableName = generateUniqueName();
-            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
-                    dataTableName + " (val1) include (val2)" + this.indexDDLOptions);
-            // Verify that without hint, the index table is not selected
-            assertIndexTableNotSelected(conn, dataTableName, indexTableName,
-                    "SELECT val3 from " + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')");
-
-            //Verify that with index hint, we will read from the index table even though val3 is not included by the index table
-            String selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ val3 from "
-                    + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')";
-            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
-            ResultSet rs = conn.createStatement().executeQuery(selectSql);
-            assertTrue(rs.next());
-            assertEquals("bcde", rs.getString(1));
-            assertFalse(rs.next());
-            conn.createStatement().execute("DROP INDEX " + indexTableName + " on " + dataTableName);
-            // Create an index does not include any columns
-            indexTableName = generateUniqueName();
-            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
-                    dataTableName + " (val1)" + this.indexDDLOptions);
-            conn.commit();
-
-            // Verify that without hint, the index table is not selected
-            assertIndexTableNotSelected(conn, dataTableName, indexTableName,
-                    "SELECT id from " + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')");
-            selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ id from " + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')";
-            //Verify that we will read from the index table
-            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
-            rs = conn.createStatement().executeQuery(selectSql);
-            assertTrue(rs.next());
-            assertEquals("b", rs.getString(1));
-            assertFalse(rs.next());
-
-            // Add another row and run a group by query where the uncovered index should be used
-            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2, val3) values ('c', 'ab','cde', 'cdef')");
-            conn.commit();
-            // Verify that without hint, the index table is not selected
-            assertIndexTableNotSelected(conn, dataTableName, indexTableName,
-                    "SELECT count(*) from " + dataTableName + " where val1 > '0' GROUP BY val1");
-            selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(*) from " + dataTableName + " where val1 > '0' GROUP BY val1";
-            //Verify that we will read from the index table
-            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
-            rs = conn.createStatement().executeQuery(selectSql);
-            assertTrue(rs.next());
-            assertEquals(2, rs.getInt(1));
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertFalse(rs.next());
-            selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(val3) from " + dataTableName + " where val1 > '0'";
-            //Verify that we will read from the index table
-            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
-            rs = conn.createStatement().executeQuery(selectSql);
-            assertTrue(rs.next());
-            assertEquals(3, rs.getInt(1));
-            // Run an order by query where the uncovered index should be used
-            // Verify that without hint, the index table is not selected
-            assertIndexTableNotSelected(conn, dataTableName, indexTableName,
-                    "SELECT val3 from " + dataTableName + " where val1 > '0' ORDER BY val1");
-            selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ val3 from " + dataTableName + " where val1 > '0' ORDER BY val1";
-            //Verify that we will read from the index table
-            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
-            rs = conn.createStatement().executeQuery(selectSql);
-            assertTrue(rs.next());
-            assertEquals("abcd", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("cdef", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("bcde", rs.getString(1));
-            assertFalse(rs.next());
-        }
-    }
-
-    @Test
     public void testSimulateConcurrentUpdates() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String dataTableName = generateUniqueName();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java
new file mode 100644
index 0000000..93d4d80
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlan;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class UncoveredGlobalIndexRegionScannerIT extends BaseTest {
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @After
+    public void unsetFailForTesting() throws Exception {
+        boolean refCountLeaked = isAnyStoreRefCountLeaked();
+        assertFalse("refCount leaked", refCountLeaked);
+    }
+
+    private void populateTable(String tableName) throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("create table " + tableName +
+                " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))");
+        conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
+        conn.commit();
+        conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')");
+        conn.commit();
+        conn.close();
+    }
+
+    @Test
+    public void testUncoveredIndexWithPhoenixRowTimestamp() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexTableName = generateUniqueName();
+            Timestamp initial = new Timestamp(EnvironmentEdgeManager.currentTimeMillis() - 1);
+            conn.createStatement().execute("create table " + dataTableName +
+                    " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))");
+            conn.createStatement().execute("upsert into " + dataTableName + " values ('a', 'ab', 'abc', 'abcd')");
+            conn.commit();
+            Timestamp before = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+            // Sleep 1ms to get a different row timestamps
+            Thread.sleep(1);
+            conn.createStatement().execute("upsert into " + dataTableName + " values ('b', 'bc', 'bcd', 'bcde')");
+            conn.commit();
+            Timestamp after = new Timestamp(EnvironmentEdgeManager.currentTimeMillis() + 1);
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+                    dataTableName + " (val1, PHOENIX_ROW_TIMESTAMP()) ");
+
+            String timeZoneID = Calendar.getInstance().getTimeZone().getID();
+            // Write a query to get the val2 = 'bc' with a time range query
+            String query = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ "
+                    + "val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName
+                    + " WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('"
+                    + before.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '"
+                    + timeZoneID + "') AND " + "PHOENIX_ROW_TIMESTAMP() < TO_DATE('" + after
+                    + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, query, dataTableName, indexTableName);
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("bcd", rs.getString(2));
+            assertTrue(rs.getTimestamp(3).after(before));
+            assertTrue(rs.getTimestamp(3).before(after));
+            assertFalse(rs.next());
+            // Count the number of index rows
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) from " + indexTableName);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            // Add one more row with val2 ='bc' and check this does not change the result of the previous
+            // query
+            // Sleep 1ms to get a different row timestamps
+            Thread.sleep(1);
+            conn.createStatement().execute("upsert into " + dataTableName + " values ('c', 'bc', 'ccc', 'cccc')");
+            conn.commit();
+            assertExplainPlan(conn, query, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("bcd", rs.getString(2));
+            assertTrue(rs.getTimestamp(3).after(before));
+            assertTrue(rs.getTimestamp(3).before(after));
+            assertFalse(rs.next());
+            // Write a time range query to get the last row with val2 ='bc'
+            query = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ "
+                    +"val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName +
+                    " WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after
+                    + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, query, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("ccc", rs.getString(2));
+            assertTrue(rs.getTimestamp(3).after(after));
+            assertFalse(rs.next());
+            // Verify that we can execute the same query without using the index
+            String noIndexQuery = "SELECT /*+ NO_INDEX */ val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE val1 = 'bc' AND " +
+                    "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+            // Verify that we will read from the data table
+            rs = conn.createStatement().executeQuery("EXPLAIN " + noIndexQuery);
+            String explainPlan = QueryUtil.getExplainPlan(rs);
+            assertTrue(explainPlan.contains("FULL SCAN OVER " + dataTableName));
+            rs = conn.createStatement().executeQuery(noIndexQuery);
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("ccc", rs.getString(2));
+            assertTrue(rs.getTimestamp(3).after(after));
+            after = rs.getTimestamp(3);
+            assertFalse(rs.next());
+            // Sleep 1ms to get a different row timestamps
+            Thread.sleep(1);
+            conn.createStatement().execute("upsert into " + dataTableName + " values ('d', 'de', 'def', 'defg')");
+            conn.commit();
+
+            query = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ "
+                    + " val1, val2, PHOENIX_ROW_TIMESTAMP()  from " + dataTableName
+                    + " WHERE val1 = 'de'";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, query, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("de", rs.getString(1));
+            assertEquals("def", rs.getString(2));
+            assertTrue(rs.getTimestamp(3).after(after));
+            assertFalse(rs.next());
+            // Add a new index where the index row key starts with PHOENIX_ROW_TIMESTAMP()
+            indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+                    dataTableName + " (PHOENIX_ROW_TIMESTAMP()) ");
+            // Add one more row
+            // Sleep 1ms to get a different row timestamps
+            Thread.sleep(1);
+            conn.createStatement().execute("upsert into " + dataTableName + " values ('e', 'ae', 'efg', 'efgh')");
+            conn.commit();
+            // Write a query to get all the rows in the order of their timestamps
+            query = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ "
+                    + " val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE "
+                    + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + initial
+                    + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, query, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("ab", rs.getString(1));
+            assertEquals("abc", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("bcd", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("ccc", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("de", rs.getString(1));
+            assertEquals("def", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("ae", rs.getString(1));
+            assertEquals("efg", rs.getString(2));
+            assertFalse(rs.next());
+        }
+    }
+
+    private void assertIndexTableNotSelected(Connection conn, String dataTableName, String indexTableName, String sql)
+            throws Exception {
+        try {
+            assertExplainPlan(conn, sql, dataTableName, indexTableName);
+            throw new RuntimeException("The index table should not be selected without an index hint");
+        } catch (AssertionError error){
+            //expected
+        }
+    }
+
+    @Test
+    public void testUncoveredGlobalIndex() throws Exception {
+        String dataTableName = generateUniqueName();
+        populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+                    dataTableName + " (val1) include (val2)");
+            // Verify that without hint, the index table is not selected
+            assertIndexTableNotSelected(conn, dataTableName, indexTableName,
+                    "SELECT val3 from " + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')");
+
+            //Verify that with index hint, we will read from the index table even though val3 is not included by the index table
+            String selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ val3 from "
+                    + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')";
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("bcde", rs.getString(1));
+            assertFalse(rs.next());
+            conn.createStatement().execute("DROP INDEX " + indexTableName + " on " + dataTableName);
+            // Create an index does not include any columns
+            indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+                    dataTableName + " (val1)");
+            conn.commit();
+
+            // Verify that without hint, the index table is not selected
+            assertIndexTableNotSelected(conn, dataTableName, indexTableName,
+                    "SELECT id from " + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')");
+            selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ id from " + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde')";
+            //Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("b", rs.getString(1));
+            assertFalse(rs.next());
+
+            // Add another row and run a group by query where the uncovered index should be used
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2, val3) values ('c', 'ab','cde', 'cdef')");
+            conn.commit();
+            // Verify that without hint, the index table is not selected
+            assertIndexTableNotSelected(conn, dataTableName, indexTableName,
+                    "SELECT count(val3) from " + dataTableName + " where val1 > '0' GROUP BY val1");
+            selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(val3) from " + dataTableName + " where val1 > '0' GROUP BY val1";
+            //Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertFalse(rs.next());
+            selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(val3) from " + dataTableName + " where val1 > '0'";
+            //Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            // Run an order by query where the uncovered index should be used
+            // Verify that without hint, the index table is not selected
+            assertIndexTableNotSelected(conn, dataTableName, indexTableName,
+                    "SELECT val3 from " + dataTableName + " where val1 > '0' ORDER BY val1");
+            selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ val3 from " + dataTableName + " where val1 > '0' ORDER BY val1";
+            //Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("abcd", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("cdef", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("bcde", rs.getString(1));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testSkipScanFilter() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + dataTableName
+                    + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, "
+                    + "v2 INTEGER, v3 INTEGER "
+                    + "CONSTRAINT pk PRIMARY KEY (k1,k2)) "
+                    + " COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
+            TestUtil.addCoprocessor(conn, dataTableName, ScanFilterRegionObserver.class);
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON "
+                    + dataTableName + "(v1) include (v2)");
+            final int nIndexValues = 97;
+            final Random RAND = new Random(7);
+            final int batchSize = 100;
+            for (int i = 0; i < 10000; i++) {
+                conn.createStatement().execute(
+                        "UPSERT INTO " + dataTableName + " VALUES (" + i + ", 1, "
+                                + (RAND.nextInt() % nIndexValues) + ", "
+                                + RAND.nextInt() + ", 1)");
+                if ((i % batchSize) == 0) {
+                    conn.commit();
+                }
+            }
+            conn.commit();
+            String selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName
+                    + ")*/  SUM(v3) from " + dataTableName + " GROUP BY v1";
+
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            int sum = 0;
+            while (rs.next()) {
+                sum += rs.getInt(1);
+            }
+            assertEquals(10000, sum);
+            // UncoveredGlobalIndexRegionScanner uses the skip scan filter to retrieve data table
+            // rows. Verify that the skip scan filter is used
+            assertEquals(10000, ScanFilterRegionObserver.count.get());
+        }
+    }
+
+    @Test
+    public void testCount() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + dataTableName
+                    + "(k1 BIGINT NOT NULL, k2 BIGINT NOT NULL, v1 INTEGER, "
+                    + "v2 INTEGER, v3 BIGINT "
+                    + "CONSTRAINT pk PRIMARY KEY (k1,k2)) "
+                    + " VERSIONS=1, IMMUTABLE_ROWS=TRUE");
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON "
+                    + dataTableName + "(v1) include (v2)");
+            final int nIndexValues = 9;
+            final Random RAND = new Random(7);
+            final int batchSize = 1000;
+            for (int i = 0; i < 100000; i++) {
+                conn.createStatement().execute(
+                        "UPSERT INTO " + dataTableName + " VALUES (" + i + ", 1, "
+                                + (RAND.nextInt() % nIndexValues) + ", "
+                                + RAND.nextInt() + ", " + RAND.nextInt()+ ")");
+                if ((i % batchSize) == 0) {
+                    conn.commit();
+                }
+            }
+            conn.commit();
+            String selectSql = "SELECT /*+ NO INDEX */  Count(v3) from " + dataTableName + " where v1 = 5";
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            long count = rs.getLong(1);
+            selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName
+                    + ")*/  Count(v3) from " + dataTableName + " where v1 = 5";
+            //Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(count, rs.getInt(1));
+        }
+    }
+
+    public static class ScanFilterRegionObserver extends SimpleRegionObserver {
+        public static final AtomicInteger count = new AtomicInteger(0);
+
+        @Override
+        public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+                                   final Scan scan) {
+            if (scan.getFilter() instanceof SkipScanFilter) {
+                List<List<KeyRange>> slots = ((SkipScanFilter)scan.getFilter()).getSlots();
+                for (List<KeyRange> ranges : slots) {
+                    count.addAndGet(ranges.size());
+                }
+            }
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b2ef71c..2e5fbc1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -73,6 +73,7 @@ abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionO
     public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
     // The number of index rows to be rebuild in one RPC call
     public static final String INDEX_REBUILD_PAGE_ROWS = "_IndexRebuildPageRows";
+    public static final String INDEX_PAGE_ROWS = "_IndexPageRows";
     public static final String SERVER_PAGE_SIZE_MS = "_ServerPageSizeMs";
     // Index verification type done by the index tool
     public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType";
@@ -387,7 +388,8 @@ abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionO
             final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
             final Region dataRegion, final IndexMaintainer indexMaintainer,
             final byte[][] viewConstants, final TupleProjector projector,
-            final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) {
+            final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex)
+            throws IOException {
 
         RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(c.getEnvironment());
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
new file mode 100644
index 0000000..f5206a7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.query.QueryServices.INDEX_PAGE_SIZE_IN_ROWS;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.hbase.index.parallel.*;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is an index table region scanner which scans index table rows locally and then extracts
+ * data table row keys from them. Using the data table row keys, the data table rows are scanned
+ * using the HBase client available to region servers.
+ */
+public class UncoveredGlobalIndexRegionScanner extends BaseRegionScanner {
+    public static final String NUM_CONCURRENT_INDEX_THREADS_CONF_KEY = "index.threads.max";
+    public static final int DEFAULT_CONCURRENT_INDEX_THREADS = 16;
+    public static final String INDEX_ROW_COUNTS_PER_TASK_CONF_KEY = "index.row.count.per.task";
+    public static final int DEFAULT_INDEX_ROW_COUNTS_PER_TASK = 2048;
+
+    /**
+     * The states of the processing a page of index rows
+     */
+    private enum State {
+        INITIAL, SCANNING_INDEX, SCANNING_DATA, SCANNING_DATA_INTERRUPTED, READY
+    }
+    State state = State.INITIAL;
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(UncoveredGlobalIndexRegionScanner.class);
+    protected final byte[][] viewConstants;
+    protected final RegionCoprocessorEnvironment env;
+    protected byte[][] regionEndKeys;
+    protected final Table dataHTable;
+    protected final int pageSizeInRows;
+    protected final int rowCountPerTask;
+    protected final Scan scan;
+    protected final Scan dataTableScan;
+    protected final RegionScanner innerScanner;
+    protected final Region region;
+    protected final IndexMaintainer indexMaintainer;
+    protected final TupleProjector tupleProjector;
+    protected final ImmutableBytesWritable ptr;
+    protected final TaskRunner pool;
+    protected String exceptionMessage;
+    protected final HTableFactory hTableFactory;
+    protected List<List<Cell>> indexRows = null;
+    protected Map<ImmutableBytesPtr, Result> dataRows = null;
+    protected Iterator<List<Cell>> indexRowIterator = null;
+    protected Map<byte[], byte[]> indexToDataRowKeyMap = null;
+    protected int indexRowCount = 0;
+    protected final long pageSizeMs;
+    protected byte[] lastIndexRowKey = null;
+
+    public UncoveredGlobalIndexRegionScanner(final RegionScanner innerScanner,
+                                             final Region region,
+                                             final Scan scan,
+                                             final RegionCoprocessorEnvironment env,
+                                             final Scan dataTableScan,
+                                             final TupleProjector tupleProjector,
+                                             final IndexMaintainer indexMaintainer,
+                                             final byte[][] viewConstants,
+                                             final ImmutableBytesWritable ptr,
+                                             final long pageSizeMs)
+            throws IOException {
+        super(innerScanner);
+        final Configuration config = env.getConfiguration();
+
+        byte[] pageSizeFromScan =
+                scan.getAttribute(BaseScannerRegionObserver.INDEX_PAGE_ROWS);
+        if (pageSizeFromScan != null) {
+            pageSizeInRows = (int) Bytes.toLong(pageSizeFromScan);
+        } else {
+            pageSizeInRows = (int)
+                    config.getLong(INDEX_PAGE_SIZE_IN_ROWS,
+                            QueryServicesOptions.DEFAULT_INDEX_PAGE_SIZE_IN_ROWS);
+        }
+
+        this.indexMaintainer = indexMaintainer;
+        this.viewConstants = viewConstants;
+        this.scan = scan;
+        this.dataTableScan = dataTableScan;
+        this.innerScanner = innerScanner;
+        this.region = region;
+        this.env = env;
+        this.ptr = ptr;
+        this.tupleProjector = tupleProjector;
+        this.pageSizeMs = pageSizeMs;
+        hTableFactory = IndexWriterUtils.getDefaultDelegateHTableFactory(env);
+        rowCountPerTask = config.getInt(INDEX_ROW_COUNTS_PER_TASK_CONF_KEY,
+                DEFAULT_INDEX_ROW_COUNTS_PER_TASK);
+
+        pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
+                new ThreadPoolBuilder("Uncovered Global Index",
+                        env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_THREADS_CONF_KEY,
+                        DEFAULT_CONCURRENT_INDEX_THREADS).setCoreTimeout(
+                        INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
+        byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
+        dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName));
+        try (org.apache.hadoop.hbase.client.Connection connection =
+                     HBaseFactoryProvider.getHConnectionFactory().createConnection(
+                             env.getConfiguration())) {
+            regionEndKeys = connection.getRegionLocator(dataHTable.getName()).getEndKeys();
+        }
+    }
+
+    @Override
+    public long getMvccReadPoint() {
+        return innerScanner.getMvccReadPoint();
+    }
+    @Override
+    public RegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        innerScanner.close();
+        hTableFactory.shutdown();
+        if (dataHTable != null) {
+            dataHTable.close();
+        }
+        this.pool.stop("UncoveredGlobalIndexRegionScanner is closing");
+    }
+
+    @Override
+    public long getMaxResultSize() {
+        return innerScanner.getMaxResultSize();
+    }
+
+    @Override
+    public int getBatch() {
+        return innerScanner.getBatch();
+    }
+
+    private void scanDataRows(Set<byte[]> dataRowKeys, long startTime) throws IOException {
+        List<KeyRange> keys = new ArrayList<>(dataRowKeys.size());
+        for (byte[] dataRowKey: dataRowKeys) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(dataRowKey));
+        }
+        ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
+        Scan dataScan = new Scan(dataTableScan);
+        dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
+        scanRanges.initializeScan(dataScan);
+        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+        dataScan.setFilter(new SkipScanFilter(skipScanFilter, false));
+        try (ResultScanner resultScanner = dataHTable.getScanner(dataScan)) {
+            for (Result result = resultScanner.next(); (result != null);
+                 result = resultScanner.next()) {
+                dataRows.put(new ImmutableBytesPtr(result.getRow()), result);
+                if ((EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) {
+                    LOGGER.info("One of the scan tasks in UncoveredGlobalIndexRegionScanner"
+                            + " for region " + region.getRegionInfo().getRegionNameAsString()
+                            + " could not complete on time (in " + pageSizeMs+ " ms) and"
+                            + " will be resubmitted");
+                    state = State.SCANNING_DATA_INTERRUPTED;
+                    break;
+                }
+            }
+        } catch (Throwable t) {
+            exceptionMessage = "scanDataRows fails for at least one task";
+            ServerUtil.throwIOException(dataHTable.getName().toString(), t);
+        }
+    }
+
+    private void addTasksForScanningDataTableRowsInParallel(TaskBatch<Boolean> tasks,
+                                                            final Set<byte[]> dataRowKeys,
+                                                            final long startTime) {
+        tasks.add(new Task<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                try {
+                    //in HBase 1.x we could check if the coproc environment was closed or aborted,
+                    //but in HBase 2.x the coproc environment can't check region server services
+                    if (Thread.currentThread().isInterrupted()) {
+                        exceptionMessage = "Pool closed, not retrieving data table rows for "
+                                + region.getRegionInfo().getRegionNameAsString();
+                        throw new IOException(exceptionMessage);
+                    }
+                    scanDataRows(dataRowKeys, startTime);
+                } catch (Exception e) {
+                    throw e;
+                }
+                return Boolean.TRUE;
+            }
+        });
+    }
+
+
+    protected void submitTasks(TaskBatch<Boolean> tasks) throws IOException {
+        Pair<List<Boolean>, List<Future<Boolean>>> resultsAndFutures = null;
+        try {
+            LOGGER.debug("Waiting on index tasks to complete...");
+            resultsAndFutures = this.pool.submitUninterruptible(tasks);
+        } catch (ExecutionException e) {
+            throw new RuntimeException(
+                    "Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+        } catch (EarlyExitFailure e) {
+            throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
+        }
+        int index = 0;
+        for (Boolean result : resultsAndFutures.getFirst()) {
+            if (result == null) {
+                Throwable cause = ServerUtil.getExceptionFromFailedFuture(
+                        resultsAndFutures.getSecond().get(index));
+                // there was a failure
+                throw new IOException(exceptionMessage == null ? "" : exceptionMessage, cause);
+            }
+            index++;
+        }
+    }
+
+    private void scanDataTableRows(long startTime)
+            throws IOException {
+        if (indexToDataRowKeyMap.size() == 0) {
+            state = State.READY;
+            return;
+        }
+        TreeSet<byte[]> dataRowKeys = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+        for (byte[] dataRowKey: indexToDataRowKeyMap.values()) {
+            dataRowKeys.add(dataRowKey);
+        }
+        List<Set<byte[]>> setList = IndexRepairRegionScanner.getPerTaskDataRowKeys(dataRowKeys,
+                regionEndKeys, rowCountPerTask);
+        int taskCount = setList.size();
+        TaskBatch<Boolean> tasks = new TaskBatch<>(taskCount);
+        for (int i = 0; i < taskCount; i++) {
+            addTasksForScanningDataTableRowsInParallel(tasks, setList.get(i), startTime);
+        }
+        submitTasks(tasks);
+        if (state == State.SCANNING_DATA_INTERRUPTED) {
+            state = State.SCANNING_DATA;
+        } else {
+            state = State.READY;
+        }
+    }
+
+    /**
+     * A page of index rows are scanned and then their corresponding data table rows are retrieved
+     * from the data table regions in parallel. These data rows are then joined with index rows.
+     * The join is for adding uncovered columns to index rows.
+     *
+     * This implementation conforms to server paging such that if the server side operation takes
+     * more than pageSizeInMs, a dummy result is returned to signal the client that more work
+     * to do on the server side. This is done to prevent RPC timeouts.
+     *
+     * @param result
+     * @return boolean to indicate if there are more rows to scan
+     * @throws IOException
+     */
+    @Override
+    public boolean next(List<Cell> result) throws IOException {
+        long startTime = EnvironmentEdgeManager.currentTimeMillis();
+        boolean hasMore;
+        region.startRegionOperation();
+        try {
+            synchronized (innerScanner) {
+                if (state == State.READY && !indexRowIterator.hasNext()) {
+                    state = State.INITIAL;
+                }
+                if (state == State.INITIAL) {
+                    indexRowCount = 0;
+                    indexRows = new ArrayList();
+                    dataRows = Maps.newConcurrentMap();
+                    indexToDataRowKeyMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+                    state = State.SCANNING_INDEX;
+                }
+                if (state == State.SCANNING_INDEX) {
+                    do {
+                        List<Cell> row = new ArrayList<Cell>();
+                        hasMore = innerScanner.nextRaw(row);
+                        if (!row.isEmpty()) {
+                            if (isDummy(row)) {
+                                result.addAll(row);
+                                // We got a dummy request from lower layers. This means that
+                                // the scan took more than pageSizeMs. Just return true here.
+                                // The client will drop this dummy request and continue to scan.
+                                // Then the lower layer scanner will continue
+                                // wherever it stopped due to this dummy request
+                                return true;
+                            }
+                            lastIndexRowKey = CellUtil.cloneRow(row.get(0));
+                            indexToDataRowKeyMap.put(lastIndexRowKey, indexMaintainer.buildDataRowKey(
+                                    new ImmutableBytesWritable(lastIndexRowKey), viewConstants));
+                            indexRows.add(row);
+                            indexRowCount++;
+                            if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime)
+                                    >= pageSizeMs) {
+                                getDummyResult(lastIndexRowKey, result);
+                                // We do not need to change the state, State.SCANNING_INDEX
+                                // since we will continue scanning the index table after
+                                // the client drops the dummy request and then calls the next
+                                // method on its ResultScanner within ScanningResultIterator
+                                return true;
+                            }
+                        }
+                    } while (hasMore && indexRowCount < pageSizeInRows);
+                    state = State.SCANNING_DATA;
+                }
+                if (state == State.SCANNING_DATA) {
+                    scanDataTableRows(startTime);
+                    indexRowIterator = indexRows.iterator();
+                }
+                if (state == State.READY) {
+                    if (indexRowIterator.hasNext()) {
+                        List<Cell> indexRow = indexRowIterator.next();
+                        result.addAll(indexRow);
+                        try {
+                            Result dataRow = dataRows.get(new ImmutableBytesPtr(
+                                    indexToDataRowKeyMap.get(CellUtil.cloneRow(indexRow.get(0)))));
+                            if (dataRow != null) {
+                                IndexUtil.addTupleAsOneCell(result, new ResultTuple(dataRow),
+                                        tupleProjector, ptr);
+                            } else {
+                                // The data row does not exist, we should skip this index row. This can happen
+                                // if index row is replicated but the data row has not been
+                                result.clear();
+                            }
+                        } catch (Throwable e) {
+                            LOGGER.error("Exception in UncoveredGlobalIndexRegionScanner for region "
+                                    + region.getRegionInfo().getRegionNameAsString(), e);
+                            throw e;
+                        }
+                        return true;
+                    } else {
+                        return false;
+                    }
+                } else {
+                    getDummyResult(lastIndexRowKey, result);
+                    return true;
+                }
+            }
+        } catch (Throwable e) {
+            LOGGER.error("Exception in UncoveredGlobalIndexRegionScanner for region "
+                    + region.getRegionInfo().getRegionNameAsString(), e);
+            throw e;
+        } finally {
+            region.closeRegionOperation();
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 3724c44..281eaad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -20,11 +20,14 @@ package org.apache.phoenix.iterate;
 
 import static org.apache.phoenix.coprocessor.ScanRegionObserver.WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
-import static org.apache.phoenix.util.ScanUtil.getDummyResult;
-import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
-import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.phoenix.coprocessor.UncoveredGlobalIndexRegionScanner;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
 
 import org.apache.hadoop.hbase.Cell;
@@ -51,10 +54,6 @@ import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.KeyValueSchema;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PColumnImpl;
-import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
 import org.apache.phoenix.schema.tuple.ResultTuple;
@@ -115,20 +114,20 @@ public abstract class RegionScannerFactory {
    * @param viewConstants
    */
   public RegionScanner getWrappedScanner(final RegionCoprocessorEnvironment env,
-      final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs,
+      final RegionScanner regionScanner, final Set<KeyValueColumnExpression> arrayKVRefs,
       final Expression[] arrayFuncRefs, final int offset, final Scan scan,
       final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
       final Region dataRegion, final IndexMaintainer indexMaintainer,
       PhoenixTransactionContext tx,
       final byte[][] viewConstants, final KeyValueSchema kvSchema,
       final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
-      final ImmutableBytesWritable ptr, final boolean useQualifierAsListIndex) {
+      final ImmutableBytesWritable ptr, final boolean useQualifierAsListIndex) throws IOException {
     return new RegionScanner() {
-
+      private RegionScanner s = regionScanner;
       private RegionInfo regionInfo = env.getRegionInfo();
       private byte[] actualStartKey = getActualStartKey();
       private boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
-      final long pageSizeMs = getPageSizeMsForRegionScanner(scan);
+      final long pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
       Expression extraWhere = null;
       long extraLimit = -1;
 
@@ -159,6 +158,19 @@ public abstract class RegionScannerFactory {
               if (limitBytes != null) {
                   extraLimit = Bytes.toLong(limitBytes);
               }
+              if (ScanUtil.isUncoveredGlobalIndex(scan) && tupleProjector != null) {
+                  PTable.ImmutableStorageScheme storageScheme = indexMaintainer.getIndexStorageScheme();
+                  Scan dataTableScan = new Scan();
+                  for (int i = 0; i < dataColumns.length; i++) {
+                      if (storageScheme == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                          scan.addFamily(dataColumns[i].getFamily());
+                      } else {
+                          scan.addColumn(dataColumns[i].getFamily(), dataColumns[i].getQualifier());
+                      }
+                  }
+                  s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
+                          dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs);
+              }
           }
       }
 
@@ -173,7 +185,7 @@ public abstract class RegionScannerFactory {
       public boolean next(List<Cell> results) throws IOException {
         try {
           boolean next = s.next(results);
-          if (isDummy(results)) {
+          if (ScanUtil.isDummy(results)) {
             return true;
           }
           return next;
@@ -217,7 +229,7 @@ public abstract class RegionScannerFactory {
       public boolean nextRaw(List<Cell> result) throws IOException {
         try {
           boolean next = s.nextRaw(result);
-          if (isDummy(result)) {
+          if (ScanUtil.isDummy(result)) {
             return true;
           }
           if (result.size() == 0) {
@@ -225,19 +237,21 @@ public abstract class RegionScannerFactory {
           }
           if ((ScanUtil.isLocalOrUncoveredGlobalIndex(scan))
                   && !ScanUtil.isAnalyzeTable(scan)) {
-            if(actualStartKey!=null) {
-              next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result,
-                  null);
-              if (result.isEmpty() || isDummy(result)) {
-                return next;
+            if (ScanUtil.isLocalIndex(scan)) {
+              if (actualStartKey != null) {
+                next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result,
+                        null);
+                if (result.isEmpty() || ScanUtil.isDummy(result)) {
+                  return next;
+                }
               }
-            }
             /* In the following, c is only used when data region is null.
             dataRegion will never be null in case of non-coprocessor call,
             therefore no need to refactor
              */
-            IndexUtil.wrapResultUsingOffset(env, result, scan, offset, dataColumns,
-                   tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
+              IndexUtil.wrapResultUsingOffset(env, result, scan, offset, dataColumns,
+                      tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
+            }
 
             if (extraWhere != null) {
                 Tuple merged = useQualifierAsListIndex ? new PositionBasedResultTuple(result) :
@@ -361,7 +375,7 @@ public abstract class RegionScannerFactory {
           if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
             byte[] rowKey = CellUtil.cloneRow(result.get(0));
             result.clear();
-            getDummyResult(rowKey, result);
+            ScanUtil.getDummyResult(rowKey, result);
             return true;
           }
           result.clear();
@@ -373,7 +387,7 @@ public abstract class RegionScannerFactory {
           if (result.isEmpty()) {
             return next;
           }
-          if (isDummy(result)) {
+          if (ScanUtil.isDummy(result)) {
             return true;
           }
           firstCell = result.get(0);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 78f153c..33d0430 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -334,6 +334,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = "phoenix.index.longViewIndex.enabled";
     // The number of index rows to be rebuild in one RPC call
     public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows";
+    // The number of index rows to be scanned in one RPC call
+    String INDEX_PAGE_SIZE_IN_ROWS = "phoenix.index.page_size_in_rows";
     // Flag indicating that server side masking of ttl expired rows is enabled.
     public static final String PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED = "phoenix.ttl.server_side.masking.enabled";
     // The time limit on the amount of work to be done in one RPC call
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3f24900..379eadd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -340,7 +340,8 @@ public class QueryServicesOptions {
     public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
     public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
     public static final boolean DEFAULT_PHOENIX_SERVER_PAGING_ENABLED = true;
-    public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024;
+    public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32 * 1024;
+    public static final long DEFAULT_INDEX_PAGE_SIZE_IN_ROWS = 32 * 1024;
 
     public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index f1449ff..fee9dba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.util;
 
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
@@ -577,7 +576,21 @@ public class IndexUtil {
         whereNode.toSQL(indexResolver, buf);
         return QueryUtil.getViewStatement(index.getSchemaName().getString(), index.getTableName().getString(), buf.toString());
     }
-    
+    public static void addTupleAsOneCell(List<Cell> result,
+                                 Tuple tuple,
+                                 TupleProjector tupleProjector,
+                                 ImmutableBytesWritable ptr) {
+        // This will create a byte[] that captures all of the values from the data table
+        byte[] value =
+                tupleProjector.getSchema().toBytes(tuple, tupleProjector.getExpressions(),
+                        tupleProjector.getValueBitSet(), ptr);
+        Cell firstCell = result.get(0);
+        Cell keyValue =
+                PhoenixKeyValueUtil.newKeyValue(firstCell.getRowArray(),
+                        firstCell.getRowOffset(),firstCell.getRowLength(), VALUE_COLUMN_FAMILY,
+                        VALUE_COLUMN_QUALIFIER, firstCell.getTimestamp(), value, 0, value.length);
+        result.add(keyValue);
+    }
     public static void wrapResultUsingOffset(final RegionCoprocessorEnvironment environment,
             List<Cell> result, final Scan scan, final int offset, ColumnReference[] dataColumns,
             TupleProjector tupleProjector, Region dataRegion, IndexMaintainer indexMaintainer,
@@ -611,13 +624,6 @@ public class IndexUtil {
                         joinResult = table.get(get);
                     }
                 }
-            } else if (ScanUtil.isUncoveredGlobalIndex(scan)) {
-                byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
-                try (Table table = ServerUtil.ConnectionFactory.getConnection(
-                        ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION, environment).
-                        getTable(TableName.valueOf(dataTableName))) {
-                    joinResult = table.get(get);
-                }
             }
 
             // at this point join result has data from the data table. We now need to take this result and
@@ -625,14 +631,7 @@ public class IndexUtil {
             // TODO: handle null case (but shouldn't happen)
             Tuple joinTuple = new ResultTuple(joinResult);
             // This will create a byte[] that captures all of the values from the data table
-            byte[] value =
-                    tupleProjector.getSchema().toBytes(joinTuple, tupleProjector.getExpressions(),
-                        tupleProjector.getValueBitSet(), ptr);
-            Cell keyValue =
-                    PhoenixKeyValueUtil.newKeyValue(firstCell.getRowArray(),
-                        firstCell.getRowOffset(),firstCell.getRowLength(), VALUE_COLUMN_FAMILY,
-                        VALUE_COLUMN_QUALIFIER, firstCell.getTimestamp(), value, 0, value.length);
-            result.add(keyValue);
+            addTupleAsOneCell(result, joinTuple, tupleProjector, ptr);
         }
         
         ListIterator<Cell> itr = result.listIterator();