You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/12/16 18:49:13 UTC

[6/6] phoenix git commit: Sync 4.x-HBase-1.2 to master (Pedro Boado)

Sync 4.x-HBase-1.2 to master (Pedro Boado)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2d70f55a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2d70f55a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2d70f55a

Branch: refs/heads/4.x-HBase-1.2
Commit: 2d70f55ae6b1a3850e2d2a5a01eb40d5928c65de
Parents: 6d19972
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Dec 16 10:48:52 2017 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Sat Dec 16 10:48:52 2017 -0800

----------------------------------------------------------------------
 .../apache/phoenix/end2end/AlterTableIT.java    |   2 +-
 .../phoenix/end2end/BasePermissionsIT.java      | 754 +++++++++++++++++++
 .../phoenix/end2end/ChangePermissionsIT.java    | 270 +++++++
 .../phoenix/end2end/CostBasedDecisionIT.java    | 466 ++++++++++++
 .../apache/phoenix/end2end/CreateSchemaIT.java  |  64 +-
 .../apache/phoenix/end2end/MutationStateIT.java | 161 ++++
 .../org/apache/phoenix/end2end/QueryMoreIT.java |  42 --
 .../org/apache/phoenix/end2end/SortOrderIT.java |  11 +-
 .../apache/phoenix/end2end/SystemCatalogIT.java |   1 -
 .../end2end/SystemTablePermissionsIT.java       | 226 +-----
 .../phoenix/end2end/TableDDLPermissionsIT.java  | 233 ++++++
 .../phoenix/end2end/index/IndexMetadataIT.java  |  55 ++
 .../phoenix/end2end/join/HashJoinMoreIT.java    |   5 +
 .../apache/phoenix/execute/PartialCommitIT.java |   5 +-
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |  37 +-
 .../org/apache/hadoop/hbase/ipc/RpcUtil.java    |  32 +
 .../phoenix/compile/BaseMutationPlan.java       |   5 +
 .../phoenix/compile/DelegateMutationPlan.java   |   5 +
 .../apache/phoenix/compile/DeleteCompiler.java  | 558 ++++++++------
 .../apache/phoenix/compile/JoinCompiler.java    |  19 +-
 .../phoenix/compile/ListJarsQueryPlan.java      |   6 +
 .../apache/phoenix/compile/MutationPlan.java    |   5 +-
 .../apache/phoenix/compile/QueryCompiler.java   |   6 +-
 .../org/apache/phoenix/compile/QueryPlan.java   |   5 +-
 .../apache/phoenix/compile/TraceQueryPlan.java  |   6 +
 .../apache/phoenix/compile/UpsertCompiler.java  | 684 ++++++++++-------
 .../apache/phoenix/compile/WhereOptimizer.java  |   5 -
 .../BaseMetaDataEndpointObserver.java           | 111 +++
 .../coprocessor/MetaDataEndpointImpl.java       | 339 +++++++--
 .../coprocessor/MetaDataEndpointObserver.java   |  68 ++
 .../coprocessor/MetaDataRegionObserver.java     |  17 +-
 .../coprocessor/PhoenixAccessController.java    | 611 +++++++++++++++
 .../PhoenixMetaDataCoprocessorHost.java         | 236 ++++++
 .../phoenix/exception/SQLExceptionCode.java     |   1 +
 .../apache/phoenix/execute/AggregatePlan.java   |  30 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |  21 +-
 .../phoenix/execute/ClientAggregatePlan.java    |  28 +
 .../apache/phoenix/execute/ClientScanPlan.java  |  25 +
 .../apache/phoenix/execute/CorrelatePlan.java   |  25 +
 .../phoenix/execute/DelegateQueryPlan.java      |   6 +
 .../apache/phoenix/execute/HashJoinPlan.java    |  29 +
 .../execute/LiteralResultIterationPlan.java     |   6 +
 .../apache/phoenix/execute/MutationState.java   | 159 ++--
 .../org/apache/phoenix/execute/ScanPlan.java    |  25 +
 .../phoenix/execute/SortMergeJoinPlan.java      |  18 +
 .../org/apache/phoenix/execute/UnionPlan.java   |  10 +
 .../RowValueConstructorExpression.java          |   4 +-
 .../index/PhoenixIndexFailurePolicy.java        | 109 +--
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  65 +-
 .../phoenix/mapreduce/PhoenixOutputFormat.java  |  13 +-
 .../phoenix/mapreduce/PhoenixRecordWriter.java  |   8 +-
 .../phoenix/mapreduce/util/ConnectionUtil.java  |  23 +-
 .../java/org/apache/phoenix/optimize/Cost.java  | 123 +++
 .../apache/phoenix/optimize/QueryOptimizer.java |  30 +-
 .../phoenix/parse/AddColumnStatement.java       |   2 +-
 .../phoenix/parse/AlterIndexStatement.java      |  14 +
 .../phoenix/parse/ChangePermsStatement.java     | 102 +++
 .../phoenix/parse/CreateSchemaStatement.java    |   2 +-
 .../apache/phoenix/parse/ParseNodeFactory.java  |  13 +-
 .../phoenix/query/ConnectionQueryServices.java  |   2 +
 .../query/ConnectionQueryServicesImpl.java      |  64 +-
 .../query/ConnectionlessQueryServicesImpl.java  |   7 +
 .../query/DelegateConnectionQueryServices.java  |   8 +-
 .../org/apache/phoenix/query/QueryServices.java |   5 +
 .../phoenix/query/QueryServicesOptions.java     |  12 +-
 .../apache/phoenix/schema/MetaDataClient.java   | 710 +++++++++++++----
 .../schema/TablesNotInSyncException.java        |  22 +
 .../phoenix/schema/stats/StatisticsWriter.java  |  42 +-
 .../java/org/apache/phoenix/util/CostUtil.java  |  90 +++
 .../java/org/apache/phoenix/util/IndexUtil.java |   4 +-
 .../org/apache/phoenix/util/KeyValueUtil.java   |  52 +-
 .../org/apache/phoenix/util/MetaDataUtil.java   |  18 +
 .../org/apache/phoenix/util/PropertiesUtil.java |   9 +-
 .../org/apache/phoenix/util/SchemaUtil.java     |  42 +-
 .../apache/phoenix/parse/QueryParserTest.java   |  59 +-
 .../query/ParallelIteratorsSplitTest.java       |   6 +
 .../java/org/apache/phoenix/pig/BasePigIT.java  |   4 +
 .../apache/phoenix/pig/PhoenixHBaseStorage.java |  12 +-
 78 files changed, 5859 insertions(+), 1250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 5265b09..17f08c4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -1080,7 +1080,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
 			} catch (SQLException e) {
 				assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_IF_TXNS_DISABLED.getErrorCode(), e.getErrorCode());
 			}
-			// altering a table to be transactional  should fail if transactions are disabled
+			// altering a table to be transactional should fail if transactions are disabled
 			conn.createStatement().execute("CREATE TABLE " + dataTableFullName + "(k INTEGER PRIMARY KEY, v VARCHAR)");
 			try {
 				conn.createStatement().execute("ALTER TABLE " + dataTableFullName + " SET TRANSACTIONAL=true");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
new file mode 100644
index 0000000..9d7ef1b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
@@ -0,0 +1,754 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+public class BasePermissionsIT extends BaseTest {
+
+    private static final Log LOG = LogFactory.getLog(BasePermissionsIT.class);
+
+    static String SUPERUSER;
+
+    static HBaseTestingUtility testUtil;
+    static final Set<String> PHOENIX_SYSTEM_TABLES = new HashSet<>(Arrays.asList(
+            "SYSTEM.CATALOG", "SYSTEM.SEQUENCE", "SYSTEM.STATS", "SYSTEM.FUNCTION"));
+
+    static final Set<String> PHOENIX_SYSTEM_TABLES_IDENTIFIERS = new HashSet<>(Arrays.asList(
+            "SYSTEM.\"CATALOG\"", "SYSTEM.\"SEQUENCE\"", "SYSTEM.\"STATS\"", "SYSTEM.\"FUNCTION\""));
+
+    static final String SYSTEM_SEQUENCE_IDENTIFIER =
+            QueryConstants.SYSTEM_SCHEMA_NAME + "." + "\"" + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE+ "\"";
+
+    static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = new HashSet<>(Arrays.asList(
+            "SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", "SYSTEM:FUNCTION"));
+
+    // Create Multiple users so that we can use Hadoop UGI to run tasks as various users
+    // Permissions can be granted or revoke by superusers and admins only
+    // DON'T USE HADOOP UserGroupInformation class to create testing users since HBase misses some of its functionality
+    // Instead use org.apache.hadoop.hbase.security.User class for testing purposes.
+
+    // Super User has all the access
+    User superUser1 = null;
+    User superUser2 = null;
+
+    // Regular users are granted and revoked permissions as needed
+    User regularUser1 = null;
+    User regularUser2 = null;
+    User regularUser3 = null;
+    User regularUser4 = null;
+
+    // Group User is equivalent of regular user but inside a group
+    // Permissions can be granted to group should affect this user
+    static final String GROUP_SYSTEM_ACCESS = "group_system_access";
+    User groupUser = null;
+
+    // Unpriviledged User doesn't have any access and is denied for every action
+    User unprivilegedUser = null;
+
+    static final int NUM_RECORDS = 5;
+
+    boolean isNamespaceMapped;
+
+    public BasePermissionsIT(final boolean isNamespaceMapped) throws Exception {
+        this.isNamespaceMapped = isNamespaceMapped;
+    }
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        SUPERUSER = System.getProperty("user.name");
+    }
+
+    void startNewMiniCluster() throws Exception {
+        startNewMiniCluster(new Configuration());
+    }
+    
+    void startNewMiniCluster(Configuration overrideConf) throws Exception{
+        if (null != testUtil) {
+            testUtil.shutdownMiniCluster();
+            testUtil = null;
+        }
+
+        testUtil = new HBaseTestingUtility();
+
+        Configuration config = testUtil.getConfiguration();
+        enablePhoenixHBaseAuthorization(config);
+        configureNamespacesOnServer(config);
+        configureRandomHMasterPort(config);
+        if (overrideConf != null) {
+            config.addResource(overrideConf);
+        }
+
+        testUtil.startMiniCluster(1);
+        initializeUsers(testUtil.getConfiguration());
+    }
+
+    private void initializeUsers(Configuration configuration) {
+
+        superUser1 = User.createUserForTesting(configuration, SUPERUSER, new String[0]);
+        superUser2 = User.createUserForTesting(configuration, "superUser2", new String[0]);
+
+        regularUser1 = User.createUserForTesting(configuration, "regularUser1", new String[0]);
+        regularUser2 = User.createUserForTesting(configuration, "regularUser2", new String[0]);
+        regularUser3 = User.createUserForTesting(configuration, "regularUser3", new String[0]);
+        regularUser4 = User.createUserForTesting(configuration, "regularUser4", new String[0]);
+
+        groupUser = User.createUserForTesting(testUtil.getConfiguration(), "groupUser", new String[] {GROUP_SYSTEM_ACCESS});
+
+        unprivilegedUser = User.createUserForTesting(configuration, "unprivilegedUser", new String[0]);
+    }
+
+    private void configureRandomHMasterPort(Configuration config) {
+        // Avoid multiple clusters trying to bind the master's info port (16010)
+        config.setInt(HConstants.MASTER_INFO_PORT, -1);
+    }
+
+    void enablePhoenixHBaseAuthorization(Configuration config) {
+        config.set("hbase.superuser", SUPERUSER + "," + "superUser2");
+        config.set("hbase.security.authorization", Boolean.TRUE.toString());
+        config.set("hbase.security.exec.permission.checks", Boolean.TRUE.toString());
+        config.set("hbase.coprocessor.master.classes",
+                "org.apache.hadoop.hbase.security.access.AccessController");
+        config.set("hbase.coprocessor.region.classes",
+                "org.apache.hadoop.hbase.security.access.AccessController");
+        config.set("hbase.coprocessor.regionserver.classes",
+                "org.apache.hadoop.hbase.security.access.AccessController");
+
+        config.set(QueryServices.PHOENIX_ACLS_ENABLED,"true");
+
+        config.set("hbase.regionserver.wal.codec", "org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec");
+    }
+
+    void configureNamespacesOnServer(Configuration conf) {
+        conf.set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
+    }
+
+    @Parameterized.Parameters(name = "isNamespaceMapped={0}") // name is used by failsafe as file name in reports
+    public static Collection<Boolean> data() {
+        return Arrays.asList(false, true);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        if (testUtil != null) {
+            testUtil.shutdownMiniCluster();
+            testUtil = null;
+        }
+    }
+
+    public static HBaseTestingUtility getUtility(){
+        return testUtil;
+    }
+
+    // Utility functions to grant permissions with HBase API
+    void grantPermissions(String toUser, Set<String> tablesToGrant, Permission.Action... actions) throws Throwable {
+        for (String table : tablesToGrant) {
+            AccessControlClient.grant(getUtility().getConnection(), TableName.valueOf(table), toUser, null, null,
+                    actions);
+        }
+    }
+
+    void grantPermissions(String toUser, String namespace, Permission.Action... actions) throws Throwable {
+        AccessControlClient.grant(getUtility().getConnection(), namespace, toUser, actions);
+    }
+
+    void grantPermissions(String groupEntry, Permission.Action... actions) throws IOException, Throwable {
+        AccessControlClient.grant(getUtility().getConnection(), groupEntry, actions);
+    }
+
+    // Utility functions to revoke permissions with HBase API
+    void revokeAll() throws Throwable {
+        AccessControlClient.revoke(getUtility().getConnection(), AuthUtil.toGroupEntry(GROUP_SYSTEM_ACCESS), Permission.Action.values() );
+        AccessControlClient.revoke(getUtility().getConnection(), regularUser1.getShortName(), Permission.Action.values() );
+        AccessControlClient.revoke(getUtility().getConnection(), unprivilegedUser.getShortName(), Permission.Action.values() );
+    }
+
+    Properties getClientProperties(String tenantId) {
+        Properties props = new Properties();
+        if(tenantId != null) {
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
+        return props;
+    }
+
+    public Connection getConnection() throws SQLException {
+        return getConnection(null);
+    }
+
+    public Connection getConnection(String tenantId) throws SQLException {
+        return DriverManager.getConnection(getUrl(), getClientProperties(tenantId));
+    }
+
+    protected static String getUrl() {
+        return "jdbc:phoenix:localhost:" + testUtil.getZkCluster().getClientPort() + ":/hbase";
+    }
+
+    static Set<String> getHBaseTables() throws IOException {
+        Set<String> tables = new HashSet<>();
+        for (TableName tn : testUtil.getHBaseAdmin().listTableNames()) {
+            tables.add(tn.getNameAsString());
+        }
+        return tables;
+    }
+
+    // UG Object
+    // 1. Instance of String --> represents GROUP name
+    // 2. Instance of User --> represents HBase user
+    AccessTestAction grantPermissions(final String actions, final Object ug,
+                                      final String tableOrSchemaList, final boolean isSchema) throws SQLException {
+        return grantPermissions(actions, ug, Collections.singleton(tableOrSchemaList), isSchema);
+    }
+
+    AccessTestAction grantPermissions(final String actions, final Object ug,
+                                      final Set<String> tableOrSchemaList, final boolean isSchema) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    for(String tableOrSchema : tableOrSchemaList) {
+                        String grantStmtSQL = "GRANT '" + actions + "' ON " + (isSchema ? " SCHEMA " : " TABLE ") + tableOrSchema + " TO "
+                                + ((ug instanceof String) ? (" GROUP " + "'" + ug + "'") : ("'" + ((User)ug).getShortName() + "'"));
+                        LOG.info("Grant Permissions SQL: " + grantStmtSQL);
+                        assertFalse(stmt.execute(grantStmtSQL));
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction grantPermissions(final String actions, final User user) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    String grantStmtSQL = "GRANT '" + actions + "' TO " + " '" + user.getShortName() + "'";
+                    LOG.info("Grant Permissions SQL: " + grantStmtSQL);
+                    assertFalse(stmt.execute(grantStmtSQL));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction revokePermissions(final Object ug,
+                                       final String tableOrSchemaList, final boolean isSchema) throws SQLException {
+        return revokePermissions(ug, Collections.singleton(tableOrSchemaList), isSchema);
+    }
+
+    AccessTestAction revokePermissions(final Object ug,
+                                       final Set<String> tableOrSchemaList, final boolean isSchema) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    for(String tableOrSchema : tableOrSchemaList) {
+                        String revokeStmtSQL = "REVOKE ON " + (isSchema ? " SCHEMA " : " TABLE ") + tableOrSchema + " FROM "
+                                + ((ug instanceof String) ? (" GROUP " + "'" + ug + "'") : ("'" + ((User)ug).getShortName() + "'"));
+                        LOG.info("Revoke Permissions SQL: " + revokeStmtSQL);
+                        assertFalse(stmt.execute(revokeStmtSQL));
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction revokePermissions(final Object ug) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    String revokeStmtSQL = "REVOKE FROM " +
+                            ((ug instanceof String) ? (" GROUP " + "'" + ug + "'") : ("'" + ((User)ug).getShortName() + "'"));
+                    LOG.info("Revoke Permissions SQL: " + revokeStmtSQL);
+                    assertFalse(stmt.execute(revokeStmtSQL));
+                }
+                return null;
+            }
+        };
+    }
+
+    // Attempts to get a Phoenix Connection
+    // New connections could create SYSTEM tables if appropriate perms are granted
+    AccessTestAction getConnectionAction() throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection();) {
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction createSchema(final String schemaName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                if (isNamespaceMapped) {
+                    try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                        assertFalse(stmt.execute("CREATE SCHEMA " + schemaName));
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction dropSchema(final String schemaName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                if (isNamespaceMapped) {
+                    try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                        assertFalse(stmt.execute("DROP SCHEMA " + schemaName));
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction createTable(final String tableName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("CREATE TABLE " + tableName + "(pk INTEGER not null primary key, data VARCHAR, val integer)"));
+                    try (PreparedStatement pstmt = conn.prepareStatement("UPSERT INTO " + tableName + " values(?, ?, ?)")) {
+                        for (int i = 0; i < NUM_RECORDS; i++) {
+                            pstmt.setInt(1, i);
+                            pstmt.setString(2, Integer.toString(i));
+                            pstmt.setInt(3, i);
+                            assertEquals(1, pstmt.executeUpdate());
+                        }
+                    }
+                    conn.commit();
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction createMultiTenantTable(final String tableName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("CREATE TABLE " + tableName
+                            + "(ORG_ID VARCHAR NOT NULL, PREFIX CHAR(3) NOT NULL, DATA VARCHAR, VAL INTEGER CONSTRAINT PK PRIMARY KEY (ORG_ID, PREFIX))  MULTI_TENANT=TRUE"));
+                    try (PreparedStatement pstmt = conn.prepareStatement("UPSERT INTO " + tableName + " values(?, ?, ?, ?)")) {
+                        for (int i = 0; i < NUM_RECORDS; i++) {
+                            pstmt.setString(1, "o" + i);
+                            pstmt.setString(2, "pr" + i);
+                            pstmt.setString(3, Integer.toString(i));
+                            pstmt.setInt(4, i);
+                            assertEquals(1, pstmt.executeUpdate());
+                        }
+                    }
+                    conn.commit();
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction dropTable(final String tableName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("DROP TABLE IF EXISTS " + tableName));
+                }
+                return null;
+            }
+        };
+
+    }
+
+    // Attempts to read given table without verifying data
+    // AccessDeniedException is only triggered when ResultSet#next() method is called
+    // The first call triggers HBase Scan object
+    // The Statement#executeQuery() method returns an iterator and doesn't interact with HBase API at all
+    AccessTestAction readTableWithoutVerification(final String tableName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) {
+                    ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
+                    assertNotNull(rs);
+                    while (rs.next()) {
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction readTable(final String tableName) throws SQLException {
+        return readTable(tableName,null);
+    }
+
+    AccessTestAction readTable(final String tableName, final String indexName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) {
+                    String readTableSQL = "SELECT "+(indexName!=null?"/*+ INDEX("+tableName+" "+indexName+")*/":"")+" pk, data, val FROM " + tableName +" where data >= '0'";
+                    ResultSet rs = stmt.executeQuery(readTableSQL);
+                    assertNotNull(rs);
+                    int i = 0;
+                    while (rs.next()) {
+                        assertEquals(i, rs.getInt(1));
+                        assertEquals(Integer.toString(i), rs.getString(2));
+                        assertEquals(i, rs.getInt(3));
+                        i++;
+                    }
+                    assertEquals(NUM_RECORDS, i);
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction readMultiTenantTableWithoutIndex(final String tableName) throws SQLException {
+        return readMultiTenantTableWithoutIndex(tableName, null);
+    }
+
+    AccessTestAction readMultiTenantTableWithoutIndex(final String tableName, final String tenantId) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(tenantId); Statement stmt = conn.createStatement()) {
+                    // Accessing all the data from the table avoids the use of index
+                    String readTableSQL = "SELECT data, val FROM " + tableName;
+                    ResultSet rs = stmt.executeQuery(readTableSQL);
+                    assertNotNull(rs);
+                    int i = 0;
+                    String explainPlan = Joiner.on(" ").join(((PhoenixStatement)stmt).getQueryPlan().getExplainPlan().getPlanSteps());
+                    rs = stmt.executeQuery(readTableSQL);
+                    if(tenantId != null) {
+                        rs.next();
+                        assertFalse(explainPlan.contains("_IDX_"));
+                        assertEquals(((PhoenixConnection)conn).getTenantId().toString(), tenantId);
+                        // For tenant ID "o3", the value in table will be 3
+                        assertEquals(Character.toString(tenantId.charAt(1)), rs.getString(1));
+                        // Only 1 record is inserted per Tenant
+                        assertFalse(rs.next());
+                    } else {
+                        while(rs.next()) {
+                            assertEquals(Integer.toString(i), rs.getString(1));
+                            assertEquals(i, rs.getInt(2));
+                            i++;
+                        }
+                        assertEquals(NUM_RECORDS, i);
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction readMultiTenantTableWithIndex(final String tableName) throws SQLException {
+        return readMultiTenantTableWithIndex(tableName, null);
+    }
+
+    AccessTestAction readMultiTenantTableWithIndex(final String tableName, final String tenantId) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(tenantId); Statement stmt = conn.createStatement()) {
+                    // Accessing only the 'data' from the table uses index since index tables are built on 'data' column
+                    String readTableSQL = "SELECT data FROM " + tableName;
+                    ResultSet rs = stmt.executeQuery(readTableSQL);
+                    assertNotNull(rs);
+                    int i = 0;
+                    String explainPlan = Joiner.on(" ").join(((PhoenixStatement) stmt).getQueryPlan().getExplainPlan().getPlanSteps());
+                    assertTrue(explainPlan.contains("_IDX_"));
+                    rs = stmt.executeQuery(readTableSQL);
+                    if (tenantId != null) {
+                        rs.next();
+                        assertEquals(((PhoenixConnection) conn).getTenantId().toString(), tenantId);
+                        // For tenant ID "o3", the value in table will be 3
+                        assertEquals(Character.toString(tenantId.charAt(1)), rs.getString(1));
+                        // Only 1 record is inserted per Tenant
+                        assertFalse(rs.next());
+                    } else {
+                        while (rs.next()) {
+                            assertEquals(Integer.toString(i), rs.getString(1));
+                            i++;
+                        }
+                        assertEquals(NUM_RECORDS, i);
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction addProperties(final String tableName, final String property, final String value)
+            throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("ALTER TABLE " + tableName + " SET " + property + "=" + value));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction addColumn(final String tableName, final String columnName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("ALTER TABLE " + tableName + " ADD "+columnName+" varchar"));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction dropColumn(final String tableName, final String columnName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("ALTER TABLE " + tableName + " DROP COLUMN "+columnName));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction createIndex(final String indexName, final String dataTable) throws SQLException {
+        return createIndex(indexName, dataTable, null);
+    }
+
+    AccessTestAction createIndex(final String indexName, final String dataTable, final String tenantId) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+
+                try (Connection conn = getConnection(tenantId); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("CREATE INDEX " + indexName + " on " + dataTable + "(data)"));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction createLocalIndex(final String indexName, final String dataTable) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("CREATE LOCAL INDEX " + indexName + " on " + dataTable + "(data)"));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction dropIndex(final String indexName, final String dataTable) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("DROP INDEX " + indexName + " on " + dataTable));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction rebuildIndex(final String indexName, final String dataTable) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("ALTER INDEX " + indexName + " on " + dataTable + " DISABLE"));
+                    assertFalse(stmt.execute("ALTER INDEX " + indexName + " on " + dataTable + " REBUILD"));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction dropView(final String viewName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("DROP VIEW " + viewName));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction createView(final String viewName, final String dataTable) throws SQLException {
+        return createView(viewName, dataTable, null);
+    }
+
+    AccessTestAction createView(final String viewName, final String dataTable, final String tenantId) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(tenantId); Statement stmt = conn.createStatement();) {
+                    String viewStmtSQL = "CREATE VIEW " + viewName + " AS SELECT * FROM " + dataTable;
+                    assertFalse(stmt.execute(viewStmtSQL));
+                }
+                return null;
+            }
+        };
+    }
+
+    static interface AccessTestAction extends PrivilegedExceptionAction<Object> { }
+
+    /** This fails only in case of ADE or empty list for any of the users. */
+    void verifyAllowed(AccessTestAction action, User... users) throws Exception {
+        if(users.length == 0) {
+            throw new Exception("Action needs at least one user to run");
+        }
+        for (User user : users) {
+            verifyAllowed(user, action);
+        }
+    }
+
+    void verifyAllowed(User user, TableDDLPermissionsIT.AccessTestAction... actions) throws Exception {
+        for (TableDDLPermissionsIT.AccessTestAction action : actions) {
+            try {
+                Object obj = user.runAs(action);
+                if (obj != null && obj instanceof List<?>) {
+                    List<?> results = (List<?>) obj;
+                    if (results != null && results.isEmpty()) {
+                        fail("Empty non null results from action for user '" + user.getShortName() + "'");
+                    }
+                }
+            } catch (AccessDeniedException ade) {
+                fail("Expected action to pass for user '" + user.getShortName() + "' but was denied");
+            }
+        }
+    }
+
+    /** This passes only if desired exception is caught for all users. */
+    <T> void verifyDenied(AccessTestAction action, Class<T> exception, User... users) throws Exception {
+        if(users.length == 0) {
+            throw new Exception("Action needs at least one user to run");
+        }
+        for (User user : users) {
+            verifyDenied(user, exception, action);
+        }
+    }
+
+    /** This passes only if desired exception is caught for all users. */
+    <T> void verifyDenied(User user, Class<T> exception, TableDDLPermissionsIT.AccessTestAction... actions) throws Exception {
+        for (TableDDLPermissionsIT.AccessTestAction action : actions) {
+            try {
+                user.runAs(action);
+                fail("Expected exception was not thrown for user '" + user.getShortName() + "'");
+            } catch (IOException e) {
+                fail("Expected exception was not thrown for user '" + user.getShortName() + "'");
+            } catch (UndeclaredThrowableException ute) {
+                Throwable ex = ute.getUndeclaredThrowable();
+
+                // HBase AccessDeniedException(ADE) is handled in different ways in different parts of code
+                // 1. Wrap HBase ADE in PhoenixIOException (Mostly for create, delete statements)
+                // 2. Wrap HBase ADE in ExecutionException (Mostly for scans)
+                // 3. Directly throwing HBase ADE or custom msg with HBase ADE
+                // Thus we iterate over the chain of throwables and find ADE
+                for(Throwable throwable : Throwables.getCausalChain(ex)) {
+                    if(exception.equals(throwable.getClass())) {
+                        if(throwable instanceof AccessDeniedException) {
+                            validateAccessDeniedException((AccessDeniedException) throwable);
+                        }
+                        return;
+                    }
+                }
+
+            } catch(RuntimeException ex) {
+                // This can occur while accessing tabledescriptors from client by the unprivileged user
+                if (ex.getCause() instanceof AccessDeniedException) {
+                    // expected result
+                    validateAccessDeniedException((AccessDeniedException) ex.getCause());
+                    return;
+                }
+            }
+            fail("Expected exception was not thrown for user '" + user.getShortName() + "'");
+        }
+    }
+
+    void validateAccessDeniedException(AccessDeniedException ade) {
+        String msg = ade.getMessage();
+        assertTrue("Exception contained unexpected message: '" + msg + "'",
+                !msg.contains("is not the scanner owner"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java
new file mode 100644
index 0000000..2bf7fe1
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test that verifies a user can read Phoenix tables with a minimal set of permissions.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class ChangePermissionsIT extends BasePermissionsIT {
+
+    private static final Log LOG = LogFactory.getLog(ChangePermissionsIT.class);
+
+    private static final String SCHEMA_NAME = "CHANGEPERMSSCHEMA";
+    private static final String TABLE_NAME =
+            ChangePermissionsIT.class.getSimpleName().toUpperCase();
+    private static final String FULL_TABLE_NAME = SCHEMA_NAME + "." + TABLE_NAME;
+    private static final String IDX1_TABLE_NAME = TABLE_NAME + "_IDX1";
+    private static final String IDX2_TABLE_NAME = TABLE_NAME + "_IDX2";
+    private static final String IDX3_TABLE_NAME = TABLE_NAME + "_IDX3";
+    private static final String LOCAL_IDX1_TABLE_NAME = TABLE_NAME + "_LIDX1";
+    private static final String VIEW1_TABLE_NAME = TABLE_NAME + "_V1";
+    private static final String VIEW2_TABLE_NAME = TABLE_NAME + "_V2";
+
+    public ChangePermissionsIT(boolean isNamespaceMapped) throws Exception {
+        super(isNamespaceMapped);
+    }
+
+    private void grantSystemTableAccess(User superUser, User... users) throws Exception {
+        for(User user : users) {
+            if(isNamespaceMapped) {
+                verifyAllowed(grantPermissions("RX", user, QueryConstants.SYSTEM_SCHEMA_NAME, true), superUser);
+            } else {
+                verifyAllowed(grantPermissions("RX", user, PHOENIX_SYSTEM_TABLES_IDENTIFIERS, false), superUser);
+            }
+            verifyAllowed(grantPermissions("W", user, SYSTEM_SEQUENCE_IDENTIFIER, false), superUser);
+        }
+    }
+
+    private void revokeSystemTableAccess(User superUser, User... users) throws Exception {
+        for(User user : users) {
+            if(isNamespaceMapped) {
+                verifyAllowed(revokePermissions(user, QueryConstants.SYSTEM_SCHEMA_NAME, true), superUser);
+            } else {
+                verifyAllowed(revokePermissions(user, PHOENIX_SYSTEM_TABLES_IDENTIFIERS, false), superUser);
+            }
+            verifyAllowed(revokePermissions(user, SYSTEM_SEQUENCE_IDENTIFIER, false), superUser);
+        }
+    }
+
+    /**
+     * Verify that READ and EXECUTE permissions are required on SYSTEM tables to get a Phoenix Connection
+     * Tests grant revoke permissions per user 1. if NS enabled -> on namespace 2. If NS disabled -> on tables
+     */
+    @Test
+    public void testRXPermsReqdForPhoenixConn() throws Exception {
+
+        startNewMiniCluster();
+
+        if(isNamespaceMapped) {
+            // NS is enabled, CQSI tries creating SYSCAT, we get NamespaceNotFoundException exception for "SYSTEM" NS
+            // We create custom ADE and throw it (and ignore NamespaceNotFoundException)
+            // This is because we didn't had CREATE perms to create "SYSTEM" NS
+            verifyDenied(getConnectionAction(), AccessDeniedException.class, regularUser1);
+        } else {
+            // NS is disabled, CQSI tries creating SYSCAT, Two cases here
+            // 1. First client ever --> Gets ADE, runs client server compatibility check again and gets TableNotFoundException since SYSCAT doesn't exist
+            // 2. Any other client --> Gets ADE, runs client server compatibility check again and gets AccessDeniedException since it doesn't have EXEC perms
+            verifyDenied(getConnectionAction(), TableNotFoundException.class, regularUser1);
+        }
+
+        // Phoenix Client caches connection per user
+        // If we grant permissions, get a connection and then revoke it, we can still get the cached connection
+        // However it will fail for other read queries
+        // Thus this test grants and revokes for 2 users, so that both functionality can be tested.
+        grantSystemTableAccess(superUser1, regularUser1, regularUser2);
+        verifyAllowed(getConnectionAction(), regularUser1);
+        revokeSystemTableAccess(superUser1, regularUser2);
+        verifyDenied(getConnectionAction(), AccessDeniedException.class, regularUser2);
+    }
+
+    /**
+     * Superuser grants admin perms to user1, who will in-turn grant admin perms to user2
+     * Not affected with namespace props
+     * Tests grant revoke permissions on per user global level
+     */
+    @Test
+    public void testSuperUserCanChangePerms() throws Exception {
+
+        startNewMiniCluster();
+
+        // Grant System Table access to all users, else they can't create a Phoenix connection
+        grantSystemTableAccess(superUser1, regularUser1, regularUser2, unprivilegedUser);
+
+        verifyAllowed(grantPermissions("A", regularUser1), superUser1);
+
+        verifyAllowed(readTableWithoutVerification(PhoenixDatabaseMetaData.SYSTEM_CATALOG), regularUser1);
+        verifyAllowed(grantPermissions("A", regularUser2), regularUser1);
+
+        verifyAllowed(revokePermissions(regularUser1), superUser1);
+        verifyDenied(grantPermissions("A", regularUser3), AccessDeniedException.class, regularUser1);
+
+        // Don't grant ADMIN perms to unprivilegedUser, thus unprivilegedUser is unable to control other permissions.
+        verifyAllowed(getConnectionAction(), unprivilegedUser);
+        verifyDenied(grantPermissions("ARX", regularUser4), AccessDeniedException.class, unprivilegedUser);
+    }
+
+    /**
+     * Test to verify READ permissions on table, indexes and views
+     * Tests automatic grant revoke of permissions per user on a table
+     */
+    @Test
+    public void testReadPermsOnTableIndexAndView() throws Exception {
+
+        startNewMiniCluster();
+
+        grantSystemTableAccess(superUser1, regularUser1, regularUser2, unprivilegedUser);
+
+        // Create new schema and grant CREATE permissions to a user
+        if(isNamespaceMapped) {
+            verifyAllowed(createSchema(SCHEMA_NAME), superUser1);
+            verifyAllowed(grantPermissions("C", regularUser1, SCHEMA_NAME, true), superUser1);
+        } else {
+            verifyAllowed(grantPermissions("C", regularUser1, "\"" + SchemaUtil.SCHEMA_FOR_DEFAULT_NAMESPACE + "\"", true), superUser1);
+        }
+
+        // Create new table. Create indexes, views and view indexes on top of it. Verify the contents by querying it
+        verifyAllowed(createTable(FULL_TABLE_NAME), regularUser1);
+        verifyAllowed(readTable(FULL_TABLE_NAME), regularUser1);
+        verifyAllowed(createIndex(IDX1_TABLE_NAME, FULL_TABLE_NAME), regularUser1);
+        verifyAllowed(createIndex(IDX2_TABLE_NAME, FULL_TABLE_NAME), regularUser1);
+        verifyAllowed(createLocalIndex(LOCAL_IDX1_TABLE_NAME, FULL_TABLE_NAME), regularUser1);
+        verifyAllowed(createView(VIEW1_TABLE_NAME, FULL_TABLE_NAME), regularUser1);
+        verifyAllowed(createIndex(IDX3_TABLE_NAME, VIEW1_TABLE_NAME), regularUser1);
+
+        // RegularUser2 doesn't have any permissions. It can get a PhoenixConnection
+        // However it cannot query table, indexes or views without READ perms
+        verifyAllowed(getConnectionAction(), regularUser2);
+        verifyDenied(readTable(FULL_TABLE_NAME), AccessDeniedException.class, regularUser2);
+        verifyDenied(readTable(FULL_TABLE_NAME, IDX1_TABLE_NAME), AccessDeniedException.class, regularUser2);
+        verifyDenied(readTable(VIEW1_TABLE_NAME), AccessDeniedException.class, regularUser2);
+        verifyDenied(readTableWithoutVerification(SCHEMA_NAME + "." + IDX1_TABLE_NAME), AccessDeniedException.class, regularUser2);
+
+        // Grant READ permissions to RegularUser2 on the table
+        // Permissions should propagate automatically to relevant physical tables such as global index and view index.
+        verifyAllowed(grantPermissions("R", regularUser2, FULL_TABLE_NAME, false), regularUser1);
+        // Granting permissions directly to index tables should fail
+        verifyDenied(grantPermissions("W", regularUser2, SCHEMA_NAME + "." + IDX1_TABLE_NAME, false), AccessDeniedException.class, regularUser1);
+        // Granting permissions directly to views should fail. We expect TableNotFoundException since VIEWS are not physical tables
+        verifyDenied(grantPermissions("W", regularUser2, SCHEMA_NAME + "." + VIEW1_TABLE_NAME, false), TableNotFoundException.class, regularUser1);
+
+        // Verify that all other access are successful now
+        verifyAllowed(readTable(FULL_TABLE_NAME), regularUser2);
+        verifyAllowed(readTable(FULL_TABLE_NAME, IDX1_TABLE_NAME), regularUser2);
+        verifyAllowed(readTable(FULL_TABLE_NAME, IDX2_TABLE_NAME), regularUser2);
+        verifyAllowed(readTable(FULL_TABLE_NAME, LOCAL_IDX1_TABLE_NAME), regularUser2);
+        verifyAllowed(readTableWithoutVerification(SCHEMA_NAME + "." + IDX1_TABLE_NAME), regularUser2);
+        verifyAllowed(readTable(VIEW1_TABLE_NAME), regularUser2);
+        verifyAllowed(readMultiTenantTableWithIndex(VIEW1_TABLE_NAME), regularUser2);
+
+        // Revoke READ permissions to RegularUser2 on the table
+        // Permissions should propagate automatically to relevant physical tables such as global index and view index.
+        verifyAllowed(revokePermissions(regularUser2, FULL_TABLE_NAME, false), regularUser1);
+        // READ query should fail now
+        verifyDenied(readTable(FULL_TABLE_NAME), AccessDeniedException.class, regularUser2);
+        verifyDenied(readTableWithoutVerification(SCHEMA_NAME + "." + IDX1_TABLE_NAME), AccessDeniedException.class, regularUser2);
+
+    }
+
+    /**
+     * Verifies permissions for users present inside a group
+     */
+    @Test
+    public void testGroupUserPerms() throws Exception {
+
+        startNewMiniCluster();
+
+        if(isNamespaceMapped) {
+            verifyAllowed(createSchema(SCHEMA_NAME), superUser1);
+        }
+        verifyAllowed(createTable(FULL_TABLE_NAME), superUser1);
+
+        // Grant SYSTEM table access to GROUP_SYSTEM_ACCESS and regularUser1
+        verifyAllowed(grantPermissions("RX", GROUP_SYSTEM_ACCESS, PHOENIX_SYSTEM_TABLES_IDENTIFIERS, false), superUser1);
+        grantSystemTableAccess(superUser1, regularUser1);
+
+        // Grant Permissions to Groups (Should be automatically applicable to all users inside it)
+        verifyAllowed(grantPermissions("AR", GROUP_SYSTEM_ACCESS, FULL_TABLE_NAME, false), superUser1);
+        verifyAllowed(readTable(FULL_TABLE_NAME), groupUser);
+
+        // GroupUser is an admin and can grant perms to other users
+        verifyDenied(readTable(FULL_TABLE_NAME), AccessDeniedException.class, regularUser1);
+        verifyAllowed(grantPermissions("R", regularUser1, FULL_TABLE_NAME, false), groupUser);
+        verifyAllowed(readTable(FULL_TABLE_NAME), regularUser1);
+
+        // Revoke the perms and try accessing data again
+        verifyAllowed(revokePermissions(GROUP_SYSTEM_ACCESS, FULL_TABLE_NAME, false), superUser1);
+        verifyDenied(readTable(FULL_TABLE_NAME), AccessDeniedException.class, groupUser);
+    }
+
+    /**
+     * Tests permissions for MultiTenant Tables and view index tables
+     */
+    @Test
+    public void testMultiTenantTables() throws Exception {
+
+        startNewMiniCluster();
+
+        grantSystemTableAccess(superUser1, regularUser1, regularUser2, regularUser3);
+
+        if(isNamespaceMapped) {
+            verifyAllowed(createSchema(SCHEMA_NAME), superUser1);
+            verifyAllowed(grantPermissions("C", regularUser1, SCHEMA_NAME, true), superUser1);
+        } else {
+            verifyAllowed(grantPermissions("C", regularUser1, "\"" + SchemaUtil.SCHEMA_FOR_DEFAULT_NAMESPACE + "\"", true), superUser1);
+        }
+
+        // Create MultiTenant Table (View Index Table should be automatically created)
+        // At this point, the index table doesn't contain any data
+        verifyAllowed(createMultiTenantTable(FULL_TABLE_NAME), regularUser1);
+
+        // RegularUser2 doesn't have access yet, RegularUser1 should have RWXCA on the table
+        verifyDenied(readMultiTenantTableWithoutIndex(FULL_TABLE_NAME), AccessDeniedException.class, regularUser2);
+
+        // Grant perms to base table (Should propagate to View Index as well)
+        verifyAllowed(grantPermissions("R", regularUser2, FULL_TABLE_NAME, false), regularUser1);
+        // Try reading full table
+        verifyAllowed(readMultiTenantTableWithoutIndex(FULL_TABLE_NAME), regularUser2);
+
+        // Create tenant specific views on the table using tenant specific Phoenix Connection
+        verifyAllowed(createView(VIEW1_TABLE_NAME, FULL_TABLE_NAME, "o1"), regularUser1);
+        verifyAllowed(createView(VIEW2_TABLE_NAME, FULL_TABLE_NAME, "o2"), regularUser1);
+
+        // Create indexes on those views using tenant specific Phoenix Connection
+        // It is not possible to create indexes on tenant specific views without tenant connection
+        verifyAllowed(createIndex(IDX1_TABLE_NAME, VIEW1_TABLE_NAME, "o1"), regularUser1);
+        verifyAllowed(createIndex(IDX2_TABLE_NAME, VIEW2_TABLE_NAME, "o2"), regularUser1);
+
+        // Read the tables as regularUser2, with and without the use of Index table
+        // If perms are propagated correctly, then both of them should work
+        // The test checks if the query plan uses the index table by searching for "_IDX_" string
+        // _IDX_ is the prefix used with base table name to derieve the name of view index table
+        verifyAllowed(readMultiTenantTableWithIndex(VIEW1_TABLE_NAME, "o1"), regularUser2);
+        verifyAllowed(readMultiTenantTableWithoutIndex(VIEW2_TABLE_NAME, "o2"), regularUser2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
new file mode 100644
index 0000000..a3584ce
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+        props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
+        props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
+        props.put(QueryServices.COST_BASED_OPTIMIZER_ENABLED, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrdering1() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where c1 LIKE 'X0%' ORDER BY rowkey";
+            // Use the data table plan that opts out order-by when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
+                    plan.contains("FULL SCAN"));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setString(1, "k" + i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the index table plan that has a lower cost when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
+                    plan.contains("RANGE SCAN"));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrdering2() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+            // Use the index table plan that opts out order-by when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
+                    plan.contains("RANGE SCAN"));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setString(1, "k" + i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Given that the range on C1 is meaningless and group-by becomes
+            // order-preserving if using the data table, the data table plan should
+            // come out as the best plan based on the costs.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
+                    plan.contains("FULL SCAN"));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrdering3() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 INTEGER,\n" +
+                    "c2 INTEGER,\n" +
+                    "c3 INTEGER)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+            String query = "SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
+            // Use the idx2 plan with a wider PK slot span when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String indexPlan =
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
+                    "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+                    "CLIENT MERGE SORT";
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(indexPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                stmt.setString(1, "k" + i);
+                stmt.setInt(2, i);
+                stmt.setInt(3, i);
+                stmt.setInt(4, i);
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the idx2 plan that scans less data when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String dataPlan =
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
+                    "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
+                    "CLIENT MERGE SORT";
+            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(dataPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrderingInUpsertQuery() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 INTEGER,\n" +
+                    "c2 INTEGER,\n" +
+                    "c3 INTEGER)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+            String query = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
+            // Use the idx2 plan with a wider PK slot span when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String indexPlan =
+                    "UPSERT SELECT\n" +
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
+                            "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+                            "CLIENT MERGE SORT";
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(indexPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                stmt.setString(1, "k" + i);
+                stmt.setInt(2, i);
+                stmt.setInt(3, i);
+                stmt.setInt(4, i);
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the idx2 plan that scans less data when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String dataPlan =
+                    "UPSERT SELECT\n" +
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
+                            "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
+                            "CLIENT MERGE SORT";
+            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(dataPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrderingInDeleteQuery() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 INTEGER,\n" +
+                    "c2 INTEGER,\n" +
+                    "c3 INTEGER)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+            String query = "DELETE FROM " + tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
+            // Use the idx2 plan with a wider PK slot span when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String indexPlan =
+                    "DELETE ROWS\n" +
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [2,*] - [2,9,000]\n" +
+                            "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+                            "CLIENT MERGE SORT";
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(indexPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                stmt.setString(1, "k" + i);
+                stmt.setInt(2, i);
+                stmt.setInt(3, i);
+                stmt.setInt(4, i);
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the idx2 plan that scans less data when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String dataPlan =
+                    "DELETE ROWS\n" +
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,10] - [1,20]\n" +
+                            "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" +
+                            "CLIENT MERGE SORT";
+            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(dataPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrderingInUnionQuery() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName + " where rowkey LIKE 'k%' GROUP BY c1 "
+                    + "UNION ALL SELECT rowkey, max(c1), max(c2) FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+            // Use the default plan when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String defaultPlan =
+                    "UNION ALL OVER 2 QUERIES\n" +
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " ['k'] - ['l']\n" +
+                    "        SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
+                    "    CLIENT MERGE SORT\n" +
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
+                    "        SERVER FILTER BY FIRST KEY ONLY\n" +
+                    "        SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
+                    "    CLIENT MERGE SORT";
+            assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(defaultPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setString(1, "k" + i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the optimal plan based on cost when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String optimizedPlan =
+                    "UNION ALL OVER 2 QUERIES\n" +
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1]\n" +
+                    "        SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" LIKE 'k%'\n" +
+                    "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [\"C1\"]\n" +
+                    "    CLIENT MERGE SORT\n" +
+                    "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
+                    "        SERVER FILTER BY C1 LIKE 'X%'\n" +
+                    "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]";
+            assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(optimizedPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrderingInJoinQuery() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT t1.rowkey, t1.c1, t1.c2, mc1, mc2 FROM " + tableName + " t1 "
+                    + "JOIN (SELECT rowkey, max(c1) mc1, max(c2) mc2 FROM " + tableName + " where c1 LIKE 'X%' GROUP BY rowkey) t2 "
+                    + "ON t1.rowkey = t2.rowkey WHERE t1.c1 LIKE 'X0%' ORDER BY t1.rowkey";
+            // Use the default plan when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String defaultPlan =
+                    "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
+                    "    SERVER FILTER BY C1 LIKE 'X0%'\n" +
+                    "    PARALLEL INNER-JOIN TABLE 0\n" +
+                    "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " [1,'X'] - [1,'Y']\n" +
+                    "            SERVER FILTER BY FIRST KEY ONLY\n" +
+                    "            SERVER AGGREGATE INTO DISTINCT ROWS BY [\"ROWKEY\"]\n" +
+                    "        CLIENT MERGE SORT\n" +
+                    "    DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.ROWKEY)";
+            assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(defaultPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setString(1, "k" + i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the optimal plan based on cost when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String optimizedPlan =
+                    "CLIENT PARALLEL 626-WAY RANGE SCAN OVER " + tableName + " [1,'X0'] - [1,'X1']\n" +
+                    "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                    "    SERVER SORTED BY [\"T1.:ROWKEY\"]\n" +
+                    "CLIENT MERGE SORT\n" +
+                    "    PARALLEL INNER-JOIN TABLE 0\n" +
+                    "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" +
+                    "            SERVER FILTER BY C1 LIKE 'X%'\n" +
+                    "            SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [ROWKEY]\n" +
+                    "    DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN (T2.ROWKEY)";
+            assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(optimizedPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testHintOverridesCost() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (\n" +
+                    "rowkey INTEGER PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + "_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where rowkey between 1 and 10 ORDER BY c1";
+            String hintedQuery = query.replaceFirst("SELECT",
+                    "SELECT  /*+ INDEX(" + tableName + " " + tableName + "_idx) */");
+            String dataPlan = "SERVER SORTED BY [C1]";
+            String indexPlan = "SERVER FILTER BY FIRST KEY ONLY AND (\"ROWKEY\" >= 1 AND \"ROWKEY\" <= 10)";
+
+            // Use the index table plan that opts out order-by when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(indexPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setInt(1, i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the data table plan that has a lower cost when stats are available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(dataPlan));
+
+            // Use the index table plan as has been hinted.
+            rs = conn.createStatement().executeQuery("explain " + hintedQuery);
+            plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + ".",
+                    plan.contains(indexPlan));
+        } finally {
+            conn.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
index fe09dcd..8002dc1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
@@ -43,31 +43,61 @@ public class CreateSchemaIT extends ParallelStatsDisabledIT {
         Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
         props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
         String schemaName = generateUniqueName();
-        String ddl = "CREATE SCHEMA " + schemaName;
+        String schemaName1 = schemaName.toLowerCase();
+        String schemaName2 = schemaName.toLowerCase();
+        // Create unique name schema and verify that it exists
+        // ddl1 should create lowercase schemaName since it is passed in with double-quotes
+        // ddl2 should create uppercase schemaName since Phoenix upper-cases identifiers without quotes
+        // Both the statements should succeed
+        String ddl1 = "CREATE SCHEMA \"" + schemaName1 + "\"";
+        String ddl2 = "CREATE SCHEMA " + schemaName2;
         try (Connection conn = DriverManager.getConnection(getUrl(), props);
                 HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
-            conn.createStatement().execute(ddl);
-            assertNotNull(admin.getNamespaceDescriptor(schemaName));
+            conn.createStatement().execute(ddl1);
+            assertNotNull(admin.getNamespaceDescriptor(schemaName1));
+            conn.createStatement().execute(ddl2);
+            assertNotNull(admin.getNamespaceDescriptor(schemaName2.toUpperCase()));
         }
+        // Try creating it again and verify that it throws SchemaAlreadyExistsException
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            conn.createStatement().execute(ddl);
+            conn.createStatement().execute(ddl1);
             fail();
         } catch (SchemaAlreadyExistsException e) {
             // expected
         }
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        try {
-            conn.createStatement().execute("CREATE SCHEMA " + SchemaUtil.SCHEMA_FOR_DEFAULT_NAMESPACE);
-            fail();
-        } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.SCHEMA_NOT_ALLOWED.getErrorCode(), e.getErrorCode());
-        }
-        try {
-            conn.createStatement().execute("CREATE SCHEMA " + SchemaUtil.HBASE_NAMESPACE);
-            fail();
-        } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.SCHEMA_NOT_ALLOWED.getErrorCode(), e.getErrorCode());
+
+        // See PHOENIX-4424
+        // Create schema DEFAULT and HBASE (Should allow since they are upper-cased) and verify that it exists
+        // Create schema default and hbase and it should fail
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+             HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
+
+            // default is a SQL keyword, hence it should always be passed in double-quotes
+            try {
+                conn.createStatement().execute("CREATE SCHEMA \""
+                        + SchemaUtil.SCHEMA_FOR_DEFAULT_NAMESPACE + "\"");
+                fail();
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.SCHEMA_NOT_ALLOWED.getErrorCode(), e.getErrorCode());
+            }
+
+            try {
+                conn.createStatement().execute("CREATE SCHEMA \""
+                        + SchemaUtil.HBASE_NAMESPACE + "\"");
+                fail();
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.SCHEMA_NOT_ALLOWED.getErrorCode(), e.getErrorCode());
+            }
+
+            // default is a SQL keyword, hence it should always be passed in double-quotes
+            conn.createStatement().execute("CREATE SCHEMA \""
+                    + SchemaUtil.SCHEMA_FOR_DEFAULT_NAMESPACE.toUpperCase() + "\"");
+            conn.createStatement().execute("CREATE SCHEMA \""
+                    + SchemaUtil.HBASE_NAMESPACE.toUpperCase() + "\"");
+
+            assertNotNull(admin.getNamespaceDescriptor(SchemaUtil.SCHEMA_FOR_DEFAULT_NAMESPACE.toUpperCase()));
+            assertNotNull(admin.getNamespaceDescriptor(SchemaUtil.HBASE_NAMESPACE.toUpperCase()));
+
         }
-        conn.close();
     }
 }