You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/11/30 06:21:03 UTC

[2/2] phoenix git commit: PHOENIX-672 Add GRANT and REVOKE commands using HBase AccessController

PHOENIX-672 Add GRANT and REVOKE commands using HBase AccessController


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/88038a2d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/88038a2d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/88038a2d

Branch: refs/heads/master
Commit: 88038a2dacb7aa1a90015163d4d75d04793e4e11
Parents: 355ee52
Author: Karan Mehta <ka...@gmail.com>
Authored: Wed Nov 29 12:04:06 2017 -0800
Committer: Thomas D'Silva <td...@apache.org>
Committed: Wed Nov 29 20:39:19 2017 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/BasePermissionsIT.java      | 754 +++++++++++++++++++
 .../phoenix/end2end/ChangePermissionsIT.java    | 269 +++++++
 .../end2end/SystemTablePermissionsIT.java       | 226 +-----
 .../phoenix/end2end/TableDDLPermissionsIT.java  | 583 ++------------
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |  30 +-
 .../coprocessor/PhoenixAccessController.java    |  29 +-
 .../phoenix/exception/SQLExceptionCode.java     |   1 +
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  40 +-
 .../phoenix/parse/ChangePermsStatement.java     | 102 +++
 .../apache/phoenix/parse/ParseNodeFactory.java  |   7 +-
 .../query/ConnectionQueryServicesImpl.java      |  24 +-
 .../apache/phoenix/query/QueryConstants.java    |   1 +
 .../org/apache/phoenix/query/QueryServices.java |   2 -
 .../phoenix/query/QueryServicesOptions.java     |   8 +-
 .../apache/phoenix/schema/MetaDataClient.java   | 138 ++++
 .../schema/TablesNotInSyncException.java        |  22 +
 .../org/apache/phoenix/util/SchemaUtil.java     |  25 +-
 .../apache/phoenix/parse/QueryParserTest.java   |  46 +-
 18 files changed, 1544 insertions(+), 763 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/88038a2d/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
new file mode 100644
index 0000000..9d7ef1b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
@@ -0,0 +1,754 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+public class BasePermissionsIT extends BaseTest {
+
+    private static final Log LOG = LogFactory.getLog(BasePermissionsIT.class);
+
+    static String SUPERUSER;
+
+    static HBaseTestingUtility testUtil;
+    static final Set<String> PHOENIX_SYSTEM_TABLES = new HashSet<>(Arrays.asList(
+            "SYSTEM.CATALOG", "SYSTEM.SEQUENCE", "SYSTEM.STATS", "SYSTEM.FUNCTION"));
+
+    static final Set<String> PHOENIX_SYSTEM_TABLES_IDENTIFIERS = new HashSet<>(Arrays.asList(
+            "SYSTEM.\"CATALOG\"", "SYSTEM.\"SEQUENCE\"", "SYSTEM.\"STATS\"", "SYSTEM.\"FUNCTION\""));
+
+    static final String SYSTEM_SEQUENCE_IDENTIFIER =
+            QueryConstants.SYSTEM_SCHEMA_NAME + "." + "\"" + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE+ "\"";
+
+    static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = new HashSet<>(Arrays.asList(
+            "SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", "SYSTEM:FUNCTION"));
+
+    // Create Multiple users so that we can use Hadoop UGI to run tasks as various users
+    // Permissions can be granted or revoke by superusers and admins only
+    // DON'T USE HADOOP UserGroupInformation class to create testing users since HBase misses some of its functionality
+    // Instead use org.apache.hadoop.hbase.security.User class for testing purposes.
+
+    // Super User has all the access
+    User superUser1 = null;
+    User superUser2 = null;
+
+    // Regular users are granted and revoked permissions as needed
+    User regularUser1 = null;
+    User regularUser2 = null;
+    User regularUser3 = null;
+    User regularUser4 = null;
+
+    // Group User is equivalent of regular user but inside a group
+    // Permissions can be granted to group should affect this user
+    static final String GROUP_SYSTEM_ACCESS = "group_system_access";
+    User groupUser = null;
+
+    // Unpriviledged User doesn't have any access and is denied for every action
+    User unprivilegedUser = null;
+
+    static final int NUM_RECORDS = 5;
+
+    boolean isNamespaceMapped;
+
+    public BasePermissionsIT(final boolean isNamespaceMapped) throws Exception {
+        this.isNamespaceMapped = isNamespaceMapped;
+    }
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        SUPERUSER = System.getProperty("user.name");
+    }
+
+    void startNewMiniCluster() throws Exception {
+        startNewMiniCluster(new Configuration());
+    }
+    
+    void startNewMiniCluster(Configuration overrideConf) throws Exception{
+        if (null != testUtil) {
+            testUtil.shutdownMiniCluster();
+            testUtil = null;
+        }
+
+        testUtil = new HBaseTestingUtility();
+
+        Configuration config = testUtil.getConfiguration();
+        enablePhoenixHBaseAuthorization(config);
+        configureNamespacesOnServer(config);
+        configureRandomHMasterPort(config);
+        if (overrideConf != null) {
+            config.addResource(overrideConf);
+        }
+
+        testUtil.startMiniCluster(1);
+        initializeUsers(testUtil.getConfiguration());
+    }
+
+    private void initializeUsers(Configuration configuration) {
+
+        superUser1 = User.createUserForTesting(configuration, SUPERUSER, new String[0]);
+        superUser2 = User.createUserForTesting(configuration, "superUser2", new String[0]);
+
+        regularUser1 = User.createUserForTesting(configuration, "regularUser1", new String[0]);
+        regularUser2 = User.createUserForTesting(configuration, "regularUser2", new String[0]);
+        regularUser3 = User.createUserForTesting(configuration, "regularUser3", new String[0]);
+        regularUser4 = User.createUserForTesting(configuration, "regularUser4", new String[0]);
+
+        groupUser = User.createUserForTesting(testUtil.getConfiguration(), "groupUser", new String[] {GROUP_SYSTEM_ACCESS});
+
+        unprivilegedUser = User.createUserForTesting(configuration, "unprivilegedUser", new String[0]);
+    }
+
+    private void configureRandomHMasterPort(Configuration config) {
+        // Avoid multiple clusters trying to bind the master's info port (16010)
+        config.setInt(HConstants.MASTER_INFO_PORT, -1);
+    }
+
+    void enablePhoenixHBaseAuthorization(Configuration config) {
+        config.set("hbase.superuser", SUPERUSER + "," + "superUser2");
+        config.set("hbase.security.authorization", Boolean.TRUE.toString());
+        config.set("hbase.security.exec.permission.checks", Boolean.TRUE.toString());
+        config.set("hbase.coprocessor.master.classes",
+                "org.apache.hadoop.hbase.security.access.AccessController");
+        config.set("hbase.coprocessor.region.classes",
+                "org.apache.hadoop.hbase.security.access.AccessController");
+        config.set("hbase.coprocessor.regionserver.classes",
+                "org.apache.hadoop.hbase.security.access.AccessController");
+
+        config.set(QueryServices.PHOENIX_ACLS_ENABLED,"true");
+
+        config.set("hbase.regionserver.wal.codec", "org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec");
+    }
+
+    void configureNamespacesOnServer(Configuration conf) {
+        conf.set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
+    }
+
+    @Parameterized.Parameters(name = "isNamespaceMapped={0}") // name is used by failsafe as file name in reports
+    public static Collection<Boolean> data() {
+        return Arrays.asList(false, true);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        if (testUtil != null) {
+            testUtil.shutdownMiniCluster();
+            testUtil = null;
+        }
+    }
+
+    public static HBaseTestingUtility getUtility(){
+        return testUtil;
+    }
+
+    // Utility functions to grant permissions with HBase API
+    void grantPermissions(String toUser, Set<String> tablesToGrant, Permission.Action... actions) throws Throwable {
+        for (String table : tablesToGrant) {
+            AccessControlClient.grant(getUtility().getConnection(), TableName.valueOf(table), toUser, null, null,
+                    actions);
+        }
+    }
+
+    void grantPermissions(String toUser, String namespace, Permission.Action... actions) throws Throwable {
+        AccessControlClient.grant(getUtility().getConnection(), namespace, toUser, actions);
+    }
+
+    void grantPermissions(String groupEntry, Permission.Action... actions) throws IOException, Throwable {
+        AccessControlClient.grant(getUtility().getConnection(), groupEntry, actions);
+    }
+
+    // Utility functions to revoke permissions with HBase API
+    void revokeAll() throws Throwable {
+        AccessControlClient.revoke(getUtility().getConnection(), AuthUtil.toGroupEntry(GROUP_SYSTEM_ACCESS), Permission.Action.values() );
+        AccessControlClient.revoke(getUtility().getConnection(), regularUser1.getShortName(), Permission.Action.values() );
+        AccessControlClient.revoke(getUtility().getConnection(), unprivilegedUser.getShortName(), Permission.Action.values() );
+    }
+
+    Properties getClientProperties(String tenantId) {
+        Properties props = new Properties();
+        if(tenantId != null) {
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
+        return props;
+    }
+
+    public Connection getConnection() throws SQLException {
+        return getConnection(null);
+    }
+
+    public Connection getConnection(String tenantId) throws SQLException {
+        return DriverManager.getConnection(getUrl(), getClientProperties(tenantId));
+    }
+
+    protected static String getUrl() {
+        return "jdbc:phoenix:localhost:" + testUtil.getZkCluster().getClientPort() + ":/hbase";
+    }
+
+    static Set<String> getHBaseTables() throws IOException {
+        Set<String> tables = new HashSet<>();
+        for (TableName tn : testUtil.getHBaseAdmin().listTableNames()) {
+            tables.add(tn.getNameAsString());
+        }
+        return tables;
+    }
+
+    // UG Object
+    // 1. Instance of String --> represents GROUP name
+    // 2. Instance of User --> represents HBase user
+    AccessTestAction grantPermissions(final String actions, final Object ug,
+                                      final String tableOrSchemaList, final boolean isSchema) throws SQLException {
+        return grantPermissions(actions, ug, Collections.singleton(tableOrSchemaList), isSchema);
+    }
+
+    AccessTestAction grantPermissions(final String actions, final Object ug,
+                                      final Set<String> tableOrSchemaList, final boolean isSchema) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    for(String tableOrSchema : tableOrSchemaList) {
+                        String grantStmtSQL = "GRANT '" + actions + "' ON " + (isSchema ? " SCHEMA " : " TABLE ") + tableOrSchema + " TO "
+                                + ((ug instanceof String) ? (" GROUP " + "'" + ug + "'") : ("'" + ((User)ug).getShortName() + "'"));
+                        LOG.info("Grant Permissions SQL: " + grantStmtSQL);
+                        assertFalse(stmt.execute(grantStmtSQL));
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction grantPermissions(final String actions, final User user) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    String grantStmtSQL = "GRANT '" + actions + "' TO " + " '" + user.getShortName() + "'";
+                    LOG.info("Grant Permissions SQL: " + grantStmtSQL);
+                    assertFalse(stmt.execute(grantStmtSQL));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction revokePermissions(final Object ug,
+                                       final String tableOrSchemaList, final boolean isSchema) throws SQLException {
+        return revokePermissions(ug, Collections.singleton(tableOrSchemaList), isSchema);
+    }
+
+    AccessTestAction revokePermissions(final Object ug,
+                                       final Set<String> tableOrSchemaList, final boolean isSchema) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    for(String tableOrSchema : tableOrSchemaList) {
+                        String revokeStmtSQL = "REVOKE ON " + (isSchema ? " SCHEMA " : " TABLE ") + tableOrSchema + " FROM "
+                                + ((ug instanceof String) ? (" GROUP " + "'" + ug + "'") : ("'" + ((User)ug).getShortName() + "'"));
+                        LOG.info("Revoke Permissions SQL: " + revokeStmtSQL);
+                        assertFalse(stmt.execute(revokeStmtSQL));
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction revokePermissions(final Object ug) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    String revokeStmtSQL = "REVOKE FROM " +
+                            ((ug instanceof String) ? (" GROUP " + "'" + ug + "'") : ("'" + ((User)ug).getShortName() + "'"));
+                    LOG.info("Revoke Permissions SQL: " + revokeStmtSQL);
+                    assertFalse(stmt.execute(revokeStmtSQL));
+                }
+                return null;
+            }
+        };
+    }
+
+    // Attempts to get a Phoenix Connection
+    // New connections could create SYSTEM tables if appropriate perms are granted
+    AccessTestAction getConnectionAction() throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection();) {
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction createSchema(final String schemaName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                if (isNamespaceMapped) {
+                    try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                        assertFalse(stmt.execute("CREATE SCHEMA " + schemaName));
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction dropSchema(final String schemaName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                if (isNamespaceMapped) {
+                    try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                        assertFalse(stmt.execute("DROP SCHEMA " + schemaName));
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction createTable(final String tableName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("CREATE TABLE " + tableName + "(pk INTEGER not null primary key, data VARCHAR, val integer)"));
+                    try (PreparedStatement pstmt = conn.prepareStatement("UPSERT INTO " + tableName + " values(?, ?, ?)")) {
+                        for (int i = 0; i < NUM_RECORDS; i++) {
+                            pstmt.setInt(1, i);
+                            pstmt.setString(2, Integer.toString(i));
+                            pstmt.setInt(3, i);
+                            assertEquals(1, pstmt.executeUpdate());
+                        }
+                    }
+                    conn.commit();
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction createMultiTenantTable(final String tableName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("CREATE TABLE " + tableName
+                            + "(ORG_ID VARCHAR NOT NULL, PREFIX CHAR(3) NOT NULL, DATA VARCHAR, VAL INTEGER CONSTRAINT PK PRIMARY KEY (ORG_ID, PREFIX))  MULTI_TENANT=TRUE"));
+                    try (PreparedStatement pstmt = conn.prepareStatement("UPSERT INTO " + tableName + " values(?, ?, ?, ?)")) {
+                        for (int i = 0; i < NUM_RECORDS; i++) {
+                            pstmt.setString(1, "o" + i);
+                            pstmt.setString(2, "pr" + i);
+                            pstmt.setString(3, Integer.toString(i));
+                            pstmt.setInt(4, i);
+                            assertEquals(1, pstmt.executeUpdate());
+                        }
+                    }
+                    conn.commit();
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction dropTable(final String tableName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("DROP TABLE IF EXISTS " + tableName));
+                }
+                return null;
+            }
+        };
+
+    }
+
+    // Attempts to read given table without verifying data
+    // AccessDeniedException is only triggered when ResultSet#next() method is called
+    // The first call triggers HBase Scan object
+    // The Statement#executeQuery() method returns an iterator and doesn't interact with HBase API at all
+    AccessTestAction readTableWithoutVerification(final String tableName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) {
+                    ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
+                    assertNotNull(rs);
+                    while (rs.next()) {
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction readTable(final String tableName) throws SQLException {
+        return readTable(tableName,null);
+    }
+
+    AccessTestAction readTable(final String tableName, final String indexName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) {
+                    String readTableSQL = "SELECT "+(indexName!=null?"/*+ INDEX("+tableName+" "+indexName+")*/":"")+" pk, data, val FROM " + tableName +" where data >= '0'";
+                    ResultSet rs = stmt.executeQuery(readTableSQL);
+                    assertNotNull(rs);
+                    int i = 0;
+                    while (rs.next()) {
+                        assertEquals(i, rs.getInt(1));
+                        assertEquals(Integer.toString(i), rs.getString(2));
+                        assertEquals(i, rs.getInt(3));
+                        i++;
+                    }
+                    assertEquals(NUM_RECORDS, i);
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction readMultiTenantTableWithoutIndex(final String tableName) throws SQLException {
+        return readMultiTenantTableWithoutIndex(tableName, null);
+    }
+
+    AccessTestAction readMultiTenantTableWithoutIndex(final String tableName, final String tenantId) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(tenantId); Statement stmt = conn.createStatement()) {
+                    // Accessing all the data from the table avoids the use of index
+                    String readTableSQL = "SELECT data, val FROM " + tableName;
+                    ResultSet rs = stmt.executeQuery(readTableSQL);
+                    assertNotNull(rs);
+                    int i = 0;
+                    String explainPlan = Joiner.on(" ").join(((PhoenixStatement)stmt).getQueryPlan().getExplainPlan().getPlanSteps());
+                    rs = stmt.executeQuery(readTableSQL);
+                    if(tenantId != null) {
+                        rs.next();
+                        assertFalse(explainPlan.contains("_IDX_"));
+                        assertEquals(((PhoenixConnection)conn).getTenantId().toString(), tenantId);
+                        // For tenant ID "o3", the value in table will be 3
+                        assertEquals(Character.toString(tenantId.charAt(1)), rs.getString(1));
+                        // Only 1 record is inserted per Tenant
+                        assertFalse(rs.next());
+                    } else {
+                        while(rs.next()) {
+                            assertEquals(Integer.toString(i), rs.getString(1));
+                            assertEquals(i, rs.getInt(2));
+                            i++;
+                        }
+                        assertEquals(NUM_RECORDS, i);
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction readMultiTenantTableWithIndex(final String tableName) throws SQLException {
+        return readMultiTenantTableWithIndex(tableName, null);
+    }
+
+    AccessTestAction readMultiTenantTableWithIndex(final String tableName, final String tenantId) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(tenantId); Statement stmt = conn.createStatement()) {
+                    // Accessing only the 'data' from the table uses index since index tables are built on 'data' column
+                    String readTableSQL = "SELECT data FROM " + tableName;
+                    ResultSet rs = stmt.executeQuery(readTableSQL);
+                    assertNotNull(rs);
+                    int i = 0;
+                    String explainPlan = Joiner.on(" ").join(((PhoenixStatement) stmt).getQueryPlan().getExplainPlan().getPlanSteps());
+                    assertTrue(explainPlan.contains("_IDX_"));
+                    rs = stmt.executeQuery(readTableSQL);
+                    if (tenantId != null) {
+                        rs.next();
+                        assertEquals(((PhoenixConnection) conn).getTenantId().toString(), tenantId);
+                        // For tenant ID "o3", the value in table will be 3
+                        assertEquals(Character.toString(tenantId.charAt(1)), rs.getString(1));
+                        // Only 1 record is inserted per Tenant
+                        assertFalse(rs.next());
+                    } else {
+                        while (rs.next()) {
+                            assertEquals(Integer.toString(i), rs.getString(1));
+                            i++;
+                        }
+                        assertEquals(NUM_RECORDS, i);
+                    }
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction addProperties(final String tableName, final String property, final String value)
+            throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("ALTER TABLE " + tableName + " SET " + property + "=" + value));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction addColumn(final String tableName, final String columnName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("ALTER TABLE " + tableName + " ADD "+columnName+" varchar"));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction dropColumn(final String tableName, final String columnName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("ALTER TABLE " + tableName + " DROP COLUMN "+columnName));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction createIndex(final String indexName, final String dataTable) throws SQLException {
+        return createIndex(indexName, dataTable, null);
+    }
+
+    AccessTestAction createIndex(final String indexName, final String dataTable, final String tenantId) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+
+                try (Connection conn = getConnection(tenantId); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("CREATE INDEX " + indexName + " on " + dataTable + "(data)"));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction createLocalIndex(final String indexName, final String dataTable) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("CREATE LOCAL INDEX " + indexName + " on " + dataTable + "(data)"));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction dropIndex(final String indexName, final String dataTable) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("DROP INDEX " + indexName + " on " + dataTable));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction rebuildIndex(final String indexName, final String dataTable) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("ALTER INDEX " + indexName + " on " + dataTable + " DISABLE"));
+                    assertFalse(stmt.execute("ALTER INDEX " + indexName + " on " + dataTable + " REBUILD"));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction dropView(final String viewName) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                    assertFalse(stmt.execute("DROP VIEW " + viewName));
+                }
+                return null;
+            }
+        };
+    }
+
+    AccessTestAction createView(final String viewName, final String dataTable) throws SQLException {
+        return createView(viewName, dataTable, null);
+    }
+
+    AccessTestAction createView(final String viewName, final String dataTable, final String tenantId) throws SQLException {
+        return new AccessTestAction() {
+            @Override
+            public Object run() throws Exception {
+                try (Connection conn = getConnection(tenantId); Statement stmt = conn.createStatement();) {
+                    String viewStmtSQL = "CREATE VIEW " + viewName + " AS SELECT * FROM " + dataTable;
+                    assertFalse(stmt.execute(viewStmtSQL));
+                }
+                return null;
+            }
+        };
+    }
+
+    static interface AccessTestAction extends PrivilegedExceptionAction<Object> { }
+
+    /** This fails only in case of ADE or empty list for any of the users. */
+    void verifyAllowed(AccessTestAction action, User... users) throws Exception {
+        if(users.length == 0) {
+            throw new Exception("Action needs at least one user to run");
+        }
+        for (User user : users) {
+            verifyAllowed(user, action);
+        }
+    }
+
+    void verifyAllowed(User user, TableDDLPermissionsIT.AccessTestAction... actions) throws Exception {
+        for (TableDDLPermissionsIT.AccessTestAction action : actions) {
+            try {
+                Object obj = user.runAs(action);
+                if (obj != null && obj instanceof List<?>) {
+                    List<?> results = (List<?>) obj;
+                    if (results != null && results.isEmpty()) {
+                        fail("Empty non null results from action for user '" + user.getShortName() + "'");
+                    }
+                }
+            } catch (AccessDeniedException ade) {
+                fail("Expected action to pass for user '" + user.getShortName() + "' but was denied");
+            }
+        }
+    }
+
+    /** This passes only if desired exception is caught for all users. */
+    <T> void verifyDenied(AccessTestAction action, Class<T> exception, User... users) throws Exception {
+        if(users.length == 0) {
+            throw new Exception("Action needs at least one user to run");
+        }
+        for (User user : users) {
+            verifyDenied(user, exception, action);
+        }
+    }
+
+    /** This passes only if desired exception is caught for all users. */
+    <T> void verifyDenied(User user, Class<T> exception, TableDDLPermissionsIT.AccessTestAction... actions) throws Exception {
+        for (TableDDLPermissionsIT.AccessTestAction action : actions) {
+            try {
+                user.runAs(action);
+                fail("Expected exception was not thrown for user '" + user.getShortName() + "'");
+            } catch (IOException e) {
+                fail("Expected exception was not thrown for user '" + user.getShortName() + "'");
+            } catch (UndeclaredThrowableException ute) {
+                Throwable ex = ute.getUndeclaredThrowable();
+
+                // HBase AccessDeniedException(ADE) is handled in different ways in different parts of code
+                // 1. Wrap HBase ADE in PhoenixIOException (Mostly for create, delete statements)
+                // 2. Wrap HBase ADE in ExecutionException (Mostly for scans)
+                // 3. Directly throwing HBase ADE or custom msg with HBase ADE
+                // Thus we iterate over the chain of throwables and find ADE
+                for(Throwable throwable : Throwables.getCausalChain(ex)) {
+                    if(exception.equals(throwable.getClass())) {
+                        if(throwable instanceof AccessDeniedException) {
+                            validateAccessDeniedException((AccessDeniedException) throwable);
+                        }
+                        return;
+                    }
+                }
+
+            } catch(RuntimeException ex) {
+                // This can occur while accessing tabledescriptors from client by the unprivileged user
+                if (ex.getCause() instanceof AccessDeniedException) {
+                    // expected result
+                    validateAccessDeniedException((AccessDeniedException) ex.getCause());
+                    return;
+                }
+            }
+            fail("Expected exception was not thrown for user '" + user.getShortName() + "'");
+        }
+    }
+
+    void validateAccessDeniedException(AccessDeniedException ade) {
+        String msg = ade.getMessage();
+        assertTrue("Exception contained unexpected message: '" + msg + "'",
+                !msg.contains("is not the scanner owner"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88038a2d/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java
new file mode 100644
index 0000000..c023440
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test that verifies a user can read Phoenix tables with a minimal set of permissions.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class ChangePermissionsIT extends BasePermissionsIT {
+
+    private static final Log LOG = LogFactory.getLog(ChangePermissionsIT.class);
+
+    private static final String SCHEMA_NAME = "CHANGEPERMSSCHEMA";
+    private static final String TABLE_NAME =
+            ChangePermissionsIT.class.getSimpleName().toUpperCase();
+    private static final String FULL_TABLE_NAME = SCHEMA_NAME + "." + TABLE_NAME;
+    private static final String IDX1_TABLE_NAME = TABLE_NAME + "_IDX1";
+    private static final String IDX2_TABLE_NAME = TABLE_NAME + "_IDX2";
+    private static final String IDX3_TABLE_NAME = TABLE_NAME + "_IDX3";
+    private static final String LOCAL_IDX1_TABLE_NAME = TABLE_NAME + "_LIDX1";
+    private static final String VIEW1_TABLE_NAME = TABLE_NAME + "_V1";
+    private static final String VIEW2_TABLE_NAME = TABLE_NAME + "_V2";
+
+    public ChangePermissionsIT(boolean isNamespaceMapped) throws Exception {
+        super(isNamespaceMapped);
+    }
+
+    private void grantSystemTableAccess(User superUser, User... users) throws Exception {
+        for(User user : users) {
+            if(isNamespaceMapped) {
+                verifyAllowed(grantPermissions("RX", user, QueryConstants.SYSTEM_SCHEMA_NAME, true), superUser);
+            } else {
+                verifyAllowed(grantPermissions("RX", user, PHOENIX_SYSTEM_TABLES_IDENTIFIERS, false), superUser);
+            }
+            verifyAllowed(grantPermissions("W", user, SYSTEM_SEQUENCE_IDENTIFIER, false), superUser);
+        }
+    }
+
+    private void revokeSystemTableAccess(User superUser, User... users) throws Exception {
+        for(User user : users) {
+            if(isNamespaceMapped) {
+                verifyAllowed(revokePermissions(user, QueryConstants.SYSTEM_SCHEMA_NAME, true), superUser);
+            } else {
+                verifyAllowed(revokePermissions(user, PHOENIX_SYSTEM_TABLES_IDENTIFIERS, false), superUser);
+            }
+            verifyAllowed(revokePermissions(user, SYSTEM_SEQUENCE_IDENTIFIER, false), superUser);
+        }
+    }
+
+    /**
+     * Verify that READ and EXECUTE permissions are required on SYSTEM tables to get a Phoenix Connection
+     * Tests grant revoke permissions per user 1. if NS enabled -> on namespace 2. If NS disabled -> on tables
+     */
+    @Test
+    public void testRXPermsReqdForPhoenixConn() throws Exception {
+
+        startNewMiniCluster();
+
+        if(isNamespaceMapped) {
+            // NS is enabled, CQSI tries creating SYSCAT, we get NamespaceNotFoundException exception for "SYSTEM" NS
+            // We create custom ADE and throw it (and ignore NamespaceNotFoundException)
+            // This is because we didn't had CREATE perms to create "SYSTEM" NS
+            verifyDenied(getConnectionAction(), AccessDeniedException.class, regularUser1);
+        } else {
+            // NS is disabled, CQSI tries creating SYSCAT, Two cases here
+            // 1. First client ever --> Gets ADE, runs client server compatibility check again and gets TableNotFoundException since SYSCAT doesn't exist
+            // 2. Any other client --> Gets ADE, runs client server compatibility check again and gets AccessDeniedException since it doesn't have EXEC perms
+            verifyDenied(getConnectionAction(), TableNotFoundException.class, regularUser1);
+        }
+
+        // Phoenix Client caches connection per user
+        // If we grant permissions, get a connection and then revoke it, we can still get the cached connection
+        // However it will fail for other read queries
+        // Thus this test grants and revokes for 2 users, so that both functionality can be tested.
+        grantSystemTableAccess(superUser1, regularUser1, regularUser2);
+        verifyAllowed(getConnectionAction(), regularUser1);
+        revokeSystemTableAccess(superUser1, regularUser2);
+        verifyDenied(getConnectionAction(), AccessDeniedException.class, regularUser2);
+    }
+
+    /**
+     * Superuser grants admin perms to user1, who will in-turn grant admin perms to user2
+     * Not affected with namespace props
+     * Tests grant revoke permissions on per user global level
+     */
+    @Test
+    public void testSuperUserCanChangePerms() throws Exception {
+
+        startNewMiniCluster();
+
+        // Grant System Table access to all users, else they can't create a Phoenix connection
+        grantSystemTableAccess(superUser1, regularUser1, regularUser2, unprivilegedUser);
+
+        verifyAllowed(grantPermissions("A", regularUser1), superUser1);
+
+        verifyAllowed(readTableWithoutVerification(PhoenixDatabaseMetaData.SYSTEM_CATALOG), regularUser1);
+        verifyAllowed(grantPermissions("A", regularUser2), regularUser1);
+
+        verifyAllowed(revokePermissions(regularUser1), superUser1);
+        verifyDenied(grantPermissions("A", regularUser3), AccessDeniedException.class, regularUser1);
+
+        // Don't grant ADMIN perms to unprivilegedUser, thus unprivilegedUser is unable to control other permissions.
+        verifyAllowed(getConnectionAction(), unprivilegedUser);
+        verifyDenied(grantPermissions("ARX", regularUser4), AccessDeniedException.class, unprivilegedUser);
+    }
+
+    /**
+     * Test to verify READ permissions on table, indexes and views
+     * Tests automatic grant revoke of permissions per user on a table
+     */
+    @Test
+    public void testReadPermsOnTableIndexAndView() throws Exception {
+
+        startNewMiniCluster();
+
+        grantSystemTableAccess(superUser1, regularUser1, regularUser2, unprivilegedUser);
+
+        // Create new schema and grant CREATE permissions to a user
+        if(isNamespaceMapped) {
+            verifyAllowed(createSchema(SCHEMA_NAME), superUser1);
+            verifyAllowed(grantPermissions("C", regularUser1, SCHEMA_NAME, true), superUser1);
+        } else {
+            verifyAllowed(grantPermissions("C", regularUser1, "\"" + QueryConstants.HBASE_DEFAULT_SCHEMA_NAME + "\"", true), superUser1);
+        }
+
+        // Create new table. Create indexes, views and view indexes on top of it. Verify the contents by querying it
+        verifyAllowed(createTable(FULL_TABLE_NAME), regularUser1);
+        verifyAllowed(readTable(FULL_TABLE_NAME), regularUser1);
+        verifyAllowed(createIndex(IDX1_TABLE_NAME, FULL_TABLE_NAME), regularUser1);
+        verifyAllowed(createIndex(IDX2_TABLE_NAME, FULL_TABLE_NAME), regularUser1);
+        verifyAllowed(createLocalIndex(LOCAL_IDX1_TABLE_NAME, FULL_TABLE_NAME), regularUser1);
+        verifyAllowed(createView(VIEW1_TABLE_NAME, FULL_TABLE_NAME), regularUser1);
+        verifyAllowed(createIndex(IDX3_TABLE_NAME, VIEW1_TABLE_NAME), regularUser1);
+
+        // RegularUser2 doesn't have any permissions. It can get a PhoenixConnection
+        // However it cannot query table, indexes or views without READ perms
+        verifyAllowed(getConnectionAction(), regularUser2);
+        verifyDenied(readTable(FULL_TABLE_NAME), AccessDeniedException.class, regularUser2);
+        verifyDenied(readTable(FULL_TABLE_NAME, IDX1_TABLE_NAME), AccessDeniedException.class, regularUser2);
+        verifyDenied(readTable(VIEW1_TABLE_NAME), AccessDeniedException.class, regularUser2);
+        verifyDenied(readTableWithoutVerification(SCHEMA_NAME + "." + IDX1_TABLE_NAME), AccessDeniedException.class, regularUser2);
+
+        // Grant READ permissions to RegularUser2 on the table
+        // Permissions should propagate automatically to relevant physical tables such as global index and view index.
+        verifyAllowed(grantPermissions("R", regularUser2, FULL_TABLE_NAME, false), regularUser1);
+        // Granting permissions directly to index tables should fail
+        verifyDenied(grantPermissions("W", regularUser2, SCHEMA_NAME + "." + IDX1_TABLE_NAME, false), AccessDeniedException.class, regularUser1);
+        // Granting permissions directly to views should fail. We expect TableNotFoundException since VIEWS are not physical tables
+        verifyDenied(grantPermissions("W", regularUser2, SCHEMA_NAME + "." + VIEW1_TABLE_NAME, false), TableNotFoundException.class, regularUser1);
+
+        // Verify that all other access are successful now
+        verifyAllowed(readTable(FULL_TABLE_NAME), regularUser2);
+        verifyAllowed(readTable(FULL_TABLE_NAME, IDX1_TABLE_NAME), regularUser2);
+        verifyAllowed(readTable(FULL_TABLE_NAME, IDX2_TABLE_NAME), regularUser2);
+        verifyAllowed(readTable(FULL_TABLE_NAME, LOCAL_IDX1_TABLE_NAME), regularUser2);
+        verifyAllowed(readTableWithoutVerification(SCHEMA_NAME + "." + IDX1_TABLE_NAME), regularUser2);
+        verifyAllowed(readTable(VIEW1_TABLE_NAME), regularUser2);
+        verifyAllowed(readMultiTenantTableWithIndex(VIEW1_TABLE_NAME), regularUser2);
+
+        // Revoke READ permissions to RegularUser2 on the table
+        // Permissions should propagate automatically to relevant physical tables such as global index and view index.
+        verifyAllowed(revokePermissions(regularUser2, FULL_TABLE_NAME, false), regularUser1);
+        // READ query should fail now
+        verifyDenied(readTable(FULL_TABLE_NAME), AccessDeniedException.class, regularUser2);
+        verifyDenied(readTableWithoutVerification(SCHEMA_NAME + "." + IDX1_TABLE_NAME), AccessDeniedException.class, regularUser2);
+
+    }
+
+    /**
+     * Verifies permissions for users present inside a group
+     */
+    @Test
+    public void testGroupUserPerms() throws Exception {
+
+        startNewMiniCluster();
+
+        if(isNamespaceMapped) {
+            verifyAllowed(createSchema(SCHEMA_NAME), superUser1);
+        }
+        verifyAllowed(createTable(FULL_TABLE_NAME), superUser1);
+
+        // Grant SYSTEM table access to GROUP_SYSTEM_ACCESS and regularUser1
+        verifyAllowed(grantPermissions("RX", GROUP_SYSTEM_ACCESS, PHOENIX_SYSTEM_TABLES_IDENTIFIERS, false), superUser1);
+        grantSystemTableAccess(superUser1, regularUser1);
+
+        // Grant Permissions to Groups (Should be automatically applicable to all users inside it)
+        verifyAllowed(grantPermissions("AR", GROUP_SYSTEM_ACCESS, FULL_TABLE_NAME, false), superUser1);
+        verifyAllowed(readTable(FULL_TABLE_NAME), groupUser);
+
+        // GroupUser is an admin and can grant perms to other users
+        verifyDenied(readTable(FULL_TABLE_NAME), AccessDeniedException.class, regularUser1);
+        verifyAllowed(grantPermissions("R", regularUser1, FULL_TABLE_NAME, false), groupUser);
+        verifyAllowed(readTable(FULL_TABLE_NAME), regularUser1);
+
+        // Revoke the perms and try accessing data again
+        verifyAllowed(revokePermissions(GROUP_SYSTEM_ACCESS, FULL_TABLE_NAME, false), superUser1);
+        verifyDenied(readTable(FULL_TABLE_NAME), AccessDeniedException.class, groupUser);
+    }
+
+    /**
+     * Tests permissions for MultiTenant Tables and view index tables
+     */
+    @Test
+    public void testMultiTenantTables() throws Exception {
+
+        startNewMiniCluster();
+
+        grantSystemTableAccess(superUser1, regularUser1, regularUser2, regularUser3);
+
+        if(isNamespaceMapped) {
+            verifyAllowed(createSchema(SCHEMA_NAME), superUser1);
+            verifyAllowed(grantPermissions("C", regularUser1, SCHEMA_NAME, true), superUser1);
+        } else {
+            verifyAllowed(grantPermissions("C", regularUser1, "\"" + QueryConstants.HBASE_DEFAULT_SCHEMA_NAME + "\"", true), superUser1);
+        }
+
+        // Create MultiTenant Table (View Index Table should be automatically created)
+        // At this point, the index table doesn't contain any data
+        verifyAllowed(createMultiTenantTable(FULL_TABLE_NAME), regularUser1);
+
+        // RegularUser2 doesn't have access yet, RegularUser1 should have RWXCA on the table
+        verifyDenied(readMultiTenantTableWithoutIndex(FULL_TABLE_NAME), AccessDeniedException.class, regularUser2);
+
+        // Grant perms to base table (Should propagate to View Index as well)
+        verifyAllowed(grantPermissions("R", regularUser2, FULL_TABLE_NAME, false), regularUser1);
+        // Try reading full table
+        verifyAllowed(readMultiTenantTableWithoutIndex(FULL_TABLE_NAME), regularUser2);
+
+        // Create tenant specific views on the table using tenant specific Phoenix Connection
+        verifyAllowed(createView(VIEW1_TABLE_NAME, FULL_TABLE_NAME, "o1"), regularUser1);
+        verifyAllowed(createView(VIEW2_TABLE_NAME, FULL_TABLE_NAME, "o2"), regularUser1);
+
+        // Create indexes on those views using tenant specific Phoenix Connection
+        // It is not possible to create indexes on tenant specific views without tenant connection
+        verifyAllowed(createIndex(IDX1_TABLE_NAME, VIEW1_TABLE_NAME, "o1"), regularUser1);
+        verifyAllowed(createIndex(IDX2_TABLE_NAME, VIEW2_TABLE_NAME, "o2"), regularUser1);
+
+        // Read the tables as regularUser2, with and without the use of Index table
+        // If perms are propagated correctly, then both of them should work
+        // The test checks if the query plan uses the index table by searching for "_IDX_" string
+        // _IDX_ is the prefix used with base table name to derieve the name of view index table
+        verifyAllowed(readMultiTenantTableWithIndex(VIEW1_TABLE_NAME, "o1"), regularUser2);
+        verifyAllowed(readMultiTenantTableWithoutIndex(VIEW2_TABLE_NAME, "o2"), regularUser2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88038a2d/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java
index 49202a4..bbe7114 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java
@@ -16,177 +16,60 @@
  */
 package org.apache.phoenix.end2end;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Properties;
 import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.security.access.AccessControlClient;
 import org.apache.hadoop.hbase.security.access.Permission.Action;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.phoenix.query.QueryServices;
-import org.junit.After;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 /**
  * Test that verifies a user can read Phoenix tables with a minimal set of permissions.
+ * Uses HBase API directly to grant/revoke permissions
  */
 @Category(NeedsOwnMiniClusterTest.class)
-public class SystemTablePermissionsIT {
-    private static String SUPERUSER;
-
-    private static final Set<String> PHOENIX_SYSTEM_TABLES = new HashSet<>(Arrays.asList(
-            "SYSTEM.CATALOG", "SYSTEM.SEQUENCE", "SYSTEM.STATS", "SYSTEM.FUNCTION",
-                "SYSTEM.MUTEX"));
-    private static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = new HashSet<>(
-            Arrays.asList("SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", "SYSTEM:FUNCTION",
-                "SYSTEM:MUTEX"));
+public class SystemTablePermissionsIT extends BasePermissionsIT {
 
     private static final String TABLE_NAME =
         SystemTablePermissionsIT.class.getSimpleName().toUpperCase();
-    private static final int NUM_RECORDS = 5;
-
-    private HBaseTestingUtility testUtil = null;
-    private Properties clientProperties = null;
 
-    @BeforeClass
-    public static void setup() throws Exception {
-        SUPERUSER = System.getProperty("user.name");
-    }
-
-    private static void setCommonConfigProperties(Configuration conf) {
-        conf.set("hbase.coprocessor.master.classes",
-            "org.apache.hadoop.hbase.security.access.AccessController");
-        conf.set("hbase.coprocessor.region.classes",
-            "org.apache.hadoop.hbase.security.access.AccessController");
-        conf.set("hbase.coprocessor.regionserver.classes",
-            "org.apache.hadoop.hbase.security.access.AccessController");
-        conf.set("hbase.security.exec.permission.checks", "true");
-        conf.set("hbase.security.authorization", "true");
-        conf.set("hbase.superuser", SUPERUSER);
-    }
-
-    @After
-    public void cleanup() throws Exception {
-        if (null != testUtil) {
-          testUtil.shutdownMiniCluster();
-          testUtil = null;
-        }
+    public SystemTablePermissionsIT(boolean isNamespaceMapped) throws Exception {
+        super(isNamespaceMapped);
     }
 
     @Test
-    public void testSystemTablePermissions() throws Exception {
-        testUtil = new HBaseTestingUtility();
-        clientProperties = new Properties();
-        Configuration conf = testUtil.getConfiguration();
-        setCommonConfigProperties(conf);
-        conf.set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "false");
-        clientProperties.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "false");
-        testUtil.startMiniCluster(1);
-        final UserGroupInformation superUser = UserGroupInformation.createUserForTesting(
-            SUPERUSER, new String[0]);
-        final UserGroupInformation regularUser = UserGroupInformation.createUserForTesting(
-            "user", new String[0]);
+    public void testSystemTablePermissions() throws Throwable {
 
-        superUser.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-                createTable();
-                readTable();
-                return null;
-            }
-        });
+        startNewMiniCluster();
+
+        verifyAllowed(createTable(TABLE_NAME), superUser1);
+        verifyAllowed(readTable(TABLE_NAME), superUser1);
 
         Set<String> tables = getHBaseTables();
-        assertTrue("HBase tables do not include expected Phoenix tables: " + tables,
-            tables.containsAll(PHOENIX_SYSTEM_TABLES));
+        if(isNamespaceMapped) {
+            assertTrue("HBase tables do not include expected Phoenix tables: " + tables,
+                    tables.containsAll(PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES));
+        } else {
+            assertTrue("HBase tables do not include expected Phoenix tables: " + tables,
+                    tables.containsAll(PHOENIX_SYSTEM_TABLES));
+        }
 
         // Grant permission to the system tables for the unprivileged user
-        superUser.doAs(new PrivilegedExceptionAction<Void>() {
+        superUser1.runAs(new PrivilegedExceptionAction<Void>() {
             @Override
             public Void run() throws Exception {
                 try {
-                    grantPermissions(regularUser.getShortUserName(), PHOENIX_SYSTEM_TABLES,
-                        Action.EXEC, Action.READ);
-                    grantPermissions(regularUser.getShortUserName(),
-                        Collections.singleton(TABLE_NAME), Action.READ);
-                } catch (Throwable e) {
-                    if (e instanceof Exception) {
-                        throw (Exception) e;
+                    if(isNamespaceMapped) {
+                        grantPermissions(regularUser1.getShortName(),
+                                PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES, Action.EXEC, Action.READ);
                     } else {
-                        throw new Exception(e);
+                        grantPermissions(regularUser1.getShortName(), PHOENIX_SYSTEM_TABLES,
+                                Action.EXEC, Action.READ);
                     }
-                }
-                return null;
-            }
-        });
-
-        // Make sure that the unprivileged user can read the table
-        regularUser.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-                // We expect this to not throw an error
-                readTable();
-                return null;
-            }
-        });
-    }
-
-    @Test
-    public void testNamespaceMappedSystemTables() throws Exception {
-        testUtil = new HBaseTestingUtility();
-        clientProperties = new Properties();
-        Configuration conf = testUtil.getConfiguration();
-        setCommonConfigProperties(conf);
-        testUtil.getConfiguration().set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
-        clientProperties.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
-        testUtil.startMiniCluster(1);
-        final UserGroupInformation superUser =
-            UserGroupInformation.createUserForTesting(SUPERUSER, new String[0]);
-        final UserGroupInformation regularUser =
-            UserGroupInformation.createUserForTesting("user", new String[0]);
-
-        superUser.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-                createTable();
-                readTable();
-                return null;
-            }
-        });
-
-        Set<String> tables = getHBaseTables();
-        assertTrue("HBase tables do not include expected Phoenix tables: " + tables,
-            tables.containsAll(PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES));
-
-        // Grant permission to the system tables for the unprivileged user
-        // An unprivileged user should only need to be able to Read and eXecute on them.
-        superUser.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-                try {
-                    grantPermissions(regularUser.getShortUserName(),
-                        PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES, Action.EXEC, Action.READ);
-                    grantPermissions(regularUser.getShortUserName(),
+                    grantPermissions(regularUser1.getShortName(),
                         Collections.singleton(TABLE_NAME), Action.READ);
                 } catch (Throwable e) {
                     if (e instanceof Exception) {
@@ -199,66 +82,7 @@ public class SystemTablePermissionsIT {
             }
         });
 
-        regularUser.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-                // We expect this to not throw an error
-                readTable();
-                return null;
-            }
-        });
-    }
-
-    private String getJdbcUrl() {
-        return "jdbc:phoenix:localhost:" + testUtil.getZkCluster().getClientPort() + ":/hbase";
-    }
-
-    private void createTable() throws SQLException {
-        try (Connection conn = DriverManager.getConnection(getJdbcUrl(), clientProperties);
-            Statement stmt = conn.createStatement();) {
-            assertFalse(stmt.execute("DROP TABLE IF EXISTS " + TABLE_NAME));
-            assertFalse(stmt.execute("CREATE TABLE " + TABLE_NAME
-                + "(pk INTEGER not null primary key, data VARCHAR)"));
-            try (PreparedStatement pstmt = conn.prepareStatement("UPSERT INTO "
-                + TABLE_NAME + " values(?, ?)")) {
-                for (int i = 0; i < NUM_RECORDS; i++) {
-                    pstmt.setInt(1, i);
-                    pstmt.setString(2, Integer.toString(i));
-                    assertEquals(1, pstmt.executeUpdate());
-                }
-            }
-            conn.commit();
-        }
-    }
-
-    private void readTable() throws SQLException {
-        try (Connection conn = DriverManager.getConnection(getJdbcUrl(), clientProperties);
-            Statement stmt = conn.createStatement()) {
-            ResultSet rs = stmt.executeQuery("SELECT pk, data FROM " + TABLE_NAME);
-            assertNotNull(rs);
-            int i = 0;
-            while (rs.next()) {
-                assertEquals(i, rs.getInt(1));
-                assertEquals(Integer.toString(i), rs.getString(2));
-                i++;
-            }
-            assertEquals(NUM_RECORDS, i);
-        }
-    }
-
-    private void grantPermissions(String toUser, Set<String> tablesToGrant, Action... actions)
-            throws Throwable {
-          for (String table : tablesToGrant) {
-              AccessControlClient.grant(testUtil.getConnection(), TableName.valueOf(table), toUser,
-                  null, null, actions);
-          }
-    }
-
-    private Set<String> getHBaseTables() throws IOException {
-        Set<String> tables = new HashSet<>();
-        for (TableName tn : testUtil.getHBaseAdmin().listTableNames()) {
-            tables.add(tn.getNameAsString());
-        }
-        return tables;
+        // Make sure that the unprivileged user can now read the table
+        verifyAllowed(readTable(TABLE_NAME), regularUser1);
     }
 }