You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2023/11/09 03:10:01 UTC

(phoenix) branch master updated: PHOENIX-7032 Partial Global Secondary Indexes (#1701)

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 2feb70fc2e PHOENIX-7032 Partial Global Secondary Indexes (#1701)
2feb70fc2e is described below

commit 2feb70fc2e41cdea2570641781d029e82c5cc832
Author: kadirozde <37...@users.noreply.github.com>
AuthorDate: Wed Nov 8 19:09:55 2023 -0800

    PHOENIX-7032 Partial Global Secondary Indexes (#1701)
---
 .../phoenix/end2end/index/PartialIndexIT.java      | 767 +++++++++++++++++++++
 .../index/UncoveredGlobalIndexRegionScannerIT.java |   9 +-
 phoenix-core/src/main/antlr3/PhoenixSQL.g          |   9 +-
 .../phoenix/compile/CreateIndexCompiler.java       | 186 ++++-
 .../phoenix/compile/ServerBuildIndexCompiler.java  |  20 +-
 .../apache/phoenix/compile/StatementContext.java   |  25 +-
 .../org/apache/phoenix/compile/WhereCompiler.java  | 613 +++++++++++++++-
 .../coprocessor/GlobalIndexRegionScanner.java      |  69 +-
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  15 +-
 .../phoenix/coprocessor/MetaDataProtocol.java      |   2 +-
 .../coprocessor/UncoveredIndexRegionScanner.java   |  15 +-
 .../UngroupedAggregateRegionObserver.java          |   2 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |   5 +-
 .../execute/PhoenixTxIndexMutationGenerator.java   |   5 +-
 .../phoenix/expression/ComparisonExpression.java   | 138 +++-
 .../org/apache/phoenix/expression/Expression.java  |  10 +
 .../phoenix/expression/IsNullExpression.java       |  16 +-
 .../phoenix/hbase/index/IndexRegionObserver.java   |  60 +-
 .../org/apache/phoenix/index/IndexMaintainer.java  | 123 +++-
 .../phoenix/iterate/RegionScannerFactory.java      |  15 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |   3 +
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |  21 +-
 .../apache/phoenix/optimize/QueryOptimizer.java    |  72 +-
 .../apache/phoenix/parse/CreateIndexStatement.java |  12 +-
 .../org/apache/phoenix/parse/ParseNodeFactory.java |   9 +-
 .../phoenix/query/ConnectionQueryServicesImpl.java |  11 +-
 .../org/apache/phoenix/query/QueryConstants.java   |   4 +-
 .../org/apache/phoenix/schema/DelegateTable.java   |  26 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  |  51 +-
 .../java/org/apache/phoenix/schema/PTable.java     |  32 +-
 .../java/org/apache/phoenix/schema/PTableImpl.java |  95 ++-
 .../java/org/apache/phoenix/util/IndexUtil.java    |  31 +
 phoenix-core/src/main/protobuf/PTable.proto        |   1 +
 .../src/main/protobuf/ServerCachingService.proto   |   2 +
 .../apache/phoenix/compile/WhereCompilerTest.java  |  88 +++
 .../phoenix/index/VerifySingleIndexRowTest.java    |   2 +-
 36 files changed, 2363 insertions(+), 201 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexIT.java
new file mode 100644
index 0000000000..0f16113239
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexIT.java
@@ -0,0 +1,767 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.exception.PhoenixParserException;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.*;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
+public class PartialIndexIT extends BaseTest {
+    private final boolean local;
+    private final boolean uncovered;
+    private final boolean salted;
+
+    public PartialIndexIT (boolean local, boolean uncovered, boolean salted) {
+        this.local = local;
+        this.uncovered = uncovered;
+        this.salted = salted;
+    }
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @After
+    public void unsetFailForTesting() throws Exception {
+        boolean refCountLeaked = isAnyStoreRefCountLeaked();
+        assertFalse("refCount leaked", refCountLeaked);
+    }
+    @Parameterized.Parameters(
+            name = "local={0}, uncovered={1}, salted={2}")
+    public static synchronized Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {
+                // Partial local indexes are not supported currently.
+                {false, false, true},
+                {false, false, false},
+                {false, true, false},
+                {false, true, true}
+        });
+    }
+
+    public static void assertPlan(PhoenixResultSet rs, String schemaName, String tableName) {
+        PTable table = rs.getContext().getCurrentTable().getTable();
+        assertTrue(table.getSchemaName().getString().equals(schemaName) &&
+                table.getTableName().getString().equals(tableName));
+    }
+
+    private static void verifyIndex(String dataTableName, String indexTableName) throws Exception {
+        IndexTool indexTool = IndexToolIT.runIndexTool(false, "", dataTableName,
+                indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY);
+
+        assertEquals(0, indexTool.getJob().getCounters().
+                findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+        assertEquals(0, indexTool.getJob().getCounters().
+                findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+        assertEquals(0, indexTool.getJob().getCounters().
+                findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+        assertEquals(0, indexTool.getJob().getCounters().
+                findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+        assertEquals(0, indexTool.getJob().getCounters().
+                findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+        assertEquals(0, indexTool.getJob().getCounters().
+                findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+        assertEquals(0, indexTool.getJob().getCounters().
+                findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
+
+        IndexToolIT.runIndexTool(false, "", dataTableName,
+                indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY, "-fi");
+        CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+        assertEquals(0,
+                mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue());
+        assertEquals(0,
+                mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue());
+        assertEquals(0,
+                mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+    }
+
+    @Test
+    public void testUnsupportedDDLs() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            conn.createStatement().execute(
+                    "create table " + dataTableName + " (id varchar not null primary key, "
+                            + "A integer, B integer, C double, D varchar)" + (salted ?
+                            " SALT_BUCKETS=4" :
+                            ""));
+            String indexTableName = generateUniqueName();
+            try {
+                conn.createStatement().execute(
+                        "CREATE " + (uncovered ? "UNCOVERED " : " ") + (local ? "LOCAL " : " ")
+                                + "INDEX " + indexTableName + " on " + dataTableName + " (A) "
+                                + (uncovered ? "" : "INCLUDE (B, C, D)") + " WHERE E > 50 ASYNC");
+                Assert.fail();
+            } catch (ColumnNotFoundException e) {
+                // Expected
+            }
+            try {
+                conn.createStatement().execute(
+                        "CREATE " + (uncovered ? "UNCOVERED " : " ") + (local ? "LOCAL " : " ")
+                                + "INDEX " + indexTableName + " on " + dataTableName + " (A) "
+                                + (uncovered ? "" : "INCLUDE (B, C, D)")
+                                + " WHERE A  < ANY (SELECT B FROM " + dataTableName + ")");
+                Assert.fail();
+            } catch (SQLException e) {
+                // Index where clause cannot include a subquery
+                assertTrue(e.getSQLState().equals("23101"));
+            }
+            try {
+                conn.createStatement().execute(
+                        "CREATE LOCAL INDEX " + indexTableName + " on " + dataTableName + " (A)"
+                                + " WHERE A  > 0");
+                Assert.fail();
+            } catch (PhoenixParserException e) {
+                // Local indexes are not supported yet
+            }
+        }
+    }
+    @Test
+    public void testDDLWithAllDataTypes() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String fullTableName = String.format("%s.%s", "S", dataTableName);
+            conn.createStatement().execute(
+                    "create table " + fullTableName
+                            + " (id varchar not null, kp varchar not null, "
+                            + "A INTEGER, B UNSIGNED_INT, C BIGINT, D UNSIGNED_LONG, E TINYINT, "
+                            + "F UNSIGNED_TINYINT, G SMALLINT, H UNSIGNED_SMALLINT, I FLOAT, "
+                            + "J UNSIGNED_FLOAT, K DOUBLE, L UNSIGNED_DOUBLE, M DECIMAL, "
+                            + "N BOOLEAN, O TIME, P DATE, Q TIMESTAMP, R UNSIGNED_TIME, "
+                            + "S UNSIGNED_DATE, T UNSIGNED_TIMESTAMP, U CHAR(10), V BINARY(1024), "
+                            + "W VARBINARY, Y INTEGER ARRAY, Z VARCHAR ARRAY[10], AA DATE ARRAY, "
+                            + "AB TIMESTAMP ARRAY, AC UNSIGNED_TIME ARRAY, AD UNSIGNED_DATE ARRAY, "
+                            + "AE UNSIGNED_TIMESTAMP ARRAY "
+                            + "CONSTRAINT pk PRIMARY KEY (id,kp)) "
+                            + "MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0" );
+            String indexTableName = generateUniqueName();
+            try {
+                conn.createStatement().execute(
+                        "CREATE " + (uncovered ? "UNCOVERED " : " ") + (local ? "LOCAL " : " ")
+                                + "INDEX IF NOT EXISTS " + indexTableName + " on " + fullTableName
+                                + " (kp,A) WHERE (kp  > '5')");
+            } catch (PhoenixParserException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+    @Test
+    public void testAtomicUpsert() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            conn.createStatement().execute("create table " + dataTableName +
+                    " (id varchar not null primary key, " +
+                    "A integer, B integer, C double, D varchar)" +
+                    (salted ? " SALT_BUCKETS=4" : ""));
+            String indexTableName = generateUniqueName();
+            // Add rows to the data table before creating a partial index to test that the index
+            // will be built correctly by IndexTool
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id1', 25, 2, 3.14, 'a')");
+
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " (id, A, D) values ('id2', 100, 'b')");
+            conn.commit();
+            conn.createStatement().execute("CREATE " + (uncovered ? "UNCOVERED " : " ") +
+                    (local ? "LOCAL " : " ") +"INDEX "
+                    + indexTableName + " on " + dataTableName + " (A) " +
+                    (uncovered ? "" : "INCLUDE (B, C, D)") + " WHERE A > 50 ASYNC");
+
+            IndexToolIT.runIndexTool(false, null, dataTableName, indexTableName);
+
+            String selectSql = "SELECT  D from " + dataTableName + " WHERE A > 60";
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            // Verify that the index table is used
+            assertPlan((PhoenixResultSet) rs,  "", indexTableName);
+            assertTrue(rs.next());
+            assertEquals("b", rs.getString(1));
+            assertFalse(rs.next());
+
+            selectSql = "SELECT  D from " + dataTableName + " WHERE A = 50";
+            rs = conn.createStatement().executeQuery(selectSql);
+            // Verify that the index table is not used
+            assertPlan((PhoenixResultSet) rs,  "", dataTableName);
+
+            // Add more rows to test the index write path
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id3', 50, 2, 9.5, 'c')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id4', 75, 2, 9.5, 'd')");
+            conn.commit();
+
+            // Verify that index table includes only the rows with A > 50
+            selectSql = "SELECT * from " + indexTableName;
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(75, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(100, rs.getInt(1));
+            assertFalse(rs.next());
+
+            // Overwrite an existing row that satisfies the index WHERE clause using an atomic
+            // upsert such that the new version of the row does not satisfy the index where clause
+            // anymore. This should result in deleting the index row.
+            String dml = "UPSERT INTO " + dataTableName + " values ('id2', 300, 2, 9.5, 'd') " +
+                    "ON DUPLICATE KEY UPDATE A = 0";
+            conn.createStatement().execute(dml);
+            conn.commit();
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(75, rs.getInt(1));
+            assertFalse(rs.next());
+
+            // Retrieve update row from the data table and verify that the index table is not used
+            selectSql = "SELECT  ID from " + dataTableName + " WHERE A = 0";
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertPlan((PhoenixResultSet) rs,  "", dataTableName);
+            assertTrue(rs.next());
+            assertEquals("id2", rs.getString(1));
+
+            // Test index verification and repair by IndexTool
+            verifyIndex(dataTableName, indexTableName);
+
+            try (Connection newConn = DriverManager.getConnection(getUrl())) {
+                PTable indexTable = PhoenixRuntime.getTableNoCache(newConn, indexTableName);
+                assertTrue(indexTable.getIndexWhere().equals("A > 50"));
+            }
+        }
+    }
+
+    @Test
+    public void testComparisonOfColumns() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            conn.createStatement().execute("create table " + dataTableName +
+                    " (id varchar not null primary key, " +
+                    "A integer, B integer, C double, D varchar) COLUMN_ENCODED_BYTES=0" +
+                    (salted ? ", SALT_BUCKETS=4" : ""));
+            String indexTableName = generateUniqueName();
+
+            // Add rows to the data table before creating a partial index to test that the index
+            // will be built correctly by IndexTool
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id1', 25, 2, 3.14, 'a')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " (id, A, B, D) values ('id2', 100, 200, 'b')");
+            conn.commit();
+            conn.createStatement().execute("CREATE " + (uncovered ? "UNCOVERED " : " ") +
+                    (local ? "LOCAL " : " ") +"INDEX "
+                    + indexTableName + " on " + dataTableName + " (A) " +
+                    (uncovered ? "" : "INCLUDE (B, C, D)") + " WHERE A > B ASYNC");
+            conn.commit();
+            IndexToolIT.runIndexTool(false, null, dataTableName, indexTableName);
+
+            String selectSql = "SELECT D from " + dataTableName + " WHERE A > B and D is not NULL";
+
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            // Verify that the index table is used
+            assertPlan((PhoenixResultSet) rs,  "", indexTableName);
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertFalse(rs.next());
+
+            selectSql = "SELECT  D from " + dataTableName + " WHERE A > 100";
+            rs = conn.createStatement().executeQuery(selectSql);
+            // Verify that the index table is not used
+            assertPlan((PhoenixResultSet) rs,  "", dataTableName);
+
+            // Add more rows to test the index write path
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id3', 50, 300, 9.5, 'c')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id4', 75, 2, 9.5, 'd')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id4', 76, 2, 9.5, 'd')");
+            conn.commit();
+
+            // Verify that index table includes only the rows with A >  B
+            selectSql = "SELECT * from " + indexTableName;
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(25, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(76, rs.getInt(1));
+            assertFalse(rs.next());
+
+            // Overwrite an existing row that satisfies the index WHERE clause such that
+            // the new version of the row does not satisfy the index where clause anymore. This
+            // should result in deleting the index row.
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " (ID, B) values ('id1', 100)");
+            conn.commit();
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(76, rs.getInt(1));
+            assertFalse(rs.next());
+
+            // Test index verification and repair by IndexTool
+            verifyIndex(dataTableName, indexTableName);
+
+            try (Connection newConn = DriverManager.getConnection(getUrl())) {
+                PTable indexTable = PhoenixRuntime.getTableNoCache(newConn, indexTableName);
+                assertTrue(indexTable.getIndexWhere().equals("A > B"));
+            }
+        }
+    }
+
+    @Test
+    public void testIsNull() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            conn.createStatement().execute("create table " + dataTableName +
+                    " (id varchar not null primary key, " +
+                    "A integer, B integer, C double, D varchar)" +
+                    (salted ? " SALT_BUCKETS=4" : ""));
+            String indexTableName = generateUniqueName();
+
+            // Add rows to the data table before creating a partial index to test that the index
+            // will be built correctly by IndexTool
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id1', 70, 2, 3.14, 'a')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " (id, A, D) values ('id2', 100, 'b')");
+            conn.commit();
+            conn.createStatement().execute("CREATE " + (uncovered ? "UNCOVERED " : " ") +
+                    (local ? "LOCAL " : " ") +"INDEX "
+                    + indexTableName + " on " + dataTableName + " (A) " +
+                    (uncovered ? "" : "INCLUDE (B, C, D)") + " WHERE B IS NOT NULL AND " +
+                    "C IS NOT NULL ASYNC");
+            IndexToolIT.runIndexTool(false, null, dataTableName, indexTableName);
+
+            String selectSql = "SELECT A, D from " + dataTableName +
+                    " WHERE A > 60 AND B IS NOT NULL AND C IS NOT NULL";
+
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            // Verify that the index table is used
+            assertPlan((PhoenixResultSet) rs,  "", indexTableName);
+            assertTrue(rs.next());
+            assertEquals(70, rs.getInt(1));
+            assertEquals("a", rs.getString(2));
+            assertFalse(rs.next());
+
+            // Add more rows to test the index write path
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id3', 20, 2, 3.14, 'a')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id4', 90, 2, 3.14, 'a')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " (id, A, D) values ('id5', 150, 'b')");
+            conn.commit();
+
+            // Verify that index table includes only the rows where B is not null
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(70, rs.getInt(1));
+            assertEquals("a", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals(90, rs.getInt(1));
+            assertEquals("a", rs.getString(2));
+            assertFalse(rs.next());
+
+            rs = conn.createStatement().executeQuery("SELECT Count(*) from " + dataTableName);
+            // Verify that the index table is not used
+            assertPlan((PhoenixResultSet) rs,  "", dataTableName);
+            assertTrue(rs.next());
+            assertEquals(5, rs.getInt(1));
+
+            // Overwrite an existing row that satisfies the index WHERE clause such that
+            // the new version of the row does not satisfy the index where clause anymore. This
+            // should result in deleting the index row.
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " (ID, B) values ('id4', null)");
+            conn.commit();
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(70, rs.getInt(1));
+            assertEquals("a", rs.getString(2));
+            assertFalse(rs.next());
+
+            // Test index verification and repair by IndexTool
+            verifyIndex(dataTableName, indexTableName);
+
+            try (Connection newConn = DriverManager.getConnection(getUrl())) {
+                PTable indexTable = PhoenixRuntime.getTableNoCache(newConn, indexTableName);
+                assertTrue(indexTable.getIndexWhere().equals("(B IS NOT NULL  AND C IS NOT NULL )"));
+            }
+        }
+    }
+
+    @Test
+    public void testLike() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            conn.createStatement().execute("create table " + dataTableName +
+                    " (id varchar not null primary key, " +
+                    "A integer, B integer, C double, D varchar)" +
+                    (salted ? " SALT_BUCKETS=4" : ""));
+            String indexTableName = generateUniqueName();
+
+            // Add rows to the data table before creating a partial index to test that the index
+            // will be built correctly by IndexTool
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id1', 70, 2, 3.14, 'abcdef')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " (id, A, D) values ('id2', 100, 'bcdez')");
+            conn.commit();
+            conn.createStatement().execute("CREATE " + (uncovered ? "UNCOVERED " : " ") +
+                    (local ? "LOCAL " : " ") +"INDEX "
+                    + indexTableName + " on " + dataTableName + " (A) " +
+                    (uncovered ? "" : "INCLUDE (B, C, D)") + " WHERE D like '%cde_' ASYNC");
+            IndexToolIT.runIndexTool(false, null, dataTableName, indexTableName);
+
+            String selectSql = "SELECT D from " + dataTableName +
+                    " WHERE B is not NULL AND D like '%cde_'";
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            // Verify that the index table is used
+            assertPlan((PhoenixResultSet) rs,  "", indexTableName);
+            assertTrue(rs.next());
+            assertEquals("abcdef", rs.getString(1));
+            assertFalse(rs.next());
+
+            // Add more rows to test the index write path
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id3', 20, 2, 3.14, 'abcdegg')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id4', 10, 2, 3.14, 'aabecdeh')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " (id, A, D) values ('id5', 150, 'bbbb')");
+            conn.commit();
+
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("aabecdeh", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("abcdef", rs.getString(1));
+            assertFalse(rs.next());
+
+            selectSql = "SELECT Count(*) from " + dataTableName;
+            rs = conn.createStatement().executeQuery(selectSql);
+            // Verify that the index table is not used
+            assertPlan((PhoenixResultSet) rs,  "", dataTableName);
+            assertTrue(rs.next());
+            assertEquals(5, rs.getInt(1));
+
+            // Overwrite an existing row that satisfies the index WHERE clause such that
+            // the new version of the row does not satisfy the index where clause anymore. This
+            // should result in deleting the index row.
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " (id, D) values ('id4',  'zzz')");
+            conn.commit();
+            selectSql = "SELECT D from " + dataTableName +
+                    " WHERE B is not NULL AND D like '%cde_'";
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("abcdef", rs.getString(1));
+            assertFalse(rs.next());
+
+            // Test index verification and repair by IndexTool
+            verifyIndex(dataTableName, indexTableName);
+
+            try (Connection newConn = DriverManager.getConnection(getUrl())) {
+                PTable indexTable = PhoenixRuntime.getTableNoCache(newConn, indexTableName);
+                assertTrue(indexTable.getIndexWhere().equals("D LIKE '%cde_'"));
+            }
+        }
+    }
+
+    @Test
+    public void testPhoenixRowTimestamp() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            conn.createStatement().execute(
+                    "create table " + dataTableName + " (id varchar not null primary key, "
+                            + "A integer, B integer)" + (salted ?
+                            " SALT_BUCKETS=4" :
+                            ""));
+            String indexTableName = generateUniqueName();
+
+            // Add rows to the data table before creating a partial index to test that the index
+            // will be built correctly by IndexTool
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " values ('id1', 70, 2)");
+            conn.commit();
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " values ('id5', 0, 2)");
+            conn.commit();
+            Thread.sleep(10);
+            Timestamp before = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+            String timeZoneID = Calendar.getInstance().getTimeZone().getID();
+            conn.createStatement().execute(
+                    "CREATE " + (uncovered ? "UNCOVERED " : " ") + (local ? "LOCAL " : " ") +
+                            "INDEX " + indexTableName + " on " + dataTableName + " (A) " + (
+                            uncovered ?
+                                    "" :
+                                    "INCLUDE (B)") + " WHERE PHOENIX_ROW_TIMESTAMP() < " +
+                            "TO_DATE('" + before + "', 'yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')  ASYNC");
+            IndexToolIT.runIndexTool(false, null, dataTableName, indexTableName);
+
+            String selectSql = "SELECT A from " + dataTableName +
+                    " WHERE PHOENIX_ROW_TIMESTAMP() < TO_DATE('" + before +
+                    "', 'yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            // Verify that the index table is used
+            assertPlan((PhoenixResultSet) rs,  "", indexTableName);
+            assertTrue(rs.next());
+            assertEquals(0, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(70, rs.getInt(1));
+            assertFalse(rs.next());
+
+            // Add more rows to test the index write path
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " values ('id2', 20, 3)");
+            conn.commit();
+            conn.createStatement().execute(
+                    "upsert into " + dataTableName + " values ('id3', 10, 4)");
+            conn.commit();
+
+            rs = conn.createStatement().executeQuery("SELECT Count(*) from " + dataTableName);
+            // Verify that the index table is not used
+            assertPlan((PhoenixResultSet) rs, "", dataTableName);
+            assertTrue(rs.next());
+            assertEquals(4, rs.getInt(1));
+
+            rs = conn.createStatement().executeQuery("SELECT Count(*) from " + indexTableName);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+
+            // Overwrite an existing row that satisfies the index WHERE clause such that
+            // the new version of the row does not satisfy the index where clause anymore. This
+            // should result in deleting the index row.
+            conn.createStatement()
+                    .execute("upsert into " + dataTableName + " values ('id1',  70, 2)");
+            conn.commit();
+
+            rs = conn.createStatement().executeQuery(selectSql);
+            // Verify that the index table is used
+            assertPlan((PhoenixResultSet) rs,  "", indexTableName);
+            assertTrue(rs.next());
+            assertEquals(0, rs.getInt(1));
+            assertFalse(rs.next());
+
+            // Test index verification and repair by IndexTool
+            verifyIndex(dataTableName, indexTableName);
+        }
+    }
+
+    @Test
+    public void testViewIndexes() throws Exception {
+        String baseTableName =  generateUniqueName();
+        String globalViewName = generateUniqueName();
+        String globalViewIndexName =  generateUniqueName();
+        String tenantViewName =  generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + baseTableName +
+                    " (TENANT_ID CHAR(15) NOT NULL, KP CHAR(3) NOT NULL, PK2 DATE NOT NULL, "+
+                    "PK3 INTEGER NOT NULL, KV1 VARCHAR, KV2 VARCHAR, KV3 CHAR(15) " +
+                    "CONSTRAINT PK PRIMARY KEY(TENANT_ID, KP, PK2, PK3)) MULTI_TENANT=true" +
+                    (salted ? ", SALT_BUCKETS=4" : ""));
+            conn.createStatement().execute("CREATE VIEW " + globalViewName +
+                    " AS SELECT * FROM " + baseTableName + " WHERE  KP = '001'");
+            conn.createStatement().execute("CREATE " + (uncovered ? "UNCOVERED " : " ") +
+                    (local ? "LOCAL " : " ") + "INDEX " + globalViewIndexName + " on " +
+                    globalViewName + " (PK3 DESC, KV3) " +
+                    (uncovered ? "" : "INCLUDE (KV1)") + " WHERE KV3 IS NOT NULL ASYNC");
+
+            String tenantId = "tenantId";
+            Properties tenantProps = new Properties();
+            tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+            // Create a tenant specific view and view index
+            try (Connection tenantConn = DriverManager.getConnection(getUrl(), tenantProps)) {
+                tenantConn.createStatement().execute("CREATE VIEW " + tenantViewName + " AS SELECT * FROM " + globalViewName);
+                String tenantViewIndexName =  generateUniqueName();
+                tenantConn.createStatement().execute("CREATE " + (uncovered ? "UNCOVERED " : " ") +
+                        (local ? "LOCAL " : " ") + "INDEX " + tenantViewIndexName + " on " +
+                        tenantViewName + " (PK3) " +
+                        (uncovered ? "" : "INCLUDE (KV1)") + " WHERE PK3 > 4");
+                PreparedStatement
+                        stmt = tenantConn.prepareStatement("UPSERT INTO  " + tenantViewName + " (PK2, PK3, KV1, KV3) VALUES (?, ?, ?, ?)");
+                stmt.setDate(1, new Date(100));
+                stmt.setInt(2, 1);
+                stmt.setString(3, "KV1");
+                stmt.setString(4, "KV3");
+                stmt.executeUpdate();
+                stmt.setDate(1, new Date(100));
+                stmt.setInt(2, 2);
+                stmt.setString(3, "KV4");
+                stmt.setString(4, "KV5");
+                stmt.executeUpdate();
+                stmt.setDate(1, new Date(100));
+                stmt.setInt(2, 3);
+                stmt.setString(3, "KV6");
+                stmt.setString(4, "KV7");
+                stmt.executeUpdate();
+                stmt.setDate(1, new Date(100));
+                stmt.setInt(2, 4);
+                stmt.setString(3, "KV8");
+                stmt.setString(4, "KV9");
+                stmt.executeUpdate();
+                stmt.setDate(1, new Date(100));
+                stmt.setInt(2, 5);
+                stmt.setString(3, "KV10");
+                stmt.setString(4, "KV11");
+                stmt.executeUpdate();
+                tenantConn.commit();
+
+                // Verify that query uses the tenant view index
+                ResultSet rs = tenantConn.createStatement().executeQuery("SELECT KV1 FROM  " +
+                        tenantViewName + " WHERE PK3 = 5");
+                assertPlan((PhoenixResultSet) rs,  "", tenantViewIndexName);
+                assertTrue(rs.next());
+                assertEquals("KV10", rs.getString(1));
+                assertFalse(rs.next());
+
+                // Verify that query does not use the tenant view index when the partial index
+                // where clause does not contain the query where clause
+                rs = tenantConn.createStatement().executeQuery("SELECT KV1 FROM  " +
+                        tenantViewName + " WHERE PK3 = 4");
+                assertPlan((PhoenixResultSet) rs,  "", tenantViewName);
+                assertTrue(rs.next());
+                assertEquals("KV8", rs.getString(1));
+                assertFalse(rs.next());
+
+                // Verify that the tenant view index has only one row
+                rs = tenantConn.createStatement().executeQuery("SELECT Count(*) FROM  " +
+                        tenantViewIndexName);
+                assertPlan((PhoenixResultSet) rs,  "", tenantViewIndexName);
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt(1));
+            }
+
+            // Run the IndexTool MR job
+            IndexToolIT.runIndexTool(false, "", globalViewName,
+                    globalViewIndexName);
+            try (Connection tenantConn = DriverManager.getConnection(getUrl(), tenantProps)) {
+                // Verify that the query uses the global view index
+                ResultSet rs = tenantConn.createStatement().executeQuery("SELECT KV1 FROM  " +
+                        tenantViewName + " WHERE PK3 = 1 AND KV3 = 'KV3'");
+                assertPlan((PhoenixResultSet) rs,  "", tenantViewName +
+                        "#" + globalViewIndexName);
+                assertTrue(rs.next());
+                assertEquals("KV1", rs.getString(1));
+                assertFalse(rs.next());
+            }
+
+            // Verify that the query uses the global view index
+            ResultSet rs = conn.createStatement().executeQuery("SELECT KV1 FROM  " +
+                    globalViewName + " WHERE PK3 = 1 AND KV3 = 'KV3'");
+            assertPlan((PhoenixResultSet) rs,  "", globalViewIndexName);
+            assertTrue(rs.next());
+            assertEquals("KV1", rs.getString(1));
+            assertFalse(rs.next());
+
+            // Verify that the global view index has five rows
+            rs = conn.createStatement().executeQuery("SELECT Count(*) FROM  " +
+                    globalViewIndexName);
+            assertPlan((PhoenixResultSet) rs,  "", globalViewIndexName);
+            assertTrue(rs.next());
+            assertEquals(5, rs.getInt(1));
+        }
+    }
+
+    @Test
+    public void testPartialIndexPreferredOverFullIndex() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            conn.createStatement().execute("create table " + dataTableName +
+                    " (id varchar not null primary key, " +
+                    "A integer, B integer, C double, D varchar)" +
+                    (salted ? " SALT_BUCKETS=4" : ""));
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " values ('id1', 10, 2, 3.14, 'a')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + dataTableName +
+                    " (id, A, D) values ('id2', 100, 'b')");
+            conn.commit();
+            String fullIndexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE " + (uncovered ? "UNCOVERED " : " ") +
+                    (local ? "LOCAL " : " ") +"INDEX "
+                    + fullIndexTableName + " on " + dataTableName + " (A) " +
+                    (uncovered ? "" : "INCLUDE (B, C, D)") + " ASYNC");
+            IndexToolIT.runIndexTool(false, null, dataTableName, fullIndexTableName);
+            String partialIndexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE " + (uncovered ? "UNCOVERED " : " ") +
+                    (local ? "LOCAL " : " ") +"INDEX "
+                    + partialIndexTableName + " on " + dataTableName + " (A) " +
+                    (uncovered ? "" : "INCLUDE (B, C, D)") + " WHERE A > 50 ASYNC");
+            IndexToolIT.runIndexTool(false, null, dataTableName, partialIndexTableName);
+            String selectSql = "SELECT  D from " + dataTableName + " WHERE A > 60";
+            // Verify that the partial index table is used
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertPlan((PhoenixResultSet) rs,  "", partialIndexTableName);
+            assertTrue(rs.next());
+            assertEquals("b", rs.getString(1));
+            assertFalse(rs.next());
+
+            selectSql = "SELECT  D from " + dataTableName + " WHERE A < 50";
+            // Verify that the full index table is used
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertPlan((PhoenixResultSet) rs,  "", fullIndexTableName);
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertFalse(rs.next());
+        }
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java
index aef6009ae5..e29284a951 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/UncoveredGlobalIndexRegionScannerIT.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.fail;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.phoenix.exception.PhoenixParserException;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.query.KeyRange;
@@ -115,7 +115,7 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest {
                     conn.createStatement().execute("CREATE UNCOVERED INDEX " + indexTableName
                             + " on " + dataTableName + " (val1) INCLUDE (val2)");
                     Assert.fail();
-                } catch (SQLException e) {
+                } catch (PhoenixParserException e) {
                     // Expected
                 }
                 // The LOCAL keyword should not be allowed with UNCOVERED
@@ -123,7 +123,7 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest {
                     conn.createStatement().execute("CREATE UNCOVERED LOCAL INDEX " + indexTableName
                             + " on " + dataTableName);
                     Assert.fail();
-                } catch (SQLException e) {
+                } catch (PhoenixParserException e) {
                     // Expected
                 }
             } else {
@@ -143,9 +143,6 @@ public class UncoveredGlobalIndexRegionScannerIT extends BaseTest {
             if (uncovered) {
                 conn.createStatement().execute("CREATE UNCOVERED INDEX IDX_" + dataTableName
                         + " on " + dataTableName + " (PHOENIX_ROW_TIMESTAMP())");
-                conn.createStatement()
-                        .execute("CREATE UNCOVERED LOCAL INDEX IDX_LOCAL_" + dataTableName
-                                + " on " + dataTableName + " (PHOENIX_ROW_TIMESTAMP())");
             } else {
                 conn.createStatement().execute("CREATE INDEX IDX_" + dataTableName
                         + " on " + dataTableName + " (PHOENIX_ROW_TIMESTAMP())");
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 54e5a564f4..5cb73379bb 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -533,6 +533,7 @@ create_index_node returns [CreateIndexStatement ret]
     :   CREATE u=UNCOVERED? l=LOCAL? INDEX (IF NOT ex=EXISTS)? i=index_name ON t=from_table_name
         (LPAREN ik=ik_constraint RPAREN)
         (in=INCLUDE (LPAREN icrefs=column_names RPAREN))?
+        (WHERE where=expression)?
         (async=ASYNC)?
         (p=fam_properties)?
         (SPLIT ON v=value_expression_list)?
@@ -540,10 +541,16 @@ create_index_node returns [CreateIndexStatement ret]
             if (u !=null && in != null) {
                 throw new RuntimeException("UNCOVERED indexes cannot have the INCLUDE clause");
             }
+            if (l !=null && u != null) {
+                throw new RuntimeException("UNCOVERED cannot be used with LOCAL");
+            }
+            if (l !=null && where != null) {
+                throw new RuntimeException("Partial local indexes are not supported");
+            }
             ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null,
                     l==null ? (u==null ? IndexType.getDefault() : IndexType.UNCOVERED_GLOBAL) :
                     IndexType.LOCAL, async != null, getBindCount(), new HashMap<String,
-                    UDFParseNode>(udfParseNodes));
+                    UDFParseNode>(udfParseNodes), where);
         }
     ;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
index 1837b522a2..4bd3a4bafe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -17,15 +17,15 @@
  */
 package org.apache.phoenix.compile;
 
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -33,8 +33,42 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.parse.CreateIndexStatement;
 import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor;
+import org.apache.phoenix.parse.SubqueryParseNode;
+import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.types.PArrayDataType;
+import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.schema.types.PDateArray;
+import org.apache.phoenix.schema.types.PNumericType;
+import org.apache.phoenix.schema.types.PTime;
+import org.apache.phoenix.schema.types.PTimeArray;
+import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.schema.types.PTimestampArray;
+import org.apache.phoenix.schema.types.PUnsignedDate;
+import org.apache.phoenix.schema.types.PUnsignedDateArray;
+import org.apache.phoenix.schema.types.PUnsignedTime;
+import org.apache.phoenix.schema.types.PUnsignedTimeArray;
+import org.apache.phoenix.schema.types.PUnsignedTimestamp;
+import org.apache.phoenix.schema.types.PUnsignedTimestampArray;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 
 public class CreateIndexCompiler {
     private final PhoenixStatement statement;
@@ -45,11 +79,155 @@ public class CreateIndexCompiler {
         this.operation = operation;
     }
 
+    /**
+     * This is to check if the index where clause has a subquery
+     */
+    private static class IndexWhereParseNodeVisitor extends StatelessTraverseAllParseNodeVisitor {
+        private boolean  hasSubquery = false;
+
+        @Override
+        public Void visit(SubqueryParseNode node) throws SQLException {
+            hasSubquery = true;
+            return null;
+        }
+    }
+
+    private String getValue(PDataType type) {
+        if (type instanceof PNumericType) {
+            return "0";
+        } else if (type instanceof PChar || type instanceof PVarchar) {
+            return "'a'";
+        } else if (type instanceof PDate || type instanceof PUnsignedDate || type instanceof PTime
+                || type instanceof PUnsignedTime || type instanceof PTimestamp
+                || type instanceof PUnsignedTimestamp) {
+            Timestamp now = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+            return "TO_DATE('" + now + "','yyyy-MM-dd HH:mm:ss.SSS', 'PST')";
+        } else if (type instanceof PBoolean) {
+            return "TRUE";
+        } else if (type instanceof PDateArray || type instanceof PUnsignedDateArray
+                || type instanceof PTimeArray || type instanceof PUnsignedTimeArray
+                || type instanceof PTimestampArray || type instanceof PUnsignedTimestampArray) {
+            return "ARRAY[" + getValue(PDate.INSTANCE) + "]";
+        } else if (type instanceof PArrayDataType) {
+            return "ARRAY" + type.getSampleValue().toString();
+        } else {
+            return "0123";
+        }
+    }
+
+    /**
+     * Verifies that index WHERE clause does not include a subquery and it can
+     * be evaluated on a single data table row.
+     *
+     */
+    private void verifyIndexWhere(ParseNode indexWhere, StatementContext context,
+            TableName dataTableName) throws SQLException {
+        if (indexWhere == null) {
+            return;
+        }
+        // Verify that index WHERE clause does not include a subquery
+        PhoenixConnection connection = context.getConnection();
+        IndexWhereParseNodeVisitor indexWhereParseNodeVisitor = new IndexWhereParseNodeVisitor();
+        indexWhere.accept(indexWhereParseNodeVisitor);
+        if (indexWhereParseNodeVisitor.hasSubquery) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_INDEX_WHERE_WITH_SUBQUERY).
+                    build().buildException();
+        }
+
+        // Verify that index WHERE clause can be evaluated on a single data table row
+
+        // Convert the index WHERE ParseNode to an Expression
+        ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
+        Expression indexWhereExpression = indexWhere.accept(expressionCompiler);
+        PTable dataTable = PhoenixRuntime.getTable(connection, dataTableName.toString());
+
+        // Create a full data table row. Skip generating values for view constants as they
+        // will be generated by the Upsert compiler
+        boolean autoCommit = connection.getAutoCommit();
+        connection.setAutoCommit(false);
+        StringBuilder stringBuilder = new StringBuilder("UPSERT INTO ");
+        stringBuilder.append(dataTableName);
+        int startingColumnIndex = dataTable.getBucketNum() != null ? 1 : 0;
+        startingColumnIndex += dataTable.getTenantId() != null ? 1 : 0;
+        stringBuilder.append(" (");
+        PColumn column;
+        byte[] value;
+        int i = startingColumnIndex;
+        for (; i < dataTable.getColumns().size() - 1; i++) {
+            column =  dataTable.getColumns().get(i);
+            value = column.getViewConstant();
+            if (value == null) {
+                stringBuilder.append(column.getName().getString() + ",");
+            }
+        }
+        column =  dataTable.getColumns().get(i);
+        value = column.getViewConstant();
+        if (value == null) {
+            stringBuilder.append(column.getName().getString() + ")");
+        } else {
+            stringBuilder.append(")");
+        }
+
+        stringBuilder.append(" Values(");
+        i = startingColumnIndex;
+        for (; i < dataTable.getColumns().size() - 1; i++) {
+            column =  dataTable.getColumns().get(i);
+            value = column.getViewConstant();
+            if (value == null) {
+                PDataType dataType = column.getDataType();
+                stringBuilder.append(getValue(dataType) + ",");
+            }
+        }
+        column =  dataTable.getColumns().get(i);
+        value = column.getViewConstant();
+        if (value == null) {
+            PDataType dataType = column.getDataType();
+            stringBuilder.append(getValue(dataType) + ")");
+        } else {
+            stringBuilder.append(")");
+        }
+
+        try (PreparedStatement ps =
+                context.getConnection().prepareStatement(stringBuilder.toString())) {
+            ps.execute();
+            Iterator<Pair<byte[], List<Cell>>> dataTableNameAndMutationIterator =
+                    PhoenixRuntime.getUncommittedDataIterator(connection);
+            Pair<byte[], List<Cell>> dataTableNameAndMutation = null;
+            while (dataTableNameAndMutationIterator.hasNext()) {
+                dataTableNameAndMutation = dataTableNameAndMutationIterator.next();
+                if (java.util.Arrays.equals(dataTableNameAndMutation.getFirst(),
+                        dataTable.getPhysicalName().getBytes())) {
+                    break;
+                } else {
+                    dataTableNameAndMutation = null;
+                }
+            }
+            if (dataTableNameAndMutation == null) {
+                throw new RuntimeException(
+                        "Unexpected result from " + "PhoenixRuntime#getUncommittedDataIterator for "
+                                + dataTableName);
+            }
+
+            // Evaluate the WHERE expression on the data table row
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+            List<Cell> cols = dataTableNameAndMutation.getSecond();
+            Collections.sort(cols, CellComparator.getInstance());
+            MultiKeyValueTuple tuple = new MultiKeyValueTuple(cols);
+            if (!indexWhereExpression.evaluate(tuple, ptr)) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_EVALUATE_INDEX_WHERE).
+                        build().buildException();
+            }
+        } finally {
+            connection.setAutoCommit(autoCommit);
+        }
+    }
     public MutationPlan compile(final CreateIndexStatement create) throws SQLException {
         final PhoenixConnection connection = statement.getConnection();
         final ColumnResolver resolver = FromCompiler.getResolver(create, connection, create.getUdfParseNodes());
         Scan scan = new Scan();
         final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
+        verifyIndexWhere(create.getWhere(), context, create.getTable().getName());
         ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
         List<ParseNode> splitNodes = create.getSplitNodes();
         if (create.getIndexType() == IndexType.LOCAL) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 934e8f8770..71ad48f554 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.util.ScanUtil;
 
 import java.sql.SQLException;
 import java.util.Collections;
+import java.util.Set;
 
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 import static org.apache.phoenix.util.ScanUtil.addEmptyColumnToScan;
@@ -89,6 +90,16 @@ public class ServerBuildIndexCompiler {
         this.tableName = tableName;
     }
 
+    private static void addColumnsToScan(Set<ColumnReference> columns, Scan scan, PTable index) {
+        for (ColumnReference columnRef : columns) {
+            if (index.getImmutableStorageScheme() ==
+                    PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                scan.addFamily(columnRef.getFamily());
+            } else {
+                scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
+            }
+        }
+    }
     public MutationPlan compile(PTable index) throws SQLException {
         try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
             String query = "SELECT /*+ NO_INDEX */ count(*) FROM " + tableName;
@@ -104,12 +115,9 @@ public class ServerBuildIndexCompiler {
             IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
             // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
             // However, in this case, we need to project all of the data columns that contribute to the index.
-            for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
-                if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
-                    scan.addFamily(columnRef.getFamily());
-                } else {
-                    scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
-                }
+            addColumnsToScan(indexMaintainer.getAllColumns(), scan, index);
+            if (indexMaintainer.getIndexWhereColumns() != null) {
+                addColumnsToScan(indexMaintainer.getIndexWhereColumns(), scan, index);
             }
             IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(index), plan.getContext().getConnection());
             scan.setAttribute(PhoenixIndexCodec.INDEX_NAME_FOR_IDX_MAINTAINER,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index bd27773afc..f795254a36 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -88,7 +88,30 @@ public class StatementContext {
     public StatementContext(PhoenixStatement statement) {
         this(statement, new Scan());
     }
-    
+    public StatementContext(StatementContext context) {
+        this.resolver = context.resolver;
+        this.connection = context.connection;
+        this.binds = context.binds;
+        this.scan = context.scan;
+        this.expressions = context.expressions;
+        this.aggregates = context.aggregates;
+        this.numberFormat = context.numberFormat;
+        this.tempPtr = context.tempPtr;
+        this.statement = context.statement;
+        this.dataColumns = context.dataColumns;
+        this.retryingPersistentCache = context.retryingPersistentCache;
+        this.currentTime = context.currentTime;
+        this.scanRanges = context.scanRanges;
+        this.sequences = context.sequences;
+        this.currentTable = context.currentTable;
+        this.whereConditionColumns = context.whereConditionColumns;
+        this.subqueryResults = context.subqueryResults;
+        this.readMetricsQueue = context.readMetricsQueue;
+        this.overAllQueryMetrics = context.overAllQueryMetrics;
+        this.queryLogger = context.queryLogger;
+        this.isClientSideUpsertSelect = context.isClientSideUpsertSelect;
+        this.isUncoveredIndex = context.isUncoveredIndex;
+    }
     /**
      *  Constructor that lets you override whether or not to collect request level metrics.
      */
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index d164b04c8c..d19142e795 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -17,6 +17,12 @@
  */
 package org.apache.phoenix.compile;
 
+import static org.apache.hadoop.hbase.CompareOperator.EQUAL;
+import static org.apache.hadoop.hbase.CompareOperator.GREATER;
+import static org.apache.hadoop.hbase.CompareOperator.GREATER_OR_EQUAL;
+import static org.apache.hadoop.hbase.CompareOperator.LESS;
+import static org.apache.hadoop.hbase.CompareOperator.LESS_OR_EQUAL;
+import static org.apache.hadoop.hbase.CompareOperator.NOT_EQUAL;
 import static org.apache.phoenix.util.EncodedColumnsUtil.isPossibleToUseEncodedCQFilter;
 
 import java.io.ByteArrayOutputStream;
@@ -24,32 +30,47 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.phoenix.expression.AddExpression;
 import org.apache.phoenix.expression.AndExpression;
+import org.apache.phoenix.expression.ArrayConstructorExpression;
+import org.apache.phoenix.expression.BaseTerminalExpression;
+import org.apache.phoenix.expression.CaseExpression;
+import org.apache.phoenix.expression.CoerceExpression;
+import org.apache.phoenix.expression.ColumnExpression;
+import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.Determinism;
+import org.apache.phoenix.expression.DivideExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.expression.InListExpression;
+import org.apache.phoenix.expression.IsNullExpression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.expression.LikeExpression;
 import org.apache.phoenix.expression.LiteralExpression;
-import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
-import org.apache.phoenix.filter.MultiCFCQKeyValueComparisonFilter;
-import org.apache.phoenix.filter.MultiCQKeyValueComparisonFilter;
-import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
-import org.apache.phoenix.filter.RowKeyComparisonFilter;
-import org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter;
-import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter;
+import org.apache.phoenix.expression.ModulusExpression;
+import org.apache.phoenix.expression.MultiplyExpression;
+import org.apache.phoenix.expression.NotExpression;
+import org.apache.phoenix.expression.OrExpression;
+import org.apache.phoenix.expression.ProjectedColumnExpression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.expression.RowValueConstructorExpression;
+import org.apache.phoenix.expression.SingleCellColumnExpression;
+import org.apache.phoenix.expression.SingleCellConstructorExpression;
+import org.apache.phoenix.expression.StringConcatExpression;
+import org.apache.phoenix.expression.SubtractExpression;
+import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression;
+import org.apache.phoenix.expression.function.ArrayElemRefExpression;
+import org.apache.phoenix.expression.function.ScalarFunction;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.expression.visitor.TraverseAllExpressionVisitor;
 import org.apache.phoenix.parse.ColumnParseNode;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
@@ -58,20 +79,36 @@ import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor;
 import org.apache.phoenix.parse.SubqueryParseNode;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.TypeMismatchException;
+import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
+import org.apache.phoenix.filter.MultiCFCQKeyValueComparisonFilter;
+import org.apache.phoenix.filter.MultiCQKeyValueComparisonFilter;
+import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
+import org.apache.phoenix.filter.RowKeyComparisonFilter;
+import org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter;
+import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.TypeMismatchException;
 import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
 import org.apache.phoenix.util.ByteUtil;
@@ -177,7 +214,7 @@ public class WhereCompiler {
     public static class WhereExpressionCompiler extends ExpressionCompiler {
         private boolean disambiguateWithFamily;
 
-        WhereExpressionCompiler(StatementContext context) {
+        public WhereExpressionCompiler(StatementContext context) {
             super(context, true);
         }
 
@@ -364,7 +401,541 @@ public class WhereCompiler {
             ScanUtil.andFilterAtBeginning(scan, scanRanges.getSkipScanFilter());
         }
     }
-    
+
+
+    public static Expression transformDNF(ParseNode where, StatementContext statementContext)
+            throws SQLException {
+        if (where == null) {
+            return null;
+        }
+        StatementContext context = new StatementContext(statementContext);
+        context.setResolver(FromCompiler.getResolver(context.getCurrentTable()));
+        Expression expression = where.accept(new WhereExpressionCompiler(context));
+        Expression dnf = expression.accept(new DNFExpressionRewriter());
+        return dnf;
+    }
+
+    /**
+     * Rewrites an expression in DNF (Disjunctive Normal Form). To do that
+     * (1) it transforms operators like RVC, IN, and BETWEEN to their AND/OR equivalents,
+     * (2) eliminate double negations and apply DeMorgan rule, i.e.,
+     *      NOT (A AND B) = NOT A OR NOT B and  NOT (A OR B) = NOT A AND NOT B, and
+     * (3) distributes AND over OR, i.e.,
+     *      (A OR B) AND (C OR D) = (A AND C) OR (A AND D) OR (B AND C) OR (B AND D).
+     */
+    public static class DNFExpressionRewriter extends TraverseAllExpressionVisitor<Expression> {
+        /**
+         * Flattens nested AND expressions.
+         * For example A > 10 AND (B = 10 AND C > 0) is an AndExpression with two children that are
+         * A > 10 and (B = 10 AND C > 0). Note the second child is another AndExpression. This is
+         * flattened as an AndExpression ( A > 10 AND B = 10 AND C > 0) with three
+         * children that are  A > 10, B = 10, and C > 0.
+         *
+         */
+
+        private static AndExpression flattenAnd(List<Expression> l) {
+            for (Expression e : l) {
+                if (e instanceof AndExpression) {
+                    List<Expression> flattenedList = new ArrayList<>(l.size()
+                            + e.getChildren().size());
+                    for (Expression child : l) {
+                        if (child instanceof AndExpression) {
+                            flattenedList.addAll(child.getChildren());
+                        } else {
+                            flattenedList.add(child);
+                        }
+                    }
+                    return new AndExpression(flattenedList);
+                }
+            }
+            return new AndExpression(l);
+        }
+
+        /**
+         * Flattens nested OR expressions.
+         * For example A > 10 OR (B = 10 OR C > 0) is an OrExpression with two children that are
+         * A > 10 and (B = 10 OR C > 0). Note the second child is another OrExpression. This is
+         * flattened as an OrExpression  ( A > 10 OR B = 10 OR C > 0) with three
+         * children that are  A > 10, B = 10, and C > 0.
+         *
+         */
+        private static OrExpression flattenOr(List<Expression> l) {
+            for (Expression e : l) {
+                if (e instanceof OrExpression) {
+                    List<Expression> flattenedList = new ArrayList<>(l.size()
+                            + e.getChildren().size());
+                    for (Expression child : l) {
+                        if (child instanceof OrExpression) {
+                            flattenedList.addAll(child.getChildren());
+                        } else {
+                            flattenedList.add(child);
+                        }
+                    }
+                    return new OrExpression(flattenedList);
+                }
+            }
+            return new OrExpression(l);
+        }
+
+        /**
+         * Flattens nested AND expressions and then distributes AND over OR.
+         *
+         */
+        @Override public Expression visitLeave(AndExpression node, List<Expression> l) {
+            AndExpression andExpression = flattenAnd(l);
+
+            boolean foundOrChild = false;
+            int i;
+            Expression child = null;
+            List<Expression> andChildren = andExpression.getChildren();
+            for (i = 0; i < andChildren.size(); i++) {
+                child = andChildren.get(i);
+                if (child instanceof OrExpression) {
+                    foundOrChild = true;
+                    break;
+                }
+            }
+
+            if (foundOrChild) {
+                List<Expression> flattenedList = new ArrayList<>(andChildren.size() - 1);
+                for (int j = 0; j < andChildren.size(); j++) {
+                    if (i != j) {
+                        flattenedList.add(andChildren.get(j));
+                    }
+                }
+                List<Expression> orList = new ArrayList<>(child.getChildren().size());
+                for (Expression grandChild : child.getChildren()) {
+                    List<Expression> andList = new ArrayList<>(l.size());
+                    andList.addAll(flattenedList);
+                    andList.add(grandChild);
+                    orList.add(visitLeave(new AndExpression(andList), andList));
+                }
+                return visitLeave(new OrExpression(orList), orList);
+            }
+            return andExpression;
+        }
+        @Override public Expression visitLeave(OrExpression node, List<Expression> l) {
+            return flattenOr(l);
+        }
+
+        @Override public Expression visitLeave(ScalarFunction node, List<Expression> l) {
+            return node;
+        }
+
+        private static ComparisonExpression createComparisonExpression(CompareOperator op,
+                Expression lhs, Expression rhs) {
+            List<Expression> children = new ArrayList<>(2);
+            children.add(lhs);
+            children.add(rhs);
+            return new ComparisonExpression(children, op);
+        }
+
+        @Override public Expression visitLeave(ComparisonExpression node, List<Expression> l) {
+            if (l == null || l.isEmpty()) {
+                return node;
+            }
+            Expression lhs = l.get(0);
+            Expression rhs = l.get(1);
+            if (!(lhs instanceof RowValueConstructorExpression)
+                    || !(rhs instanceof RowValueConstructorExpression)) {
+                return new ComparisonExpression(l, node.getFilterOp());
+            }
+
+            // Rewrite RVC in DNF (Disjunctive Normal Form)
+            // For example
+            // (A, B, C ) op (a, b, c) where op is == or != equals to
+            // (A != a and B != b and C != c)
+            // (A, B, C ) op (a, b, c) where op is <, <=, >, or >= is equals to
+            // (A == a and B == b and C op c) or (A == a and  B op b) or A op c
+
+            int childCount = lhs.getChildren().size();
+            if (node.getFilterOp() == EQUAL
+                    || node.getFilterOp() == NOT_EQUAL) {
+                List<Expression> andList = new ArrayList<>(childCount);
+                for (int i = 0; i < childCount; i++) {
+                    andList.add(createComparisonExpression(node.getFilterOp(),
+                            lhs.getChildren().get(i),
+                            rhs.getChildren().get(i)));
+                }
+                return new AndExpression(andList);
+            }
+            List<Expression> orList = new ArrayList<>(childCount);
+            for (int i = 0; i < childCount; i++) {
+                List<Expression> andList = new ArrayList<>(childCount);
+                int j;
+                for (j = 0; j < childCount - i - 1; j++) {
+                    andList.add(createComparisonExpression(EQUAL, lhs.getChildren().get(j),
+                            rhs.getChildren().get(j)));
+                }
+                andList.add(createComparisonExpression(node.getFilterOp(), lhs.getChildren().get(j),
+                        rhs.getChildren().get(j)));
+                orList.add(new AndExpression(andList));
+            }
+            return new OrExpression(orList);
+        }
+
+        @Override public Expression visitLeave(LikeExpression node, List<Expression> l) {
+            return node;
+        }
+
+        @Override public Expression visitLeave(SingleAggregateFunction node, List<Expression> l) {
+            return node;
+        }
+
+        @Override public Expression visitLeave(CaseExpression node, List<Expression> l) {
+            return node;
+        }
+
+        private static Expression negate(ComparisonExpression node) {
+            CompareOperator op = node.getFilterOp();
+            Expression lhs = node.getChildren().get(0);
+            Expression rhs = node.getChildren().get(1);
+            switch (op) {
+            case LESS:
+                return createComparisonExpression(GREATER_OR_EQUAL, lhs, rhs);
+            case LESS_OR_EQUAL:
+                return createComparisonExpression(GREATER, lhs, rhs);
+            case EQUAL:
+                return createComparisonExpression(NOT_EQUAL, lhs, rhs);
+            case NOT_EQUAL:
+                return createComparisonExpression(EQUAL, lhs, rhs);
+            case GREATER_OR_EQUAL:
+                return createComparisonExpression(LESS, lhs, rhs);
+            case GREATER:
+                return createComparisonExpression(LESS_OR_EQUAL, lhs, rhs);
+            default:
+                throw new IllegalArgumentException("Unexpected CompareOp of " + op);
+            }
+        }
+        private static List<Expression> negateChildren(List<Expression> children) {
+            List<Expression> list = new ArrayList<>(children.size());
+            for (Expression child : children) {
+                if (child instanceof ComparisonExpression) {
+                    list.add(negate((ComparisonExpression) child));
+                } else if (child instanceof OrExpression) {
+                    list.add(negate((OrExpression) child));
+                } else if (child instanceof AndExpression) {
+                    list.add(negate((AndExpression) child));
+                } else if (child instanceof ColumnExpression) {
+                    list.add(new NotExpression(child));
+                } else if (child instanceof NotExpression) {
+                    list.add(child.getChildren().get(0));
+                } else {
+                    throw new IllegalArgumentException("Unexpected Instance of " + child);
+                }
+            }
+            return list;
+        }
+        private static Expression negate(OrExpression node) {
+            return new AndExpression(negateChildren(node.getChildren()));
+        }
+
+        private static Expression negate(AndExpression node) {
+            return new OrExpression(negateChildren(node.getChildren()));
+        }
+        @Override public Expression visitLeave(NotExpression node, List<Expression> l) {
+            Expression child = l.get(0);
+            if (child instanceof OrExpression) {
+                return negate((OrExpression) child);
+            } else if (child instanceof AndExpression) {
+                return negate((AndExpression) child);
+            } else if (child instanceof ComparisonExpression) {
+                return negate((ComparisonExpression) child);
+            } else if (child instanceof NotExpression) {
+                return child.getChildren().get(0);
+            } else if (child instanceof IsNullExpression) {
+                return new IsNullExpression(ImmutableList.of(l.get(0).getChildren().get(0)),
+                        !((IsNullExpression) child).isNegate());
+            } else {
+                return new NotExpression(child);
+            }
+        }
+
+        private Expression transformInList(InListExpression node, boolean negate,
+                List<Expression> l) {
+            List<Expression> list = new ArrayList<>(node.getKeyExpressions().size());
+            for (Expression element : node.getKeyExpressions()) {
+                if (negate) {
+                    list.add(createComparisonExpression(NOT_EQUAL, l.get(0), element));
+                } else {
+                    list.add(createComparisonExpression(EQUAL, l.get(0), element));
+                }
+            }
+            if (negate) {
+                return new AndExpression(list);
+            } else {
+                return new OrExpression(list);
+            }
+        }
+
+        @Override public Expression visitLeave(InListExpression node, List<Expression> l) {
+            Expression inList = transformInList(node, false, l);
+            Expression firstElement = inList.getChildren().get(0);
+            // Check if inList includes RVC expressions. If so, rewrite them
+            if (firstElement instanceof ComparisonExpression
+                    && firstElement.getChildren().get(0) instanceof RowValueConstructorExpression) {
+                List<Expression> list = new ArrayList<>(node.getKeyExpressions().size());
+                for (Expression e : inList.getChildren()) {
+                    list.add(visitLeave((ComparisonExpression) e, e.getChildren()));
+                }
+                if (inList instanceof OrExpression) {
+                    return visitLeave(new OrExpression(list), list);
+                } else {
+                    return visitLeave(new AndExpression(list), list);
+                }
+            } else {
+                return inList;
+            }
+        }
+
+        @Override public Expression visitLeave(IsNullExpression node, List<Expression> l) {
+            return node;
+        }
+
+        @Override public Expression visitLeave(SubtractExpression node, List<Expression> l) {
+            return node;
+        }
+
+        @Override public Expression visitLeave(MultiplyExpression node, List<Expression> l) {
+            return node;
+        }
+
+        @Override public Expression visitLeave(AddExpression node, List<Expression> l) {
+            return node;
+        }
+
+        @Override public Expression visitLeave(DivideExpression node, List<Expression> l) {
+            return node;
+        }
+
+        @Override public Expression visitLeave(CoerceExpression node, List<Expression> l) {
+            return node;
+        }
+
+        @Override
+        public Expression visitLeave(ArrayConstructorExpression node, List<Expression> l) {
+            return node;
+        }
+
+        @Override
+        public Expression visitLeave(SingleCellConstructorExpression node, List<Expression> l) {
+            return node;
+        }
+
+        @Override public Expression visit(CorrelateVariableFieldAccessExpression node) {
+            return node;
+        }
+
+        @Override public Expression visit(LiteralExpression node) {
+            return node;
+        }
+
+        @Override public Expression visit(RowKeyColumnExpression node) {
+            return node;
+        }
+
+        @Override public Expression visit(KeyValueColumnExpression node) {
+            return node;
+        }
+
+        @Override public Expression visit(SingleCellColumnExpression node) {
+            return node;
+        }
+
+        @Override public Expression visit(ProjectedColumnExpression node) {
+            return node;
+        }
+
+        @Override public Expression visit(SequenceValueExpression node) {
+            return node;
+        }
+
+        @Override public Expression visitLeave(StringConcatExpression node, List<Expression> l) {
+            return node;
+        }
+
+        @Override
+        public Expression visitLeave(RowValueConstructorExpression node, List<Expression> l) {
+            return node;
+        }
+
+        @Override public Expression visitLeave(ModulusExpression node, List<Expression> l) {
+            return node;
+        }
+
+        @Override
+        public Expression visitLeave(ArrayAnyComparisonExpression node, List<Expression> l) {
+            return node;
+        }
+
+        @Override public Expression visitLeave(ArrayElemRefExpression node, List<Expression> l) {
+            return node;
+        }
+    }
+
+    public static LiteralExpression getLiteralExpression(Expression node) {
+        while (!node.getChildren().isEmpty()) {
+            node = node.getChildren().get(0);
+        }
+        if (node instanceof LiteralExpression) {
+            return (LiteralExpression) node;
+        }
+        throw new IllegalArgumentException("Unexpected instance type for " + node);
+    }
+
+
+    public static BaseTerminalExpression getBaseTerminalExpression(Expression node) {
+        while (!node.getChildren().isEmpty()) {
+            node = node.getChildren().get(0);
+        }
+        if (node instanceof BaseTerminalExpression) {
+            return (BaseTerminalExpression) node;
+        }
+        throw new IllegalArgumentException("Unexpected instance type for " + node);
+    }
+
+    /**
+     * Determines if nodeA is contained by nodeB.
+     *
+     * nodeB contains nodeA if every conjunct of nodeB contains at least one conjunct of nodeA.
+     *
+     * Example 1: nodeA is contained by nodeB where
+     *      nodeA = (A > 5) and (A < 10) and (B > 0) and C = 5, and
+     *      nodeB = (A > 0)
+     *
+     * Example 2: nodeA is not contained by nodeB since C < 0 does not contain any of A's conjuncts
+     * where
+     *      nodeA = (A > 5) and (A < 10) and (B > 0) and C = 5, and
+     *      nodeB = (A > 0) and (C < 0)
+     *
+     * @param nodeA is a simple term or AndExpression constructed from simple terms
+     * @param nodeB is a simple term or AndExpression constructed from simple terms
+     * @return true if nodeA is contained by nodeB.
+     */
+    private static boolean contained(Expression nodeA, Expression nodeB) {
+        if (nodeB instanceof AndExpression) {
+            for (Expression childB : nodeB.getChildren()) {
+                if (nodeA instanceof AndExpression) {
+                    boolean contains = false;
+                    for (Expression childA : nodeA.getChildren()) {
+                        if (childB.contains(childA)) {
+                            contains = true;
+                            break;
+                        }
+                    }
+                    if (!contains) {
+                        return false;
+                    }
+                } else {
+                    // node A is a simple term
+                    if (!childB.contains(nodeA)) {
+                        return false;
+                    }
+                }
+            }
+        } else {
+            // node B is a simple term
+            if (nodeA instanceof AndExpression) {
+                boolean contains = false;
+                for (Expression childA : nodeA.getChildren()) {
+                    if (nodeB.contains(childA)) {
+                        contains = true;
+                        break;
+                    }
+                }
+                if (!contains) {
+                    return false;
+                }
+            } else {
+                // Both nodeA and nodeB are simple terms
+                if (!nodeB.contains(nodeA)) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+    /**
+     * Determines if node is contained in one of the elements of l
+     *
+     * @param node is a simple term or AndExpression constructed from simple terms
+     * @param l is a list of nodes where a node is a simple term or AndExpression constructed from
+     *          simple terms
+     * @return true if an element of the list contains node
+     */
+    private static boolean contained(Expression node, List<Expression> l) {
+        for (Expression e : l) {
+            if (contained(node, e)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static boolean containsDisjunct(Expression nodeA, Expression nodeB)
+            throws SQLException {
+        // nodeB is a disjunct, that is, either an AND expression or a simple term
+        if (nodeA instanceof OrExpression) {
+            // node A is an OR expression. The following check if nodeB is contained by
+            // any of the disjuncts of nodeA
+            if (!contained(nodeB, nodeA.getChildren())) {
+                return false;
+            }
+        } else {
+            // Both nodeA and nodeB are either an AND expression or a simple term (e.g., C < 5)
+            if (!contained(nodeB, nodeA)) {
+                return false;
+            }
+        }
+        return true;
+    }
+    /**
+     * Determines if nodeA contains/implies nodeB. Both nodeA and B are DNF (Disjunctive Normal
+     * Form) expressions. nodeA contains nodeB if every disjunct of nodeB is contained
+     * by a nodeA disjunct. A disjunct x contains another disjunct y if every conjunct of x
+     * contains at least one conjunct of y.
+     *
+     * Example:
+     * nodeA: (A > 0 AND B > 0) OR C < 5
+     * nodeB: (A = 5 AND B > 1) OR (A = 3 AND C = 1)
+     *
+     * Disjuncts of nodeA: (A > 0 AND B > 0) and C < 5
+     * Disjuncts of nodeB: (A = 5 AND B > 1) and (A = 3 AND C = 1)
+     *
+     * Conjuncts of (A > 0 AND B > 0): A > 0 and B > 0
+     * Conjuncts of C < 5 : C < 5
+     *
+     * nodeA contains node B because every disjunct of nodeB is contained
+     * by a nodeA disjunct. The first disjunct (A = 5 AND B > 1) is contained by the disjunct
+     * (A > 0 AND B > 0). The second disjunct (A = 3 AND C = 1) is contained by C < 5. Please node
+     * a disjunct x contains another disjunct y if every conjunct of x contains at least one
+     * conjunct of y as in the example above.
+     *
+     * @param nodeA is an expression in DNF
+     * @param nodeB is an expression in DNF
+     * @return true if nodeA contains/implies nodeB
+     * @throws SQLException
+     */
+    public static boolean contains(Expression nodeA, Expression nodeB) throws SQLException {
+        if (nodeA == null) {
+            return true;
+        } else if (nodeB == null) {
+            return false;
+        }
+        if (nodeB instanceof OrExpression) {
+            // Check if every disjunct of nodeB is contained by a nodeA disjunct
+            for (Expression childB : nodeB.getChildren()) {
+                if (!containsDisjunct(nodeA, childB)) {
+                    return false;
+                }
+            }
+            return true;
+        } else {
+            // nodeB is either an AND expression or a simple term
+            return containsDisjunct(nodeA, nodeB);
+        }
+    }
+
     private static class SubqueryParseNodeVisitor extends StatelessTraverseAllParseNodeVisitor {
         private final StatementContext context;
         private final Set<SubqueryParseNode> subqueryNodes;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index b615d31b13..0f07fe65d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -1173,9 +1173,8 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
         return mutationList;
     }
 
-    private static Put prepareIndexPutForRebuid(IndexMaintainer indexMaintainer, ImmutableBytesPtr rowKeyPtr,
-                                                ValueGetter mergedRowVG, long ts)
-            throws IOException {
+    private static Put prepareIndexPutForRebuild(IndexMaintainer indexMaintainer,
+            ImmutableBytesPtr rowKeyPtr, ValueGetter mergedRowVG, long ts) throws IOException {
         Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
                 mergedRowVG, rowKeyPtr, ts, null, null, false);
         if (indexPut == null) {
@@ -1247,11 +1246,13 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
     }
 
     /**
-     * Generate the index update for a data row from the mutation that are obtained by merging the previous data row
-     * state with the pending row mutation for index rebuild. This method is called only for global indexes.
-     * pendingMutations is a sorted list of data table mutations that are used to replay index table mutations.
-     * This list is sorted in ascending order by the tuple of row key, timestamp and mutation type where delete comes
-     * after put.
+     * Generate the index update for a data row from the mutation that are obtained by merging
+     * the previous data row state with the pending row mutation for index rebuild. This method is
+     * called only for global indexes including covered full, covered partial, uncovered full, and
+     * uncovered partial indexes.
+     * pendingMutations is a sorted list of data table mutations that are used to replay index
+     * table mutations. This list is sorted in ascending order by the tuple of row key, timestamp
+     * and mutation type where delete comes after put.
      */
     public static List<Mutation> prepareIndexMutationsForRebuild(IndexMaintainer indexMaintainer,
                                                                  Put dataPut, Delete dataDel) throws IOException {
@@ -1300,14 +1301,28 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
 
                 if (mutation.getFamilyCellMap().size() != 0) {
                     // Add this put on top of the current data row state to get the next data row state
-                    Put nextDataRow = (currentDataRowState == null) ? new Put((Put)mutation) : applyNew((Put)mutation, currentDataRowState);
+                    Put nextDataRow = (currentDataRowState == null) ? new Put((Put) mutation) :
+                            applyNew((Put) mutation, currentDataRowState);
+                    if (!indexMaintainer.shouldPrepareIndexMutations(nextDataRow)) {
+                        currentDataRowState = nextDataRow;
+                        if (indexRowKeyForCurrentDataRow != null) {
+                            Mutation del = indexMaintainer.buildRowDeleteMutation(
+                                    indexRowKeyForCurrentDataRow,
+                                    IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                            indexMutations.add(del);
+                        }
+                        indexRowKeyForCurrentDataRow = null;
+                        continue;
+                    }
                     ValueGetter nextDataRowVG = new IndexUtil.SimpleValueGetter(nextDataRow);
-                    Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
+                    Put indexPut = prepareIndexPutForRebuild(indexMaintainer, rowKeyPtr,
+                            nextDataRowVG, ts);
                     indexMutations.add(indexPut);
                     // Delete the current index row if the new index key is different than the current one
-                    if (currentDataRowState != null) {
+                    if (indexRowKeyForCurrentDataRow != null) {
                         if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
-                            Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                            Mutation del = indexMaintainer.buildRowDeleteMutation(
+                                    indexRowKeyForCurrentDataRow,
                                     IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
                             indexMutations.add(del);
                         }
@@ -1330,24 +1345,42 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
                 applyDeleteOnPut(deleteToApply, currentDataRowState);
                 Put nextDataRowState = currentDataRowState;
                 if (nextDataRowState.getFamilyCellMap().size() == 0) {
-                    Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
-                            IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
-                    indexMutations.add(del);
+                    if (indexRowKeyForCurrentDataRow != null) {
+                        Mutation
+                                del =
+                                indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                                        IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                        indexMutations.add(del);
+                    }
                     currentDataRowState = null;
                     indexRowKeyForCurrentDataRow = null;
-                } else {
+                } else if (indexRowKeyForCurrentDataRow != null) {
+                    if (!indexMaintainer.shouldPrepareIndexMutations(nextDataRowState)) {
+                        currentDataRowState = nextDataRowState;
+                        Mutation del = indexMaintainer.buildRowDeleteMutation(
+                                indexRowKeyForCurrentDataRow,
+                                IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+                        indexMutations.add(del);
+                        indexRowKeyForCurrentDataRow = null;
+                        continue;
+                    }
                     ValueGetter nextDataRowVG = new IndexUtil.SimpleValueGetter(nextDataRowState);
-                    Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
+                    Put indexPut = prepareIndexPutForRebuild(indexMaintainer, rowKeyPtr,
+                            nextDataRowVG, ts);
                     indexMutations.add(indexPut);
                     // Delete the current index row if the new index key is different than the current one
                     if (indexRowKeyForCurrentDataRow != null) {
                         if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
-                            Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+                            Mutation del = indexMaintainer.buildRowDeleteMutation(
+                                    indexRowKeyForCurrentDataRow,
                                     IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
                             indexMutations.add(del);
                         }
                     }
                     indexRowKeyForCurrentDataRow = indexPut.getRow();
+                } else {
+                    currentDataRowState = nextDataRowState;
+                    indexRowKeyForCurrentDataRow = null;
                 }
             }
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 422eac328e..4605554409 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -41,6 +41,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYT
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_WHERE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED_BYTES;
@@ -365,6 +366,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         TABLE_FAMILY_BYTES, EXTERNAL_SCHEMA_ID_BYTES);
     private static final Cell STREAMING_TOPIC_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
         TABLE_FAMILY_BYTES, STREAMING_TOPIC_NAME_BYTES);
+    private static final Cell INDEX_WHERE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
+            TABLE_FAMILY_BYTES, INDEX_WHERE_BYTES);
 
     private static final List<Cell> TABLE_KV_COLUMNS = Lists.newArrayList(
             EMPTY_KEYVALUE_KV,
@@ -404,7 +407,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             CHANGE_DETECTION_ENABLED_KV,
             SCHEMA_VERSION_KV,
             EXTERNAL_SCHEMA_ID_KV,
-            STREAMING_TOPIC_NAME_KV
+            STREAMING_TOPIC_NAME_KV,
+            INDEX_WHERE_KV
     );
 
     static {
@@ -452,6 +456,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         TABLE_KV_COLUMNS.indexOf(EXTERNAL_SCHEMA_ID_KV);
     private static final int STREAMING_TOPIC_NAME_INDEX =
         TABLE_KV_COLUMNS.indexOf(STREAMING_TOPIC_NAME_KV);
+    private static final int INDEX_WHERE_INDEX =
+            TABLE_KV_COLUMNS.indexOf(INDEX_WHERE_KV);
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
     private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
@@ -1423,6 +1429,13 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         builder.setStreamingTopicName(streamingTopicName != null ? streamingTopicName :
             oldTable != null ? oldTable.getStreamingTopicName() : null);
 
+        Cell indexWhereKv = tableKeyValues[INDEX_WHERE_INDEX];
+        String indexWhere = indexWhereKv != null
+                ? (String) PVarchar.INSTANCE.toObject(indexWhereKv.getValueArray(),
+                        indexWhereKv.getValueOffset(), indexWhereKv.getValueLength())
+                : null;
+        builder.setIndexWhere(indexWhere != null ? indexWhere
+                : oldTable != null ? oldTable.getIndexWhere() : null);
         // Check the cell tag to see whether the view has modified this property
         final byte[] tagUseStatsForParallelization = (useStatsForParallelizationKv == null) ?
                 HConstants.EMPTY_BYTE_ARRAY :
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index af6c1cf312..a88a991a04 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -99,7 +99,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0 = MIN_TABLE_TIMESTAMP + 29;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = MIN_TABLE_TIMESTAMP + 33;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_1_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = MIN_TABLE_TIMESTAMP + 37;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = MIN_TABLE_TIMESTAMP + 38;
     // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index 46c61bb1b1..438c014ca3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -272,19 +272,21 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
         return scanIndexTableRows(result, startTime, null, 0);
     }
 
-    private boolean verifyIndexRowAndRepairIfNecessary(Result dataRow, byte[] indexRowKey, long ts) throws IOException {
+    private boolean verifyIndexRowAndRepairIfNecessary(Result dataRow, byte[] indexRowKey,
+            long indexTimestamp)
+            throws IOException {
         Put put = new Put(dataRow.getRow());
         for (Cell cell : dataRow.rawCells()) {
             put.add(cell);
         }
         if (indexMaintainer.checkIndexRow(indexRowKey, put)) {
-            if (IndexUtil.getMaxTimestamp(put) != ts) {
+            if (IndexUtil.getMaxTimestamp(put) != indexTimestamp) {
                 Mutation[] mutations;
                 Put indexPut = new Put(indexRowKey);
-                indexPut.addColumn(emptyCF, emptyCQ, ts, QueryConstants.VERIFIED_BYTES);
-                if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
+                indexPut.addColumn(emptyCF, emptyCQ, indexTimestamp, QueryConstants.VERIFIED_BYTES);
+                if ((EnvironmentEdgeManager.currentTimeMillis() - indexTimestamp) > ageThreshold) {
                     Delete indexDelete = indexMaintainer.buildRowDeleteMutation(indexRowKey,
-                            IndexMaintainer.DeleteType.SINGLE_VERSION, ts);
+                            IndexMaintainer.DeleteType.SINGLE_VERSION, indexTimestamp);
                     mutations = new Mutation[]{indexPut, indexDelete};
                 } else {
                     mutations = new Mutation[]{indexPut};
@@ -293,7 +295,8 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner {
             }
             return true;
         }
-        indexMaintainer.deleteRowIfAgedEnough(indexRowKey, ts, ageThreshold, false, region);
+        indexMaintainer.deleteRowIfAgedEnough(indexRowKey, IndexUtil.getMaxTimestamp(put),
+                ageThreshold, false, region);
         return false;
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index fd6b8af346..de7ff1c591 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -443,8 +443,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         if (((localIndexScan || uncoveredGlobalIndexScan) && !isDelete && !isDescRowKeyOrderUpgrade) || (j == null && p != null)) {
             if (dataColumns != null) {
                 tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
-                viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
             }
+            viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
             ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
             theScanner =
                     getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 67dd2635a1..3f4d5168af 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -106,7 +106,10 @@ public enum SQLExceptionCode {
         }
     }),
     CANNOT_INDEX_COLUMN_ON_TYPE(302, "23100", "The column cannot be index due to its type."),
-
+    INVALID_INDEX_WHERE_WITH_SUBQUERY(303, "23101",
+            " Index where clause cannot include a subquery."),
+    CANNOT_EVALUATE_INDEX_WHERE(304, "23102",
+            "Invalid index where clause. It cannot be evaluated on a data table row."),
     /**
      * Invalid Cursor State (errorcode 04, sqlstate 24)
      */
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
index d40bf44d97..9a85b1c061 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -482,8 +482,9 @@ public class PhoenixTxIndexMutationGenerator {
         }
     }
     
-    public static PhoenixTxIndexMutationGenerator newGenerator(final PhoenixConnection connection, PTable table, List<PTable> indexes,
-            Map<String, byte[]> attributes) {
+    public static PhoenixTxIndexMutationGenerator newGenerator(final PhoenixConnection connection,
+            PTable table, List<PTable> indexes, Map<String, byte[]> attributes)
+            throws SQLException {
         final List<IndexMaintainer> indexMaintainers = Lists.newArrayListWithExpectedSize(indexes.size());
         for (PTable index : indexes) {
             IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ComparisonExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ComparisonExpression.java
index dc7a3cd815..6879cc9356 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ComparisonExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ComparisonExpression.java
@@ -22,17 +22,15 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.expression.function.ArrayElemRefExpression;
-import org.apache.phoenix.expression.function.InvertFunction;
 import org.apache.phoenix.expression.rewrite.RowValueConstructorExpressionRewriter;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
 import org.apache.phoenix.schema.SortOrder;
@@ -53,6 +51,12 @@ import org.apache.phoenix.util.StringUtil;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 
+import static org.apache.hadoop.hbase.CompareOperator.EQUAL;
+import static org.apache.hadoop.hbase.CompareOperator.GREATER;
+import static org.apache.hadoop.hbase.CompareOperator.GREATER_OR_EQUAL;
+import static org.apache.hadoop.hbase.CompareOperator.LESS;
+import static org.apache.hadoop.hbase.CompareOperator.LESS_OR_EQUAL;
+import static org.apache.hadoop.hbase.CompareOperator.NOT_EQUAL;
 
 /**
  * 
@@ -352,7 +356,133 @@ public class ComparisonExpression extends BaseCompoundExpression {
         ptr.set(ByteUtil.compare(op, comparisonResult) ? PDataType.TRUE_BYTES : PDataType.FALSE_BYTES);
         return true;
     }
-    
+
+    @Override
+    public boolean contains(Expression other) {
+        if (!(other instanceof ComparisonExpression || other instanceof IsNullExpression)) {
+            return false;
+        }
+        if (other instanceof IsNullExpression) {
+            return !((IsNullExpression) other).isNegate();
+        }
+
+        BaseTerminalExpression lhsA =
+                WhereCompiler.getBaseTerminalExpression(this.getChildren().get(0));
+        BaseTerminalExpression lhsB =
+                WhereCompiler.getBaseTerminalExpression(other.getChildren().get(0));
+        if (!lhsA.equals(lhsB)) {
+            return false;
+        }
+        CompareOperator opA = this.getFilterOp();
+        CompareOperator opB = ((ComparisonExpression) other).getFilterOp();
+        BaseTerminalExpression rhs = WhereCompiler.getBaseTerminalExpression(
+                this.getChildren().get(1));
+        if (rhs instanceof ColumnExpression) {
+            BaseTerminalExpression rhsB = WhereCompiler.getBaseTerminalExpression(
+                    other.getChildren().get(1));
+            if (!rhs.equals(rhsB)) {
+                return false;
+            }
+            switch (opA) {
+                case LESS_OR_EQUAL:
+                    if (opB == LESS || opB == LESS_OR_EQUAL || opB == EQUAL) {
+                        return true;
+                    }
+                    return false;
+                case LESS:
+                case EQUAL:
+                case NOT_EQUAL:
+                case GREATER:
+                    if (opA == opB) {
+                        return true;
+                    }
+                    return false;
+                case GREATER_OR_EQUAL:
+                    if (opB == GREATER || opB == GREATER_OR_EQUAL || opB == EQUAL) {
+                        return true;
+                    }
+                    return false;
+                default:
+                    throw new IllegalArgumentException("Unexpected CompareOp " + opA);
+            }
+        }
+        LiteralExpression rhsA = WhereCompiler.getLiteralExpression(this.getChildren().get(1));
+        LiteralExpression rhsB = WhereCompiler.getLiteralExpression(other.getChildren().get(1));
+        Object valA = rhsA.getValue();
+        Object valB = rhsB.getValue();
+
+        PDataType typeA = rhsA.getDataType();
+        PDataType typeB = rhsB.getDataType();
+        switch (opA){
+        case LESS:
+            if (opB == GREATER_OR_EQUAL || opB == GREATER || opB == NOT_EQUAL) {
+                return false;
+            }
+            if (opB == LESS) {
+                if (typeA.compareTo(valA, valB, typeB) >= 0) {
+                    return true;
+                }
+                return false;
+            }
+            if (opB == LESS_OR_EQUAL || opB == EQUAL) {
+                if (typeA.compareTo(valA, valB, typeB) > 0) {
+                    return true;
+                }
+                return false;
+            }
+            return false;
+        case LESS_OR_EQUAL:
+            if (opB == GREATER_OR_EQUAL || opB == GREATER || opB ==NOT_EQUAL) {
+                return false;
+            }
+            if (opB == LESS_OR_EQUAL || opB == LESS || opB == EQUAL) {
+                if (typeA.compareTo(valA, valB, typeB) >= 0) {
+                    return true;
+                }
+                return false;
+            }
+            return false;
+        case EQUAL:
+        case NOT_EQUAL:
+            if (opA != opB) {
+                return false;
+            }
+            if (typeA.compareTo(valA, valB, typeB) == 0) {
+                return true;
+            }
+            return false;
+        case GREATER_OR_EQUAL:
+            if (opB == LESS_OR_EQUAL || opB == LESS || opB ==NOT_EQUAL) {
+                return false;
+            }
+            if (opB == GREATER_OR_EQUAL || opB == GREATER || opB == EQUAL) {
+                if (typeA.compareTo(valA, valB, typeB) <= 0) {
+                    return true;
+                }
+                return false;
+            }
+            return false;
+        case GREATER:
+            if (opB == LESS_OR_EQUAL || opB == LESS || opB ==NOT_EQUAL) {
+                return false;
+            }
+            if (opB == GREATER) {
+                if (typeA.compareTo(valA, valB, typeB) <= 0) {
+                    return true;
+                }
+                return false;
+            }
+            if (opB == GREATER_OR_EQUAL || opB == EQUAL) {
+                if (typeA.compareTo(valA, valB, typeB) < 0) {
+                    return true;
+                }
+                return false;
+            }
+            return false;
+        default:
+            throw new IllegalArgumentException("Unexpected CompareOp of " + opA);
+        }
+    }
     @Override
     public void readFields(DataInput input) throws IOException {
         op = CompareOperator.values()[WritableUtils.readVInt(input)];
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/Expression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/Expression.java
index d490ced462..b744a6e504 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/Expression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/Expression.java
@@ -94,4 +94,14 @@ public interface Expression extends PDatum, Writable {
      * @return
      */
     boolean isCloneExpression();
+
+    /**
+     * Determines if this contains/implies other. For example A > 0 contains A >= 5.
+     * @param other is an expression with the lhs (left-hand side) column having the same type of
+     *              the lhs column of other;
+     * @return true if this contains other.
+     */
+    default boolean contains(Expression other) {
+        return this.equals(other);
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/IsNullExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/IsNullExpression.java
index c2f43cbcbb..09f9a6e36d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/IsNullExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/IsNullExpression.java
@@ -85,7 +85,21 @@ public class IsNullExpression extends BaseSingleExpression {
     public boolean isNegate() {
         return isNegate;
     }
-    
+
+    @Override
+    public boolean contains(Expression other) {
+        if (!(other instanceof ComparisonExpression || other instanceof IsNullExpression)) {
+            return false;
+        }
+        if (!this.getChildren().get(0).equals(other.getChildren().get(0))) {
+            return false;
+        }
+        if (other instanceof ComparisonExpression) {
+            return isNegate;
+        }
+        return isNegate == ((IsNullExpression) other).isNegate;
+    }
+
     @Override
     public void readFields(DataInput input) throws IOException {
         super.readFields(input);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index e8d9cb5280..593fdbe0d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -79,7 +79,6 @@ import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
-import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.expression.Expression;
@@ -811,15 +810,25 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
         }
     }
 
-    private boolean isPartialUncoveredIndexUpdate(PhoenixIndexMetaData indexMetaData,
-            MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+    /**
+     * Determines if any of the data table mutations in the given batch does not include all
+     * the indexed columns or the where clause columns for partial uncovered indexes.
+     */
+    private boolean isPartialUncoveredIndexMutation(PhoenixIndexMetaData indexMetaData,
+            MiniBatchOperationInProgress<Mutation> miniBatchOp) {
         int indexedColumnCount = 0;
         for (IndexMaintainer indexMaintainer : indexMetaData.getIndexMaintainers()) {
             indexedColumnCount += indexMaintainer.getIndexedColumns().size();
+            if (indexMaintainer.getIndexWhereColumns() != null) {
+                indexedColumnCount += indexMaintainer.getIndexWhereColumns().size();
+            }
         }
-        Set<ColumnReference> indexedColumns = new HashSet<ColumnReference>(indexedColumnCount);
+        Set<ColumnReference> columns = new HashSet<ColumnReference>(indexedColumnCount);
         for (IndexMaintainer indexMaintainer : indexMetaData.getIndexMaintainers()) {
-            indexedColumns.addAll(indexMaintainer.getIndexedColumns());
+            columns.addAll(indexMaintainer.getIndexedColumns());
+            if (indexMaintainer.getIndexWhereColumns() != null) {
+                columns.addAll(indexMaintainer.getIndexWhereColumns());
+            }
         }
         for (int i = 0; i < miniBatchOp.size(); i++) {
             if (miniBatchOp.getOperationStatus(i) == IGNORE) {
@@ -829,8 +838,8 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
             if (!this.builder.isEnabled(m)) {
                 continue;
             }
-            for (ColumnReference indexedColumn : indexedColumns) {
-                if (m.get(indexedColumn.getFamily(), indexedColumn.getQualifier()).isEmpty()) {
+            for (ColumnReference column : columns) {
+                if (m.get(column.getFamily(), column.getQualifier()).isEmpty()) {
                     // The returned list is empty, which means the indexed column is not
                     // included. This mutation would result in partial index update (and thus
                     // index column values should be retrieved from the existing data table row)
@@ -917,7 +926,8 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
             for (Pair<IndexMaintainer, HTableInterfaceReference> pair : indexTables) {
                 IndexMaintainer indexMaintainer = pair.getFirst();
                 HTableInterfaceReference hTableInterfaceReference = pair.getSecond();
-                if (nextDataRowState != null) {
+                if (nextDataRowState != null
+                        && indexMaintainer.shouldPrepareIndexMutations(nextDataRowState)) {
                     ValueGetter nextDataRowVG = new IndexUtil.SimpleValueGetter(nextDataRowState);
                     Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
                             nextDataRowVG, rowKeyPtr, ts, null, null, false);
@@ -948,7 +958,8 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
                                     new Pair<Mutation, byte[]>(del, rowKeyPtr.get()));
                         }
                     }
-                } else if (currentDataRowState != null) {
+                } else if (currentDataRowState != null
+                        && indexMaintainer.shouldPrepareIndexMutations(currentDataRowState)) {
                     ValueGetter currentDataRowVG = new IndexUtil.SimpleValueGetter(currentDataRowState);
                     byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
                             null, null, ts);
@@ -1162,7 +1173,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
             context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size());
             if (context.hasGlobalIndex || context.hasTransform || context.hasAtomic ||
                     context.hasDelete ||  (context.hasUncoveredIndex &&
-                    isPartialUncoveredIndexUpdate(indexMetaData, miniBatchOp))) {
+                    isPartialUncoveredIndexMutation(indexMetaData, miniBatchOp))) {
                 getCurrentRowStates(c, context);
             }
             onDupCheckTime += (EnvironmentEdgeManager.currentTimeMillis() - start);
@@ -1509,7 +1520,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
       // stores the intermediate values as we apply conditional update expressions
       List<Cell> flattenedCells;
       // read the column values requested in the get from the current data row
-      List<Cell> cells = readColumnsFromRow(currentDataRowState, colsReadInExpr);
+      List<Cell> cells = IndexUtil.readColumnsFromRow(currentDataRowState, colsReadInExpr);
 
       if (currentDataRowState == null) { // row doesn't exist
           if (skipFirstOp) {
@@ -1601,34 +1612,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
       return mutations;
   }
 
-    private List<Cell> readColumnsFromRow(Put currentDataRow, Set<ColumnReference> cols) {
-        if (currentDataRow == null) {
-            return Collections.EMPTY_LIST;
-        }
-
-        List<Cell> columnValues = Lists.newArrayList();
 
-        // just return any cell FirstKeyOnlyFilter
-        if (cols.isEmpty()) {
-            for (List<Cell> cells : currentDataRow.getFamilyCellMap().values()) {
-                if (cells == null || cells.isEmpty()) {
-                    continue;
-                }
-                columnValues.add(cells.get(0));
-                break;
-            }
-            return columnValues;
-        }
-
-        IndexUtil.SimpleValueGetter valueGetter = new IndexUtil.SimpleValueGetter(currentDataRow);
-        for (ColumnReference colRef : cols) {
-            Cell cell = valueGetter.getLatestCell(colRef, HConstants.LATEST_TIMESTAMP);
-            if (cell != null) {
-                columnValues.add(cell);
-            }
-        }
-        return columnValues;
-    }
 
     private static List<Cell> flattenCells(Mutation m) {
         List<Cell> flattenedCells = new ArrayList<>();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 0ad9e76aee..c943cb442e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -41,6 +41,7 @@ import java.util.Set;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
@@ -101,7 +102,9 @@ import org.apache.phoenix.schema.ValueSchema;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.transform.TransformMaintainer;
 import org.apache.phoenix.schema.tuple.BaseTuple;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.ValueGetterTuple;
+import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 
@@ -141,7 +144,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     private static final int EXPRESSION_NOT_PRESENT = -1;
     private static final int ESTIMATED_EXPRESSION_SIZE = 8;
     
-    public static IndexMaintainer create(PTable dataTable, PTable index, PhoenixConnection connection) {
+    public static IndexMaintainer create(PTable dataTable, PTable index,
+            PhoenixConnection connection) throws SQLException {
         if (dataTable.getType() == PTableType.INDEX || index.getType() != PTableType.INDEX || !dataTable.getIndexes().contains(index)) {
             throw new IllegalArgumentException();
         }
@@ -198,13 +202,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
      * @param dataTable data table
      * @param ptr bytes pointer to hold returned serialized value
      */
-    public static void serialize(PTable dataTable, ImmutableBytesWritable ptr, PhoenixConnection connection) {
+    public static void serialize(PTable dataTable, ImmutableBytesWritable ptr,
+            PhoenixConnection connection) throws SQLException {
         List<PTable> indexes = dataTable.getIndexes();
         serializeServerMaintainedIndexes(dataTable, ptr, indexes, connection);
     }
 
     public static void serializeServerMaintainedIndexes(PTable dataTable, ImmutableBytesWritable ptr,
-            List<PTable> indexes, PhoenixConnection connection) {
+            List<PTable> indexes, PhoenixConnection connection) throws SQLException {
         Iterator<PTable> indexesItr = Collections.emptyListIterator();
         boolean onlyLocalIndexes = dataTable.isImmutableRows() || dataTable.isTransactional();
         if (onlyLocalIndexes) {
@@ -225,7 +230,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
      * @param indexes indexes to serialize
      */
     public static void serialize(PTable dataTable, ImmutableBytesWritable ptr,
-            List<PTable> indexes, PhoenixConnection connection) {
+            List<PTable> indexes, PhoenixConnection connection) throws SQLException {
         if (indexes.isEmpty() && dataTable.getTransformingNewTable() == null) {
             ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
             return;
@@ -279,7 +284,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
      * @param keyValueIndexes indexes to serialize
      */
     public static void serializeAdditional(PTable table, ImmutableBytesWritable indexMetaDataPtr,
-            List<PTable> keyValueIndexes, PhoenixConnection connection) {
+            List<PTable> keyValueIndexes, PhoenixConnection connection) throws SQLException {
         int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : ByteUtil.vintFromBytes(indexMetaDataPtr);
         int nIndexes = nMutableIndexes + keyValueIndexes.size();
         int estimatedSize = indexMetaDataPtr.getLength() + 1; // Just in case new size increases buffer
@@ -430,13 +435,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     private String logicalIndexName;
 
     private boolean isUncovered;
+    private Expression indexWhere;
+    private Set<ColumnReference> indexWhereColumns;
 
     protected IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
         this.dataRowKeySchema = dataRowKeySchema;
         this.isDataTableSalted = isDataTableSalted;
     }
     
-    private IndexMaintainer(final PTable dataTable, final PTable index, PhoenixConnection connection) {
+    private IndexMaintainer(final PTable dataTable, final PTable index,
+            PhoenixConnection connection) throws SQLException {
         this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null);
         this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable();
         this.isMultiTenant = dataTable.isMultiTenant();
@@ -662,6 +670,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         }
         this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize);
         this.logicalIndexName = index.getName().getString();
+        if (index.getIndexWhere() != null) {
+            this.indexWhere = index.getIndexWhereExpression(connection);
+            this.indexWhereColumns = index.getIndexWhereColumns(connection);
+        }
+
         initCachedState();
     }
 
@@ -923,6 +936,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     }
     public boolean checkIndexRow(final byte[] indexRowKey,
                                         final Put dataRow) {
+        if (!shouldPrepareIndexMutations(dataRow)) {
+            return false;
+        }
         byte[] builtIndexRowKey = getIndexRowKey(dataRow);
         if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
                 indexRowKey, 0, indexRowKey.length) != 0) {
@@ -931,6 +947,34 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         return true;
     }
 
+    /**
+     * Determines if the index row for a given data row should be prepared. For full
+     * indexes, index rows should always be prepared. For the partial indexes, the index row should
+     * be prepared only if the index where clause is satisfied on the given data row.
+     *
+     * @param dataRowState data row represented as a put mutation, that is list of put cells
+     * @return always true for full indexes, and true for partial indexes if the index where
+     * expression evaluates to true on the given data row
+     */
+
+    public boolean shouldPrepareIndexMutations(Put dataRowState) {
+        if (getIndexWhere() == null) {
+            // It is a full index and the index row should be prepared.
+            return true;
+        }
+        List<Cell> cols = IndexUtil.readColumnsFromRow(dataRowState, getIndexWhereColumns());
+        // Cells should be sorted as they are searched using a binary search during expression
+        // evaluation
+        Collections.sort(cols, CellComparator.getInstance());
+        MultiKeyValueTuple tuple = new MultiKeyValueTuple(cols);
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+        if (!getIndexWhere().evaluate(tuple, ptr)) {
+            return false;
+        }
+        Object value = PBoolean.INSTANCE.toObject(ptr);
+        return value.equals(Boolean.TRUE);
+    }
     public void deleteRowIfAgedEnough(byte[] indexRowKey, long ts, long ageThreshold,
                                       boolean singleVersion, Region region) throws IOException {
         if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
@@ -1456,16 +1500,26 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         return allColumns;
     }
 
-    public Set<ColumnReference> getAllColumnsForDataTable() {
-        Set<ColumnReference> result = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumnsMap.size());
-        result.addAll(indexedColumns);
-        for (ColumnReference colRef : coveredColumnsMap.keySet()) {
+
+    private void addColumnRefForScan(Set<ColumnReference> from, Set<ColumnReference> to) {
+        for (ColumnReference colRef : from) {
             if (getDataImmutableStorageScheme()==ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
-                result.add(colRef);
+                to.add(colRef);
             } else {
-                result.add(new ColumnReference(colRef.getFamily(), QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES));
+                to.add(new ColumnReference(colRef.getFamily(),
+                        QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES));
             }
         }
+    }
+    public Set<ColumnReference> getAllColumnsForDataTable() {
+        Set<ColumnReference> result = Sets.newLinkedHashSetWithExpectedSize(
+                indexedExpressions.size() + coveredColumnsMap.size()
+                        + (indexWhereColumns == null ? 0 : indexWhereColumns.size()));
+        addColumnRefForScan(indexedColumns, result);
+        addColumnRefForScan(coveredColumnsMap.keySet(), result);
+        if (indexWhereColumns != null) {
+            addColumnRefForScan(indexWhereColumns, result);
+        }
         return result;
     }
 
@@ -1698,6 +1752,27 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         } else {
             maintainer.isUncovered = false;
         }
+        if (proto.hasIndexWhere()) {
+            try (ByteArrayInputStream stream =
+                    new ByteArrayInputStream(proto.getIndexWhere().toByteArray())) {
+                DataInput input = new DataInputStream(stream);
+                int expressionOrdinal = WritableUtils.readVInt(input);
+                Expression expression = ExpressionType.values()[expressionOrdinal].newInstance();
+                expression.readFields(input);
+                maintainer.indexWhere = expression;
+                List<ServerCachingProtos.ColumnReference> indexWhereColumnsList =
+                        proto.getIndexWhereColumnsList();
+                maintainer.indexWhereColumns = new HashSet<>(indexWhereColumnsList.size());
+                for (ServerCachingProtos.ColumnReference colRefFromProto : indexWhereColumnsList) {
+                    maintainer.indexWhereColumns.add(new ColumnReference(
+                            colRefFromProto.getFamily().toByteArray(),
+                            colRefFromProto.getQualifier().toByteArray()));
+                }
+            }
+        } else {
+            maintainer.indexWhere = null;
+            maintainer.indexWhereColumns = null;
+        }
         maintainer.initCachedState();
         return maintainer;
     }
@@ -1825,6 +1900,22 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         builder.setDataEncodingScheme(maintainer.dataEncodingScheme.getSerializedMetadataValue());
         builder.setDataImmutableStorageScheme(maintainer.dataImmutableStorageScheme.getSerializedMetadataValue());
         builder.setIsUncovered(maintainer.isUncovered);
+        if (maintainer.indexWhere != null) {
+            try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+                DataOutput output = new DataOutputStream(stream);
+                WritableUtils.writeVInt(output,
+                        ExpressionType.valueOf(maintainer.indexWhere).ordinal());
+                maintainer.indexWhere.write(output);
+                builder.setIndexWhere(ByteStringer.wrap(stream.toByteArray()));
+                for (ColumnReference colRef : maintainer.indexWhereColumns) {
+                    ServerCachingProtos.ColumnReference.Builder cRefBuilder =
+                            ServerCachingProtos.ColumnReference.newBuilder();
+                    cRefBuilder.setFamily(ByteStringer.wrap(colRef.getFamily()));
+                    cRefBuilder.setQualifier(ByteStringer.wrap(colRef.getQualifier()));
+                    builder.addIndexWhereColumns(cRefBuilder.build());
+                }
+            }
+        }
         return builder.build();
     }
 
@@ -1863,7 +1954,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         size += estimatedExpressionSize;
         return size;
     }
-    
+    public Expression getIndexWhere() {
+        return indexWhere;
+    }
+
+    public Set<ColumnReference> getIndexWhereColumns() {
+        return indexWhereColumns;
+    }
     private int estimateIndexRowKeyByteSize(int indexColByteSize) {
         int estimatedIndexRowKeyBytes = indexColByteSize + dataRowKeySchema.getEstimatedValueLength() + (nIndexSaltBuckets == 0 || isLocalIndex || this.isDataTableSalted ? 0 : SaltingUtil.NUM_SALTING_BYTES);
         return estimatedIndexRowKeyBytes;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 59fb21f522..7877e71ca3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -176,15 +176,12 @@ public abstract class RegionScannerFactory {
                                 dataColumns[i].getQualifier());
                       }
                     }
-                  } else  if (indexMaintainer.isUncovered()) {
-                    // Indexed columns should also be added to the data columns to join for uncovered global indexes.
-                    // This is required to verify the index row against the data table row.
-                    for (ColumnReference cr: indexMaintainer.getIndexedColumns()) {
-                      if (storageScheme == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
-                        dataTableScan.addFamily(cr.getFamily());
-                      } else {
-                        dataTableScan.addColumn(cr.getFamily(), cr.getQualifier());
-                      }
+                  } else if (indexMaintainer.isUncovered()) {
+                    // Indexed columns and the columns in index where clause should also be added
+                    // to the data columns to scan for uncovered global indexes. This is required
+                    // to verify the index row against the data table row.
+                    for (ColumnReference column : indexMaintainer.getAllColumnsForDataTable()) {
+                      dataTableScan.addColumn(column.getFamily(), column.getQualifier());
                     }
                   }
                   if (ScanUtil.isLocalIndex(scan)) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 4283a59e00..9bfaf070d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -417,6 +417,9 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final String STREAMING_TOPIC_NAME = "STREAMING_TOPIC_NAME";
     public static final byte[] STREAMING_TOPIC_NAME_BYTES = Bytes.toBytes(STREAMING_TOPIC_NAME);
 
+    public static final String INDEX_WHERE = "INDEX_WHERE";
+    public static final byte[] INDEX_WHERE_BYTES = Bytes.toBytes(INDEX_WHERE);
+
     public static final String SYSTEM_CHILD_LINK_TABLE = "CHILD_LINK";
     public static final String SYSTEM_CHILD_LINK_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CHILD_LINK_TABLE);
     public static final byte[] SYSTEM_CHILD_LINK_NAME_BYTES = Bytes.toBytes(SYSTEM_CHILD_LINK_NAME);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 1996577fcb..145d08c1ce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -1375,9 +1375,13 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
 
     private static class ExecutableCreateIndexStatement extends CreateIndexStatement implements CompilableStatement {
 
-        public ExecutableCreateIndexStatement(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
-                ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, boolean async, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
-            super(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async , bindCount, udfParseNodes);
+        public ExecutableCreateIndexStatement(NamedNode indexName, NamedTableNode dataTable,
+                IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns,
+                List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props,
+                boolean ifNotExists, IndexType indexType, boolean async, int bindCount, Map<String,
+                UDFParseNode> udfParseNodes, ParseNode where) {
+            super(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists,
+                    indexType, async , bindCount, udfParseNodes, where);
         }
 
         @SuppressWarnings("unchecked")
@@ -1887,9 +1891,14 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
         }
         
         @Override
-        public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
-                ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, boolean async, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
-            return new ExecutableCreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount, udfParseNodes);
+        public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable,
+                IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns,
+                List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props,
+                boolean ifNotExists, IndexType indexType, boolean async, int bindCount, Map<String,
+                UDFParseNode> udfParseNodes, ParseNode where) {
+            return new ExecutableCreateIndexStatement(indexName, dataTable, ikConstraint,
+                    includeColumns, splits, props, ifNotExists, indexType, async, bindCount,
+                    udfParseNodes, where);
         }
         
         @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 0c8787170a..504ae47e91 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -37,6 +37,8 @@ import org.apache.phoenix.compile.QueryCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.WhereCompiler;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -57,6 +59,7 @@ import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.TableNode;
 import org.apache.phoenix.parse.TableNodeVisitor;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -65,7 +68,6 @@ import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
 import org.apache.phoenix.schema.TableRef;
@@ -75,6 +77,7 @@ import org.apache.phoenix.util.ParseNodeUtil;
 import org.apache.phoenix.util.ParseNodeUtil.RewriteResult;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 
 public class QueryOptimizer {
@@ -197,18 +200,29 @@ public class QueryOptimizer {
         return Collections.singletonList(compiler.compile());
     }
 
+    private static boolean isPartialIndexUsable(SelectStatement select, QueryPlan dataPlan,
+            PTable index) throws SQLException {
+        StatementContext context = new StatementContext(dataPlan.getContext());
+        context.setResolver(FromCompiler.getResolver(dataPlan.getTableRef()));
+        return WhereCompiler.contains(
+                index.getIndexWhereExpression(dataPlan.getContext().getConnection()),
+                WhereCompiler.transformDNF(select.getWhere(), context));
+    }
+
     private List<QueryPlan> getApplicablePlansForSingleFlatQuery(QueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, boolean stopAtBestPlan) throws SQLException {
         SelectStatement select = (SelectStatement)dataPlan.getStatement();
         // Exit early if we have a point lookup as we can't get better than that
-        if (dataPlan.getContext().getScanRanges().isPointLookup() && stopAtBestPlan && dataPlan.isApplicable()) {
+        if (dataPlan.getContext().getScanRanges().isPointLookup()
+                && stopAtBestPlan && dataPlan.isApplicable()) {
             return Collections.<QueryPlan> singletonList(dataPlan);
         }
-
         List<PTable>indexes = Lists.newArrayList(dataPlan.getTableRef().getTable().getIndexes());
-        if (dataPlan.isApplicable() && (indexes.isEmpty() || dataPlan.isDegenerate() || dataPlan.getTableRef().hasDynamicCols() || select.getHint().hasHint(Hint.NO_INDEX))) {
+        if (dataPlan.isApplicable() && (indexes.isEmpty()
+                || dataPlan.isDegenerate()
+                || dataPlan.getTableRef().hasDynamicCols()
+                || select.getHint().hasHint(Hint.NO_INDEX))) {
             return Collections.<QueryPlan> singletonList(dataPlan);
         }
-        
         // The targetColumns is set for UPSERT SELECT to ensure that the proper type conversion takes place.
         // For a SELECT, it is empty. In this case, we want to set the targetColumns to match the projection
         // from the dataPlan to ensure that the metadata for when an index is used matches the metadata for
@@ -227,7 +241,9 @@ public class QueryOptimizer {
         plans.add(dataPlan);
         QueryPlan hintedPlan = getHintedQueryPlan(statement, translatedIndexSelect, indexes, targetColumns, parallelIteratorFactory, plans);
         if (hintedPlan != null) {
-            if (stopAtBestPlan && hintedPlan.isApplicable()) {
+            PTable index = hintedPlan.getTableRef().getTable();
+            if (stopAtBestPlan && hintedPlan.isApplicable() && (index.getIndexWhere() == null
+                    || isPartialIndexUsable(select, dataPlan, index))) {
                 return Collections.singletonList(hintedPlan);
             }
             plans.add(0, hintedPlan);
@@ -235,7 +251,9 @@ public class QueryOptimizer {
         
         for (PTable index : indexes) {
             QueryPlan plan = addPlan(statement, translatedIndexSelect, index, targetColumns, parallelIteratorFactory, dataPlan, false);
-            if (plan != null) {
+            if (plan != null &&
+                    (index.getIndexWhere() == null
+                            || isPartialIndexUsable(select, dataPlan, index))) {
                 // Query can't possibly return anything so just return this plan.
                 if (plan.isDegenerate()) {
                     return Collections.singletonList(plan);
@@ -357,15 +375,36 @@ public class QueryOptimizer {
 
                 QueryPlan plan = compiler.compile();
                 if (indexTable.getIndexType() == IndexType.UNCOVERED_GLOBAL) {
-                    // Indexed columns should also be added to the data columns to join for uncovered global indexes.
-                    // This is required to verify index rows against data table rows
+                    // Indexed columns should also be added to the data columns to join for
+                    // uncovered global indexes. This is required to verify index rows against
+                    // data table rows
                     plan.getContext().setUncoveredIndex(true);
                     PhoenixConnection connection = statement.getConnection();
-                    PTable dataTable = connection.getTable(new PTableKey(connection.getTenantId(),
-                            SchemaUtil.getTableName(indexTable.getParentSchemaName().getString(),
-                                    indexTable.getParentTableName().getString())));
+                    IndexMaintainer maintainer;
+                    PTable dataTable;
+                    if (indexTable.getViewIndexId() != null
+                            && indexTable.getName().getString().contains(
+                                    QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
+                        // MetaDataClient modifies the index table name for view indexes if the
+                        // parent view of an index has a child view. We need to recreate a PTable
+                        // object with the correct table name to get the index maintainer
+                        int lastIndexOf = indexTable.getName().getString().lastIndexOf(
+                                QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
+                        String indexName = indexTable.getName().getString().substring(lastIndexOf + 1);
+                        PTable newIndexTable = PhoenixRuntime.getTable(connection, indexName);
+                        dataTable = PhoenixRuntime.getTable(connection, SchemaUtil.getTableName(
+                                newIndexTable.getParentSchemaName().getString(),
+                                indexTable.getParentTableName().getString()));
+                        maintainer = newIndexTable.getIndexMaintainer(dataTable,
+                                statement.getConnection());
+                    } else {
+                        dataTable = PhoenixRuntime.getTable(connection,
+                                SchemaUtil.getTableName(indexTable.getParentSchemaName().getString(),
+                                        indexTable.getParentTableName().getString()));
+                        maintainer = indexTable.getIndexMaintainer(dataTable, connection);
+                    }
                     Set<org.apache.hadoop.hbase.util.Pair<String, String>> indexedColumns =
-                            indexTable.getIndexMaintainer(dataTable, statement.getConnection()).getIndexedColumnInfo();
+                            maintainer.getIndexedColumnInfo();
                     for (org.apache.hadoop.hbase.util.Pair<String, String> pair : indexedColumns) {
                         // The first member of the pair is the column family. For the data table PK columns, the column
                         // family is set to null. The data PK columns should not be added to the set of data columns
@@ -580,6 +619,13 @@ public class QueryOptimizer {
                     }
                 }
 
+                // Partial secondary index is preferred
+                if (table1.getIndexWhere() != null && table2.getIndexWhere() == null) {
+                    return -1;
+                }
+                if (table1.getIndexWhere() == null && table2.getIndexWhere() != null) {
+                    return 1;
+                }
                 // Use the plan that has fewer "dataColumns" (columns that need to be merged in)
                 c = plan1.getContext().getDataColumns().size() - plan2.getContext().getDataColumns().size();
                 if (c != 0) return c;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
index 172b95ec13..de15ac88ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
@@ -38,10 +38,13 @@ public class CreateIndexStatement extends SingleTableStatement {
     private final IndexType indexType;
     private final boolean async;
     private final Map<String, UDFParseNode> udfParseNodes;
+    private final ParseNode where;
 
     public CreateIndexStatement(NamedNode indexTableName, NamedTableNode dataTable, 
-            IndexKeyConstraint indexKeyConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
-            ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, boolean async, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
+            IndexKeyConstraint indexKeyConstraint, List<ColumnName> includeColumns,
+            List<ParseNode> splits, ListMultimap<String, Pair<String, Object>> props,
+            boolean ifNotExists, IndexType indexType, boolean async, int bindCount,
+            Map<String, UDFParseNode> udfParseNodes, ParseNode where) {
         super(dataTable, bindCount);
         this.indexTableName =TableName.create(dataTable.getName().getSchemaName(),indexTableName.getName());
         this.indexKeyConstraint = indexKeyConstraint == null ? IndexKeyConstraint.EMPTY : indexKeyConstraint;
@@ -52,6 +55,7 @@ public class CreateIndexStatement extends SingleTableStatement {
         this.indexType = indexType;
         this.async = async;
         this.udfParseNodes = udfParseNodes;
+        this.where = where;
     }
 
     public CreateIndexStatement(CreateIndexStatement createStmt, ListMultimap<String, Pair<String, Object>> finalProps) {
@@ -65,6 +69,7 @@ public class CreateIndexStatement extends SingleTableStatement {
         this.indexType = createStmt.getIndexType();
         this.async = createStmt.isAsync();
         this.udfParseNodes = createStmt.getUdfParseNodes();
+        this.where = createStmt.where;
     }
 
     public IndexKeyConstraint getIndexConstraint() {
@@ -103,4 +108,7 @@ public class CreateIndexStatement extends SingleTableStatement {
     public Map<String, UDFParseNode> getUdfParseNodes() {
         return udfParseNodes;
     }
+    public ParseNode getWhere() {
+        return where;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 12f806020f..c8f392bdbc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -365,8 +365,13 @@ public class ParseNodeFactory {
         return new CreateSchemaStatement(schemaName, ifNotExists);
     }
 
-    public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType,boolean async, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
-        return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount, udfParseNodes);
+    public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable,
+            IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns,
+            List<ParseNode> splits, ListMultimap<String, Pair<String, Object>> props,
+            boolean ifNotExists, IndexType indexType, boolean async, int bindCount,
+            Map<String, UDFParseNode> udfParseNodes, ParseNode where) {
+        return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits,
+                props, ifNotExists, indexType, async, bindCount, udfParseNodes, where);
     }
 
     public CreateSequenceStatement createSequence(TableName tableName, ParseNode startsWith,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index ad4d2a48e2..d73c1123ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4149,19 +4149,22 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
         if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0) {
             metaConnection = addColumnsIfNotExists(metaConnection,
-                    PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -3,
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 4,
                     PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME
                             + " " + PVarchar.INSTANCE.getSqlTypeName());
 
             metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -2,
+                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 3,
                     PhoenixDatabaseMetaData.SCHEMA_VERSION + " " + PVarchar.INSTANCE.getSqlTypeName());
             metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -1,
+                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 2,
                 PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID + " " + PVarchar.INSTANCE.getSqlTypeName());
             metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0,
+                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 1,
                 PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME + " " + PVarchar.INSTANCE.getSqlTypeName());
+            metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                    MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0,
+                    PhoenixDatabaseMetaData.INDEX_WHERE + " " + PVarchar.INSTANCE.getSqlTypeName());
             UpgradeUtil.bootstrapLastDDLTimestampForIndexes(metaConnection);
         }
         return metaConnection;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 52326c189d..ac1ae6a255 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -81,6 +81,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_WHERE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
@@ -158,9 +159,9 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TASK_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_LAST_STATE_TS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_FUNCTION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_JOB_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_LAST_STATE_TS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_RETRY_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_START_TS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_STATUS;
@@ -342,6 +343,7 @@ public interface QueryConstants {
             SCHEMA_VERSION + " VARCHAR, \n" +
             EXTERNAL_SCHEMA_ID + " VARCHAR, \n" +
             STREAMING_TOPIC_NAME + " VARCHAR, \n" +
+            INDEX_WHERE + " VARCHAR, \n" +
             // Column metadata (will be null for table row)
             DATA_TYPE + " INTEGER," +
             COLUMN_SIZE + " INTEGER," +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 10ab0c7245..1a5d83e728 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -17,10 +17,14 @@
  */
 package org.apache.phoenix.schema;
 
+import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -196,12 +200,14 @@ public class DelegateTable implements PTable {
     }
 
     @Override
-    public boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) {
+    public boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection)
+            throws SQLException {
         return delegate.getIndexMaintainers(ptr, connection);
     }
 
     @Override
-    public IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection) {
+    public IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection)
+            throws SQLException {
         return delegate.getIndexMaintainer(dataTable, connection);
     }
 
@@ -402,6 +408,22 @@ public class DelegateTable implements PTable {
     @Override
     public String getStreamingTopicName() { return delegate.getStreamingTopicName(); }
 
+    @Override
+    public String getIndexWhere() {
+        return delegate.getIndexWhere();
+    }
+
+    @Override
+    public Expression getIndexWhereExpression(PhoenixConnection connection)
+            throws SQLException {
+        return delegate.getIndexWhereExpression(connection);
+    }
+
+    @Override
+    public Set<ColumnReference> getIndexWhereColumns(PhoenixConnection connection)
+            throws SQLException {
+        return delegate.getIndexWhereColumns(connection);
+    }
     @Override public Map<String, String> getPropertyValues() { return delegate.getPropertyValues(); }
 
     @Override public Map<String, String> getDefaultPropertyValues() { return delegate.getDefaultPropertyValues(); }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 4fbee3cc24..b4ce6c2351 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -19,17 +19,6 @@ package org.apache.phoenix.schema;
 
 import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE;
 import static org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
-import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
-import static org.apache.phoenix.query.QueryServices.INDEX_CREATE_DEFAULT_STATE;
-import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSet;
-import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSetWithExpectedSize;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB;
-import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.INDEX_NAME;
-import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.REBUILD_ALL;
-import static org.apache.phoenix.exception.SQLExceptionCode.INSUFFICIENT_MULTI_TENANT_COLUMNS;
-import static org.apache.phoenix.exception.SQLExceptionCode.PARENT_TABLE_NOT_FOUND;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
@@ -44,7 +33,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_ENCODED_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
@@ -61,10 +49,10 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_WHERE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
@@ -89,10 +77,12 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_VERSION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYNC_INDEX_CREATED_DATE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
@@ -107,6 +97,15 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
+import static org.apache.phoenix.query.QueryServices.INDEX_CREATE_DEFAULT_STATE;
+import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSet;
+import static org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSetWithExpectedSize;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB;
+import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.INDEX_NAME;
+import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.REBUILD_ALL;
+import static org.apache.phoenix.exception.SQLExceptionCode.INSUFFICIENT_MULTI_TENANT_COLUMNS;
+import static org.apache.phoenix.exception.SQLExceptionCode.PARENT_TABLE_NOT_FOUND;
 import static org.apache.phoenix.monitoring.MetricType.NUM_METADATA_LOOKUP_FAILURES;
 import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
 import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
@@ -121,7 +120,6 @@ import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_P
 import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
 import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static org.apache.phoenix.schema.PTable.ViewType.MAPPED;
-import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.schema.PTableType.TABLE;
 import static org.apache.phoenix.schema.PTableType.VIEW;
@@ -290,7 +288,6 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
 import org.apache.phoenix.thirdparty.com.google.common.primitives.Ints;
 
-
 public class MetaDataClient {
     private static final Logger LOGGER = LoggerFactory.getLogger(MetaDataClient.class);
 
@@ -347,9 +344,10 @@ public class MetaDataClient {
                     CHANGE_DETECTION_ENABLED + "," +
                     PHYSICAL_TABLE_NAME + "," +
                     SCHEMA_VERSION + "," +
-                    STREAMING_TOPIC_NAME +
+                    STREAMING_TOPIC_NAME + "," +
+                    INDEX_WHERE +
                     ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " +
-                "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+                "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
     private static final String CREATE_SCHEMA = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE
             + "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)";
@@ -1665,8 +1663,13 @@ public class MetaDataClient {
             PrimaryKeyConstraint pk = FACTORY.primaryKey(null, allPkColumns);
 
             tableProps.put(MetaDataUtil.DATA_TABLE_NAME_PROP_NAME, dataTable.getPhysicalName().getString());
-            CreateTableStatement tableStatement = FACTORY.createTable(indexTableName, statement.getProps(), columnDefs, pk, statement.getSplitNodes(), PTableType.INDEX, statement.ifNotExists(), null, null, statement.getBindCount(), null);
-            table = createTableInternal(tableStatement, splits, dataTable, null, null, getViewIndexDataType() ,null, null, allocateIndexId, statement.getIndexType(), asyncCreatedDate, tableProps, commonFamilyProps);
+            CreateTableStatement tableStatement = FACTORY.createTable(indexTableName,
+                    statement.getProps(), columnDefs, pk, statement.getSplitNodes(),
+                    PTableType.INDEX, statement.ifNotExists(), null,
+                    statement.getWhere(), statement.getBindCount(), null);
+            table = createTableInternal(tableStatement, splits, dataTable, null, null,
+                    getViewIndexDataType() ,null, null, allocateIndexId,
+                    statement.getIndexType(), asyncCreatedDate, tableProps, commonFamilyProps);
         }
         finally {
             deleteMutexCells(physicalSchemaName, physicalTableName, acquiredColumnMutexSet);
@@ -2818,6 +2821,8 @@ public class MetaDataClient {
                         .setColumns(columns.values())
                         .setPhoenixTTL(PHOENIX_TTL_NOT_DEFINED)
                         .setPhoenixTTLHighWaterMark(MIN_PHOENIX_TTL_HWM)
+                        .setIndexWhere(statement.getWhereClause() == null ? null
+                                : statement.getWhereClause().toString())
                         .build();
                 connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             }
@@ -3071,7 +3076,11 @@ public class MetaDataClient {
             } else {
                 tableUpsert.setString(35, streamingTopicName);
             }
-
+            if (tableType == INDEX && statement.getWhereClause() != null) {
+                tableUpsert.setString(36, statement.getWhereClause().toString());
+            } else {
+                tableUpsert.setNull(36, Types.VARCHAR);
+            }
             tableUpsert.execute();
 
             if (asyncCreatedDate != null) {
@@ -3219,6 +3228,8 @@ public class MetaDataClient {
                         .setExternalSchemaId(result.getTable() != null ?
                         result.getTable().getExternalSchemaId() : null)
                         .setStreamingTopicName(streamingTopicName)
+                        .setIndexWhere(statement.getWhereClause() == null ? null
+                                : statement.getWhereClause().toString())
                         .build();
                 result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
                 addTableToCache(result);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 09bc2023a2..6fd89ad4bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -22,17 +22,21 @@ import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL
 import static org.apache.phoenix.util.EncodedColumnsUtil.isReservedColumnQualifier;
 
 import java.io.DataOutputStream;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -867,8 +871,10 @@ public interface PTable extends PMetaDataEntity {
 
     boolean isImmutableRows();
 
-    boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection);
-    IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection);
+    boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection)
+            throws SQLException;
+    IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection)
+            throws SQLException;
     TransformMaintainer getTransformMaintainer(PTable oldTable, PhoenixConnection connection);
     PName getDefaultFamilyName();
 
@@ -969,6 +975,28 @@ public interface PTable extends PMetaDataEntity {
      */
     String getStreamingTopicName();
 
+    /**
+     *
+     * @return the optional where clause in string used for partial indexes
+     */
+    String getIndexWhere();
+
+
+    /**
+     *
+     * @param connection PhoenixConnection
+     * @return the optional where clause in DNF expression used for partial indexes
+     * @throws SQLException
+     */
+    Expression getIndexWhereExpression(PhoenixConnection connection) throws SQLException;
+
+    /**
+     *
+     * @param connection
+     * @return the set of column references for the columns included in the index where clause
+     * @throws SQLException
+     */
+    Set<ColumnReference> getIndexWhereColumns(PhoenixConnection connection) throws SQLException;
     /**
      * Class to help track encoded column qualifier counters per column family.
      */
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index ba888d7358..8b30976cf0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.schema;
 
+import static org.apache.phoenix.compile.WhereCompiler.transformDNF;
 import static org.apache.phoenix.coprocessor.ScanRegionObserver.DYNAMIC_COLUMN_METADATA_STORED_FOR_MUTATION;
 import static org.apache.phoenix.hbase.index.util.KeyValueBuilder.addQuietly;
 import static org.apache.phoenix.hbase.index.util.KeyValueBuilder.deleteQuietly;
@@ -64,10 +65,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import javax.annotation.Nonnull;
 
-import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
@@ -81,6 +82,8 @@ import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ExpressionCompiler;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.generated.DynamicColumnMetaDataProtos;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
@@ -88,11 +91,13 @@ import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.SingleCellConstructorExpression;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.protobuf.ProtobufUtil;
@@ -105,15 +110,7 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PVarchar;
-import org.apache.phoenix.transaction.TransactionFactory;
-import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.SizedUtil;
-import org.apache.phoenix.util.TrustedByteArrayOutputStream;
-
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.base.Objects;
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
@@ -124,6 +121,15 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSortedMa
 import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
 /**
  *
@@ -211,6 +217,9 @@ public class PTableImpl implements PTable {
     private String schemaVersion;
     private String externalSchemaId;
     private String streamingTopicName;
+    private String indexWhere;
+    private Expression indexWhereExpression;
+    private Set<ColumnReference> indexWhereColumns;
 
     public static class Builder {
         private PTableKey key;
@@ -276,6 +285,7 @@ public class PTableImpl implements PTable {
         private String schemaVersion;
         private String externalSchemaId;
         private String streamingTopicName;
+        private String indexWhere;
 
         // Used to denote which properties a view has explicitly modified
         private BitSet viewModifiedPropSet = new BitSet(3);
@@ -696,6 +706,13 @@ public class PTableImpl implements PTable {
             return this;
          }
 
+        public Builder setIndexWhere(String indexWhere) {
+            if (indexWhere != null) {
+                this.indexWhere = indexWhere;
+            }
+            return this;
+        }
+
         /**
          * Populate derivable attributes of the PTable
          * @return PTableImpl.Builder object
@@ -986,6 +1003,7 @@ public class PTableImpl implements PTable {
         this.schemaVersion = builder.schemaVersion;
         this.externalSchemaId = builder.externalSchemaId;
         this.streamingTopicName = builder.streamingTopicName;
+        this.indexWhere = builder.indexWhere;
     }
 
     // When cloning table, ignore the salt column as it will be added back in the constructor
@@ -1065,7 +1083,8 @@ public class PTableImpl implements PTable {
                 .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
                 .setSchemaVersion(table.getSchemaVersion())
                 .setExternalSchemaId(table.getExternalSchemaId())
-                .setStreamingTopicName(table.getStreamingTopicName());
+                .setStreamingTopicName(table.getStreamingTopicName())
+                .setIndexWhere(table.getIndexWhere());
     }
 
     @Override
@@ -1731,7 +1750,8 @@ public class PTableImpl implements PTable {
     }
 
     @Override
-    public synchronized IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection connection) {
+    public synchronized IndexMaintainer getIndexMaintainer(PTable dataTable,
+            PhoenixConnection connection) throws SQLException {
         if (indexMaintainer == null) {
             indexMaintainer = IndexMaintainer.create(dataTable, this, connection);
         }
@@ -1739,7 +1759,8 @@ public class PTableImpl implements PTable {
     }
 
     @Override
-    public synchronized boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection connection) {
+    public synchronized boolean getIndexMaintainers(ImmutableBytesWritable ptr,
+            PhoenixConnection connection) throws SQLException {
         if (indexMaintainersPtr == null || indexMaintainersPtr.getLength()==0) {
             indexMaintainersPtr = new ImmutableBytesWritable();
             if (indexes.isEmpty() && transformingNewTable == null) {
@@ -2004,6 +2025,11 @@ public class PTableImpl implements PTable {
             streamingTopicName =
                 (String) PVarchar.INSTANCE.toObject(table.getStreamingTopicName().toByteArray());
         }
+        String indexWhere = null;
+        if (table.hasIndexWhere()) {
+            indexWhere =
+                    (String) PVarchar.INSTANCE.toObject(table.getIndexWhere().toByteArray());
+        }
         try {
             return new PTableImpl.Builder()
                     .setType(tableType)
@@ -2061,6 +2087,7 @@ public class PTableImpl implements PTable {
                     .setSchemaVersion(schemaVersion)
                     .setExternalSchemaId(externalSchemaId)
                     .setStreamingTopicName(streamingTopicName)
+                    .setIndexWhere(indexWhere)
                     .build();
         } catch (SQLException e) {
             throw new RuntimeException(e); // Impossible
@@ -2200,6 +2227,10 @@ public class PTableImpl implements PTable {
         if (table.getStreamingTopicName() != null) {
             builder.setStreamingTopicName(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getStreamingTopicName())));
         }
+        if (table.getIndexWhere() != null) {
+            builder.setIndexWhere(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(
+                    table.getIndexWhere())));
+        }
         return builder.build();
     }
 
@@ -2342,6 +2373,42 @@ public class PTableImpl implements PTable {
         return streamingTopicName;
     }
 
+    @Override
+    public String getIndexWhere() {
+        return indexWhere;
+    }
+
+    private void buildIndexWhereExpression(PhoenixConnection connection) throws SQLException {
+        PhoenixPreparedStatement
+                pstmt =
+                new PhoenixPreparedStatement(connection,
+                        "select * from " + SchemaUtil.getTableName(parentSchemaName, parentTableName).getString() + " where " + indexWhere);
+        QueryPlan plan = pstmt.compileQuery();
+        ParseNode where = plan.getStatement().getWhere();
+        plan.getContext().setResolver(FromCompiler.getResolver(plan.getTableRef()));
+        indexWhereExpression = transformDNF(where, plan.getContext());
+        indexWhereColumns =
+                Sets.newHashSetWithExpectedSize(plan.getContext().getWhereConditionColumns().size());
+        for (Pair<byte[], byte[]> column : plan.getContext().getWhereConditionColumns()) {
+            indexWhereColumns.add(new ColumnReference(column.getFirst(), column.getSecond()));
+        }
+    }
+    @Override
+    public Expression getIndexWhereExpression(PhoenixConnection connection) throws SQLException {
+        if (indexWhereExpression == null && indexWhere != null) {
+            buildIndexWhereExpression(connection);
+        }
+        return indexWhereExpression;
+    }
+
+    @Override
+    public Set<ColumnReference> getIndexWhereColumns(PhoenixConnection connection)
+            throws SQLException {
+        if (indexWhereColumns == null && indexWhere != null) {
+            buildIndexWhereExpression(connection);
+        }
+        return indexWhereColumns;
+    }
     private static final class KVColumnFamilyQualifier {
         @Nonnull
         private final String colFamilyName;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 0b25f97dba..5a06b6287d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -40,6 +40,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.ArrayBackedTag;
@@ -893,6 +894,36 @@ public class IndexUtil {
         }
         return ts;
     }
+
+    public static List<Cell> readColumnsFromRow(Put row, Set<ColumnReference> cols) {
+        if (row == null) {
+            return Collections.EMPTY_LIST;
+        }
+
+        List<Cell> columns = Lists.newArrayList();
+
+        if (cols.isEmpty()) {
+            // just return any cell FirstKeyOnlyFilter
+            for (List<Cell> cells : row.getFamilyCellMap().values()) {
+                if (cells == null || cells.isEmpty()) {
+                    continue;
+                }
+                columns.add(cells.get(0));
+                break;
+            }
+            return columns;
+        }
+
+        IndexUtil.SimpleValueGetter valueGetter = new IndexUtil.SimpleValueGetter(row);
+        for (ColumnReference colRef : cols) {
+            Cell cell = valueGetter.getLatestCell(colRef, HConstants.LATEST_TIMESTAMP);
+            if (cell != null) {
+                columns.add(cell);
+            }
+        }
+        return columns;
+    }
+
     public static class SimpleValueGetter implements ValueGetter {
         final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
         final Put put;
diff --git a/phoenix-core/src/main/protobuf/PTable.proto b/phoenix-core/src/main/protobuf/PTable.proto
index 6f6a663a99..88617e3636 100644
--- a/phoenix-core/src/main/protobuf/PTable.proto
+++ b/phoenix-core/src/main/protobuf/PTable.proto
@@ -117,6 +117,7 @@ message PTable {
   optional bytes externalSchemaId=50;
   optional PTable transformingNewTable=51;
   optional bytes streamingTopicName=52;
+  optional bytes indexWhere=53;
 }
 
 message EncodedCQCounter {
diff --git a/phoenix-core/src/main/protobuf/ServerCachingService.proto b/phoenix-core/src/main/protobuf/ServerCachingService.proto
index 28108768e6..24717fdb14 100644
--- a/phoenix-core/src/main/protobuf/ServerCachingService.proto
+++ b/phoenix-core/src/main/protobuf/ServerCachingService.proto
@@ -69,6 +69,8 @@ message IndexMaintainer {
   optional int32 dataEncodingScheme = 26;
   optional int32 dataImmutableStorageScheme = 27;
   optional bool isUncovered = 28;
+  optional bytes indexWhere = 29;
+  repeated ColumnReference indexWhereColumns = 30;
 }
 
 message TransformMaintainer {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
index 76cc9b0545..6e4d17121c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.compile;
 
+import static org.apache.phoenix.compile.WhereCompiler.transformDNF;
 import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS;
 import static org.apache.phoenix.util.TestUtil.ATABLE_NAME;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -63,6 +64,7 @@ import org.apache.phoenix.filter.RowKeyComparisonFilter;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
@@ -83,6 +85,7 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -1026,4 +1029,89 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest {
         assertEquals(FETCH_SIZE, pstmt.getFetchSize());
         assertEquals(FETCH_SIZE, scan.getCaching());
     }
+    private Expression getDNF(PhoenixConnection pconn, String query) throws SQLException {
+        //SQLParser parser = new SQLParser("where ID = 'i1' or (ID = 'i2' and A > 1)");
+        //        ParseNode where = parser.parseWhereClause()
+        PhoenixPreparedStatement pstmt = newPreparedStatement(pconn, query);
+        QueryPlan plan = pstmt.compileQuery();
+        ParseNode where = plan.getStatement().getWhere();
+
+        return transformDNF(where, plan.getContext());
+    }
+    @Test
+    public void testWhereInclusion() throws SQLException {
+        PhoenixConnection pconn = DriverManager.getConnection(getUrl(),
+                PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
+        String ddl = "create table myTable(ID varchar primary key, A integer, B varchar, " +
+                "C date, D double, E integer)";
+        pconn.createStatement().execute(ddl);
+        ddl = "create table myTableDesc(ID varchar primary key DESC, A integer, B varchar, " +
+                "C date, D double, E integer)";
+        pconn.createStatement().execute(ddl);
+
+        final int NUM = 15;
+        String[] containingQueries = new String[NUM];
+        String[] containedQueries = new String[NUM];
+
+        containingQueries[0] = "select * from myTable where ID = 'i1' or (ID = 'i2' and A > 1)";
+        containedQueries[0] = "select * from myTableDesc where ID = 'i1' or (ID = 'i2' and " +
+                "A > 2 + 2)";
+
+        containingQueries[1] = "select * from myTable where ID > 'i3' and A > 1";
+        containedQueries[1] = "select * from myTableDesc where (ID > 'i7' or ID = 'i4') and " +
+                "A > 2 * 10";
+
+        containingQueries[2] = "select * from myTable where ID IN ('i3', 'i7', 'i1') and A < 10";
+        containedQueries[2] = "select * from myTableDesc where ID IN ('i1', 'i7') and A < 10 / 2";
+
+        containingQueries[3] = "select * from myTableDesc where (ID, B) > ('i3', 'a') and A >= 10";
+        containedQueries[3] = "select * from myTable where ID = 'i3' and B = 'c' and A = 10";
+
+        containingQueries[4] = "select * from myTable where ID >= 'i3' and A between 5 and 15";
+        containedQueries[4] = "select * from myTableDesc where ID = 'i3' and A between 5 and 10";
+
+        containingQueries[5] = "select * from myTable where (A between 5 and 15) and " +
+                "(D < 10.67 or C <= CURRENT_DATE())";
+        containedQueries[5] = "select * from myTable where (A = 5 and D between 1.5 and 9.99) or " +
+                "(A = 6 and C <= CURRENT_DATE() - 1000)";
+
+        containingQueries[6] = "select * from myTable where A is not null";
+        containedQueries[6] = "select * from myTable where A > 0";
+
+        containingQueries[7] = "select * from myTable where NOT (B is null)";
+        containedQueries[7] = "select * from myTable where (B > 'abc')";
+
+        containingQueries[8] = "select * from myTable where A >= E and D <= A";
+        containedQueries[8] = "select * from myTable where (A > E and D = A)";
+
+        containingQueries[9] = "select * from myTable where A > E";
+        containedQueries[9] = "select * from myTable where (A > E  and B is not null)";
+
+        containingQueries[10] = "select * from myTable where B like '%abc'";
+        containedQueries[10] = "select * from myTable where (B like '%abc' and ID > 'i1')";
+
+        containingQueries[11] = "select * from myTable where " +
+                "PHOENIX_ROW_TIMESTAMP() < CURRENT_TIME()";
+        containedQueries[11] = "select * from myTable where " +
+                "(PHOENIX_ROW_TIMESTAMP() < CURRENT_TIME() - 1)";
+
+        containingQueries[12] = "select * from myTable where (A, E) IN ((2,3), (7,8), (10,11))";
+        containedQueries[12] = "select * from myTable where (A, E) IN ((2,3), (7,8))";
+
+        containingQueries[13] = "select * from myTable where ID > 'i3' and ID < 'i5'";
+        containedQueries[13] = "select * from myTable where (ID = 'i4') ";
+
+        containingQueries[14] = "select * from myTable where " +
+                "CURRENT_DATE() - PHOENIX_ROW_TIMESTAMP() < 10";
+        containedQueries[14] = "select * from myTable where " +
+                " CURRENT_DATE() - PHOENIX_ROW_TIMESTAMP() < 5 ";
+
+        for (int i = 0; i < NUM; i++) {
+            Assert.assertTrue(WhereCompiler.contains(getDNF(pconn, containingQueries[i]),
+                    getDNF(pconn, containedQueries[i])));
+            Assert.assertFalse(WhereCompiler.contains(getDNF(pconn, containedQueries[i]),
+                    getDNF(pconn, containingQueries[i])));
+        }
+    }
+
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
index 9ee3d18c07..e42055ae93 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -271,7 +271,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
         }
     }
 
-    private void initializeRebuildScannerAttributes() {
+    private void initializeRebuildScannerAttributes() throws SQLException {
         when(rebuildScanner.setIndexTableTTL(Matchers.anyInt())).thenCallRealMethod();
         when(rebuildScanner.setIndexMaintainer(Matchers.<IndexMaintainer>any())).thenCallRealMethod();
         when(rebuildScanner.setMaxLookBackInMills(Matchers.anyLong())).thenCallRealMethod();