You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ya...@apache.org on 2020/03/09 02:43:17 UTC

[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-4845 Support using Row Value Constructors in OFFSET clause for paging in tables where the sort order of PK columns varies

This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
     new 53813ba  PHOENIX-4845 Support using Row Value Constructors in OFFSET clause for paging in tables where the sort order of PK columns varies
53813ba is described below

commit 53813baf84ee8ffc946682a85b953995800d6a12
Author: Daniel Wong <da...@salesforce.com>
AuthorDate: Wed May 15 15:23:50 2019 -0700

    PHOENIX-4845 Support using Row Value Constructors in OFFSET clause for paging in tables where the sort order of PK columns varies
    
    Signed-off-by: Xinyi Yan <ya...@apache.org>
---
 .../org/apache/phoenix/end2end/QueryMoreIT.java    |  26 +-
 .../apache/phoenix/end2end/QueryWithOffsetIT.java  |   6 +-
 .../end2end/RowValueConstructorOffsetIT.java       | 943 +++++++++++++++++++++
 phoenix-core/src/main/antlr3/PhoenixSQL.g          |   7 +-
 .../org/apache/phoenix/compile/CompiledOffset.java |  41 +
 .../apache/phoenix/compile/ListJarsQueryPlan.java  |   5 +
 .../org/apache/phoenix/compile/OffsetCompiler.java |  39 +-
 .../apache/phoenix/compile/OrderByCompiler.java    |  22 +-
 .../org/apache/phoenix/compile/QueryCompiler.java  |  56 +-
 .../java/org/apache/phoenix/compile/QueryPlan.java |   2 +
 .../apache/phoenix/compile/RVCOffsetCompiler.java  | 321 +++++++
 .../org/apache/phoenix/compile/ScanRanges.java     |  31 +-
 .../org/apache/phoenix/compile/TraceQueryPlan.java |   5 +
 .../org/apache/phoenix/compile/WhereCompiler.java  |  18 +-
 .../org/apache/phoenix/compile/WhereOptimizer.java |  29 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |   5 +-
 .../org/apache/phoenix/execute/BaseQueryPlan.java  |  14 +-
 .../apache/phoenix/execute/DelegateQueryPlan.java  |   3 +
 .../org/apache/phoenix/execute/HashJoinPlan.java   |   5 +-
 .../java/org/apache/phoenix/execute/ScanPlan.java  |  13 +-
 .../apache/phoenix/execute/SortMergeJoinPlan.java  |   5 +
 .../java/org/apache/phoenix/execute/UnionPlan.java |   5 +
 .../phoenix/iterate/BaseResultIterators.java       |  11 +
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |   5 +
 .../apache/phoenix/optimize/QueryOptimizer.java    |  28 +-
 .../java/org/apache/phoenix/parse/HintNode.java    |   5 +
 .../java/org/apache/phoenix/parse/OffsetNode.java  |  59 +-
 .../org/apache/phoenix/parse/ParseNodeFactory.java |  10 +-
 .../apache/phoenix/parse/ParseNodeRewriter.java    |  17 +-
 ...lueConstructorOffsetInternalErrorException.java |  38 +
 ...onstructorOffsetNotAllowedInQueryException.java |  38 +
 ...alueConstructorOffsetNotCoercibleException.java |  38 +
 .../phoenix/compile/RVCOffsetCompilerTest.java     | 159 ++++
 .../org/apache/phoenix/parse/QueryParserTest.java  |  36 +
 .../phoenix/query/ParallelIteratorsSplitTest.java  |   3 +
 .../java/org/apache/phoenix/util/TestUtil.java     |  16 +
 36 files changed, 1949 insertions(+), 115 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
index 7c45f1a..61d19cc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
@@ -401,10 +401,9 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
         try (Statement stmt = conn.createStatement()) {
             final ResultSet rs = stmt.executeQuery("SELECT entity_id, score\n" + 
                     "FROM " + fullTableName + "\n" + 
-                    "WHERE organization_id = 'org1'\n" + 
-                    "AND (score, entity_id) < (2, '04')\n" + 
-                    "ORDER BY score DESC, entity_id DESC\n" + 
-                    "LIMIT 3");
+                    "ORDER BY ORGANIZATION_ID, score DESC, entity_id DESC\n" +
+                    "LIMIT 3\n" +
+                    "OFFSET  (ORGANIZATION_ID, SCORE, ENTITY_ID)=('org1', 2, '04')");
             assertTrue(rs.next());
             assertEquals("03", rs.getString(1));
             assertEquals(2.0, rs.getDouble(2), 0.001);
@@ -415,11 +414,11 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
         }
         try (Statement stmt = conn.createStatement()) {
             final ResultSet rs = stmt.executeQuery("SELECT entity_id, score\n" + 
-                    "FROM " + fullTableName + "\n" + 
-                    "WHERE organization_id = 'org1'\n" + 
-                    "AND (organization_id, score, entity_id) < ('org1', 2, '04')\n" + 
-                    "ORDER BY score DESC, entity_id DESC\n" + 
-                    "LIMIT 3");
+                    "FROM " + fullTableName + "\n" +
+                    "WHERE ORGANIZATION_ID='org1'\n" +
+                    "ORDER BY organization_id, score DESC, entity_id DESC\n" +
+                    "LIMIT 3\n" +
+                    "OFFSET (ORGANIZATION_ID, SCORE, ENTITY_ID)=('org1', 2, '04')");
             assertTrue(rs.next());
             assertEquals("03", rs.getString(1));
             assertEquals(2.0, rs.getDouble(2), 0.001);
@@ -536,12 +535,9 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
             final ResultSet
                     rs =
                     stmt.executeQuery("SELECT score, entity_id \n" + "FROM " + fullTableName + "\n"
-                            + "WHERE organization_id = 'org1'\n"
-                            + "AND (score, entity_id) < ('b', '4')\n"
-                            + "ORDER BY score DESC, entity_id\n" + "LIMIT 3");
-            assertTrue(rs.next());
-            assertEquals("b", rs.getString(1));
-            assertEquals("3", rs.getString(2));
+                            + "ORDER BY organization_id, score DESC, entity_id\n"
+                            + "LIMIT 3\n"
+                            + "OFFSET (ORGANIZATION_ID,SCORE,ENTITY_ID)=('org1','b','4')\n");
             assertTrue(rs.next());
             assertEquals("a", rs.getString(1));
             assertEquals("2", rs.getString(2));
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
index 7374c8e..a1d562f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
@@ -49,7 +49,7 @@ import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
 public class QueryWithOffsetIT extends ParallelStatsDisabledIT {
-    
+
     private static final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n",
             "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
     private final boolean isSalted;
@@ -215,7 +215,7 @@ public class QueryWithOffsetIT extends ParallelStatsDisabledIT {
         ResultSetMetaData md = rs.getMetaData();
         assertEquals(5, md.getColumnCount());
     }
-    
+
     private void initTableValues(Connection conn) throws SQLException {
         for (int i = 0; i < 26; i++) {
             conn.createStatement().execute("UPSERT INTO " + tableName + " values('" + STRINGS[i] + "'," + i + ","
@@ -230,4 +230,6 @@ public class QueryWithOffsetIT extends ParallelStatsDisabledIT {
         conn.createStatement().execute(query);
     }
 
+
+
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowValueConstructorOffsetIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowValueConstructorOffsetIT.java
new file mode 100644
index 0000000..7309a3a
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowValueConstructorOffsetIT.java
@@ -0,0 +1,943 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+// RVC Based Offset - Tests
+public class RowValueConstructorOffsetIT extends ParallelStatsDisabledIT {
+
+    private static final String SIMPLE_DDL = "CREATE TABLE %s (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
+            + "k2 INTEGER NOT NULL,\n" + "v1 INTEGER,\n" + "v2 VARCHAR,\n"
+            + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) ";
+
+    private static final String DATA_DDL = "CREATE TABLE %s (k1 TINYINT NOT NULL,\n" + "k2 TINYINT NOT NULL,\n"
+            + "k3 TINYINT NOT NULL,\n" + "v1 INTEGER,\n" + "CONSTRAINT pk PRIMARY KEY (k1, k2, k3)) ";
+
+    private static final String TABLE_NAME = "T_" + generateUniqueName();
+
+    private static final String TABLE_ROW_KEY = "t_id, k1, k2";
+
+    private static final String GOOD_TABLE_ROW_KEY_VALUE = "'a', 1, 2";
+
+    private static final String DATA_TABLE_NAME = "T_" + generateUniqueName();
+
+    private static final String DATA_ROW_KEY = "k1, k2, k3";
+
+    private static final String GOOD_DATA_ROW_KEY_VALUE = "2, 3, 0";
+
+    private static final String INDEX_NAME =  "INDEX_" + TABLE_NAME;
+
+    private static final String DATA_INDEX_NAME =  "INDEX_" + DATA_TABLE_NAME;
+
+    private static final String DATA_INDEX_ROW_KEY = "k2, k1, k3";
+
+    private static Connection conn = null;
+
+    @BeforeClass
+    public static void init() throws SQLException {
+        conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
+
+        String dataTableDDL = String.format(DATA_DDL, DATA_TABLE_NAME);
+
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(dataTableDDL);
+        }
+
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(String.format(SIMPLE_DDL, TABLE_NAME));
+        }
+
+        conn.commit();
+
+        String upsertDML = String.format("UPSERT INTO %s VALUES(?,?,?,?)", DATA_TABLE_NAME);
+
+        int nRows = 0;
+        try (PreparedStatement ps = conn.prepareStatement(upsertDML)) {
+            for (int k1 = 0; k1 < 4; k1++) {
+                ps.setInt(1, k1);
+                for (int k2 = 0; k2 < 4; k2++) {
+                    ps.setInt(2, k2);
+                    for (int k3 = 0; k3 < 4; k3++) {
+                        ps.setInt(3, k3);
+                        ps.setInt(4, nRows);
+                        int result = ps.executeUpdate();
+                        assertEquals(1, result);
+                        nRows++;
+                    }
+                }
+            }
+            conn.commit();
+        }
+
+        String createIndex = "CREATE INDEX IF NOT EXISTS " + INDEX_NAME + " ON " + TABLE_NAME + " (k2 DESC,k1)";
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(createIndex);
+        }
+
+        String createDataIndex = "CREATE INDEX IF NOT EXISTS " + DATA_INDEX_NAME + " ON " + DATA_TABLE_NAME
+                + " (k2 DESC,k1)";
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(createDataIndex);
+        }
+        conn.commit();
+    }
+
+    @AfterClass
+    public static void cleanup() {
+        try {
+            if (conn != null) {
+                conn.close();
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    // Test RVC Offset columns must be coercible to a base table
+    @Test
+    public void testRVCOffsetNotCoercible() throws SQLException {
+        //'ab' is not an integer so this fails
+        String failureSql = String.format("SELECT %s FROM %s OFFSET (%s)=('a', 'ab', 2)",
+                TABLE_ROW_KEY,TABLE_NAME,TABLE_ROW_KEY);
+        try (Statement statement = conn.createStatement()){
+            statement.execute(failureSql);
+            fail("Should not allow non coercible values to PK in RVC Offset");
+        } catch (RowValueConstructorOffsetNotCoercibleException e) {
+            return;
+        }
+    }
+
+    // Test Order By Not PK Order By Exception
+    @Test
+    public void testRVCOffsetNotAllowNonPKOrderBy() throws SQLException {
+        String failureSql = String.format("SELECT %s, v1 FROM %s ORDER BY v1 OFFSET (%s)=(%s)",
+                TABLE_ROW_KEY,TABLE_NAME,TABLE_ROW_KEY,GOOD_TABLE_ROW_KEY_VALUE);
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(failureSql);
+            fail("Should not allow no PK order by with RVC Offset");
+        } catch (RowValueConstructorOffsetNotCoercibleException e) {
+            return;
+        }
+
+    }
+
+    // Test Order By Partial PK Order By Exception
+    @Test
+    public void testRVCOffsetNotAllowPartialPKOrderBy() throws SQLException {
+        String failureSql = String.format("SELECT %s FROM %s ORDER BY k1 OFFSET (%s)=(%s)",
+                TABLE_ROW_KEY,TABLE_NAME, TABLE_ROW_KEY, GOOD_TABLE_ROW_KEY_VALUE);
+        try (Statement statement = conn.createStatement()){
+            statement.execute(failureSql);
+            fail("Should not allow partial PK order by with RVC Offset");
+        } catch (RowValueConstructorOffsetNotCoercibleException e) {
+            return;
+        }
+    }
+
+    // Test Order By Different Sort PK Order By Exception
+    @Test
+    public void testRVCOffsetSamePKDifferentSortOrderBy() throws SQLException {
+        String failureSql = String.format("SELECT %s FROM %s ORDER BY t_id DESC, k1, k2 OFFSET (%s)=(%s)",
+                TABLE_ROW_KEY,TABLE_NAME, TABLE_ROW_KEY,GOOD_TABLE_ROW_KEY_VALUE);
+        try (Statement statement = conn.createStatement()){
+            statement.execute(failureSql);
+            fail("Should not allow different PK order by with RVC Offset");
+        } catch (RowValueConstructorOffsetNotCoercibleException e) {
+            return;
+        }
+    }
+
+    // Test Not allow joins
+    @Test
+    public void testRVCOffsetNotAllowedInJoins() throws SQLException {
+        String tableName2 = "T_" + generateUniqueName();
+        createTestTable(getUrl(), String.format(SIMPLE_DDL,tableName2));
+
+        String failureSql = String.format("SELECT T1.k1,T2.k2 FROM %s AS T1, %s AS T2 WHERE T1.t_id=T2.t_id OFFSET (T1.t_id, T1.k1, T1.k2)=('a', 1, 2)",
+                TABLE_NAME, tableName2);
+        try (Statement statement = conn.createStatement()){
+            statement.execute(failureSql);
+            fail("Should not have JOIN in RVC Offset");
+        } catch (SQLException e) {
+            return;
+        }
+    }
+
+    // Test Not allowed in subquery
+    @Test
+    public void testRVCOffsetNotAllowedInSubQuery() throws SQLException {
+        String failureSql = String.format("SELECT B.k2 FROM (SELECT %s FROM %s OFFSET (%s)=(%s)) AS B",
+                TABLE_ROW_KEY,TABLE_NAME,TABLE_ROW_KEY,GOOD_TABLE_ROW_KEY_VALUE);
+        try (Statement statement = conn.createStatement()){
+            statement.execute(failureSql);
+            fail("Should not have subquery with RVC Offset");
+        } catch (SQLException e) {
+            return;
+        }
+    }
+
+    // Test Not allowed on subquery
+    @Test
+    public void testRVCOffsetNotAllowedOnSubQuery() throws SQLException {
+        //Note subselect often gets rewritten to a flat query, in this case offset is still viable, inner orderby should require failure
+        String failureSql = String.format("SELECT * FROM (SELECT T_ID,K1,K2 AS COL3 FROM %s ORDER BY K1 LIMIT 2) AS B OFFSET (%s)=(%s)",
+                TABLE_NAME,TABLE_ROW_KEY,GOOD_TABLE_ROW_KEY_VALUE);
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(failureSql);
+            fail("Should not have subquery with RVC Offset");
+        } catch (SQLException e) {
+            return;
+        }
+    }
+
+    // Test RVC Offset must be a literal, cannot have column reference
+    @Test
+    public void testRVCOffsetLiteral() throws SQLException {
+        // column doesn't work must be literal
+        String failureSql = String.format("SELECT * FROM %s OFFSET (%s)=('a', 1, k2)",TABLE_NAME,TABLE_ROW_KEY);
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(failureSql);
+            fail("Should not have allowed column in RVC Offset");
+        } catch (RowValueConstructorOffsetNotCoercibleException e) {
+            return;
+        }
+    }
+
+    // Test RVC Offset must be in non-aggregate
+    @Test
+    public void testRVCOffsetAggregate() {
+        String failureSql = String.format("SELECT count(*) FROM %s  OFFSET (%s)=(%s)",TABLE_NAME,TABLE_ROW_KEY,GOOD_TABLE_ROW_KEY_VALUE);
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(failureSql);
+            fail("Should not have allowed aggregate with RVC Offset");
+        } catch (SQLException e) {
+            return;
+        }
+    }
+
+    // Test if RVC Offset RHS has less expressions than the pk, then it fails
+    @Test
+    public void testRVCOffsetPartialKey() throws SQLException {
+        String failureSql = String.format("SELECT * FROM %s  OFFSET (%s)=('a', 1)",TABLE_NAME,TABLE_ROW_KEY);
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(failureSql);
+            fail("Should not have allowed partial Key RVC Offset");
+        } catch (RowValueConstructorOffsetNotCoercibleException e) {
+            return;
+        }
+    }
+
+    // Test if RVC Offset RHS has more expressions than the pk, then it fails
+    @Test
+    public void testRVCOffsetMoreThanKey() throws SQLException {
+        String failureSql = String.format("SELECT * FROM %s OFFSET (%s)=('a', 1, 2, 3)",TABLE_NAME,TABLE_ROW_KEY);
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(failureSql);
+            fail("Should not have allowed more than pk columns in Key RVC Offset");
+        } catch (RowValueConstructorOffsetNotCoercibleException e) {
+            return;
+        }
+    }
+
+    // Test RVC Offset doesn't match the rowkey
+    @Test
+    public void testRVCOffsetLHSDoesNotMatchTable() throws SQLException {
+        String failureSql = String.format("SELECT * FROM %s LIMIT 2 OFFSET (k1,k2)=(%s)",TABLE_NAME,GOOD_TABLE_ROW_KEY_VALUE);
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(failureSql);
+            fail("Should not have allowed the LHS to not be the same as the pk");
+        } catch (RowValueConstructorOffsetNotCoercibleException e) {
+            return;
+        }
+    }
+
+    // Test RVC Offset simple case, can we offset into the table and select the correct rows
+    @Test
+    public void testSimpleRVCOffsetLookup() throws SQLException {
+        String sql = String.format("SELECT * FROM %s LIMIT 3 OFFSET (%s)=(%s)",DATA_TABLE_NAME,DATA_ROW_KEY,GOOD_DATA_ROW_KEY_VALUE);
+        try(Statement statement = conn.createStatement(); ResultSet rs = statement.executeQuery(sql)) {
+            assertTrue(rs.next());
+            {
+                int k1 = rs.getInt(1);
+                int k2 = rs.getInt(2);
+                int k3 = rs.getInt(3);
+                assertEquals(2, k1);
+                assertEquals(3, k2);
+                assertEquals(1, k3);
+            }
+            assertTrue(rs.next());
+            {
+                int k1 = rs.getInt(1);
+                int k2 = rs.getInt(2);
+                int k3 = rs.getInt(3);
+                assertEquals(2, k1);
+                assertEquals(3, k2);
+                assertEquals(2, k3);
+            }
+            assertTrue(rs.next());
+            {
+                int k1 = rs.getInt(1);
+                int k2 = rs.getInt(2);
+                int k3 = rs.getInt(3);
+                assertEquals(2, k1);
+                assertEquals(3, k2);
+                assertEquals(3, k3);
+            }
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testBindsRVCOffsetLookup() throws SQLException {
+        String sql = String.format("SELECT * FROM %s LIMIT 2 OFFSET (%s)=(?, ?, ?)",DATA_TABLE_NAME,DATA_ROW_KEY);
+        try(PreparedStatement ps = conn.prepareStatement(sql)) {
+            ps.setInt(1, 2);
+            ps.setInt(2, 3);
+            ps.setInt(3, 1);
+
+            try (ResultSet rs = ps.executeQuery()) {
+
+                assertTrue(rs.next());
+                {
+                    int k1 = rs.getInt(1);
+                    int k2 = rs.getInt(2);
+                    int k3 = rs.getInt(3);
+                    assertEquals(2, k1);
+                    assertEquals(3, k2);
+                    assertEquals(2, k3);
+                }
+                assertTrue(rs.next());
+                {
+                    int k1 = rs.getInt(1);
+                    int k2 = rs.getInt(2);
+                    int k3 = rs.getInt(3);
+                    assertEquals(2, k1);
+                    assertEquals(3, k2);
+                    assertEquals(3, k3);
+                }
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+
+
+    // Test RVC Offset where clause
+    @Test
+    public void testWhereClauseRVCOffsetLookup() throws SQLException {
+       //Offset should not overcome the where clause
+        String sql = String.format("SELECT * FROM %s WHERE (k1,k2,k3)=(3,3,3) LIMIT 2 OFFSET (%s)=(%s)",DATA_TABLE_NAME,DATA_ROW_KEY,GOOD_DATA_ROW_KEY_VALUE);
+        try(Statement statement = conn.createStatement(); ResultSet rs = statement.executeQuery(sql)) {
+            assertTrue(rs.next());
+            {
+                int k1 = rs.getInt(1);
+                int k2 = rs.getInt(2);
+                int k3 = rs.getInt(3);
+                assertEquals(3, k1);
+                assertEquals(3, k2);
+                assertEquals(3, k3);
+            }
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testSaltedTableRVCOffsetOrderBy() throws SQLException {
+        //Make a salted table
+        String saltedTableName = "T_" + generateUniqueName();
+
+        String saltedDDL = String.format(DATA_DDL + "SALT_BUCKETS=4",saltedTableName);
+
+        try(Statement statement = conn.createStatement()) {
+            statement.execute(saltedDDL);
+        }
+        conn.commit();
+
+        //If we attempt to order by the row key we should not fail
+        String sql = "SELECT * FROM " + saltedTableName + " ORDER BY K1,K2,K3 LIMIT 2 OFFSET (k1,k2,k3)=(2, 3, 1)";
+        try(Statement statement = conn.createStatement(); ResultSet rs = statement.executeQuery(sql)) {
+            statement.executeQuery(sql);
+        }
+
+        //If we attempt to order by the not row key we should fail
+        sql = "SELECT * FROM " + saltedTableName + " ORDER BY K2,K1,K3 LIMIT 2 OFFSET (k1,k2,k3)=(2, 3, 1)";
+        try(Statement statement = conn.createStatement(); ResultSet rs = statement.executeQuery(sql)) {
+            fail();
+        } catch(RowValueConstructorOffsetNotCoercibleException e) {
+            return;
+        }
+    }
+
+    @Test
+    public void testSaltedTableRVCOffset() throws SQLException {
+        //Make a salted table
+        String saltedTableName = "T_" + generateUniqueName();
+
+        String saltedDDL = String.format(DATA_DDL + "SALT_BUCKETS=4",saltedTableName);
+
+        try(Statement statement = conn.createStatement()) {
+            statement.execute(saltedDDL);
+            conn.commit();
+        }
+
+        String upsertDML = String.format("UPSERT INTO %s VALUES(?,?,?,?)", saltedTableName);
+
+        int nRows = 0;
+        try (PreparedStatement ps = conn.prepareStatement(upsertDML)) {
+            for (int k1 = 0; k1 < 4; k1++) {
+                ps.setInt(1, k1);
+                for (int k2 = 0; k2 < 4; k2++) {
+                    ps.setInt(2, k2);
+                    for (int k3 = 0; k3 < 4; k3++) {
+                        ps.setInt(3, k3);
+                        ps.setInt(4, nRows);
+                        int result = ps.executeUpdate();
+                        assertEquals(1, result);
+                        nRows++;
+                    }
+                }
+            }
+            conn.commit();
+        }
+
+        String sql = String.format("SELECT * FROM " + saltedTableName + " ORDER BY %s LIMIT 3 OFFSET (%s)=(%s)",DATA_ROW_KEY,DATA_ROW_KEY,GOOD_DATA_ROW_KEY_VALUE);
+
+        try(Statement statement = conn.createStatement(); ResultSet rs = statement.executeQuery(sql)) {
+            assertTrue(rs.next());
+            {
+                int k1 = rs.getInt(1);
+                int k2 = rs.getInt(2);
+                int k3 = rs.getInt(3);
+                assertEquals(2, k1);
+                assertEquals(3, k2);
+                assertEquals(1, k3);
+            }
+            assertTrue(rs.next());
+            {
+                int k1 = rs.getInt(1);
+                int k2 = rs.getInt(2);
+                int k3 = rs.getInt(3);
+                assertEquals(2, k1);
+                assertEquals(3, k2);
+                assertEquals(2, k3);
+            }
+            assertTrue(rs.next());
+            {
+                int k1 = rs.getInt(1);
+                int k2 = rs.getInt(2);
+                int k3 = rs.getInt(3);
+                assertEquals(2, k1);
+                assertEquals(3, k2);
+                assertEquals(3, k3);
+            }
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testGlobalViewRVCOffset() throws SQLException {
+        //Make a view
+        String viewName1 = "V_" + generateUniqueName();
+
+        //Simple View
+        String viewDDL = "CREATE VIEW " + viewName1 + " AS SELECT * FROM " + DATA_TABLE_NAME;
+        try(Statement statement = conn.createStatement()) {
+            statement.execute(viewDDL);
+            conn.commit();
+        }
+        String sql = "SELECT  k2,k1,k3 FROM " + viewName1 + " LIMIT 3 OFFSET (k2,k1,k3)=(3, 3, 1)";
+
+        try(Statement statement = conn.createStatement() ; ResultSet rs = statement.executeQuery(sql)) {
+            assertTrue(rs.next());
+            {
+                int k2 = rs.getInt(1);
+                int k1 = rs.getInt(2);
+                int k3 = rs.getInt(3);
+
+                assertEquals(3, k2);
+                assertEquals(3, k1);
+                assertEquals(2, k3);
+            }
+            assertTrue(rs.next());
+            {
+                int k2 = rs.getInt(1);
+                int k1 = rs.getInt(2);
+                int k3 = rs.getInt(3);
+
+                assertEquals(3, k2);
+                assertEquals(3, k1);
+                assertEquals(3, k3);
+            }
+            assertTrue(rs.next());
+            {
+                int k2 = rs.getInt(1);
+                int k1 = rs.getInt(2);
+                int k3 = rs.getInt(3);
+
+                assertEquals(2, k2);
+                assertEquals(0, k1);
+                assertEquals(0, k3);
+            }
+            assertFalse(rs.next());
+        }
+    }
+
+    private void parameterizedTenantTestCase(boolean isSalted) throws SQLException {
+        String multiTenantDataTableName = "T_" + generateUniqueName();
+
+        String multiTenantDDL = "CREATE TABLE %s (tenant_id VARCHAR NOT NULL, k1 TINYINT NOT NULL,\n" + "k2 TINYINT NOT NULL,\n"
+                + "k3 TINYINT NOT NULL,\n" + "v1 INTEGER,\n" + "CONSTRAINT pk PRIMARY KEY (tenant_id, k1, k2, k3)) MULTI_TENANT=true";
+
+        if(isSalted){
+            multiTenantDDL = multiTenantDDL + ", SALT_BUCKETS=4";
+        }
+
+        try(Statement statement = conn.createStatement()) {
+            statement.execute(String.format(multiTenantDDL, multiTenantDataTableName));
+            conn.commit();
+        }
+
+        String tenantId2 = "tenant2";
+
+        //tenant connection
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId2);
+
+        try(Connection tenant2Connection = DriverManager.getConnection(getUrl(), props)) {
+
+            //create tenant view with new pks
+            String viewName = multiTenantDataTableName + "_" + tenantId2;
+            try (Statement statement = tenant2Connection.createStatement()) {
+                statement.execute("CREATE VIEW " + viewName + " ( vk1 INTEGER NOT NULL, vv1 INTEGER, CONSTRAINT PKVIEW PRIMARY KEY(vk1))  AS SELECT * FROM "
+                        + multiTenantDataTableName);
+            }
+            //create tenant view index on tenant view
+            String viewIndexName = viewName + "_Index1";
+            try (Statement statement = tenant2Connection.createStatement()) {
+                statement.execute("CREATE INDEX " + viewIndexName + " ON " + viewName + " ( vv1 ) ");
+            }
+
+            String upsertDML = String.format("UPSERT INTO %s VALUES(?,?,?,?,?,?)", viewName);
+            int tenantRows = 0;
+            try (PreparedStatement tps = tenant2Connection.prepareStatement(upsertDML)) {
+                for (int k1 = 0; k1 < 4; k1++) {
+                    tps.setInt(1, k1);
+                    for (int k2 = 0; k2 < 4; k2++) {
+                        tps.setInt(2, k2);
+                        for (int k3 = 0; k3 < 4; k3++) {
+                            tps.setInt(3, k3);
+                            for (int vk1 = 0; vk1 < 4; vk1++) {
+                                tps.setInt(4, tenantRows);
+                                tps.setInt(5, vk1);
+                                tps.setInt(6, -tenantRows); //vv1
+
+                                int result = tps.executeUpdate();
+                                assertEquals(1, result);
+                                tenantRows++;
+                            }
+                        }
+                    }
+                }
+                tenant2Connection.commit();
+            }
+
+            //tenant view
+            {
+                String sql = "SELECT k1,k2,k3,vk1 FROM " + viewName + "  LIMIT 2 OFFSET (k1,k2,k3,vk1)=(2, 3, 1, 2)";
+                try (Statement statement = tenant2Connection.createStatement(); ResultSet rs = statement.executeQuery(sql)) {
+                    assertTrue(rs.next());
+                    {
+                        int k1 = rs.getInt(1);
+                        int k2 = rs.getInt(2);
+                        int k3 = rs.getInt(3);
+                        int vk1 = rs.getInt(4);
+                        assertEquals(2, k1);
+                        assertEquals(3, k2);
+                        assertEquals(1, k3);
+                        assertEquals(3, vk1);
+                    }
+                    assertTrue(rs.next());
+                    {
+                        int k1 = rs.getInt(1);
+                        int k2 = rs.getInt(2);
+                        int k3 = rs.getInt(3);
+                        int vk1 = rs.getInt(4);
+                        assertEquals(2, k1);
+                        assertEquals(3, k2);
+                        assertEquals(2, k3);
+                        assertEquals(0, vk1);
+                    }
+                    assertFalse(rs.next());
+                }
+            }
+
+            //tenant index
+            {
+                String sql = "SELECT vv1,k1,k2,k3,vk1 FROM " + viewName + " ORDER BY vv1 LIMIT 2 OFFSET (vv1,k1,k2,k3,vk1)=(-184,2, 3, 2, 0)";
+                try (Statement statement = tenant2Connection.createStatement(); ResultSet rs = statement.executeQuery(sql)) {
+                    assertTrue(rs.next());
+                    {
+                        int vv1 = rs.getInt(1);
+                        int k1 = rs.getInt(2);
+                        int k2 = rs.getInt(3);
+                        int k3 = rs.getInt(4);
+                        int vk1 = rs.getInt(5);
+                        assertEquals(-183, vv1);
+                        assertEquals(2, k1);
+                        assertEquals(3, k2);
+                        assertEquals(1, k3);
+                        assertEquals(3, vk1);
+                    }
+                    assertTrue(rs.next());
+                    {
+                        int vv1 = rs.getInt(1);
+                        int k1 = rs.getInt(2);
+                        int k2 = rs.getInt(3);
+                        int k3 = rs.getInt(4);
+                        int vk1 = rs.getInt(5);
+                        assertEquals(-182, vv1);
+                        assertEquals(2, k1);
+                        assertEquals(3, k2);
+                        assertEquals(1, k3);
+                        assertEquals(2, vk1);
+                    }
+                    assertFalse(rs.next());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testTenantRVCOffset() throws SQLException {
+        parameterizedTenantTestCase(false);
+    }
+
+    @Test
+    public void testSaltedViewIndexRVCOffset() throws SQLException {
+        parameterizedTenantTestCase(true);
+    }
+
+    @Test
+    public void testViewIndexRVCOffset() throws SQLException {
+        String multiTenantDataTableName = "T_" + generateUniqueName();
+
+        String multiTenantDDL = String.format("CREATE TABLE %s (tenant_id VARCHAR NOT NULL, k1 TINYINT NOT NULL,\n" + "k2 TINYINT NOT NULL,\n"
+                + "k3 TINYINT NOT NULL,\n" + "v1 INTEGER,\n" + "CONSTRAINT pk PRIMARY KEY (tenant_id, k1, k2, k3)) MULTI_TENANT=true",multiTenantDataTableName);
+
+        try(Statement statement = conn.createStatement()) {
+            statement.execute(multiTenantDDL);
+            conn.commit();
+        }
+
+        String tenantId2 = "tenant2";
+
+        //tenant connection
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId2);
+        try(Connection tenant2Connection = DriverManager.getConnection(getUrl(), props)) {
+            //create tenant view with new pks
+            String viewName = multiTenantDataTableName + "_" + tenantId2;
+            try(Statement statement = tenant2Connection.createStatement()) {
+                statement.execute("CREATE VIEW " + viewName + " ( vk1 INTEGER NOT NULL, vv1 INTEGER, CONSTRAINT PKVIEW PRIMARY KEY(vk1))  AS SELECT * FROM "
+                        + multiTenantDataTableName);
+            }
+
+            //create tenant view index on tenant view
+            String viewIndexName = viewName + "_Index1";
+            try(Statement statement = tenant2Connection.createStatement()) {
+                statement.execute("CREATE INDEX " + viewIndexName + " ON " + viewName + " ( vv1 ) ");
+            }
+
+            String upsertDML = String.format("UPSERT INTO %s VALUES(?,?,?,?,?,?)", viewName);
+            int tenantRows = 0;
+            try(PreparedStatement tps = tenant2Connection.prepareStatement(upsertDML)) {
+                for (int k1 = 0; k1 < 4; k1++) {
+                    tps.setInt(1, k1);
+                    for (int k2 = 0; k2 < 4; k2++) {
+                        tps.setInt(2, k2);
+                        for (int k3 = 0; k3 < 4; k3++) {
+                            tps.setInt(3, k3);
+                            tps.setInt(4, tenantRows);
+                            for (int vk1 = 0; vk1 < 4; vk1++) {
+                                tps.setInt(5, vk1);
+                                tps.setInt(6, -tenantRows); //vv1
+
+                                int result = tps.executeUpdate();
+                                assertEquals(1, result);
+                                tenantRows++;
+                            }
+                        }
+                    }
+                }
+                tenant2Connection.commit();
+            }
+
+            //View Index Queries
+            String sql = "SELECT vv1,k1,k2,k3,vk1 FROM " + viewName + " ORDER BY vv1 LIMIT 3 OFFSET (vv1,k1,k2,k3,vk1)=(-196, 3,0,0,1)";
+            try(Statement statement = tenant2Connection.createStatement(); ResultSet rs = statement.executeQuery(sql)) {
+                assertTrue(rs.next());
+                {
+                    int vv1 = rs.getInt(1);
+                    int k1 = rs.getInt(2);
+                    int k2 = rs.getInt(3);
+                    int k3 = rs.getInt(4);
+                    int vk1 = rs.getInt(5);
+
+                    assertEquals(-196, vv1);
+                    assertEquals(3, k1);
+                    assertEquals(0, k2);
+                    assertEquals(1, k3);
+                    assertEquals(0, vk1);
+                }
+                assertTrue(rs.next());
+                {
+                    int vv1 = rs.getInt(1);
+                    int k1 = rs.getInt(2);
+                    int k2 = rs.getInt(3);
+                    int k3 = rs.getInt(4);
+                    int vk1 = rs.getInt(5);
+
+                    assertEquals(-195, vv1);
+                    assertEquals(3, k1);
+                    assertEquals(0, k2);
+                    assertEquals(0, k3);
+                    assertEquals(3, vk1);
+                }
+                assertTrue(rs.next());
+                {
+                    int vv1 = rs.getInt(1);
+                    int k1 = rs.getInt(2);
+                    int k2 = rs.getInt(3);
+                    int k3 = rs.getInt(4);
+                    int vk1 = rs.getInt(5);
+
+                    assertEquals(-194, vv1);
+                    assertEquals(3, k1);
+                    assertEquals(0, k2);
+                    assertEquals(0, k3);
+                    assertEquals(2, vk1);
+                }
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testIndexRVCOffset() throws SQLException {
+        String sql = String.format("SELECT %s FROM %s LIMIT 3 OFFSET (%s)=(3, 3, 1)",DATA_INDEX_ROW_KEY,DATA_TABLE_NAME,DATA_INDEX_ROW_KEY);
+        try(Statement statement = conn.createStatement() ; ResultSet rs = statement.executeQuery(sql)) {
+            assertTrue(rs.next());
+            {
+                int k2 = rs.getInt(1);
+                int k1 = rs.getInt(2);
+                int k3 = rs.getInt(3);
+
+                assertEquals(3, k2);
+                assertEquals(3, k1);
+                assertEquals(2, k3);
+            }
+            assertTrue(rs.next());
+            {
+                int k2 = rs.getInt(1);
+                int k1 = rs.getInt(2);
+                int k3 = rs.getInt(3);
+
+                assertEquals(3, k2);
+                assertEquals(3, k1);
+                assertEquals(3, k3);
+            }
+            assertTrue(rs.next());
+            {
+                int k2 = rs.getInt(1);
+                int k1 = rs.getInt(2);
+                int k3 = rs.getInt(3);
+
+                assertEquals(2, k2);
+                assertEquals(0, k1);
+                assertEquals(0, k3);
+            }
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testUncoveredIndexRVCOffsetFails() throws SQLException {
+        //v1 is not in the index
+        String sql = "SELECT  k2,k1,k3,v1 FROM " + DATA_TABLE_NAME + " LIMIT 3 OFFSET (k2,k1,k3)=(3, 3, 2)";
+        try (Statement statement = conn.createStatement(); ResultSet rs = statement.executeQuery(sql)){
+            fail("Should not have allowed uncovered index access with RVC Offset without hinting to index.");
+        } catch (RowValueConstructorOffsetNotCoercibleException e) {
+            return;
+        }
+
+    }
+
+    @Test
+    public void testIndexSaltedBaseTableRVCOffset() throws SQLException {
+        String saltedTableName = "T_" + generateUniqueName();
+
+        String saltedDDL = String.format(DATA_DDL + "SALT_BUCKETS=4",saltedTableName);
+
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(saltedDDL);
+            conn.commit();
+        }
+
+        String indexName = "I_" + generateUniqueName();
+
+        String indexDDL = String.format("CREATE INDEX %s ON %s (v1,k2)",indexName,saltedTableName);
+
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(indexDDL);
+            conn.commit();
+        }
+
+        String upsertDML = String.format("UPSERT INTO %s VALUES(?,?,?,?)", saltedTableName);
+        int nRows = 0;
+        try(PreparedStatement ps = conn.prepareStatement(upsertDML)) {
+            for (int k1 = 0; k1 < 4; k1++) {
+                ps.setInt(1, k1);
+                for (int k2 = 0; k2 < 4; k2++) {
+                    ps.setInt(2, k2);
+                    for (int k3 = 0; k3 < 4; k3++) {
+                        ps.setInt(3, k3);
+                        ps.setInt(4, nRows);
+                        int result = ps.executeUpdate();
+                        assertEquals(1, result);
+                        nRows++;
+                    }
+                }
+            }
+            conn.commit();
+        }
+
+        //Note Today Salted Base Table forces salted index
+        String sql = "SELECT v1,k2,k1,k3 FROM " + saltedTableName + " LIMIT 3 OFFSET (v1,k2,k1,k3)=(8, 2, 0, 0)";
+        try(Statement statement = conn.createStatement(); ResultSet rs = statement.executeQuery(sql)) {
+            assertTrue(rs.next());
+            {
+                int v1 = rs.getInt(1);
+                int k2 = rs.getInt(2);
+                int k1 = rs.getInt(3);
+                int k3 = rs.getInt(4);
+                assertEquals(9, v1);
+                assertEquals(2, k2);
+                assertEquals(0, k1);
+                assertEquals(1, k3);
+            }
+            assertTrue(rs.next());
+            {
+                int v1 = rs.getInt(1);
+                int k2 = rs.getInt(2);
+                int k1 = rs.getInt(3);
+                int k3 = rs.getInt(4);
+                assertEquals(10, v1);
+                assertEquals(2, k2);
+                assertEquals(0, k1);
+                assertEquals(2, k3);
+            }
+            assertTrue(rs.next());
+            {
+                int v1 = rs.getInt(1);
+                int k2 = rs.getInt(2);
+                int k1 = rs.getInt(3);
+                int k3 = rs.getInt(4);
+                assertEquals(11, v1);
+                assertEquals(2, k2);
+                assertEquals(0, k1);
+                assertEquals(3, k3);
+            }
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testIndexMultiColumnsMultiIndexesRVCOffset() throws SQLException {
+        String ddlTemplate = "CREATE TABLE %s (k1 TINYINT NOT NULL,\n" +
+                "k2 TINYINT NOT NULL,\n" +
+                "k3 TINYINT NOT NULL,\n" +
+                "k4 TINYINT NOT NULL,\n" +
+                "k5 TINYINT NOT NULL,\n" +
+                "k6 TINYINT NOT NULL,\n" +
+                "v1 INTEGER,\n" +
+                "v2 INTEGER,\n" +
+                "v3 INTEGER,\n" +
+                "v4 INTEGER,\n" +
+                "CONSTRAINT pk PRIMARY KEY (k1, k2, k3, k4, k5, k6)) ";
+
+        String longKeyTableName = "T_" + generateUniqueName();
+        String longKeyIndex1Name =  "INDEX_1_" + longKeyTableName;
+        String longKeyIndex2Name =  "INDEX_2_" + longKeyTableName;
+
+        String ddl = String.format(ddlTemplate,longKeyTableName);
+        try(Statement statement = conn.createStatement()) {
+            statement.execute(ddl);
+        }
+
+        String createIndex1 = "CREATE INDEX IF NOT EXISTS " + longKeyIndex1Name + " ON " + longKeyTableName + " (k2 ,v1, k4)";
+        String createIndex2 = "CREATE INDEX IF NOT EXISTS " + longKeyIndex2Name + " ON " + longKeyTableName + " (v1, v3)";
+
+        try(Statement statement = conn.createStatement()) {
+            statement.execute(createIndex1);
+        }
+        try(Statement statement = conn.createStatement()) {
+            statement.execute(createIndex2);
+        }
+
+        String sql = "SELECT  k2,v1,k4 FROM " + longKeyTableName + " LIMIT 3 OFFSET (k2,v1,k4,k1,k3,k5,k6)=(2,-1,4,1,3,5,6)";
+        try(Statement statement = conn.createStatement(); ResultSet rs = statement.executeQuery(sql)) {
+        }
+        sql = "SELECT  v1,v3 FROM " + longKeyTableName + " LIMIT 3 OFFSET (v1,v3,k1,k2,k3,k4,k5,k6)=(-1,-3,1,2,3,4,5,6)";
+        try(Statement statement = conn.createStatement(); ResultSet rs = statement.executeQuery(sql)) {
+        }
+    }
+
+    @Test
+    public void testOffsetExplain() throws SQLException {
+        String sql = "EXPLAIN SELECT * FROM " + DATA_TABLE_NAME + "  LIMIT 2 OFFSET (k1,k2,k3)=(2, 3, 2)";
+        try(Statement statement = conn.createStatement(); ResultSet rs = statement.executeQuery(sql)) {
+            StringBuilder explainStringBuilder = new StringBuilder();
+            while (rs.next()) {
+                String explain = rs.getString(1);
+                explainStringBuilder.append(explain);
+            }
+            assertTrue(explainStringBuilder.toString().contains("With RVC Offset"));
+        }
+    }
+}
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 2191007..eca5c9c 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -747,7 +747,7 @@ select_node returns [SelectStatement ret]
     :   u=unioned_selects
         (ORDER BY order=order_by)?
         (LIMIT l=limit)?
-        (OFFSET o=offset (ROW | ROWS)?)?
+        (OFFSET o=offset)?
         (FETCH (FIRST | NEXT) (l=limit)? (ROW | ROWS) ONLY)?
         { ParseContext context = contextStack.peek(); $ret = factory.select(u, order, l, o, getBindCount(), context.isAggregate()); }
     ;
@@ -817,8 +817,9 @@ limit returns [LimitNode ret]
     ;
     
 offset returns [OffsetNode ret]
-	: b=bind_expression { $ret = factory.offset(b); }
-    | l=int_or_long_literal { $ret = factory.offset(l); }
+	: b=bind_expression (ROW | ROWS)? {  try { $ret = factory.offset(b); } catch (SQLException e) { throw new RuntimeException(e); } }
+    | l=int_or_long_literal (ROW | ROWS)? { try { $ret = factory.offset(l); } catch (SQLException e) { throw new RuntimeException(e); } }
+    | LPAREN lhs=one_or_more_expressions RPAREN EQ LPAREN rhs=one_or_more_expressions RPAREN { try { $ret = factory.offset(factory.comparison(CompareOp.EQUAL,factory.rowValueConstructor(lhs),factory.rowValueConstructor(rhs)));  } catch (SQLException e) { throw new RuntimeException(e); } }
     ;
 
 sampling_rate returns [LiteralParseNode ret]
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CompiledOffset.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CompiledOffset.java
new file mode 100644
index 0000000..d479c4a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CompiledOffset.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import com.google.common.base.Optional;
+
+/**
+ * CompiledOffset represents the result of the Compiler on the OFFSET clause.
+ */
+public class CompiledOffset {
+    private final Optional<Integer> integerOffset;
+    private final Optional<byte[]> byteOffset;
+
+    public CompiledOffset(Optional<Integer> integerOffset, Optional<byte[]> byteOffset) {
+        this.integerOffset = integerOffset;
+        this.byteOffset = byteOffset;
+    }
+
+    public Optional<Integer> getIntegerOffset() {
+        return integerOffset;
+    }
+
+    public Optional<byte[]> getByteOffset() {
+        return byteOffset;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index d81e6b3..e7db5b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -286,4 +286,9 @@ public class ListJarsQueryPlan implements QueryPlan {
     public List<OrderBy> getOutputOrderBys() {
         return Collections.<OrderBy> emptyList();
     }
+
+    @Override
+    public boolean isApplicable() {
+        return true;
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java
index 54be50b..c68a910 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java
@@ -30,10 +30,13 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PInteger;
 
+import com.google.common.base.Optional;
+
 public class OffsetCompiler {
+
     private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();
 
-    public static final PDatum OFFSET_DATUM = new PDatum() {
+    private static final PDatum OFFSET_DATUM = new PDatum() {
         @Override
         public boolean isNullable() {
             return false;
@@ -60,25 +63,43 @@ public class OffsetCompiler {
         }
     };
 
+    private final RVCOffsetCompiler rvcOffsetCompiler = RVCOffsetCompiler.getInstance();
+
     private OffsetCompiler() {}
 
-    public static Integer compile(StatementContext context, FilterableStatement statement) throws SQLException {
+    // eager initialization
+    final private static OffsetCompiler OFFSET_COMPILER = getInstance();
+
+    private static OffsetCompiler getInstance() {
+        return new OffsetCompiler();
+    }
+
+    public static OffsetCompiler getOffsetCompiler() {
+        return OFFSET_COMPILER;
+    }
+
+    public CompiledOffset compile(StatementContext context, FilterableStatement statement, boolean inJoin, boolean inUnion) throws SQLException {
         OffsetNode offsetNode = statement.getOffset();
-        if (offsetNode == null) { return null; }
-        OffsetParseNodeVisitor visitor = new OffsetParseNodeVisitor(context);
-        offsetNode.getOffsetParseNode().accept(visitor);
-        return visitor.getOffset();
+        if (offsetNode == null) { return new CompiledOffset(Optional.<Integer>absent(), Optional.<byte[]>absent()); }
+        if (offsetNode.isIntegerOffset()) {
+            OffsetParseNodeVisitor visitor = new OffsetParseNodeVisitor(context);
+            offsetNode.getOffsetParseNode().accept(visitor);
+            Integer offset = visitor.getOffset();
+            return new CompiledOffset(Optional.fromNullable(offset), Optional.<byte[]>absent());
+        } else { //Must be a RVC Offset
+            return rvcOffsetCompiler.getRVCOffset(context, statement, inJoin, inUnion, offsetNode);
+        }
     }
 
     private static class OffsetParseNodeVisitor extends TraverseNoParseNodeVisitor<Void> {
         private final StatementContext context;
         private Integer offset;
 
-        public OffsetParseNodeVisitor(StatementContext context) {
+        OffsetParseNodeVisitor(StatementContext context) {
             this.context = context;
         }
 
-        public Integer getOffset() {
+        Integer getOffset() {
             return offset;
         }
 
@@ -87,7 +108,7 @@ public class OffsetCompiler {
             Object offsetValue = node.getValue();
             if (offsetValue != null) {
                 Integer offset = (Integer)OFFSET_DATUM.getDataType().toObject(offsetValue, node.getType());
-                if (offset.intValue() >= 0) {
+                if (offset >= 0) {
                     this.offset = offset;
                 }
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
index 712663c..d76875e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
@@ -39,6 +39,8 @@ import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowValueConstructorOffsetNotAllowedInQueryException;
+import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
 import org.apache.phoenix.schema.types.PInteger;
 
 import com.google.common.collect.ImmutableList;
@@ -118,7 +120,7 @@ public class OrderByCompiler {
                                   SelectStatement statement,
                                   GroupBy groupBy,
                                   Integer limit,
-                                  Integer offset,
+                                  CompiledOffset offset,
                                   RowProjector rowProjector,
                                   QueryPlan innerQueryPlan,
                                   Expression whereExpression) throws SQLException {
@@ -192,6 +194,16 @@ public class OrderByCompiler {
             }
             compiler.reset();
         }
+
+        //If we are not ordered we shouldn't be using RVC Offset
+        //I think this makes sense for the pagination case but perhaps we can relax this for
+        //other use cases.
+        //Note If the table is salted we still mark as row ordered in this code path
+        if (offset.getByteOffset().isPresent() && orderByExpressions.isEmpty()) {
+            throw new RowValueConstructorOffsetNotAllowedInQueryException(
+                    "RVC OFFSET requires either forceRowKeyOrder or explict ORDERBY with row key order");
+        }
+
         // we can remove ORDER BY clauses in case of only COUNT(DISTINCT...) clauses
         if (orderByExpressions.isEmpty() || groupBy.isUngroupedAggregate()) {
             return OrderBy.EMPTY_ORDER_BY;
@@ -209,13 +221,19 @@ public class OrderByCompiler {
                         && context.getCurrentTable().getTable().getType() != PTableType.PROJECTED
                         && context.getCurrentTable().getTable().getType() != PTableType.SUBQUERY
                         && !statement.getHint().hasHint(Hint.FORWARD_SCAN)) {
+                    if(offset.getByteOffset().isPresent()){
+                        throw new SQLException("Do not allow non-pk ORDER BY with RVC OFFSET");
+                    }
                     return OrderBy.REV_ROW_KEY_ORDER_BY;
                 }
             } else {
                 return OrderBy.FWD_ROW_KEY_ORDER_BY;
             }
         }
-
+        //If we were in row order this would be optimized out above
+        if(offset.getByteOffset().isPresent()){
+            throw new RowValueConstructorOffsetNotCoercibleException("Do not allow non-pk ORDER BY with RVC OFFSET");
+        }
         return new OrderBy(Lists.newArrayList(orderByExpressions.iterator()));
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 8afd111..a7b6948 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -41,6 +41,7 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.ClientAggregatePlan;
 import org.apache.phoenix.execute.ClientScanPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
@@ -80,6 +81,7 @@ import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -88,6 +90,7 @@ import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.base.Optional;
 
 
 /**
@@ -240,7 +243,9 @@ public class QueryCompiler {
                 statement.getParameters(),
                 false,
                 false,
-                null);
+                null,
+                false,
+                true);
         plan = new UnionPlan(context, select, tableRef, plan.getProjector(), plan.getLimit(),
             plan.getOffset(), plan.getOrderBy(), GroupBy.EMPTY_GROUP_BY, plans,
             context.getBindManager().getParameterMetaData());
@@ -288,7 +293,9 @@ public class QueryCompiler {
                         binds,
                         asSubquery,
                         !asSubquery,
-                        null);
+                        null,
+                        true,
+                        false);
             }
             QueryPlan plan = compileSubquery(subquery, false);
             PTable projectedTable = table.createProjectedTable(plan.getProjector());
@@ -411,7 +418,7 @@ public class QueryCompiler {
                         binds,
                         asSubquery,
                         !asSubquery && joinTable.isAllLeftJoin(),
-                        null);
+                        null, true, false);
                 Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
                 Integer limit = null;
                 Integer offset = null;
@@ -471,7 +478,9 @@ public class QueryCompiler {
                         binds,
                         asSubquery,
                         !asSubquery && type == JoinType.Right,
-                        null);
+                        null,
+                        true,
+                        false);
                 Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
                 Integer limit = null;
                 Integer offset = null;
@@ -561,7 +570,9 @@ public class QueryCompiler {
                         binds,
                         asSubquery,
                         false,
-                        innerPlan);
+                        innerPlan,
+                        true,
+                        false);
             }
             default:
                 throw new IllegalArgumentException("Invalid join strategy '" + strategy + "'");
@@ -625,7 +636,12 @@ public class QueryCompiler {
     protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{
         SelectStatement innerSelect = select.getInnerSelectStatement();
         if (innerSelect == null) {
-            return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null);
+            return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, false, false);
+        }
+
+        if((innerSelect.getOffset() != null && (!innerSelect.getOffset().isIntegerOffset()) ||
+                select.getOffset() != null && !select.getOffset().isIntegerOffset())) {
+            throw new SQLException("RVC Offset not allowed with subqueries.");
         }
 
         QueryPlan innerPlan = compileSubquery(innerSelect, false);
@@ -640,7 +656,7 @@ public class QueryCompiler {
         context.setCurrentTable(tableRef);
         innerPlan = new TupleProjectionPlan(innerPlan, tupleProjector, context, null);
 
-        return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan);
+        return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, false, false);
     }
 
     protected QueryPlan compileSingleFlatQuery(
@@ -649,7 +665,10 @@ public class QueryCompiler {
             List<Object> binds,
             boolean asSubquery,
             boolean allowPageFilter,
-            QueryPlan innerPlan) throws SQLException {
+            QueryPlan innerPlan,
+            boolean inJoin,
+            boolean inUnion) throws SQLException {
+        boolean isApplicable = true;
         PTable projectedTable = null;
         if (this.projectTuples) {
             projectedTable = TupleProjectionCompiler.createProjectedTable(select, context);
@@ -667,7 +686,17 @@ public class QueryCompiler {
             viewWhere = new SQLParser(table.getViewStatement()).parseQuery().getWhere();
         }
         Integer limit = LimitCompiler.compile(context, select);
-        Integer offset = OffsetCompiler.compile(context, select);
+
+        CompiledOffset compiledOffset = null;
+        Integer offset = null;
+        try {
+            compiledOffset = OffsetCompiler.getOffsetCompiler().compile(context, select, inJoin, inUnion);
+            offset = compiledOffset.getIntegerOffset().orNull();
+        } catch(RowValueConstructorOffsetNotCoercibleException e){
+            //This current plan is not executable
+            compiledOffset = new CompiledOffset(Optional.<Integer>absent(),Optional.<byte[]>absent());
+            isApplicable = false;
+        }
 
         GroupBy groupBy = GroupByCompiler.compile(context, select);
         // Optimize the HAVING clause by finding any group by expressions that can be moved
@@ -680,7 +709,7 @@ public class QueryCompiler {
         	context.setResolver(FromCompiler.getResolver(context.getConnection(), tableRef, select.getUdfParseNodes()));
         }
         Set<SubqueryParseNode> subqueries = Sets.<SubqueryParseNode> newHashSet();
-        Expression where = WhereCompiler.compile(context, select, viewWhere, subqueries);
+        Expression where = WhereCompiler.compile(context, select, viewWhere, subqueries, compiledOffset.getByteOffset());
         // Recompile GROUP BY now that we've figured out our ScanRanges so we know
         // definitively whether or not we'll traverse in row key order.
         groupBy = groupBy.compile(context, innerPlan, where);
@@ -696,7 +725,7 @@ public class QueryCompiler {
                 select,
                 groupBy,
                 limit,
-                offset,
+                compiledOffset,
                 projector,
                 innerPlan,
                 where);
@@ -730,7 +759,7 @@ public class QueryCompiler {
                             ? new AggregatePlan(context, select, tableRef, projector, limit, offset, orderBy,
                                     parallelIteratorFactory, groupBy, having, dataPlan)
                             : new ScanPlan(context, select, tableRef, projector, limit, offset, orderBy,
-                                    parallelIteratorFactory, allowPageFilter, dataPlan));
+                                    parallelIteratorFactory, allowPageFilter, dataPlan, compiledOffset.getByteOffset()));
         }
         SelectStatement planSelect = asSubquery ? select : this.select;
         if (!subqueries.isEmpty()) {
@@ -755,6 +784,9 @@ public class QueryCompiler {
 
         }
 
+        if(plan instanceof BaseQueryPlan){
+            ((BaseQueryPlan) plan).setApplicable(isApplicable);
+        }
         return plan;
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index 7732119..4764f59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -86,6 +86,8 @@ public interface QueryPlan extends StatementPlan {
     public boolean isDegenerate();
     
     public boolean isRowKeyOrdered();
+
+    boolean isApplicable();
     
     /**
      * 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/RVCOffsetCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/RVCOffsetCompiler.java
new file mode 100644
index 0000000..5f043fe
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/RVCOffsetCompiler.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import org.apache.commons.lang.StringUtils;
+import org.apache.phoenix.expression.AndExpression;
+import org.apache.phoenix.expression.CoerceExpression;
+import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.parse.CastParseNode;
+import org.apache.phoenix.parse.ColumnParseNode;
+import org.apache.phoenix.parse.EqualParseNode;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.OffsetNode;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.RowValueConstructorParseNode;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowValueConstructorOffsetInternalErrorException;
+import org.apache.phoenix.schema.RowValueConstructorOffsetNotAllowedInQueryException;
+import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
+import org.apache.phoenix.schema.TypeMismatchException;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class RVCOffsetCompiler {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(RVCOffsetCompiler.class);
+
+    private final static RVCOffsetCompiler INSTANCE = new RVCOffsetCompiler();
+
+    private RVCOffsetCompiler() {
+    }
+
+    public static RVCOffsetCompiler getInstance() {
+        return INSTANCE;
+    }
+
+    public CompiledOffset getRVCOffset(StatementContext context, FilterableStatement statement,
+            boolean inJoin, boolean inUnion, OffsetNode offsetNode) throws SQLException {
+        // We have a RVC offset. See PHOENIX-4845
+
+        // This is a EqualParseNode with LHS and RHS RowValueConstructorParseNodes
+        // This is enforced as part of the grammar
+        EqualParseNode equalParseNode = (EqualParseNode) offsetNode.getOffsetParseNode();
+
+        RowValueConstructorParseNode
+                rvcColumnsParseNode =
+                (RowValueConstructorParseNode) equalParseNode.getLHS();
+        RowValueConstructorParseNode
+                rvcConstantParseNode =
+                (RowValueConstructorParseNode) equalParseNode.getRHS();
+
+        // disallow use with aggregations
+        if (statement.isAggregate()) {
+            throw new RowValueConstructorOffsetNotAllowedInQueryException("RVC Offset not allowed in Aggregates");
+        }
+
+        // Get the Select Type should not be join/union
+        // Note cannot use the SelectStatement as for Left/Right joins we won't get passed in the join context
+        if (inJoin || inUnion) {
+            throw new RowValueConstructorOffsetNotAllowedInQueryException("RVC Offset not allowed in Joins or Unions");
+        }
+
+        // Get the tables primary keys
+        if (context.getResolver().getTables().size() != 1) {
+            throw new RowValueConstructorOffsetNotAllowedInQueryException("RVC Offset not allowed with zero or multiple tables");
+        }
+
+        PTable pTable = context.getCurrentTable().getTable();
+
+        List<PColumn> columns = pTable.getPKColumns();
+
+        int numUserColumns = columns.size(); // columns specified by the user
+        int userColumnIndex = 0; // index into the ordered list, columns, of where user specified start
+
+        // if we are salted we need to take a subset of the pk
+        Integer buckets = pTable.getBucketNum();
+        if (buckets != null && buckets > 0) { // We are salted
+            numUserColumns--;
+            userColumnIndex++;
+        }
+
+        if (pTable.isMultiTenant() && pTable.getTenantId() != null) {
+            // the tenantId is one of the pks and will be handled automatically
+            numUserColumns--;
+            userColumnIndex++;
+        }
+
+        boolean isIndex = false;
+        if (PTableType.INDEX.equals(pTable.getType())) {
+            isIndex = true;
+            // If we are a view index we have to handle the idxId column
+            // Note that viewIndexId comes before tenantId (what about salt byte?)
+            if (pTable.getViewIndexId() != null) {
+                numUserColumns--;
+                userColumnIndex++;
+            }
+        }
+
+        // Sanity check that they are providing all the user defined keys to this table
+        if (numUserColumns != rvcConstantParseNode.getChildren().size()) {
+            throw new RowValueConstructorOffsetNotCoercibleException(
+                    "RVC Offset must exactly cover the tables PK.");
+        }
+
+        // Make sure the order is the same and all the user defined columns are mentioned in the column RVC
+        if (numUserColumns != rvcColumnsParseNode.getChildren().size()) {
+            throw new RowValueConstructorOffsetNotCoercibleException(
+                    "RVC Offset must specify the tables PKs.");
+        }
+
+        List<ColumnParseNode>
+                rvcColumnParseNodeList = buildListOfColumnParseNodes(rvcColumnsParseNode, isIndex);
+
+        // Make sure we have all column parse nodes for the left hand
+        if (rvcColumnParseNodeList.size() != numUserColumns) {
+            throw new RowValueConstructorOffsetNotCoercibleException(
+                    "RVC Offset must specify the tables PKs.");
+        }
+
+        // We resolve the mini-where now so we can compare to tables pks PColumns and to produce a row offset
+        // Construct a mini where clause
+        ParseNode miniWhere = equalParseNode;
+
+        Set<HintNode.Hint> originalHints = statement.getHint().getHints();
+        WhereCompiler.WhereExpressionCompiler whereCompiler = new WhereCompiler.WhereExpressionCompiler(context);
+
+        Expression whereExpression;
+        try {
+            whereExpression = miniWhere.accept(whereCompiler);
+        }catch(TypeMismatchException e) {
+            throw new RowValueConstructorOffsetNotCoercibleException(
+                    "RVC Offset could not be coerced to the tables PKs. " + e.getMessage());
+        } catch (Exception e) {
+            LOGGER.error("Unexpected error while compiling RVC Offset, got null expression.",e);
+            throw new RowValueConstructorOffsetInternalErrorException(
+                    "RVC Offset unexpected failure.");
+        }
+
+        if (whereExpression == null) {
+            LOGGER.error("Unexpected error while compiling RVC Offset, got null expression.");
+            throw new RowValueConstructorOffsetInternalErrorException(
+                    "RVC Offset unexpected failure.");
+        }
+
+        Expression expression;
+        try {
+            expression =
+                    WhereOptimizer
+                            .pushKeyExpressionsToScan(context, originalHints, whereExpression, null,
+                                    Optional.<byte[]>absent());
+        } catch (Exception e) {
+            LOGGER.error("Unexpected error while compiling RVC Offset, got null expression.");
+            throw new RowValueConstructorOffsetInternalErrorException(
+                    "RVC Offset unexpected failure.");
+        }
+        if (expression == null) {
+            LOGGER.error("Unexpected error while compiling RVC Offset, got null expression.");
+            throw new RowValueConstructorOffsetInternalErrorException(
+                    "RVC Offset unexpected failure.");
+        }
+
+        // Now that columns etc have been resolved lets check to make sure they match the pk order
+        // Today the RowValueConstuctor Equality gets Rewritten into AND of EQ Ops.
+        if (!(whereExpression instanceof AndExpression)) {
+            LOGGER.warn("Unexpected error while compiling RVC Offset, expected AndExpression got "
+                    + whereExpression.getClass().getName());
+            throw new RowValueConstructorOffsetInternalErrorException(
+                    "RVC Offset must specify the tables PKs.");
+        }
+
+        List<RowKeyColumnExpression>
+                rowKeyColumnExpressionList =
+                buildListOfRowKeyColumnExpressions(whereExpression.getChildren(), isIndex);
+        if (rowKeyColumnExpressionList.size() != numUserColumns) {
+            LOGGER.warn("Unexpected error while compiling RVC Offset, expected " + numUserColumns
+                    + " found " + rowKeyColumnExpressionList.size());
+            throw new RowValueConstructorOffsetInternalErrorException(
+                    "RVC Offset must specify the table's PKs.");
+        }
+
+        for (int i = 0; i < numUserColumns; i++) {
+            PColumn column = columns.get(i + userColumnIndex);
+
+            ColumnParseNode columnParseNode = rvcColumnParseNodeList.get(i);
+
+            String columnParseNodeString = columnParseNode.getFullName();
+            if (isIndex) {
+                columnParseNodeString = IndexUtil.getDataColumnName(columnParseNodeString);
+            }
+
+            RowKeyColumnExpression rowKeyColumnExpression = rowKeyColumnExpressionList.get(i);
+            String expressionName = rowKeyColumnExpression.getName();
+
+            // Not sure why it is getting quoted
+            expressionName = expressionName.replace("\"", "");
+
+            if (isIndex) {
+                expressionName = IndexUtil.getDataColumnName(expressionName);
+            }
+
+            if (!StringUtils.equals(expressionName, columnParseNodeString)) {
+                throw new RowValueConstructorOffsetNotCoercibleException(
+                        "RVC Offset must specify the table's PKs.");
+            }
+
+            String columnString = column.getName().getString();
+            if (isIndex) {
+                columnString = IndexUtil.getDataColumnName(columnString);
+            }
+            if (!StringUtils.equals(expressionName, columnString)) {
+                throw new RowValueConstructorOffsetNotCoercibleException(
+                        "RVC Offset must specify the table's PKs.");
+            }
+        }
+
+        // check to see if this was a single key expression
+        ScanRanges scanRanges = context.getScanRanges();
+
+        if (!scanRanges.isPointLookup()) {
+            throw new RowValueConstructorOffsetNotCoercibleException(
+                    "RVC Offset must be a point lookup.");
+        }
+
+        // Note the use of ByteUtil.nextKey() to generate exclusive offset
+        CompiledOffset
+                compiledOffset =
+                new CompiledOffset(Optional.<Integer>absent(),
+                        Optional.of(ByteUtil.nextKey(scanRanges.getScanRange().getLowerRange())));
+
+        return compiledOffset;
+    }
+
+    @VisibleForTesting List<RowKeyColumnExpression> buildListOfRowKeyColumnExpressions(
+            List<Expression> expressions, boolean isIndex)
+            throws RowValueConstructorOffsetNotCoercibleException {
+        List<RowKeyColumnExpression>
+                rowKeyColumnExpressionList =
+                new ArrayList<RowKeyColumnExpression>();
+        for (Expression child : expressions) {
+            if (!(child instanceof ComparisonExpression)) {
+                LOGGER.warn("Unexpected error while compiling RVC Offset");
+                throw new RowValueConstructorOffsetNotCoercibleException(
+                        "RVC Offset must specify the tables PKs.");
+            }
+
+            Expression possibleRowKeyColumnExpression = child.getChildren().get(0);
+
+            // Note that since we store indexes in variable length form there may be casts from fixed types to
+            // variable length
+            if (isIndex) {
+                if (possibleRowKeyColumnExpression instanceof CoerceExpression) {
+                    // Cast today has 1 child
+                    possibleRowKeyColumnExpression =
+                            ((CoerceExpression) possibleRowKeyColumnExpression).getChild();
+                }
+            }
+
+            if (!(possibleRowKeyColumnExpression instanceof RowKeyColumnExpression)) {
+                LOGGER.warn("Unexpected error while compiling RVC Offset");
+                throw new RowValueConstructorOffsetNotCoercibleException(
+                        "RVC Offset must specify the tables PKs.");
+            }
+            rowKeyColumnExpressionList.add((RowKeyColumnExpression) possibleRowKeyColumnExpression);
+        }
+        return rowKeyColumnExpressionList;
+    }
+
+    @VisibleForTesting List<ColumnParseNode> buildListOfColumnParseNodes(
+            RowValueConstructorParseNode rvcColumnsParseNode, boolean isIndex)
+            throws RowValueConstructorOffsetNotCoercibleException {
+        List<ColumnParseNode> nodes = new ArrayList<ColumnParseNode>();
+        for (ParseNode node : rvcColumnsParseNode.getChildren()) {
+            // Note that since we store indexes in variable length form there may be casts from fixed types to
+            // variable length
+            if (isIndex) {
+                if (node instanceof CastParseNode) {
+                    // Cast today has 1 child
+                    node = node.getChildren().get(0);
+                }
+            }
+
+            if (!(node instanceof ColumnParseNode)) {
+                throw new RowValueConstructorOffsetNotCoercibleException(
+                        "RVC Offset must specify the tables PKs.");
+            } else {
+                nodes.add((ColumnParseNode) node);
+            }
+        }
+        return nodes;
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index b41930b..3bcf271 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.base.Optional;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
@@ -73,9 +74,14 @@ public class ScanRanges {
     }
 
     public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan, Integer nBuckets, boolean useSkipScan, int rowTimestampColIndex) {
+        return create(schema,ranges,slotSpan,nBuckets,useSkipScan,rowTimestampColIndex,Optional.<byte[]>absent());
+    }
+
+    public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan, Integer nBuckets, boolean useSkipScan, int rowTimestampColIndex, Optional<byte[]> scanMinOffset) {
         int offset = nBuckets == null ? 0 : SaltingUtil.NUM_SALTING_BYTES;
         int nSlots = ranges.size();
-        if (nSlots == offset) {
+
+        if (nSlots == offset && !scanMinOffset.isPresent()) {
             return EVERYTHING;
         } else if ((nSlots == 1 + offset && ranges.get(offset).size() == 1 && ranges.get(offset).get(0) == KeyRange.EMPTY_RANGE)) {
             return NOTHING;
@@ -108,6 +114,7 @@ public class ScanRanges {
                 slotSpan = new int[] {schema.getMaxFields()-1};
             }
         }
+
         List<List<KeyRange>> sortedRanges = Lists.newArrayListWithExpectedSize(ranges.size());
         for (int i = 0; i < ranges.size(); i++) {
             Field f = schema.getField(i);
@@ -135,6 +142,24 @@ public class ScanRanges {
             if (minKey.length <= offset) {
                 minKey = KeyRange.UNBOUND;
             }
+
+            //Handle the offset by pushing it into the scanRange
+            if(scanMinOffset.isPresent()){
+
+                byte[] minOffset = scanMinOffset.get();
+                //If we are salted we have to
+                //This should be safe for RVC Offset since we specify a full rowkey which
+                // is by definition unique so duplicating the salt bucket is fine
+                if(nBuckets != null && nBuckets > 0) {
+                    minOffset[0] = 0;  //We use 0 for salt bucket for scans
+                }
+
+                //If the offset is more selective than the existing ranges
+                if(Bytes.BYTES_COMPARATOR.compare(minOffset,minKey) > 0){
+                    minKey=minOffset;
+                }
+            }
+
             scanRange = KeyRange.getKeyRange(minKey, maxKey);
         }
 
@@ -160,7 +185,7 @@ public class ScanRanges {
         this.useSkipScanFilter = useSkipScanFilter;
         this.scanRange = scanRange;
         this.rowTimestampRange = rowTimestampRange;
-        
+
         if (isSalted && !isPointLookup) {
             ranges.set(0, SaltingUtil.generateAllSaltingRanges(bucketNum));
         }
@@ -425,7 +450,7 @@ public class ScanRanges {
     }
 
     public boolean isEverything() {
-        return this == EVERYTHING || ranges.get(0).get(0) == KeyRange.EVERYTHING_RANGE;
+        return this == EVERYTHING || (!ranges.isEmpty() && ranges.get(0).get(0) == KeyRange.EVERYTHING_RANGE);
     }
 
     public boolean isDegenerate() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index fc3e263..7d36f0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -294,4 +294,9 @@ public class TraceQueryPlan implements QueryPlan {
     public List<OrderBy> getOutputOrderBys() {
         return Collections.<OrderBy> emptyList();
     }
+
+    @Override
+    public boolean isApplicable() {
+        return true;
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 2c7926e..fb294c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.base.Optional;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -45,6 +46,7 @@ import org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter;
 import org.apache.phoenix.parse.ColumnParseNode;
 import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SelectStatement;
@@ -86,7 +88,7 @@ public class WhereCompiler {
     }
 
     public static Expression compile(StatementContext context, FilterableStatement statement) throws SQLException {
-        return compile(context, statement, null, null);
+        return compile(context, statement, null, null, Optional.<byte[]>absent());
     }
 
     public static Expression compile(StatementContext context, ParseNode whereNode) throws SQLException {
@@ -104,8 +106,8 @@ public class WhereCompiler {
      * @throws ColumnNotFoundException if column name could not be resolved
      * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
      */
-    public static Expression compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere, Set<SubqueryParseNode> subqueryNodes) throws SQLException {
-        return compile(context, statement, viewWhere, Collections.<Expression>emptyList(), subqueryNodes);
+    public static Expression compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere, Set<SubqueryParseNode> subqueryNodes, Optional<byte[]> minOffset) throws SQLException {
+        return compile(context, statement, viewWhere, Collections.<Expression>emptyList(), subqueryNodes, minOffset);
     }
 
     /**
@@ -118,7 +120,7 @@ public class WhereCompiler {
      * @throws ColumnNotFoundException if column name could not be resolved
      * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
      */    
-    public static Expression compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere, List<Expression> dynamicFilters, Set<SubqueryParseNode> subqueryNodes) throws SQLException {
+    public static Expression compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere, List<Expression> dynamicFilters, Set<SubqueryParseNode> subqueryNodes, Optional<byte[]> minOffset) throws SQLException {
         ParseNode where = statement.getWhere();
         if (subqueryNodes != null) { // if the subqueryNodes passed in is null, we assume there will be no sub-queries in the WHERE clause.
             SubqueryParseNodeVisitor subqueryVisitor = new SubqueryParseNodeVisitor(context, subqueryNodes);
@@ -154,14 +156,18 @@ public class WhereCompiler {
         }
         
         if (context.getCurrentTable().getTable().getType() != PTableType.PROJECTED && context.getCurrentTable().getTable().getType() != PTableType.SUBQUERY) {
-            expression = WhereOptimizer.pushKeyExpressionsToScan(context, statement, expression, extractedNodes);
+            Set<HintNode.Hint> hints = null;
+            if(statement.getHint() != null){
+                hints = statement.getHint().getHints();
+            }
+            expression = WhereOptimizer.pushKeyExpressionsToScan(context, hints, expression, extractedNodes, minOffset);
         }
         setScanFilter(context, statement, expression, whereCompiler.disambiguateWithFamily);
 
         return expression;
     }
     
-    private static class WhereExpressionCompiler extends ExpressionCompiler {
+    public static class WhereExpressionCompiler extends ExpressionCompiler {
         private boolean disambiguateWithFamily;
 
         WhereExpressionCompiler(StatementContext context) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index 9ca2056..7e461d4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -29,7 +29,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
-import org.apache.hadoop.hbase.filter.CompareFilter;
+import com.google.common.base.Optional;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -99,18 +99,18 @@ public class WhereOptimizer {
     /**
      * Pushes row key expressions from the where clause into the start/stop key of the scan.
      * @param context the shared context during query compilation
-     * @param statement the statement being compiled
+     * @param hints the set, possibly empty, of hints in this statement
      * @param whereClause the where clause expression
      * @return the new where clause with the key expressions removed
      */
-    public static Expression pushKeyExpressionsToScan(StatementContext context, FilterableStatement statement, Expression whereClause)
+    public static Expression pushKeyExpressionsToScan(StatementContext context, Set<Hint> hints, Expression whereClause)
             throws SQLException{
-        return pushKeyExpressionsToScan(context, statement, whereClause, null);
+        return pushKeyExpressionsToScan(context, hints, whereClause, null, Optional.<byte[]>absent());
     }
 
     // For testing so that the extractedNodes can be verified
-    public static Expression pushKeyExpressionsToScan(StatementContext context, FilterableStatement statement,
-            Expression whereClause, Set<Expression> extractNodes) throws SQLException {
+    public static Expression pushKeyExpressionsToScan(StatementContext context, Set<Hint> hints,
+            Expression whereClause, Set<Expression> extractNodes, Optional<byte[]> minOffset) throws SQLException {
         PName tenantId = context.getConnection().getTenantId();
         byte[] tenantIdBytes = null;
         PTable table = context.getCurrentTable().getTable();
@@ -125,7 +125,7 @@ public class WhereOptimizer {
             tenantIdBytes = ScanUtil.getTenantIdBytes(schema, isSalted, tenantId, isSharedIndex);
     	}
 
-        if (whereClause == null && (tenantId == null || !table.isMultiTenant()) && table.getViewIndexId() == null) {
+        if (whereClause == null && (tenantId == null || !table.isMultiTenant()) && table.getViewIndexId() == null && !minOffset.isPresent()) {
             context.setScanRanges(ScanRanges.EVERYTHING);
             return whereClause;
         }
@@ -141,7 +141,7 @@ public class WhereOptimizer {
             // becomes consistent.
             keySlots = whereClause.accept(visitor);
     
-            if (keySlots == null && (tenantId == null || !table.isMultiTenant()) && table.getViewIndexId() == null) {
+            if (keySlots == null && (tenantId == null || !table.isMultiTenant()) && table.getViewIndexId() == null && !minOffset.isPresent()) {
                 context.setScanRanges(ScanRanges.EVERYTHING);
                 return whereClause;
             }
@@ -192,8 +192,8 @@ public class WhereOptimizer {
             pkPos++;
         }
         
-        boolean forcedSkipScan = statement.getHint().hasHint(Hint.SKIP_SCAN);
-        boolean forcedRangeScan = statement.getHint().hasHint(Hint.RANGE_SCAN);
+        boolean forcedSkipScan = hints.contains(Hint.SKIP_SCAN);
+        boolean forcedRangeScan = hints.contains(Hint.RANGE_SCAN);
         boolean hasUnboundedRange = false;
         boolean hasMultiRanges = false;
         boolean hasRangeKey = false;
@@ -334,7 +334,7 @@ public class WhereOptimizer {
         // we can still use our skip scan. The ScanRanges.create() call will explode
         // out the keys.
         slotSpanArray = Arrays.copyOf(slotSpanArray, cnf.size());
-        ScanRanges scanRanges = ScanRanges.create(schema, cnf, slotSpanArray, nBuckets, useSkipScan, table.getRowTimestampColPos());
+        ScanRanges scanRanges = ScanRanges.create(schema, cnf, slotSpanArray, nBuckets, useSkipScan, table.getRowTimestampColPos(), minOffset);
         context.setScanRanges(scanRanges);
         if (whereClause == null) {
             return null;
@@ -468,7 +468,12 @@ public class WhereOptimizer {
             Expression firstRhs = count == 0 ? sampleValues.get(0).get(0) : new RowValueConstructorExpression(sampleValues.get(0).subList(0, count + 1), true);
             Expression secondRhs = count == 0 ? sampleValues.get(1).get(0) : new RowValueConstructorExpression(sampleValues.get(1).subList(0, count + 1), true);
             Expression testExpression = InListExpression.create(Lists.newArrayList(lhs, firstRhs, secondRhs), false, context.getTempPtr(), context.getCurrentTable().getTable().rowKeyOrderOptimizable());
-            remaining = pushKeyExpressionsToScan(context, statement, testExpression);
+            Set<Hint> hints = new HashSet<>();
+            if(statement.getHint() != null){
+                hints = statement.getHint().getHints();
+            }
+
+            remaining = pushKeyExpressionsToScan(context, hints, testExpression);
             if (context.getScanRanges().isPointLookup()) {
                 count++;
                 break; // found the best match
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 2bd9ff3..7a5b13d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -151,7 +151,6 @@ public enum SQLExceptionCode {
     }),
     ORDER_BY_ARRAY_NOT_SUPPORTED(515, "42893", "ORDER BY of an array type is not allowed."),
     NON_EQUALITY_ARRAY_COMPARISON(516, "42894", "Array types may only be compared using = or !=."),
-
     /**
      *  Invalid Transaction State (errorcode 05, sqlstate 25)
      */
@@ -444,6 +443,10 @@ public enum SQLExceptionCode {
     UPGRADE_NOT_REQUIRED(2012, "INT14", ""),
     GET_TABLE_ERROR(2013, "INT15", "MetadataEndpointImpl doGetTable called for table not present " +
             "on region"),
+    ROW_VALUE_CONSTRUCTOR_OFFSET_NOT_COERCIBLE(2014, "INT16", "Row Value Constructor Offset Not Coercible to a Primary or Indexed RowKey."),
+    ROW_VALUE_CONSTRUCTOR_OFFSET_INTERNAL_ERROR(2015, "INT17", "Row Value Constructor Offset had an Unexpected Error."),
+    ROW_VALUE_CONSTRUCTOR_OFFSET_NOT_ALLOWED_IN_QUERY(2016, "INT18", "Row Value Constructor Offset Not Allowed In Query."),
+
     OPERATION_TIMED_OUT(6000, "TIM01", "Operation timed out.", new Factory() {
         @Override
         public SQLException newException(SQLExceptionInfo info) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 89c5233..9bbd21c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.base.Optional;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -120,7 +121,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
     protected Long estimatedSize;
     protected Long estimateInfoTimestamp;
     private boolean getEstimatesCalled;
-    
+    protected boolean isApplicable = true;
 
     protected BaseQueryPlan(
             StatementContext context, FilterableStatement statement, TableRef table,
@@ -261,7 +262,8 @@ public abstract class BaseQueryPlan implements QueryPlan {
         PTable table = tableRef.getTable();
         
         if (dynamicFilter != null) {
-            WhereCompiler.compile(context, statement, null, Collections.singletonList(dynamicFilter), null);
+            WhereCompiler.compile(context, statement, null, Collections.singletonList(dynamicFilter), null,
+                    Optional.<byte[]>absent());
         }
         
         if (OrderBy.REV_ROW_KEY_ORDER_BY.equals(orderBy)) {
@@ -558,6 +560,14 @@ public abstract class BaseQueryPlan implements QueryPlan {
         return estimateInfoTimestamp;
     }
 
+    public boolean isApplicable(){
+        return isApplicable;
+    }
+
+    public void setApplicable(boolean isApplicable){
+        this.isApplicable = isApplicable;
+    }
+
     private void getEstimates() throws SQLException {
         getEstimatesCalled = true;
         // Initialize a dummy iterator to get the estimates based on stats.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index 4724edd..ba65fbe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -167,4 +167,7 @@ public abstract class DelegateQueryPlan implements QueryPlan {
     public List<OrderBy> getOutputOrderBys() {
         return delegate.getOutputOrderBys();
     }
+
+    @Override
+    public boolean isApplicable() { return delegate.isApplicable(); }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 1cb9dc8..98d9576 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Optional;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -235,10 +236,10 @@ public class HashJoinPlan extends DelegateQueryPlan {
             ParseNode viewWhere = table.getViewStatement() == null ? null : new SQLParser(table.getViewStatement()).parseQuery().getWhere();
             context.setResolver(FromCompiler.getResolverForQuery((SelectStatement) (delegate.getStatement()), delegate.getContext().getConnection()));
             if (recompileWhereClause) {
-                postFilter = WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, null);
+                postFilter = WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, null, Optional.<byte[]>absent());
             }
             if (hasKeyRangeExpressions) {
-                WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, keyRangeExpressions, null);
+                WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, keyRangeExpressions, null, Optional.<byte[]>absent());
             }
         }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index d2019fc..81c14d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Optional;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
@@ -98,15 +99,16 @@ public class ScanPlan extends BaseQueryPlan {
     private Long serialBytesEstimate;
     private Long serialEstimateInfoTs;
     private OrderBy actualOutputOrderBy;
+    private Optional<byte[]> rowOffset;
 
     public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit,
             Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, 
-            QueryPlan dataPlan) throws SQLException {
-        this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null, dataPlan);
+            QueryPlan dataPlan, Optional<byte[]> rowOffset) throws SQLException {
+        this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null, dataPlan, rowOffset);
     }
     
     private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset,
-            OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter, QueryPlan dataPlan) throws SQLException {
+            OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter, QueryPlan dataPlan, Optional<byte[]> rowOffset) throws SQLException {
         super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit,offset, orderBy, GroupBy.EMPTY_GROUP_BY,
                 parallelIteratorFactory != null ? parallelIteratorFactory :
                         buildResultIteratorFactory(context, statement, table, orderBy, limit, offset, allowPageFilter), dynamicFilter, dataPlan);
@@ -129,6 +131,7 @@ public class ScanPlan extends BaseQueryPlan {
             serialEstimateInfoTs = StatisticsUtil.NOT_STATS_BASED_TS;
         }
         this.actualOutputOrderBy = convertActualOutputOrderBy(orderBy, context);
+        this.rowOffset = rowOffset;
     }
 
     private static boolean isSerial(StatementContext context, FilterableStatement statement,
@@ -374,4 +377,8 @@ public class ScanPlan extends BaseQueryPlan {
     public List<OrderBy> getOutputOrderBys() {
        return OrderBy.wrapForOutputOrderBys(this.actualOutputOrderBy);
     }
+
+    public Optional<byte[]> getRowOffset() {
+        return this.rowOffset;
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 8e20908..b1d9c69 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -783,4 +783,9 @@ public class SortMergeJoinPlan implements QueryPlan {
     public List<OrderBy> getOutputOrderBys() {
         return this.actualOutputOrderBys;
     }
+
+    @Override
+    public boolean isApplicable() {
+        return true;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 07a3aa5..cefd7b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -312,4 +312,9 @@ public class UnionPlan implements QueryPlan {
         }
         return Collections.<OrderBy> emptyList();
     }
+
+    @Override
+    public boolean isApplicable() {
+        return true;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 3d61236..51b15fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -57,6 +57,8 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -80,6 +82,7 @@ import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.ScanPlan;
 import org.apache.phoenix.filter.BooleanExpressionFilter;
 import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.filter.DistinctPrefixFilter;
@@ -1564,6 +1567,14 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }
+
+        if(this.plan instanceof ScanPlan) {
+            ScanPlan scanPlan = (ScanPlan) this.plan;
+            if(scanPlan.getRowOffset().isPresent()) {
+                buf.append("With RVC Offset " + "0x" + Hex.encodeHexString(scanPlan.getRowOffset().get()) + " ");
+            }
+        }
+
         explain(buf.toString(),planSteps);
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 1595dea..d4543cd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -777,6 +777,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 public List<OrderBy> getOutputOrderBys() {
                     return Collections.<OrderBy> emptyList();
                 }
+
+                @Override
+                public boolean isApplicable() {
+                    return true;
+                }
             };
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 2fc500f..50c0deb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -70,6 +70,7 @@ import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.IndexUtil;
@@ -199,12 +200,12 @@ public class QueryOptimizer {
     private List<QueryPlan> getApplicablePlansForSingleFlatQuery(QueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, boolean stopAtBestPlan) throws SQLException {
         SelectStatement select = (SelectStatement)dataPlan.getStatement();
         // Exit early if we have a point lookup as we can't get better than that
-        if (dataPlan.getContext().getScanRanges().isPointLookup() && stopAtBestPlan) {
+        if (dataPlan.getContext().getScanRanges().isPointLookup() && stopAtBestPlan && dataPlan.isApplicable()) {
             return Collections.<QueryPlan> singletonList(dataPlan);
         }
 
         List<PTable>indexes = Lists.newArrayList(dataPlan.getTableRef().getTable().getIndexes());
-        if (indexes.isEmpty() || dataPlan.isDegenerate() || dataPlan.getTableRef().hasDynamicCols() || select.getHint().hasHint(Hint.NO_INDEX)) {
+        if (dataPlan.isApplicable() && (indexes.isEmpty() || dataPlan.isDegenerate() || dataPlan.getTableRef().hasDynamicCols() || select.getHint().hasHint(Hint.NO_INDEX))) {
             return Collections.<QueryPlan> singletonList(dataPlan);
         }
         
@@ -226,7 +227,7 @@ public class QueryOptimizer {
         plans.add(dataPlan);
         QueryPlan hintedPlan = getHintedQueryPlan(statement, translatedIndexSelect, indexes, targetColumns, parallelIteratorFactory, plans);
         if (hintedPlan != null) {
-            if (stopAtBestPlan) {
+            if (stopAtBestPlan && hintedPlan.isApplicable()) {
                 return Collections.singletonList(hintedPlan);
             }
             plans.add(0, hintedPlan);
@@ -242,8 +243,21 @@ public class QueryOptimizer {
                 plans.add(plan);
             }
         }
-        
-        return hintedPlan == null ? orderPlansBestToWorst(select, plans, stopAtBestPlan) : plans;
+
+        //Only pull out applicable plans, late filtering since dataplan is used to construct the plans
+        List<QueryPlan> applicablePlans = Lists.newArrayListWithExpectedSize(plans.size());
+        for(QueryPlan plan : plans) {
+            if(plan.isApplicable()) {
+                applicablePlans.add(plan);
+            }
+        }
+        if(applicablePlans.isEmpty()) {
+            //Currently this is the only case for non-applicable plans
+            throw new RowValueConstructorOffsetNotCoercibleException("No table or index could be coerced to the PK as the offset. Or an uncovered index was attempted");
+        }
+
+        //OrderPlans
+        return hintedPlan == null ? orderPlansBestToWorst(select, applicablePlans, stopAtBestPlan) : applicablePlans;
     }
     
     private QueryPlan getHintedQueryPlan(PhoenixStatement statement, SelectStatement select, List<PTable> indexes, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, List<QueryPlan> plans) throws SQLException {
@@ -395,6 +409,10 @@ public class QueryOptimizer {
                     }
                 }
             }
+            catch (RowValueConstructorOffsetNotCoercibleException e) {
+                // Could not coerce the user provided RVC Offset so we do not have a plan to add.
+                return null;
+            }
         }
         return null;
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
index 8a83116..f1dfdb6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.parse;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
@@ -213,6 +214,10 @@ public class HintNode {
     public boolean hasHint(Hint hint) {
         return hints.containsKey(hint);
     }
+
+    public Set<Hint> getHints(){
+        return hints.keySet();
+    }
     
     @Override
     public String toString() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/OffsetNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/OffsetNode.java
index 0387f5c..e84316d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/OffsetNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/OffsetNode.java
@@ -17,51 +17,44 @@
  */
 package org.apache.phoenix.parse;
 
+import java.sql.SQLException;
+import java.util.Objects;
 
 public class OffsetNode {
-    private final BindParseNode bindNode;
-    private final LiteralParseNode offsetNode;
-    
-    OffsetNode(BindParseNode bindNode) {
-        this.bindNode = bindNode;
-        offsetNode = null;
-    }
-    
-    OffsetNode(LiteralParseNode limitNode) {
-        this.offsetNode = limitNode;
-        this.bindNode = null;
+    private final ParseNode node;
+
+    OffsetNode(ParseNode node) throws SQLException {
+        if(!(node instanceof BindParseNode || node instanceof LiteralParseNode || node instanceof ComparisonParseNode)) {
+            throw new SQLException("Bad Expression Passed To Offset, node of type" + node.getClass().getName());
+        }
+        this.node = node;
     }
     
     public ParseNode getOffsetParseNode() {
-        return bindNode == null ? offsetNode : bindNode;
+        return node;
+    }
+
+    /**
+     * As we usually consider RVC as having multiple binds treat bind as Integer offset.
+     * @return true for Literal or Bind parse nodes.
+     */
+    public boolean isIntegerOffset() {
+        return (node instanceof BindParseNode) || (node instanceof LiteralParseNode);
     }
     
     @Override
     public String toString() {
-        return bindNode == null ? offsetNode.toString() : bindNode.toString();
+        return node.toString();
     }
 
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((bindNode == null) ? 0 : bindNode.hashCode());
-        result = prime * result + ((offsetNode == null) ? 0 : offsetNode.hashCode());
-        return result;
+    @Override public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        OffsetNode that = (OffsetNode) o;
+        return Objects.equals(node, that.node);
     }
 
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) return true;
-        if (obj == null) return false;
-        if (getClass() != obj.getClass()) return false;
-        OffsetNode other = (OffsetNode)obj;
-        if (bindNode == null) {
-            if (other.bindNode != null) return false;
-        } else if (!bindNode.equals(other.bindNode)) return false;
-        if (offsetNode == null) {
-            if (other.offsetNode != null) return false;
-        } else if (!offsetNode.equals(other.offsetNode)) return false;
-        return true;
+    @Override public int hashCode() {
+        return Objects.hash(node);
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 1080996..84e7998 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -590,7 +590,7 @@ public class ParseNodeFactory {
         return new CastParseNode(expression, dataType, maxLength, scale, arr);
     }
 
-    public ParseNode rowValueConstructor(List<ParseNode> l) {
+    public RowValueConstructorParseNode rowValueConstructor(List<ParseNode> l) {
         return new RowValueConstructorParseNode(l);
     }
 
@@ -909,14 +909,18 @@ public class ParseNodeFactory {
         return new LimitNode(l);
     }
 
-    public OffsetNode offset(BindParseNode b) {
+    public OffsetNode offset(BindParseNode b) throws SQLException {
         return new OffsetNode(b);
     }
 
-    public OffsetNode offset(LiteralParseNode l) {
+    public OffsetNode offset(LiteralParseNode l) throws SQLException {
         return new OffsetNode(l);
     }
 
+    public OffsetNode offset(ComparisonParseNode r) throws SQLException {
+        return new OffsetNode(r);
+    }
+
     public DropSchemaStatement dropSchema(String schemaName, boolean ifExists, boolean cascade) {
         return new DropSchemaStatement(schemaName, ifExists, cascade);
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
index 473383d..01533ef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
@@ -91,6 +91,17 @@ public class ParseNodeRewriter extends TraverseAllParseNodeVisitor<ParseNode> {
             rewriter.reset();
             normWhere = where.accept(rewriter);
         }
+        OffsetNode offsetNode = statement.getOffset();
+        ParseNode offset = null;
+        ParseNode normOffset = null;
+        if(offsetNode != null) {
+            offset = statement.getOffset().getOffsetParseNode();
+            normOffset = offset;
+            if (offset != null && !statement.getOffset().isIntegerOffset()) {
+                rewriter.reset();
+                normOffset = offset.accept(rewriter);
+            }
+        }
         ParseNode having = statement.getHaving();
         ParseNode normHaving= having;
         if (having != null) {
@@ -171,12 +182,14 @@ public class ParseNodeRewriter extends TraverseAllParseNodeVisitor<ParseNode> {
                 normHaving == having && 
                 selectNodes == normSelectNodes && 
                 groupByNodes == normGroupByNodes &&
-                orderByNodes == normOrderByNodes) {
+                orderByNodes == normOrderByNodes &&
+                normOffset == offset
+        ) {
             return statement;
         }
         return NODE_FACTORY.select(normFrom, statement.getHint(), statement.isDistinct(),
                 normSelectNodes, normWhere, normGroupByNodes, normHaving, normOrderByNodes,
-                statement.getLimit(), statement.getOffset(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(),
+                statement.getLimit(), normOffset == null ? null : new OffsetNode(normOffset), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(),
                 statement.getSelects(), statement.getUdfParseNodes());
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/RowValueConstructorOffsetInternalErrorException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowValueConstructorOffsetInternalErrorException.java
new file mode 100644
index 0000000..5e2622c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowValueConstructorOffsetInternalErrorException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+
+/**
+ * 
+ * Exception thrown when a RVC Offset is not coercible to a PK or index of a table
+ * 
+ */
+public class RowValueConstructorOffsetInternalErrorException extends SQLException {
+    private static final long serialVersionUID = 1L;
+    private static SQLExceptionCode code = SQLExceptionCode.ROW_VALUE_CONSTRUCTOR_OFFSET_INTERNAL_ERROR;
+    final private static String BASE_MESSAGE = new SQLExceptionInfo.Builder(code).build().toString();
+
+    public RowValueConstructorOffsetInternalErrorException(String additionalInfo) {
+        super(BASE_MESSAGE + " " + additionalInfo, code.getSQLState(), code.getErrorCode());
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/RowValueConstructorOffsetNotAllowedInQueryException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowValueConstructorOffsetNotAllowedInQueryException.java
new file mode 100644
index 0000000..1ab0285
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowValueConstructorOffsetNotAllowedInQueryException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+
+/**
+ * 
+ * Exception thrown when a RVC Offset is not coercible to a PK or index of a table
+ * 
+ */
+public class RowValueConstructorOffsetNotAllowedInQueryException extends SQLException {
+    private static final long serialVersionUID = 1L;
+    private static SQLExceptionCode code = SQLExceptionCode.ROW_VALUE_CONSTRUCTOR_OFFSET_NOT_ALLOWED_IN_QUERY;
+    final private static String BASE_MESSAGE = new SQLExceptionInfo.Builder(code).build().toString();
+
+    public RowValueConstructorOffsetNotAllowedInQueryException(String additionalInfo) {
+        super(BASE_MESSAGE + " " + additionalInfo, code.getSQLState(), code.getErrorCode());
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/RowValueConstructorOffsetNotCoercibleException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowValueConstructorOffsetNotCoercibleException.java
new file mode 100644
index 0000000..4183d0c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowValueConstructorOffsetNotCoercibleException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+
+/**
+ * 
+ * Exception thrown when a RVC Offset is not coercible to a PK or index of a table
+ * 
+ */
+public class RowValueConstructorOffsetNotCoercibleException extends SQLException {
+    private static final long serialVersionUID = 1L;
+    private static SQLExceptionCode code = SQLExceptionCode.ROW_VALUE_CONSTRUCTOR_OFFSET_NOT_COERCIBLE;
+    final private static String BASE_MESSAGE = new SQLExceptionInfo.Builder(code).build().toString();
+
+    public RowValueConstructorOffsetNotCoercibleException(String additionalInfo) {
+        super(BASE_MESSAGE + " " + additionalInfo, code.getSQLState(), code.getErrorCode());
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/RVCOffsetCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/RVCOffsetCompilerTest.java
new file mode 100644
index 0000000..70a48a1
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/RVCOffsetCompilerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.expression.CoerceExpression;
+import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.parse.ColumnParseNode;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.RowValueConstructorParseNode;
+import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDecimal;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
+
+public class RVCOffsetCompilerTest {
+
+    private static TableName TABLE_NAME = TableName.create(null,"TABLE1");
+
+
+    RVCOffsetCompiler offsetCompiler;
+
+    @Before
+    public void init(){
+        offsetCompiler = RVCOffsetCompiler.getInstance();
+    }
+
+    @Test
+    public void buildListOfColumnParseNodesTest() throws Exception {
+        List<ParseNode> children = new ArrayList<>();
+        ColumnParseNode col1 = new ColumnParseNode(TABLE_NAME,"col1");
+        ColumnParseNode col2 = new ColumnParseNode(TABLE_NAME,"col2");
+
+        children.add(col1);
+        children.add(col2);
+        RowValueConstructorParseNode rvc = new RowValueConstructorParseNode(children);
+
+        List<ColumnParseNode>
+                result =
+                offsetCompiler.buildListOfColumnParseNodes(rvc, true);
+
+        assertEquals(2,result.size());
+        assertEquals(col1,result.get(0));
+        assertEquals(col2,result.get(1));
+    }
+
+    @Test
+    public void buildListOfColumnParseNodesTestIndex() throws Exception {
+        List<ParseNode> children = new ArrayList<>();
+        ColumnParseNode col1 = new ColumnParseNode(TABLE_NAME,"col1");
+        ColumnParseNode col2 = new ColumnParseNode(TABLE_NAME,"col2");
+
+        ParseNodeFactory factory = new ParseNodeFactory();
+
+        children.add(factory.cast(col1, PDecimal.INSTANCE, null, null,false));
+        children.add(factory.cast(col2, PDecimal.INSTANCE, null, null,false));
+
+        RowValueConstructorParseNode rvc = new RowValueConstructorParseNode(children);
+
+        List<ColumnParseNode>
+                result =
+                offsetCompiler.buildListOfColumnParseNodes(rvc, true);
+
+        assertEquals(2,result.size());
+        assertEquals(col1,result.get(0));
+        assertEquals(col2,result.get(1));
+    }
+
+
+    @Test
+    public void buildListOfRowKeyColumnExpressionsTest() throws Exception {
+        List<Expression> expressions = new ArrayList<>();
+
+        RowKeyColumnExpression rvc1 = new RowKeyColumnExpression();
+        RowKeyColumnExpression rvc2 = new RowKeyColumnExpression();
+
+        ComparisonExpression expression1 = mock(ComparisonExpression.class);
+        ComparisonExpression expression2 = mock(ComparisonExpression.class);
+
+        Mockito.when(expression1.getChildren()).thenReturn(Lists.<Expression>newArrayList(rvc1));
+        Mockito.when(expression2.getChildren()).thenReturn(Lists.<Expression>newArrayList(rvc2));
+
+        expressions.add(expression1);
+        expressions.add(expression2);
+
+        List<RowKeyColumnExpression>
+                result =
+                offsetCompiler.buildListOfRowKeyColumnExpressions(expressions, false);
+
+        assertEquals(2,result.size());
+        assertEquals(rvc1,result.get(0));
+
+        assertEquals(rvc2,result.get(1));
+    }
+
+    @Test
+    public void buildListOfRowKeyColumnExpressionsIndexTest() throws Exception {
+        List<Expression> expressions = new ArrayList<>();
+
+        PColumn
+                column = new PColumnImpl(PName.EMPTY_COLUMN_NAME, PName.EMPTY_NAME, PDecimal.INSTANCE, 10, 1,
+                true, 1, SortOrder.getDefault(), 0, null, false, null, false, false, null, HConstants.LATEST_TIMESTAMP);
+
+
+        RowKeyColumnExpression rvc1 = new RowKeyColumnExpression(column,null);
+        RowKeyColumnExpression rvc2 = new RowKeyColumnExpression(column, null);
+
+        Expression coerce1 = CoerceExpression.create(rvc1,PDecimal.INSTANCE);
+        Expression coerce2 = CoerceExpression.create(rvc2,PDecimal.INSTANCE);
+
+        ComparisonExpression expression1 = mock(ComparisonExpression.class);
+        ComparisonExpression expression2 = mock(ComparisonExpression.class);
+
+        Mockito.when(expression1.getChildren()).thenReturn(Lists.newArrayList(coerce1));
+        Mockito.when(expression2.getChildren()).thenReturn(Lists.newArrayList(coerce2));
+
+        expressions.add(expression1);
+        expressions.add(expression2);
+
+        List<RowKeyColumnExpression>
+                result =
+                offsetCompiler.buildListOfRowKeyColumnExpressions(expressions, true);
+
+        assertEquals(2,result.size());
+        assertEquals(rvc1,result.get(0));
+        assertEquals(rvc2,result.get(1));
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
index 24653c6..5b3c6fa 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
@@ -860,4 +860,40 @@ public class QueryParserTest {
         parseQuery("create table \"a.b\".\"c.d\" (id varchar not null primary key)");
         parseQuery("create table \"a.b.c.d\" (id varchar not null primary key)");
     }
+
+    @Test
+    public void testIntegerInOffsetSelect() throws Exception {
+        String sql = "SELECT * FROM T OFFSET 1";
+        parseQuery(sql);
+    }
+
+    @Test
+    public void testRVCInOffsetSelect() throws Exception {
+        String sql = "SELECT * FROM T OFFSET (A,B,C)=('a','b','c')";
+        parseQuery(sql);
+    }
+
+    @Test
+    public void testBindInOffsetSelect() throws Exception {
+        String sql = "SELECT * FROM T OFFSET ?";
+        parseQuery(sql);
+    }
+
+    @Test
+    public void testLongQuery() throws Exception {
+        String sql = "SELECT * FROM T WHERE a IN (1) OFFSET 1";
+        parseQuery(sql);
+    }
+
+    @Test
+    public void testLimitOffsetQuery() throws Exception {
+        String sql = "SELECT * FROM T LIMIT 10 OFFSET 1";
+        parseQuery(sql);
+    }
+
+    @Test
+    public void testLimitRVCOffsetQuery() throws Exception {
+        String sql = "SELECT * FROM T LIMIT 10 OFFSET (A,B,C)=('a','b','c')";
+        parseQuery(sql);
+    }
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index d9dac25..2fc01cd 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -503,6 +503,9 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
             public List<OrderBy> getOutputOrderBys() {
                 return Collections.<OrderBy> emptyList();
             }
+
+            @Override
+            public boolean isApplicable() { return true; }
         }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null, null);
         List<KeyRange> keyRanges = parallelIterators.getSplits();
         return keyRanges;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 9fa6422..290ba8d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -991,6 +991,22 @@ public class TestUtil {
     }
     }
 
+    public static void printResultSet(ResultSet rs) throws SQLException {
+        while(rs.next()){
+            StringBuilder builder = new StringBuilder();
+            int columnCount = rs.getMetaData().getColumnCount();
+            for(int i = 0; i < columnCount; i++) {
+                Object value = rs.getObject(i+1);
+                String output = value == null ? "null" : value.toString();
+                builder.append(output);
+                if(i + 1 < columnCount){
+                    builder.append(",");
+                }
+            }
+            System.out.println(builder.toString());
+        }
+    }
+
     public static void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState indexState) throws InterruptedException, SQLException {
         waitForIndexState(conn, fullIndexName, indexState, 0L);
     }