You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2018/01/19 18:03:14 UTC

[06/50] [abbrv] phoenix git commit: Merge remote-tracking branch 'upstream/master' into PHOENIX-3534

Merge remote-tracking branch 'upstream/master' into PHOENIX-3534


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/39c0d1d4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/39c0d1d4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/39c0d1d4

Branch: refs/heads/system-catalog
Commit: 39c0d1d41ceca41bfb0257de6d18aab59f1c9566
Parents: 758efd9 2a8e1c7
Author: Thomas D'Silva <td...@apache.org>
Authored: Mon Nov 13 08:22:34 2017 -0800
Committer: Thomas D'Silva <td...@apache.org>
Committed: Mon Nov 13 08:22:34 2017 -0800

----------------------------------------------------------------------
 LICENSE                                         |  43 +-
 dev/make_rc.sh                                  |  26 +-
 dev/make_rc_on_mac.sh                           | 121 ----
 dev/release_files/LICENSE                       |   2 +
 phoenix-assembly/pom.xml                        |   2 +-
 phoenix-client/pom.xml                          |   2 +-
 phoenix-core/pom.xml                            |   6 +-
 .../phoenix/end2end/CollationKeyFunctionIT.java | 181 +++++
 .../org/apache/phoenix/end2end/DeleteIT.java    |  96 ++-
 .../end2end/ExplainPlanWithStatsEnabledIT.java  | 146 +++-
 .../phoenix/end2end/TableDDLPermissionsIT.java  | 694 +++++++++++++++++++
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |  15 +-
 .../org/apache/hadoop/hbase/ipc/RpcUtil.java    |  32 +
 .../apache/phoenix/compile/DeleteCompiler.java  |   5 +-
 .../BaseMetaDataEndpointObserver.java           | 111 +++
 .../coprocessor/MetaDataEndpointImpl.java       | 358 +++++++---
 .../coprocessor/MetaDataEndpointObserver.java   |  68 ++
 .../phoenix/coprocessor/MetaDataProtocol.java   |   8 +-
 .../coprocessor/MetaDataRegionObserver.java     |  17 +-
 .../coprocessor/PhoenixAccessController.java    | 628 +++++++++++++++++
 .../PhoenixMetaDataCoprocessorHost.java         | 236 +++++++
 .../apache/phoenix/coprocessor/ViewFinder.java  |   5 +-
 .../phoenix/expression/ExpressionType.java      |   4 +-
 .../function/CollationKeyFunction.java          | 199 ++++++
 .../index/PhoenixIndexFailurePolicy.java        | 109 +--
 .../phoenix/iterate/BaseResultIterators.java    |  42 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |   3 +
 .../query/ConnectionQueryServicesImpl.java      |  15 +-
 .../org/apache/phoenix/query/QueryServices.java |   4 +
 .../phoenix/query/QueryServicesOptions.java     |  14 +-
 .../phoenix/schema/stats/StatisticsWriter.java  |  42 +-
 .../org/apache/phoenix/util/MetaDataUtil.java   |  18 +
 .../org/apache/phoenix/util/SchemaUtil.java     |  12 +
 .../apache/phoenix/util/VarBinaryFormatter.java |  52 ++
 .../function/CollationKeyFunctionTest.java      | 243 +++++++
 .../apache/phoenix/parse/QueryParserTest.java   |  21 +
 phoenix-flume/pom.xml                           |   2 +-
 phoenix-hive/pom.xml                            |   2 +-
 phoenix-kafka/pom.xml                           |   2 +-
 phoenix-load-balancer/pom.xml                   |   2 +-
 phoenix-pherf/pom.xml                           |   2 +-
 phoenix-pig/pom.xml                             |   2 +-
 phoenix-queryserver-client/pom.xml              |   2 +-
 phoenix-queryserver/pom.xml                     |   2 +-
 phoenix-server/pom.xml                          |   3 +-
 phoenix-spark/pom.xml                           |   2 +-
 phoenix-tracing-webapp/pom.xml                  |   2 +-
 pom.xml                                         |   7 +-
 48 files changed, 3253 insertions(+), 357 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/39c0d1d4/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
index 0000000,971383b..2e78cce
mode 000000,100644..100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
@@@ -1,0 -1,692 +1,694 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to you under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.phoenix.end2end;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertNotNull;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+ 
+ import java.io.IOException;
+ import java.lang.reflect.UndeclaredThrowableException;
+ import java.security.PrivilegedExceptionAction;
+ import java.sql.Connection;
+ import java.sql.DriverManager;
+ import java.sql.PreparedStatement;
+ import java.sql.ResultSet;
+ import java.sql.SQLException;
+ import java.sql.Statement;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Properties;
+ import java.util.Set;
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hbase.AuthUtil;
+ import org.apache.hadoop.hbase.HBaseTestingUtility;
+ import org.apache.hadoop.hbase.HConstants;
+ import org.apache.hadoop.hbase.NamespaceDescriptor;
+ import org.apache.hadoop.hbase.TableName;
+ import org.apache.hadoop.hbase.security.AccessDeniedException;
+ import org.apache.hadoop.hbase.security.access.AccessControlClient;
+ import org.apache.hadoop.hbase.security.access.Permission.Action;
+ import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.phoenix.exception.PhoenixIOException;
+ import org.apache.phoenix.query.QueryServices;
+ import org.apache.phoenix.util.MetaDataUtil;
+ import org.apache.phoenix.util.SchemaUtil;
+ import org.junit.After;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ import org.junit.runners.Parameterized.Parameters;
+ 
+ import com.google.common.collect.Maps;
+ 
+ /**
+  * Test that verifies a user can read Phoenix tables with a minimal set of permissions.
+  */
+ @Category(NeedsOwnMiniClusterTest.class)
+ @RunWith(Parameterized.class)
+ public class TableDDLPermissionsIT{
+     private static String SUPERUSER;
+ 
+     private static HBaseTestingUtility testUtil;
+ 
+     private static final Set<String> PHOENIX_SYSTEM_TABLES = new HashSet<>(Arrays.asList(
+             "SYSTEM.CATALOG", "SYSTEM.SEQUENCE", "SYSTEM.STATS", "SYSTEM.FUNCTION",
+                 "SYSTEM.MUTEX"));
+     // PHOENIX-XXXX SYSTEM.MUTEX isn't being created in the SYSTEM namespace as it should be.
+     private static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = new HashSet<>(
+             Arrays.asList("SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", "SYSTEM:FUNCTION",
+                 "SYSTEM.MUTEX"));
+     private static final String GROUP_SYSTEM_ACCESS = "group_system_access";
+     final UserGroupInformation superUser = UserGroupInformation.createUserForTesting(SUPERUSER, new String[0]);
+     final UserGroupInformation superUser2 = UserGroupInformation.createUserForTesting("superuser", new String[0]);
+     final UserGroupInformation regularUser = UserGroupInformation.createUserForTesting("user",  new String[0]);
+     final UserGroupInformation groupUser = UserGroupInformation.createUserForTesting("user2", new String[] { GROUP_SYSTEM_ACCESS });
+     final UserGroupInformation unprivilegedUser = UserGroupInformation.createUserForTesting("unprivilegedUser",
+             new String[0]);
+ 
+ 
+     private static final int NUM_RECORDS = 5;
+ 
+     private boolean isNamespaceMapped;
+ 
+     public TableDDLPermissionsIT(final boolean isNamespaceMapped) throws Exception {
+         this.isNamespaceMapped = isNamespaceMapped;
+         Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+         clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
+     }
+ 
+     private void startNewMiniCluster(Configuration overrideConf) throws Exception{
+         if (null != testUtil) {
+             testUtil.shutdownMiniCluster();
+             testUtil = null;
+         }
+         testUtil = new HBaseTestingUtility();
+ 
+         Configuration config = testUtil.getConfiguration();
+         
+         config.set("hbase.coprocessor.master.classes",
+                 "org.apache.hadoop.hbase.security.access.AccessController");
+         config.set("hbase.coprocessor.region.classes",
+                 "org.apache.hadoop.hbase.security.access.AccessController");
+         config.set("hbase.coprocessor.regionserver.classes",
+                 "org.apache.hadoop.hbase.security.access.AccessController");
+         config.set("hbase.security.exec.permission.checks", "true");
+         config.set("hbase.security.authorization", "true");
+         config.set("hbase.superuser", SUPERUSER+","+superUser2.getShortUserName());
+         config.set("hbase.regionserver.wal.codec", "org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec");
+         config.set(QueryServices.PHOENIX_ACLS_ENABLED,"true");
+         config.set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
+         // Avoid multiple clusters trying to bind the master's info port (16010)
+         config.setInt(HConstants.MASTER_INFO_PORT, -1);
+         
+         if (overrideConf != null) {
+             config.addResource(overrideConf);
+         }
+         testUtil.startMiniCluster(1);
+     }
+     
+     private void grantSystemTableAccess() throws Exception{
+         try (Connection conn = getConnection()) {
+             if (isNamespaceMapped) {
+                 grantPermissions(regularUser.getShortUserName(), PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES, Action.READ,
+                         Action.EXEC);
+                 grantPermissions(unprivilegedUser.getShortUserName(), PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES,
+                         Action.READ, Action.EXEC);
+                 grantPermissions(AuthUtil.toGroupEntry(GROUP_SYSTEM_ACCESS), PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES,
+                         Action.READ, Action.EXEC);
+                 // Local Index requires WRITE permission on SYSTEM.SEQUENCE TABLE.
+                 grantPermissions(regularUser.getShortUserName(), Collections.singleton("SYSTEM:SEQUENCE"), Action.WRITE,
+                         Action.READ, Action.EXEC);
+                 grantPermissions(unprivilegedUser.getShortUserName(), Collections.singleton("SYSTEM:SEQUENCE"), Action.WRITE,
+                         Action.READ, Action.EXEC);
+                 
+             } else {
+                 grantPermissions(regularUser.getShortUserName(), PHOENIX_SYSTEM_TABLES, Action.READ, Action.EXEC);
+                 grantPermissions(unprivilegedUser.getShortUserName(), PHOENIX_SYSTEM_TABLES, Action.READ, Action.EXEC);
+                 grantPermissions(AuthUtil.toGroupEntry(GROUP_SYSTEM_ACCESS), PHOENIX_SYSTEM_TABLES, Action.READ, Action.EXEC);
+                 // Local Index requires WRITE permission on SYSTEM.SEQUENCE TABLE.
+                 grantPermissions(regularUser.getShortUserName(), Collections.singleton("SYSTEM.SEQUENCE"), Action.WRITE,
+                         Action.READ, Action.EXEC);
+                 grantPermissions(unprivilegedUser.getShortUserName(), Collections.singleton("SYSTEM:SEQUENCE"), Action.WRITE,
+                         Action.READ, Action.EXEC);
+             }
+         } catch (Throwable e) {
+             if (e instanceof Exception) {
+                 throw (Exception)e;
+             } else {
+                 throw new Exception(e);
+             }
+         }
+     }
+ 
+     @Parameters(name = "isNamespaceMapped={0}") // name is used by failsafe as file name in reports
+     public static Collection<Boolean> data() {
 -        return Arrays.asList(true, false);
++        return Arrays.asList(true/*, false*/);
+     }
+ 
+     @BeforeClass
+     public static void doSetup() throws Exception {
+         SUPERUSER = System.getProperty("user.name");
+         //setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+     }
+ 
+     protected static String getUrl() {
+         return "jdbc:phoenix:localhost:" + testUtil.getZkCluster().getClientPort() + ":/hbase";
+     }
+ 
+     public Connection getConnection() throws SQLException{
+         Properties props = new Properties();
+         props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
+         return DriverManager.getConnection(getUrl(),props);
+     }
+ 
+     @Test
+     public void testSchemaPermissions() throws Throwable{
+ 
+         if (!isNamespaceMapped) { return; }
+         try {
+             startNewMiniCluster(null);
+             grantSystemTableAccess();
+             final String schemaName = "TEST_SCHEMA_PERMISSION";
+             superUser.doAs(new PrivilegedExceptionAction<Void>() {
+                 @Override
+                 public Void run() throws Exception {
+                     try {
+                         AccessControlClient.grant(getUtility().getConnection(), regularUser.getShortUserName(),
+                                 Action.ADMIN);
+                     } catch (Throwable e) {
+                         if (e instanceof Exception) {
+                             throw (Exception)e;
+                         } else {
+                             throw new Exception(e);
+                         }
+                     }
+                     return null;
+                 }
+             });
+             verifyAllowed(createSchema(schemaName), regularUser);
+             // Unprivileged user cannot drop a schema
+             verifyDenied(dropSchema(schemaName), unprivilegedUser);
+             verifyDenied(createSchema(schemaName), unprivilegedUser);
+ 
+             verifyAllowed(dropSchema(schemaName), regularUser);
+         } finally {
+             revokeAll();
+         }
+     }
+ 
+     @Test
+     public void testAutomaticGrantDisabled() throws Throwable{
+         testIndexAndView(false);
+     }
+     
+     public void testIndexAndView(boolean isAutomaticGrant) throws Throwable {
+         Configuration conf = new Configuration();
+         conf.set(QueryServices.PHOENIX_AUTOMATIC_GRANT_ENABLED, Boolean.toString(isAutomaticGrant));
+         startNewMiniCluster(conf);
+         final String schema = "TEST_INDEX_VIEW";
+         final String tableName = "TABLE_DDL_PERMISSION_IT";
+         final String phoenixTableName = schema + "." + tableName;
+         final String indexName1 = tableName + "_IDX1";
+         final String indexName2 = tableName + "_IDX2";
+         final String lIndexName1 = tableName + "_LIDX1";
+         final String viewName1 = schema+"."+tableName + "_V1";
+         final String viewName2 = schema+"."+tableName + "_V2";
+         final String viewName3 = schema+"."+tableName + "_V3";
+         final String viewName4 = schema+"."+tableName + "_V4";
+         final String viewIndexName1 = tableName + "_VIDX1";
+         final String viewIndexName2 = tableName + "_VIDX2";
+         grantSystemTableAccess();
+         try {
+             superUser.doAs(new PrivilegedExceptionAction<Void>() {
+                 @Override
+                 public Void run() throws Exception {
+                     try {
+                         verifyAllowed(createSchema(schema), superUser);
+                         if (isNamespaceMapped) {
+                             grantPermissions(regularUser.getShortUserName(), schema, Action.CREATE);
+                             grantPermissions(AuthUtil.toGroupEntry(GROUP_SYSTEM_ACCESS), schema, Action.CREATE);
+ 
+                         } else {
+                             grantPermissions(regularUser.getShortUserName(),
+                                     NamespaceDescriptor.DEFAULT_NAMESPACE.getName(), Action.CREATE);
+                             grantPermissions(AuthUtil.toGroupEntry(GROUP_SYSTEM_ACCESS),
+                                     NamespaceDescriptor.DEFAULT_NAMESPACE.getName(), Action.CREATE);
+ 
+                         }
+                     } catch (Throwable e) {
+                         if (e instanceof Exception) {
+                             throw (Exception)e;
+                         } else {
+                             throw new Exception(e);
+                         }
+                     }
+                     return null;
+                 }
+             });
+ 
+             verifyAllowed(createTable(phoenixTableName), regularUser);
+             verifyAllowed(createIndex(indexName1, phoenixTableName), regularUser);
+             verifyAllowed(createView(viewName1, phoenixTableName), regularUser);
+             verifyAllowed(createLocalIndex(lIndexName1, phoenixTableName), regularUser);
+             verifyAllowed(createIndex(viewIndexName1, viewName1), regularUser);
+             verifyAllowed(createIndex(viewIndexName2, viewName1), regularUser);
+             verifyAllowed(createView(viewName4, viewName1), regularUser);
+             verifyAllowed(readTable(phoenixTableName), regularUser);
+ 
+             verifyDenied(createIndex(indexName2, phoenixTableName), unprivilegedUser);
+             verifyDenied(createView(viewName2, phoenixTableName), unprivilegedUser);
+             verifyDenied(createView(viewName3, viewName1), unprivilegedUser);
+             verifyDenied(dropView(viewName1), unprivilegedUser);
+             
+             verifyDenied(dropIndex(indexName1, phoenixTableName), unprivilegedUser);
+             verifyDenied(dropTable(phoenixTableName), unprivilegedUser);
+             verifyDenied(rebuildIndex(indexName1, phoenixTableName), unprivilegedUser);
+             verifyDenied(addColumn(phoenixTableName, "val1"), unprivilegedUser);
+             verifyDenied(dropColumn(phoenixTableName, "val"), unprivilegedUser);
+             verifyDenied(addProperties(phoenixTableName, "GUIDE_POSTS_WIDTH", "100"), unprivilegedUser);
+ 
+             // Granting read permission to unprivileged user, now he should be able to create view but not index
+             grantPermissions(unprivilegedUser.getShortUserName(),
+                     Collections.singleton(
+                             SchemaUtil.getPhysicalHBaseTableName(schema, tableName, isNamespaceMapped).getString()),
+                     Action.READ, Action.EXEC);
+             grantPermissions(AuthUtil.toGroupEntry(GROUP_SYSTEM_ACCESS),
+                     Collections.singleton(
+                             SchemaUtil.getPhysicalHBaseTableName(schema, tableName, isNamespaceMapped).getString()),
+                     Action.READ, Action.EXEC);
+             verifyDenied(createIndex(indexName2, phoenixTableName), unprivilegedUser);
+             if (!isAutomaticGrant) {
+                 // Automatic grant will read access for all indexes
+                 verifyDenied(createView(viewName2, phoenixTableName), unprivilegedUser);
+ 
+                 // Granting read permission to unprivileged user on index so that a new view can read a index as well,
+                 // now
+                 // he should be able to create view but not index
+                 grantPermissions(unprivilegedUser.getShortUserName(),
+                         Collections.singleton(SchemaUtil
+                                 .getPhysicalHBaseTableName(schema, indexName1, isNamespaceMapped).getString()),
+                         Action.READ, Action.EXEC);
+                 verifyDenied(createView(viewName3, viewName1), unprivilegedUser);
+             }
+             
+             verifyAllowed(createView(viewName2, phoenixTableName), unprivilegedUser);
+             
+             if (!isAutomaticGrant) {
+                 // Grant access to view index for parent view
+                 grantPermissions(unprivilegedUser.getShortUserName(),
+                         Collections.singleton(Bytes.toString(MetaDataUtil.getViewIndexPhysicalName(SchemaUtil
+                                 .getPhysicalHBaseTableName(schema, tableName, isNamespaceMapped).getBytes()))),
+                         Action.READ, Action.EXEC);
+             }
+             verifyAllowed(createView(viewName3, viewName1), unprivilegedUser);
+             
+             // Grant create permission in namespace
+             if (isNamespaceMapped) {
+                 grantPermissions(unprivilegedUser.getShortUserName(), schema, Action.CREATE);
+             } else {
+                 grantPermissions(unprivilegedUser.getShortUserName(), NamespaceDescriptor.DEFAULT_NAMESPACE.getName(),
+                         Action.CREATE);
+             }
+             if (!isAutomaticGrant) {
+                 verifyDenied(createIndex(indexName2, phoenixTableName), unprivilegedUser);
+                 // Give user of data table access to index table which will be created by unprivilegedUser
+                 grantPermissions(regularUser.getShortUserName(),
+                         Collections.singleton(SchemaUtil
+                                 .getPhysicalHBaseTableName(schema, indexName2, isNamespaceMapped).getString()),
+                         Action.WRITE);
+                 verifyDenied(createIndex(indexName2, phoenixTableName), unprivilegedUser);
+                 grantPermissions(regularUser.getShortUserName(),
+                         Collections.singleton(SchemaUtil
+                                 .getPhysicalHBaseTableName(schema, indexName2, isNamespaceMapped).getString()),
+                         Action.WRITE, Action.READ, Action.CREATE, Action.EXEC, Action.ADMIN);
+             }
+             // we should be able to read the data from another index as well to which we have not given any access to
+             // this user
+             verifyAllowed(createIndex(indexName2, phoenixTableName), unprivilegedUser);
+             verifyAllowed(readTable(phoenixTableName, indexName1), unprivilegedUser);
+             verifyAllowed(readTable(phoenixTableName, indexName2), unprivilegedUser);
+             verifyAllowed(rebuildIndex(indexName2, phoenixTableName), unprivilegedUser);
++            verifyAllowed(dropView(viewName3), regularUser);
++            verifyAllowed(dropView(viewName4), regularUser);
+ 
+             // data table user should be able to read new index
+             verifyAllowed(rebuildIndex(indexName2, phoenixTableName), regularUser);
+             verifyAllowed(readTable(phoenixTableName, indexName2), regularUser);
+ 
+             verifyAllowed(readTable(phoenixTableName), regularUser);
+             verifyAllowed(rebuildIndex(indexName1, phoenixTableName), regularUser);
+             verifyAllowed(addColumn(phoenixTableName, "val1"), regularUser);
+             verifyAllowed(addProperties(phoenixTableName, "GUIDE_POSTS_WIDTH", "100"), regularUser);
+             verifyAllowed(dropView(viewName1), regularUser);
+             verifyAllowed(dropView(viewName2), regularUser);
+             verifyAllowed(dropColumn(phoenixTableName, "val1"), regularUser);
+             verifyAllowed(dropIndex(indexName2, phoenixTableName), regularUser);
+             verifyAllowed(dropIndex(indexName1, phoenixTableName), regularUser);
+             verifyAllowed(dropTable(phoenixTableName), regularUser);
+ 
+             // check again with super users
+             verifyAllowed(createTable(phoenixTableName), superUser2);
+             verifyAllowed(createIndex(indexName1, phoenixTableName), superUser2);
+             verifyAllowed(createView(viewName1, phoenixTableName), superUser2);
+             verifyAllowed(readTable(phoenixTableName), superUser2);
+             verifyAllowed(dropView(viewName1), superUser2);
+             verifyAllowed(dropTable(phoenixTableName), superUser2);
+ 
+         } finally {
+             revokeAll();
+         }
+     }
+     
+     
+     @Test
+     public void testAutomaticGrantEnabled() throws Throwable{
+         testIndexAndView(true);
+     }
+ 
+     private void revokeAll() throws IOException, Throwable {
+         AccessControlClient.revoke(getUtility().getConnection(), AuthUtil.toGroupEntry(GROUP_SYSTEM_ACCESS),Action.values() );
+         AccessControlClient.revoke(getUtility().getConnection(), regularUser.getShortUserName(),Action.values() );
+         AccessControlClient.revoke(getUtility().getConnection(), unprivilegedUser.getShortUserName(),Action.values() );
+         
+     }
+ 
+     protected void grantPermissions(String groupEntry, Action... actions) throws IOException, Throwable {
+         AccessControlClient.grant(getUtility().getConnection(), groupEntry, actions);
+     }
+ 
+     private AccessTestAction dropTable(final String tableName) throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+                 try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                     assertFalse(stmt.execute("DROP TABLE IF EXISTS " + tableName));
+                 }
+                 return null;
+             }
+         };
+ 
+     }
+ 
+     private AccessTestAction createTable(final String tableName) throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+         try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+             assertFalse(stmt.execute("CREATE TABLE " + tableName + "(pk INTEGER not null primary key, data VARCHAR,val integer)"));
+             try (PreparedStatement pstmt = conn.prepareStatement("UPSERT INTO " + tableName + " values(?, ?, ?)")) {
+                 for (int i = 0; i < NUM_RECORDS; i++) {
+                     pstmt.setInt(1, i);
+                     pstmt.setString(2, Integer.toString(i));
+                     pstmt.setInt(3, i);
+                     assertEquals(1, pstmt.executeUpdate());
+                 }
+             }
+             conn.commit();
+         }
+         return null;
+             }
+         };
+     }
+ 
+     private AccessTestAction readTable(final String tableName) throws SQLException {
+         return readTable(tableName,null);
+     }
+     private AccessTestAction readTable(final String tableName, final String indexName) throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+                 try (Connection conn = getConnection(); Statement stmt = conn.createStatement()) {
+                     ResultSet rs = stmt.executeQuery("SELECT "+(indexName!=null?"/*+ INDEX("+tableName+" "+indexName+")*/":"")+" pk, data,val FROM " + tableName +" where data>='0'");
+                     assertNotNull(rs);
+                     int i = 0;
+                     while (rs.next()) {
+                         assertEquals(i, rs.getInt(1));
+                         assertEquals(Integer.toString(i), rs.getString(2));
+                         assertEquals(i, rs.getInt(3));
+                         i++;
+                     }
+                     assertEquals(NUM_RECORDS, i);
+                 }
+                 return null;
+                 }
+             };
+     }
+ 
+     public static HBaseTestingUtility getUtility(){
+         return testUtil;
+     }
+ 
+     private void grantPermissions(String toUser, Set<String> tablesToGrant, Action... actions) throws Throwable {
+         for (String table : tablesToGrant) {
+             AccessControlClient.grant(getUtility().getConnection(), TableName.valueOf(table), toUser, null, null,
+                     actions);
+         }
+     }
+ 
+     private void grantPermissions(String toUser, String namespace, Action... actions) throws Throwable {
+         AccessControlClient.grant(getUtility().getConnection(), namespace, toUser, actions);
+     }
+     
+ 
+     private AccessTestAction dropColumn(final String tableName, final String columnName) throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+                 try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                     assertFalse(stmt.execute("ALTER TABLE " + tableName + " DROP COLUMN "+columnName));
+                 }
+                 return null;
+             }
+         };
+     }
+ 
+     private AccessTestAction addColumn(final String tableName, final String columnName) throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+                 try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                     assertFalse(stmt.execute("ALTER TABLE " + tableName + " ADD "+columnName+" varchar"));
+                 }
+                 return null;
+             }
+         };
+     }
+ 
+     private AccessTestAction addProperties(final String tableName, final String property, final String value)
+             throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+                 try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                     assertFalse(stmt.execute("ALTER TABLE " + tableName + " SET " + property + "=" + value));
+                 }
+                 return null;
+             }
+         };
+     }
+ 
+     private AccessTestAction dropView(final String viewName) throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+                 try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                     assertFalse(stmt.execute("DROP VIEW " + viewName));
+                 }
+                 return null;
+             }
+         };
+     }
+ 
+     private AccessTestAction createView(final String viewName, final String dataTable) throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+                 try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                     assertFalse(stmt.execute("CREATE VIEW " + viewName + " AS SELECT * FROM " + dataTable));
+                 }
+                 return null;
+             }
+         };
+     }
+ 
+     private AccessTestAction createIndex(final String indexName, final String dataTable) throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+ 
+                 try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                     assertFalse(stmt.execute("CREATE INDEX " + indexName + " on " + dataTable + "(data)"));
+                 }
+                 return null;
+             }
+         };
+     }
+     
+     private AccessTestAction createLocalIndex(final String indexName, final String dataTable) throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+ 
+                 try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                     assertFalse(stmt.execute("CREATE LOCAL INDEX " + indexName + " on " + dataTable + "(data)"));
+                 }
+                 return null;
+             }
+         };
+     }
+ 
+     private AccessTestAction dropIndex(final String indexName, final String dataTable) throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+                 try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                     assertFalse(stmt.execute("DROP INDEX " + indexName + " on " + dataTable));
+                 }
+                 return null;
+             }
+         };
+     }
+ 
+     private AccessTestAction createSchema(final String schemaName) throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+                 if (isNamespaceMapped) {
+                     try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                         assertFalse(stmt.execute("CREATE SCHEMA " + schemaName));
+                     }
+                 }
+                 return null;
+             }
+         };
+     }
+ 
+     private AccessTestAction dropSchema(final String schemaName) throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+                 if (isNamespaceMapped) {
+                     try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                         assertFalse(stmt.execute("DROP SCHEMA " + schemaName));
+                     }
+                 }
+                 return null;
+             }
+         };
+     }
+ 
+     private AccessTestAction rebuildIndex(final String indexName, final String dataTable) throws SQLException {
+         return new AccessTestAction() {
+             @Override
+             public Object run() throws Exception {
+                 try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) {
+                     assertFalse(stmt.execute("ALTER INDEX " + indexName + " on " + dataTable + " DISABLE"));
+                     assertFalse(stmt.execute("ALTER INDEX " + indexName + " on " + dataTable + " REBUILD"));
+                 }
+                 return null;
+             }
+         };
+     }
+ 
+     static interface AccessTestAction extends PrivilegedExceptionAction<Object> { }
+ 
+     @After
+     public void cleanup() throws Exception {
+         if (null != testUtil) {
+           testUtil.shutdownMiniCluster();
+           testUtil = null;
+         }
+     }
+ 
+     /** This fails only in case of ADE or empty list for any of the users. */
+     private void verifyAllowed(AccessTestAction action, UserGroupInformation... users) throws Exception {
+       for (UserGroupInformation user : users) {
+         verifyAllowed(user, action);
+       }
+     }
+ 
+     /** This passes only in case of ADE for all users. */
+     private void verifyDenied(AccessTestAction action, UserGroupInformation... users) throws Exception {
+       for (UserGroupInformation user : users) {
+         verifyDenied(user, action);
+       }
+     }
+ 
+     /** This fails only in case of ADE or empty list for any of the actions. */
+     private void verifyAllowed(UserGroupInformation user, AccessTestAction... actions) throws Exception {
+       for (AccessTestAction action : actions) {
+         try {
+           Object obj = user.doAs(action);
+           if (obj != null && obj instanceof List<?>) {
+             List<?> results = (List<?>) obj;
+             if (results != null && results.isEmpty()) {
+               fail("Empty non null results from action for user '" + user.getShortUserName() + "'");
+             }
+           }
+         } catch (AccessDeniedException ade) {
+           fail("Expected action to pass for user '" + user.getShortUserName() + "' but was denied");
+         }
+       }
+     }
+ 
+     /** This passes only in case of ADE for all actions. */
+     private void verifyDenied(UserGroupInformation user, AccessTestAction... actions) throws Exception {
+         for (AccessTestAction action : actions) {
+             try {
+                 user.doAs(action);
+                 fail("Expected exception was not thrown for user '" + user.getShortUserName() + "'");
+             } catch (IOException e) {
+                 fail("Expected exception was not thrown for user '" + user.getShortUserName() + "'");
+             } catch (UndeclaredThrowableException ute) {
+                 Throwable ex = ute.getUndeclaredThrowable();
+ 
+                 if (ex instanceof PhoenixIOException) {
+                     if (ex.getCause() instanceof AccessDeniedException) {
+                         // expected result
+                         validateAccessDeniedException((AccessDeniedException) ex.getCause());
+                         return;
+                     }
+                 }
+             }catch(RuntimeException ex){
+                 // This can occur while accessing tabledescriptors from client by the unprivileged user
+                 if (ex.getCause() instanceof AccessDeniedException) {
+                     // expected result
+                     validateAccessDeniedException((AccessDeniedException) ex.getCause());
+                     return;
+                 }
+             }
+             fail("Expected exception was not thrown for user '" + user.getShortUserName() + "'");
+         }
+     }
+ 
+     private void validateAccessDeniedException(AccessDeniedException ade) {
+         String msg = ade.getMessage();
+         assertTrue("Exception contained unexpected message: '" + msg + "'",
+             !msg.contains("is not the scanner owner"));
+     }
+ }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/39c0d1d4/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 9f7a629,afbd63f..b9dbc20
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@@ -89,13 -92,12 +90,15 @@@ import java.util.ArrayList
  import java.util.Arrays;
  import java.util.Collections;
  import java.util.Comparator;
+ import java.util.HashSet;
  import java.util.Iterator;
 +import java.util.LinkedList;
  import java.util.List;
 +import java.util.ListIterator;
  import java.util.Map;
 +import java.util.Map.Entry;
  import java.util.NavigableMap;
+ import java.util.Set;
  
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.hbase.Cell;
@@@ -119,8 -120,13 +122,10 @@@ import org.apache.hadoop.hbase.client.S
  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 -import org.apache.hadoop.hbase.filter.FilterList;
  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+ import org.apache.hadoop.hbase.ipc.RpcServer.Call;
+ import org.apache.hadoop.hbase.ipc.RpcUtil;
  import org.apache.hadoop.hbase.regionserver.Region;
  import org.apache.hadoop.hbase.regionserver.Region.RowLock;
  import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@@ -554,209 -570,11 +570,213 @@@ public class MetaDataEndpointImpl exten
                  ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
          }
      }
 +    
 +	private Pair<PTable, MetaDataProtos.MutationCode> combineColumns(PTable table, byte[] tenantId, byte[] schemaName,
 +			byte[] tableName, long timestamp, int clientVersion) throws SQLException, IOException {
 +		// combine columns for view and view indexes
 +		boolean hasIndexId = table.getViewIndexId() != null;
 +		if (table.getType() != PTableType.VIEW && !hasIndexId) {
 +			return new Pair<PTable, MetaDataProtos.MutationCode>(table, MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
 +		}
 +		boolean isDiverged = isDivergedView(table);
 +		// here you combine columns from the parent tables
 +		// the logic is as follows, if the PColumn is in the EXCLUDED_COLUMNS
 +		// remove it,
 +		// otherwise priority of keeping duplicate columns is child -> parent
 +		List<byte[]> ancestorList = Lists.newArrayList();
 +		TableViewFinderResult viewFinderResult = new TableViewFinderResult();
 +		if (PTableType.VIEW == table.getType()) {
 +			findAncestorViews(tenantId, schemaName, tableName, viewFinderResult);
 +		} else { // is a view index
 +			findAncestorViewsOfIndex(tenantId, schemaName, tableName, viewFinderResult);
 +		}
 +		if (viewFinderResult.getResults().isEmpty()) {
 +			// no need to combine columns for local indexes on regular tables
 +			return new Pair<PTable, MetaDataProtos.MutationCode>(table, MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
 +		}
 +		for (TableInfo viewInfo : viewFinderResult.getResults()) {
 +			ancestorList.add(viewInfo.getRowKeyPrefix());
 +		}
 +		List<PColumn> allColumns = Lists.newArrayList();
 +		List<PColumn> excludedColumns = Lists.newArrayList();
 +		// add my own columns first in reverse order
 +		List<PColumn> myColumns = table.getColumns();
 +		for (int i = myColumns.size() - 1; i >= 0; i--) {
 +			PColumn pColumn = myColumns.get(i);
 +			if (pColumn.isExcluded()) {
 +				excludedColumns.add(pColumn);
 +			} else if (!pColumn.equals(SaltingUtil.SALTING_COLUMN)) { // skip salted column as it will be added from the base table columns
 +				allColumns.add(pColumn);
 +			}
 +		}
 +		// index columns that have been dropped in the parent table
 +		boolean isSalted = table.getBucketNum() != null;
 +		int indexPosOffset = (isSalted ? 1 : 0) + (table.isMultiTenant() ? 1 : 0) + 1;
 +		// map from indexed expression to list of data columns that have been dropped
 +		Map<PColumn, List<String>> droppedColMap = Maps.newHashMapWithExpectedSize(table.getColumns().size());
 +		if (hasIndexId) {
 +			ColumnNameTrackingExpressionCompiler expressionCompiler = new ColumnNameTrackingExpressionCompiler();
 +	        for (int i = indexPosOffset; i < table.getPKColumns().size(); i++) {
 +	            PColumn indexColumn = table.getPKColumns().get(i);
 +	            try {
 +	                expressionCompiler.reset();
 +	                String expressionStr = IndexUtil.getIndexColumnExpressionStr(indexColumn);
 +	                ParseNode parseNode  = SQLParser.parseCondition(expressionStr);
 +	                parseNode.accept(expressionCompiler);
 +	                droppedColMap.put(indexColumn, Lists.newArrayList(expressionCompiler.getDataColumnNames()));
 +	            } catch (SQLException e) {
 +	                throw new RuntimeException(e); // Impossible
 +	            }
 +	        }
 +		}
 +		// now go up from child to parent all the way to the base table:
 +		PTable baseTable = null;
 +		long maxTableTimestamp = -1;
 +		int numPKCols = table.getPKColumns().size();
 +		for (int i = 0; i < ancestorList.size(); i++) {
 +			byte[] tableInQuestion = ancestorList.get(i);
 +			PTable pTable = this.doGetTable(tableInQuestion, timestamp, clientVersion);
 +			if (pTable == null) {
 +				String tableNameLink = Bytes.toString(tableInQuestion);
 +				throw new TableNotFoundException("ERROR COMBINING COLUMNS FOR: " + tableNameLink);
 +			} else {
 +				// if it has an index id only combine columns for view indexes
 +				// (and not local indexes on regular tables)
 +				if (i == 0 && hasIndexId && pTable.getType() != PTableType.VIEW) {
 +					return new Pair<PTable, MetaDataProtos.MutationCode>(table, MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
 +				}
 +				if (TABLE.equals(pTable.getType())) {
 +					baseTable = pTable;
 +				}
 +				maxTableTimestamp = Math.max(maxTableTimestamp, pTable.getTimeStamp());
 +				if (hasIndexId) {
 +					// add all pk columns of parent tables to indexes
 +					for (PColumn column : pTable.getPKColumns()) {
 +						if (column.isExcluded()) {
 +							continue;
 +						}
 +						column = IndexUtil.getIndexPKColumn(++numPKCols, column);
 +						int existingColumnIndex = allColumns.indexOf(column);
 +						if (existingColumnIndex == -1) {
 +							allColumns.add(0, column);
 +						}
 +						// TODO should we just generate columnsToAdd here (since
 +						// it doesnt need to be reversed)
 +					}
 +					for (int j = 0; j < pTable.getColumns().size(); j++) {
 +						PColumn tableColumn = pTable.getColumns().get(j);
 +						if (tableColumn.isExcluded()) {
 +							continue;
 +						}
 +						String dataColumnName = tableColumn.getName().getString();
 +						// remove from list of dropped columns since it
 +						// still exists
 +						for (Entry<PColumn, List<String>> entry : droppedColMap.entrySet()) {
 +							entry.getValue().remove(dataColumnName);
 +						}
 +					}
 +				} else {
 +					List<PColumn> someTablesColumns = PTableImpl.getColumnsToClone(pTable);
 +					if (someTablesColumns != null) {
 +						for (int j = someTablesColumns.size() - 1; j >= 0; j--) {
 +							PColumn column = someTablesColumns.get(j);
 +							// For diverged views we always include pk columns
 +							// of the base table. We have to include these pk
 +							// columns to be able to support adding pk columns
 +							// to the diverged view
 +							// We only include regular columns that were created
 +							// before the view diverged
 +							if (isDiverged && column.getFamilyName()!=null && column.getTimestamp() > table.getTimeStamp()) {
 +								continue;
 +							}
 +							// need to check if this column is in the list of excluded (dropped) columns of the view
 +							int existingIndex = excludedColumns.indexOf(column);
 +							if (existingIndex != -1) {
 +								// if it is, only exclude the column if was
 +								// created before the column was dropped in the
 +								// view in order to handle the case where a base
 +								// table column is dropped in a view, then
 +								// dropped in the base table and then added back
 +								// to the base table
 +								if (column.getTimestamp() <= excludedColumns.get(existingIndex).getTimestamp()) {
 +									continue;
 +								}
 +							}
 +							if (column.isExcluded()) {
 +								excludedColumns.add(column);
 +							} else {
 +								int existingColumnIndex = allColumns.indexOf(column);
 +								if (existingColumnIndex != -1) {
 +									// TODO ask james about this
 +									// we always keep the parent table column so
 +									// that we can handle
 +									// the case when you add a column that
 +									// already exists in a view to the base
 +									// table
 +									PColumn existingColumn = allColumns.get(existingColumnIndex);
 +									// if (column.getTimestamp() <
 +									// existingColumn.getTimestamp()) {
 +									allColumns.remove(existingColumnIndex);
 +									allColumns.add(column);
 +									// }
 +								} else {
 +									allColumns.add(column);
 +								}
 +							}
 +						}
 +					}
 +				}
 +			}
 +		}
 +		for (Entry<PColumn, List<String>> entry : droppedColMap.entrySet()) {
 +			if (!entry.getValue().isEmpty()) {
 +				PColumn indexColumnToBeDropped = entry.getKey();
 +				if (SchemaUtil.isPKColumn(indexColumnToBeDropped)) {
 +					// if an indexed column was dropped in an ancestor then we
 +					// cannot use this index an more
 +					// TODO figure out a way to actually drop this view index
 +					return new Pair<PTable, MetaDataProtos.MutationCode>(null, MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
 +				} else {
 +					allColumns.remove(indexColumnToBeDropped);
 +				}
 +			}
 +		}
 +		// lets remove the excluded columns first if the timestamp is newer than
 +		// the added column
 +		for (PColumn excludedColumn : excludedColumns) {
 +			int index = allColumns.indexOf(excludedColumn);
 +			if (index != -1) {
 +				if (allColumns.get(index).getTimestamp() <= excludedColumn.getTimestamp()) {
 +					allColumns.remove(excludedColumn);
 +				}
 +			}
 +		}
 +		List<PColumn> columnsToAdd = Lists.newArrayList();
 +		int position = isSalted ? 1 : 0;
 +		for (int i = allColumns.size() - 1; i >= 0; i--) {
 +			PColumn column = allColumns.get(i);
 +			if (table.getColumns().contains(column)) {
 +				// for views this column is not derived from an ancestor
 +				columnsToAdd.add(new PColumnImpl(column, position));
 +			} else {
 +				columnsToAdd.add(new PColumnImpl(column, true, position));
 +			}
 +			position++;
 +		}
 +		// need to have the columns in the PTable to use the WhereCompiler
 +		// unfortunately so this needs to be done
 +		// twice....
 +		// TODO set the view properties correctly instead of just setting them
 +		// same as the base table
 +		int baseTableColumnCount = isDiverged ? QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT : columnsToAdd.size() - myColumns.size();
 +		PTableImpl pTable = PTableImpl.makePTable(table, baseTable, columnsToAdd, maxTableTimestamp, baseTableColumnCount);
 +		return WhereConstantParser.addViewInfoToPColumnsIfNeeded(pTable);
 +	}
  
+     private PhoenixMetaDataCoprocessorHost getCoprocessorHost() {
+         return phoenixAccessCoprocessorHost;
+     }
+ 
      private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, Region region,
              long clientTimeStamp, int clientVersion) throws IOException, SQLException {
          Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
@@@ -1529,11 -1334,11 +1549,11 @@@
      }
  
      /**
 -     * 
 +     *
       * @return null if the physical table row information is not present.
 -     * 
 +     *
       */
-     private static Mutation getPhysicalTableForView(List<Mutation> tableMetadata, byte[][] parentSchemaTableNames) {
+     private static Mutation getPhysicalTableRowForView(List<Mutation> tableMetadata, byte[][] parentTenantSchemaTableNames, byte[][] physicalSchemaTableNames) {
          int size = tableMetadata.size();
          byte[][] rowKeyMetaData = new byte[3][];
          MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
@@@ -1566,12 -1396,12 +1611,12 @@@
          if ((colBytes == null || colBytes.length == 0) && (famBytes != null && famBytes.length > 0)) {
              byte[] sName = SchemaUtil.getSchemaNameFromFullName(famBytes).getBytes();
              byte[] tName = SchemaUtil.getTableNameFromFullName(famBytes).getBytes();
-             parentSchemaTableNames[0] = sName;
-             parentSchemaTableNames[1] = tName;
+             schemaTableNames[0]= tenantId;
+             schemaTableNames[1] = sName;
+             schemaTableNames[2] = tName;
          }
-         return physicalTableRow;
      }
 -    
 +
      @Override
      public void createTable(RpcController controller, CreateTableRequest request,
              RpcCallback<MetaDataResponse> done) {
@@@ -1586,65 -1416,22 +1631,74 @@@
              byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
              schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
              tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
 +            // no need to run OrpanCleaner (which cleans up orphaned views) while creating SYSTEM tables  env.getTable
 +            if (Bytes.compareTo(schemaName,PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME_BYTES)!=0) {
 +	            HTableInterface systemCatalog = null;
 +	            try {
 +	            	// can't use SchemaUtil.getPhysicalTableName on server side as we don't know whether 
 +	            	// the system tables have been migrated to the system namespaces
 +	            	TableName systemCatalogTableName = env.getRegion().getTableDesc().getTableName();
 +	                systemCatalog = env.getTable(systemCatalogTableName);
 +	                OrphanCleaner.reapOrphans(systemCatalog, tenantIdBytes, schemaName, tableName);
 +	            } finally {
 +	                if (systemCatalog != null) {
 +	                    systemCatalog.close();
 +	                }
 +	            }
 +            }
+             boolean isNamespaceMapped = MetaDataUtil.isNameSpaceMapped(tableMetadata, GenericKeyValueBuilder.INSTANCE,
+                     new ImmutableBytesWritable());
+             final IndexType indexType = MetaDataUtil.getIndexType(tableMetadata, GenericKeyValueBuilder.INSTANCE,
+                     new ImmutableBytesWritable());
              byte[] parentSchemaName = null;
              byte[] parentTableName = null;
              PTableType tableType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());
 +            ViewType viewType = MetaDataUtil.getViewType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());
 +
 +            // Here we are passed the parent's columns to add to a view, PHOENIX-3534 allows for a splittable
 +            // System.Catalog thus we only store the columns that are new to the view, not the parents columns,
 +            // thus here we remove everything that is ORDINAL.POSITION <= baseColumnCount and update the
 +            // ORDINAL.POSITIONS to be shifted accordingly.
 +            if (PTableType.VIEW.equals(tableType) && !ViewType.MAPPED.equals(viewType)) {
 +            	boolean isSalted = MetaDataUtil.getSaltBuckets(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable()) > 0;
 +				int baseColumnCount = MetaDataUtil.getBaseColumnCount(tableMetadata) - (isSalted ? 1 : 0);
 +                if (baseColumnCount > 0) {
 +                    Iterator<Mutation> mutationIterator = tableMetadata.iterator();
 +                    while (mutationIterator.hasNext()) {
 +                        Mutation mutation = mutationIterator.next();
 +                        // if not null and ordinal position < base column count remove this mutation
 +                        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 +                        MetaDataUtil.getMutationValue(mutation, PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES,
 +                            GenericKeyValueBuilder.INSTANCE, ptr);
 +                        if (MetaDataUtil.getMutationValue(mutation, PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES,
 +                            GenericKeyValueBuilder.INSTANCE, ptr)) {
 +                            int ordinalValue = PInteger.INSTANCE.getCodec().decodeInt(ptr, SortOrder.ASC);
 +                            if (ordinalValue <= baseColumnCount) {
 +                                mutationIterator.remove();
 +                            } else {
 +                                if (mutation instanceof Put) {
 +                                    byte[] ordinalPositionBytes = new byte[PInteger.INSTANCE.getByteSize()];
 +                                    int newOrdinalValue = ordinalValue - baseColumnCount;
 +                                    PInteger.INSTANCE.getCodec()
 +                                        .encodeInt(newOrdinalValue, ordinalPositionBytes, 0);
 +                                    byte[] family = Iterables.getOnlyElement(mutation.getFamilyCellMap().keySet());
 +                                    MetaDataUtil.mutatePutValue((Put) mutation, family, PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, ordinalPositionBytes);
 +                                }
 +                            }
 +                        }
 +                    }
 +                }
 +            }
 +
              byte[] parentTableKey = null;
              Mutation viewPhysicalTableRow = null;
+             Set<TableName> indexes = new HashSet<TableName>();;
+             byte[] cPhysicalName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, isNamespaceMapped)
+                     .getBytes();
+             byte[] cParentPhysicalName=null;
              if (tableType == PTableType.VIEW) {
-                 byte[][] parentSchemaTableNames = new byte[2][];
+                 byte[][] parentSchemaTableNames = new byte[3][];
+                 byte[][] parentPhysicalSchemaTableNames = new byte[3][];
                  /*
                   * For a view, we lock the base physical table row. For a mapped view, there is 
                   * no link present to the physical table. So the viewPhysicalTableRow is null
@@@ -1662,10 -1492,30 +1759,30 @@@
                   * For an index we lock the parent table's row which could be a physical table or a view.
                   * If the parent table is a physical table, then the tenantIdBytes is empty because
                   * we allow creating an index with a tenant connection only if the parent table is a view.
 -                 */ 
 +                 */
                  parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
                  parentTableKey = SchemaUtil.getTableKey(tenantIdBytes, parentSchemaName, parentTableName);
+                 long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
+                 PTable parentTable = loadTable(env, parentTableKey, new ImmutableBytesPtr(parentTableKey),
+                         clientTimeStamp, clientTimeStamp, clientVersion);
+                 if (IndexType.LOCAL == indexType) {
+                     cPhysicalName = parentTable.getPhysicalName().getBytes();
+                     cParentPhysicalName=parentTable.getPhysicalName().getBytes();
+                 } else if (parentTable.getType() == PTableType.VIEW) {
+                     cPhysicalName = MetaDataUtil.getViewIndexPhysicalName(parentTable.getPhysicalName().getBytes());
+                     cParentPhysicalName = parentTable.getPhysicalName().getBytes();
+                 }else{
+                     cParentPhysicalName = SchemaUtil
+                             .getPhysicalHBaseTableName(parentSchemaName, parentTableName, isNamespaceMapped).getBytes();
+                 }
              }
+             
+             getCoprocessorHost().preCreateTable(Bytes.toString(tenantIdBytes),
+                     SchemaUtil.getTableName(schemaName, tableName),
+                     (tableType == PTableType.VIEW) ? null : TableName.valueOf(cPhysicalName),
+                     cParentPhysicalName == null ? null : TableName.valueOf(cParentPhysicalName), tableType,
+                     /* TODO: During inital create we may not need the family map */
+                     Collections.<byte[]> emptySet(), indexes);
  
              Region region = env.getRegion();
              List<RowLock> locks = Lists.newArrayList();
@@@ -1933,57 -1765,89 +2050,23 @@@
                  QueryServicesOptions.DEFAULT_MAX_INDEXES_PER_TABLE);
      }
  
-     private static RowLock acquireLock(Region region, byte[] key, List<RowLock> locks)
-         throws IOException {
-         RowLock rowLock = region.getRowLock(key, false);
-         if (rowLock == null) {
-             throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
 -    private static final byte[] CHILD_TABLE_BYTES = new byte[] {PTable.LinkType.CHILD_TABLE.getSerializedValue()};
 -
 -    
 -    private void findAllChildViews(Region region, byte[] tenantId, PTable table,
 -            TableViewFinder result, long clientTimeStamp, int clientVersion) throws IOException, SQLException {
 -        TableViewFinder currResult = findChildViews(region, tenantId, table, clientVersion);
 -        result.addResult(currResult);
 -        for (ViewInfo viewInfo : currResult.getViewInfoList()) {
 -            byte[] viewtenantId = viewInfo.getTenantId();
 -            byte[] viewSchema = viewInfo.getSchemaName();
 -            byte[] viewTable = viewInfo.getViewName();
 -            byte[] tableKey = SchemaUtil.getTableKey(viewtenantId, viewSchema, viewTable);
 -            ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
 -            PTable view = loadTable(env, tableKey, cacheKey, clientTimeStamp, clientTimeStamp, clientVersion);
 -            findAllChildViews(region, viewtenantId, view, result, clientTimeStamp, clientVersion);
--        }
-         locks.add(rowLock);
-         return rowLock;
--    }
- 
-     private static void printMutations(List<Mutation> mutations) {
-         for (Mutation mutation : mutations) {
-             if (mutation instanceof Put) {
-                 Put put = (Put) mutation;
-                 NavigableMap<byte[], List<Cell>> familyCellMap = put.getFamilyCellMap();
-                 for (List<Cell> cells : familyCellMap.values()) {
-                     StringBuilder builder = new StringBuilder();
-                     for (Cell cell : cells) {
-                         // print the rowkey
-                         builder.append("ROW_KEY: " + Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
-                         builder.append("\t");
-                         builder.append("QUALIFIER: "+  Bytes
-                             .toStringBinary(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
-                         builder.append("\t");
-                         builder.append("VALUE: " + Bytes
-                             .toStringBinary(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
-                         builder.append("\n");
-                         System.out.println(builder.toString());
 -        
 -    // TODO remove this in 4.13 release 
 -    @Deprecated
 -    private TableViewFinder findChildViews_deprecated(Region region, byte[] tenantId, PTable table, byte[] linkTypeBytes) throws IOException {
 -        byte[] schemaName = table.getSchemaName().getBytes();
 -        byte[] tableName = table.getTableName().getBytes();
 -        boolean isMultiTenant = table.isMultiTenant();
 -        Scan scan = new Scan();
 -        // If the table is multi-tenant, we need to check across all tenant_ids,
 -        // so we can't constrain the row key. Otherwise, any views would have
 -        // the same tenantId.
 -        if (!isMultiTenant) {
 -            byte[] startRow = ByteUtil.concat(tenantId, QueryConstants.SEPARATOR_BYTE_ARRAY);
 -            byte[] stopRow = ByteUtil.nextKey(startRow);
 -            scan.setStartRow(startRow);
 -            scan.setStopRow(stopRow);
 -        }
 -        SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, linkTypeBytes);
 -        SingleColumnValueFilter tableTypeFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES,
 -                CompareOp.EQUAL, PTableType.VIEW.getSerializedValue().getBytes());
 -        tableTypeFilter.setFilterIfMissing(false);
 -        linkFilter.setFilterIfMissing(true);
 -        byte[] suffix = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, SchemaUtil
 -                .getPhysicalHBaseTableName(schemaName, tableName, table.isNamespaceMapped())
 -                .getBytes());
 -        SuffixFilter rowFilter = new SuffixFilter(suffix);
 -        FilterList filter = new FilterList(linkFilter,tableTypeFilter,rowFilter);
 -        scan.setFilter(filter);
 -        scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
 -        scan.addColumn(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
 -        scan.addColumn(TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
 -        
 -        // Original region-only scanner modified due to PHOENIX-1208
 -        // RegionScanner scanner = region.getScanner(scan);
 -        // The following *should* work, but doesn't due to HBASE-11837
 -        // TableName systemCatalogTableName = region.getTableDesc().getTableName();
 -        // HTableInterface hTable = env.getTable(systemCatalogTableName);
 -        // These deprecated calls work around the issue
 -        try (HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env,
 -            region.getTableDesc().getTableName().getName())) {
 -            boolean allViewsInCurrentRegion = true;
 -            int numOfChildViews = 0;
 -            List<ViewInfo> viewInfoList = Lists.newArrayList();
 -            try (ResultScanner scanner = hTable.getScanner(scan)) {
 -                for (Result result = scanner.next(); (result != null); result = scanner.next()) {
 -                    numOfChildViews++;
 -                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 -                    ResultTuple resultTuple = new ResultTuple(result);
 -                    resultTuple.getKey(ptr);
 -                    byte[] key = ptr.copyBytes();
 -                    if (checkTableKeyInRegion(key, region) != null) {
 -                        allViewsInCurrentRegion = false;
--                    }
 -                    byte[][] rowKeyMetaData = new byte[3][];
 -                    getVarChars(result.getRow(), 3, rowKeyMetaData);
 -                    byte[] viewTenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
 -                    byte[] viewSchemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
 -                    byte[] viewName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
 -                    viewInfoList.add(new ViewInfo(viewTenantId, viewSchemaName, viewName));
 -                }
 -                TableViewFinder tableViewFinderResult = new TableViewFinder(viewInfoList);
 -                if (numOfChildViews > 0 && !allViewsInCurrentRegion) {
 -                    tableViewFinderResult.setAllViewsNotInSingleRegion();
--                }
 -                return tableViewFinderResult;
--            }
-         }
-     }
-     
 +    private void findAncestorViewsOfIndex(byte[] tenantId, byte[] schemaName, byte[] indexName, TableViewFinderResult result) throws IOException {
 +        HTableInterface hTable = env.getTable(SchemaUtil
 +                .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
 +        try {
 +            TableViewFinderResult currentResult = ViewFinder.findParentViewofIndex(hTable, tenantId, schemaName, indexName);
 +//            currentResult.addResult(ViewFinder.findBaseTable(hTable, tenantId, schemaName, indexName));
 +//            if ( currentResult.getResults().size()!=1 ) {
 +//                throw new RuntimeException("View index should have exactly one parent");
 +//            }
 +            if (currentResult.getResults().size()==1) {
 +            	result.addResult(currentResult);
 +            	TableInfo tableInfo = currentResult.getResults().get(0);
 +            	findAncestorViews(tableInfo.getTenantId(), tableInfo.getSchemaName(), tableInfo.getTableName(), result);
 +            }
 +            // else this is an index on a regular table and so we don't need to combine columns
 +        } finally {
 +            hTable.close();
          }
      }
      
@@@ -2056,6 -1953,23 +2139,23 @@@
                      parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes,
                          schemaName, tableName);
  
+             
+             PTableType ptableType=PTableType.fromSerializedValue(tableType);
 -            long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
++            long clientTimeStamp = MetaDataUtil.getClientTimeStamp(catalogMutations);
+             byte[] cKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
+             PTable loadedTable = loadTable(env, cKey, new ImmutableBytesPtr(cKey), clientTimeStamp, clientTimeStamp,
+                     request.getClientVersion());
+             if (loadedTable == null) {
+                 builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
+                 builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                 done.run(builder.build());
+                 return;
+             }
+             getCoprocessorHost().preDropTable(Bytes.toString(tenantIdBytes),
+                     SchemaUtil.getTableName(schemaName, tableName),
+                     TableName.valueOf(loadedTable.getPhysicalName().getBytes()),
+                     getParentPhysicalTableName(loadedTable), ptableType,loadedTable.getIndexes());
+ 
              Region region = env.getRegion();
              MetaDataMutationResult result = checkTableKeyInRegion(key, region);
              if (result != null) {
@@@ -2077,20 -1992,11 +2177,20 @@@
                      done.run(MetaDataMutationResult.toProto(result));
                      return;
                  }
 -                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
 -                // Commit the list of deletion.
 -                mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
 -                    HConstants.NO_NONCE);
 -                long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
 +				Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env)
 +						.getMetaDataCache();
 +				// since the mutations in catalogMutations can span multiple
 +				// regions first we first process process mutations local to
 +				// this region, then we process the remaining mutations, finally
 +				// we process the child link mutations if any of the mutations
 +				// fail, we can will clean them up later using
 +				// OrphanCleaner.reapOrphans()
 +				separateLocalAndRemoteMutations(region, catalogMutations, localRegionMutations, remoteRegionMutations);
 +				// drop rows from catalog on this region
- 				region.mutateRowsWithLocks(localRegionMutations, Collections.<byte[]> emptyList(), HConstants.NO_NONCE,
++				mutateRowsWithLocks(region, localRegionMutations, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
 +						HConstants.NO_NONCE);
 +
 +                long currentTime = MetaDataUtil.getClientTimeStamp(catalogMutations);
                  for (ImmutableBytesPtr ckey : invalidateList) {
                      metaDataCache.put(ckey, newDeletedTableMarker(currentTime));
                  }
@@@ -2101,11 -2007,7 +2201,11 @@@
                  done.run(MetaDataMutationResult.toProto(result));
                  return;
              } finally {
-                 region.releaseRowLocks(locks);
 -                releaseRowLocks(region,locks);
++                releaseRowLocks(region, locks);
 +                // drop rows from catalog on remote regions
 +                processMutations(controller, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, SchemaUtil.getTableName(schemaName, tableName), remoteRegionMutations);
 +                // drop all child links 
 +                processMutations(controller, PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES, SchemaUtil.getTableName(schemaName, tableName), childLinkMutations);
              }
          } catch (Throwable t) {
            logger.error("dropTable failed", t);
@@@ -2113,29 -2015,32 +2213,46 @@@
                  ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
          }
      }
+     
+     protected void releaseRowLocks(Region region, List<RowLock> locks) {
+         if (locks != null) {
+             region.releaseRowLocks(locks);
+         }
+     }
+ 
+     private RowLock acquireLock(Region region, byte[] lockKey, List<RowLock> locks) throws IOException {
 -        //LockManager.RowLock rowLock = lockManager.lockRow(lockKey, rowLockWaitDuration);
+         RowLock rowLock = region.getRowLock(lockKey, false);
+         if (rowLock == null) {
+             throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(lockKey));
+         }
+         if (locks != null) {
+             locks.add(rowLock);
+         }
+         return rowLock;
+     }
  
 -    private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName,
 -        byte[] tableName, byte[] parentTableName, PTableType tableType, List<Mutation> rowsToDelete,
 -        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks,
 -        List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, boolean isCascade, int clientVersion) throws IOException, SQLException {
 -
 +	private void processMutations(RpcController controller, byte[] systemTableName, String droppedTableName,
 +			List<Mutation> childLinkMutations) throws IOException {
 +		HTableInterface hTable = null;
 +		try {
 +			hTable = env.getTable(SchemaUtil.getPhysicalTableName(systemTableName, env.getConfiguration()));
 +			hTable.batch(childLinkMutations);
 +		} catch (Throwable t) {
 +			logger.error("dropTable failed", t);
 +			ProtobufUtil.setControllerException(controller, ServerUtil.createIOException(droppedTableName, t));
 +		} finally {
 +			if (hTable != null) {
 +				hTable.close();
 +			}
 +		}
 +	}
  
 -        long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete);
 +	private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName, byte[] tableName,
 +			byte[] parentTableName, PTableType tableType, List<Mutation> catalogMutations,
 +			List<Mutation> childLinkMutations, List<ImmutableBytesPtr> invalidateList, List<byte[]> tableNamesToDelete,
 +			List<SharedTableState> sharedTablesToDelete, boolean isCascade, int clientVersion)
 +			throws IOException, SQLException {
 +        long clientTimeStamp = MetaDataUtil.getClientTimeStamp(catalogMutations);
  
          Region region = env.getRegion();
          ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
@@@ -2862,15 -3108,18 +2976,20 @@@
                      byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
                      byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
                      PTableType type = table.getType();
 +                    table = combineColumns(table, tenantId, schemaName, tableName, HConstants.LATEST_TIMESTAMP, request.getClientVersion()).getFirst();
                      byte[] tableHeaderRowKey = SchemaUtil.getTableKey(tenantId,
                              schemaName, tableName);
+                     byte[] cPhysicalTableName=table.getPhysicalName().getBytes();
+                     getCoprocessorHost().preAlterTable(Bytes.toString(tenantId),
+                             SchemaUtil.getTableName(schemaName, tableName), TableName.valueOf(cPhysicalTableName),
+                             getParentPhysicalTableName(table),type);
+ 
                      // Size for worst case - all new columns are PK column
                      List<Mutation> mutationsForAddingColumnsToViews = Lists.newArrayListWithExpectedSize(tableMetaData.size() * ( 1 + table.getIndexes().size()));
 +
                      if (type == PTableType.TABLE || type == PTableType.SYSTEM) {
 -                        TableViewFinder childViewsResult = new TableViewFinder();
 -                        findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp, request.getClientVersion());
 +                        TableViewFinderResult childViewsResult = new TableViewFinderResult();
 +                        findAllChildViews(tenantId, table.getSchemaName().getBytes(), table.getTableName().getBytes(), childViewsResult);
                          if (childViewsResult.hasViews()) {
                              /* 
                               * Dis-allow if:
@@@ -3015,28 -3265,36 +3134,25 @@@
           * from getting rebuilt too often.
           */
          final boolean wasLocked = (rowLock != null);
 -        boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
 +        boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
                  QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
          if (!wasLocked) {
-             rowLock = region.getRowLock(key, false);
-             if (rowLock == null) {
-                 throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
-             }
+             rowLock = acquireLock(region, key, null);
          }
          try {
 -            PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
 -            // We only cache the latest, so we'll end up building the table with every call if the
 -            // client connection has specified an SCN.
 -            // TODO: If we indicate to the client that we're returning an older version, but there's a
 -            // newer version available, the client
 -            // can safely not call this, since we only allow modifications to the latest.
 -            if (table != null && table.getTimeStamp() < clientTimeStamp) {
 -                // Table on client is up-to-date with table on server, so just return
 -                if (isTableDeleted(table)) {
 -                    return null;
 -                }
 -                return table;
 -            }
 -            // Try cache again in case we were waiting on a lock
 -            table = (PTable)metaDataCache.getIfPresent(cacheKey);
 -            // We only cache the latest, so we'll end up building the table with every call if the
 -            // client connection has specified an SCN.
 -            // TODO: If we indicate to the client that we're returning an older version, but there's
 -            // a newer version available, the client
 -            // can safely not call this, since we only allow modifications to the latest.
 -            if (table != null && table.getTimeStamp() < clientTimeStamp) {
 -                // Table on client is up-to-date with table on server, so just return
 -                if (isTableDeleted(table)) {
 -                    return null;
 +            PTable table = getCachedTable(clientTimeStamp, cacheKey, metaDataCache);
 +            if (table == null) {
 +                // Try cache again in case we were waiting on a lock
 +                table = getCachedTable(clientTimeStamp, cacheKey, metaDataCache);
 +                if (table == null) {
 +                    // Query for the latest table first, since it's not cached
 +                    table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion);
 +                    if ((table == null || table.getTimeStamp() >= clientTimeStamp) && (!blockWriteRebuildIndex
 +                        || table.getIndexDisableTimestamp() <= 0)) {
 +                        // Otherwise, query for an older version of the table - it won't be cached
 +                        table = buildTable(key, cacheKey, region, clientTimeStamp, clientVersion);
 +
 +                    }
                  }
                  return table;
              }
@@@ -3124,27 -3358,9 +3234,24 @@@
              if(functionsAvailable.size() == numFunctions) return functionsAvailable;
              return null;
          } finally {
-             for (Region.RowLock lock : rowLocks) {
-                 lock.release();
-             }
-             rowLocks.clear();
+             releaseRowLocks(region,rowLocks);
          }
      }
 +    
 +    private PColumn getColumn(int pkCount, byte[][] rowKeyMetaData, PTable table) throws ColumnFamilyNotFoundException, ColumnNotFoundException {
 +    	PColumn col = null;
 +        if (pkCount > FAMILY_NAME_INDEX
 +            && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
 +            PColumnFamily family =
 +                table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
 +            col =
 +                family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
 +        } else if (pkCount > COLUMN_NAME_INDEX
 +            && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
 +            col = table.getPKColumn(new String(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
 +        }
 +    	return col;
 +    }
  
      @Override
      public void dropColumn(RpcController controller, final DropColumnRequest request,
@@@ -3164,42 -3379,56 +3271,47 @@@
                      byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
                      byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
                      byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
 +                    table = combineColumns(table, tenantId, schemaName, tableName, clientTimeStamp, request.getClientVersion()).getFirst();
 +                    boolean isView = table.getType() == PTableType.VIEW;
                      boolean deletePKColumn = false;
+                     getCoprocessorHost().preAlterTable(Bytes.toString(tenantId),
+                             SchemaUtil.getTableName(schemaName, tableName),
+                             TableName.valueOf(table.getPhysicalName().getBytes()),
+                             getParentPhysicalTableName(table),table.getType());
+ 
                      List<Mutation> additionalTableMetaData = Lists.newArrayList();
 -                    
 -                    PTableType type = table.getType();
 -                    if (type == PTableType.TABLE || type == PTableType.SYSTEM) {
 -                        TableViewFinder childViewsResult = new TableViewFinder();
 -                        findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp, request.getClientVersion());
 -                        if (childViewsResult.hasViews()) {
 -                            MetaDataMutationResult mutationResult =
 -                                    dropColumnsFromChildViews(region, table,
 -                                        locks, tableMetaData, additionalTableMetaData,
 -                                        schemaName, tableName, invalidateList,
 -                                        clientTimeStamp, childViewsResult, tableNamesToDelete, sharedTablesToDelete, request.getClientVersion());
 -                            // return if we were not able to drop the column successfully
 -                            if (mutationResult != null) return mutationResult;
 -                        }
 -                    }
 -                    
 -                    for (Mutation m : tableMetaData) {
 -                        if (m instanceof Delete) {
 -                            byte[] key = m.getRow();
 -                            int pkCount = getVarChars(key, rowKeyMetaData);
 +                    ListIterator<Mutation> iterator = tableMetaData.listIterator();
 +                    while (iterator.hasNext()) {
 +                        Mutation mutation = iterator.next();
 +                        byte[] key = mutation.getRow();
 +                        int pkCount = getVarChars(key, rowKeyMetaData);
 +                        if (isView && mutation instanceof Put) {
 +                        	PColumn column = getColumn(pkCount, rowKeyMetaData, table);
 +							if (column == null)
 +								continue;
 +                        	// ignore any puts that modify the ordinal positions of columns
 +                        	iterator.remove();
 +                        } 
 +                        else if (mutation instanceof Delete) {
                              if (pkCount > COLUMN_NAME_INDEX
 -                                    && Bytes.compareTo(schemaName,
 -                                        rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
 -                                    && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
 +                                && Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
 +                                && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
                                  PColumn columnToDelete = null;
                                  try {
 -                                    if (pkCount > FAMILY_NAME_INDEX
 -                                            && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
 -                                        PColumnFamily family =
 -                                                table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
 -                                        columnToDelete =
 -                                                family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
 -                                    } else if (pkCount > COLUMN_NAME_INDEX
 -                                            && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
 -                                        deletePKColumn = true;
 -                                        columnToDelete = table.getPKColumn(new String(
 -                                          rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
 -                                    } else {
 -                                        continue;
 -                                    }
 -                                    if (table.getType() == PTableType.VIEW) {
 +                                	columnToDelete = getColumn(pkCount, rowKeyMetaData, table);
 +									if (columnToDelete == null)
 +										continue;
 +                                    deletePKColumn = columnToDelete.getFamilyName() == null;
 +									if (isView) {
 +                                        // if we are dropping a derived column add it to the excluded column list
 +                                        if (columnToDelete.isDerived()) {
 +                                            mutation = MetaDataUtil
 +                                                .cloneDeleteToPutAndAddColumn((Delete) mutation, TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, LinkType.EXCLUDED_COLUMN.getSerializedValueAsByteArray());
 +                                            iterator.set(mutation);
 +                                        }
 +
                                          if (table.getBaseColumnCount() != DIVERGED_VIEW_BASE_COLUMN_COUNT
 -                                                && columnToDelete.getPosition() < table.getBaseColumnCount()) {
 +                                            && columnToDelete.isDerived()) {
                                              /*
                                               * If the column being dropped is inherited from the base table, then the
                                               * view is about to diverge itself from the base table. The consequence of

http://git-wip-us.apache.org/repos/asf/phoenix/blob/39c0d1d4/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index dd619eb,fe11ec7..7b82581
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@@ -92,9 -92,9 +92,10 @@@ public abstract class MetaDataProtocol 
      public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0 = MIN_TABLE_TIMESTAMP + 27;
      // Since there's no upgrade code, keep the version the same as the previous version
      public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
-     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0 = MIN_TABLE_TIMESTAMP + 28;
+     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
++    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = MIN_TABLE_TIMESTAMP + 28;
      // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
--    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0;
++    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0;
      
      // ALWAYS update this map whenever rolling out a new release (major, minor or patch release). 
      // Key is the SYSTEM.CATALOG timestamp for the version and value is the version string.