You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/12/16 18:49:12 UTC

[5/6] phoenix git commit: Sync 4.x-HBase-1.2 to master (Pedro Boado)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
new file mode 100644
index 0000000..36782c1
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.junit.Test;
+
+public class MutationStateIT extends ParallelStatsDisabledIT {
+
+    private static final String DDL =
+            " (ORGANIZATION_ID CHAR(15) NOT NULL, SCORE DOUBLE, "
+            + "ENTITY_ID CHAR(15) NOT NULL, TAGS VARCHAR, CONSTRAINT PAGE_SNAPSHOT_PK "
+            + "PRIMARY KEY (ORGANIZATION_ID, ENTITY_ID DESC)) MULTI_TENANT=TRUE";
+
+    private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException {
+        PreparedStatement stmt =
+                conn.prepareStatement("upsert into " + fullTableName
+                        + " (organization_id, entity_id, score) values (?,?,?)");
+        for (int i = 0; i < 10000; i++) {
+            stmt.setString(1, "AAAA" + i);
+            stmt.setString(2, "BBBB" + i);
+            stmt.setInt(3, 1);
+            stmt.execute();
+        }
+    }
+
+    @Test
+    public void testMaxMutationSize() throws Exception {
+        Properties connectionProperties = new Properties();
+        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3");
+        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000");
+        PhoenixConnection connection =
+                (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
+        String fullTableName = generateUniqueName();
+        try (Statement stmt = connection.createStatement()) {
+            stmt.execute(
+                "CREATE TABLE " + fullTableName + DDL);
+        }
+        try {
+            upsertRows(connection, fullTableName);
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(),
+                e.getErrorCode());
+        }
+
+        // set the max mutation size (bytes) to a low value
+        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000");
+        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4");
+        connection =
+                (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
+        try {
+            upsertRows(connection, fullTableName);
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(),
+                e.getErrorCode());
+        }
+    }
+
+    @Test
+    public void testMutationEstimatedSize() throws Exception {
+        PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl());
+        conn.setAutoCommit(false);
+        String fullTableName = generateUniqueName();
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute(
+                "CREATE TABLE " + fullTableName + DDL);
+        }
+
+        // upserting rows should increase the mutation state size
+        MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
+        long prevEstimatedSize = state.getEstimatedSize();
+        upsertRows(conn, fullTableName);
+        assertTrue("Mutation state size should have increased",
+            state.getEstimatedSize() > prevEstimatedSize);
+        
+        
+        // after commit or rollback the size should be zero
+        conn.commit();
+        assertEquals("Mutation state size should be zero after commit", 0,
+            state.getEstimatedSize());
+        upsertRows(conn, fullTableName);
+        conn.rollback();
+        assertEquals("Mutation state size should be zero after rollback", 0,
+            state.getEstimatedSize());
+
+        // upsert one row
+        PreparedStatement stmt =
+                conn.prepareStatement("upsert into " + fullTableName
+                        + " (organization_id, entity_id, score) values (?,?,?)");
+        stmt.setString(1, "ZZZZ");
+        stmt.setString(2, "YYYY");
+        stmt.setInt(3, 1);
+        stmt.execute();
+        assertTrue("Mutation state size should be greater than zero ", state.getEstimatedSize()>0);
+
+        prevEstimatedSize = state.getEstimatedSize();
+        // upserting the same row twice should not increase the size
+        stmt.setString(1, "ZZZZ");
+        stmt.setString(2, "YYYY");
+        stmt.setInt(3, 1);
+        stmt.execute();
+        assertEquals(
+            "Mutation state size should only increase 4 bytes (size of the new statement index)",
+            prevEstimatedSize + 4, state.getEstimatedSize());
+        
+        prevEstimatedSize = state.getEstimatedSize();
+        // changing the value of one column of a row to a larger value should increase the estimated size 
+        stmt =
+                conn.prepareStatement("upsert into " + fullTableName
+                        + " (organization_id, entity_id, score, tags) values (?,?,?,?)");
+        stmt.setString(1, "ZZZZ");
+        stmt.setString(2, "YYYY");
+        stmt.setInt(3, 1);
+        stmt.setString(4, "random text string random text string random text string");
+        stmt.execute();
+        assertTrue("Mutation state size should increase", prevEstimatedSize+4 < state.getEstimatedSize());
+        
+        prevEstimatedSize = state.getEstimatedSize();
+        // changing the value of one column of a row to a smaller value should decrease the estimated size 
+        stmt =
+                conn.prepareStatement("upsert into " + fullTableName
+                        + " (organization_id, entity_id, score, tags) values (?,?,?,?)");
+        stmt.setString(1, "ZZZZ");
+        stmt.setString(2, "YYYY");
+        stmt.setInt(3, 1);
+        stmt.setString(4, "");
+        stmt.execute();
+        assertTrue("Mutation state size should decrease", prevEstimatedSize+4 > state.getEstimatedSize());
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
index 77cb19f..9109c12 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.Date;
@@ -39,7 +38,6 @@ import java.util.Properties;
 
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -510,46 +508,6 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
         assertEquals(4L, connection.getMutationState().getBatchCount());
     }
     
-    @Test
-    public void testMaxMutationSize() throws Exception {
-        Properties connectionProperties = new Properties();
-        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3");
-        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000");
-        PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
-        String fullTableName = generateUniqueName();
-        try (Statement stmt = connection.createStatement()) {
-            stmt.execute("CREATE TABLE " + fullTableName + "(\n" +
-                "    ORGANIZATION_ID CHAR(15) NOT NULL,\n" +
-                "    SCORE DOUBLE NOT NULL,\n" +
-                "    ENTITY_ID CHAR(15) NOT NULL\n" +
-                "    CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" +
-                "        ORGANIZATION_ID,\n" +
-                "        SCORE DESC,\n" +
-                "        ENTITY_ID DESC\n" +
-                "    )\n" +
-                ") MULTI_TENANT=TRUE");
-        }
-        try {
-            upsertRows(connection, fullTableName);
-            fail();
-        }
-        catch(SQLException e) {
-            assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), e.getErrorCode());
-        }
-        
-        // set the max mutation size (bytes) to a low value
-        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000");
-        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4");
-        connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
-        try {
-            upsertRows(connection, fullTableName);
-            fail();
-        }
-        catch(SQLException e) {
-            assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(), e.getErrorCode());
-        }
-    }
-
     private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException {
         PreparedStatement stmt = conn.prepareStatement("upsert into " + fullTableName +
                 " (organization_id, entity_id, score) values (?,?,?)");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java
index 655dbb1..3f749c1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortOrderIT.java
@@ -167,7 +167,16 @@ public class SortOrderIT extends ParallelStatsDisabledIT {
         runQueryTest(ddl, upsert("oid", "code"), insertedRows, new Object[][]{{"o2", 2}}, new WhereCondition("oid", "IN", "('o2')"),
             table);
     }
-    
+
+    @Test
+    public void inDescCompositePK3() throws Exception {
+        String table = generateUniqueName();
+        String ddl = "CREATE table " + table + " (oid VARCHAR NOT NULL, code VARCHAR NOT NULL constraint pk primary key (oid DESC, code DESC))";
+        Object[][] insertedRows = new Object[][]{{"o1", "1"}, {"o2", "2"}, {"o3", "3"}};
+        runQueryTest(ddl, upsert("oid", "code"), insertedRows, new Object[][]{{"o2", "2"}, {"o1", "1"}}, new WhereCondition("(oid, code)", "IN", "(('o2', '2'), ('o1', '1'))"),
+                table);
+    }
+
     @Test
     public void likeDescCompositePK1() throws Exception {
         String table = generateUniqueName();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogIT.java
index 15af2af..7b6a543 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogIT.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.phoenix.end2end;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java
index 49202a4..bbe7114 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java
@@ -16,177 +16,60 @@
  */
 package org.apache.phoenix.end2end;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Properties;
 import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.security.access.AccessControlClient;
 import org.apache.hadoop.hbase.security.access.Permission.Action;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.phoenix.query.QueryServices;
-import org.junit.After;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 /**
  * Test that verifies a user can read Phoenix tables with a minimal set of permissions.
+ * Uses HBase API directly to grant/revoke permissions
  */
 @Category(NeedsOwnMiniClusterTest.class)
-public class SystemTablePermissionsIT {
-    private static String SUPERUSER;
-
-    private static final Set<String> PHOENIX_SYSTEM_TABLES = new HashSet<>(Arrays.asList(
-            "SYSTEM.CATALOG", "SYSTEM.SEQUENCE", "SYSTEM.STATS", "SYSTEM.FUNCTION",
-                "SYSTEM.MUTEX"));
-    private static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = new HashSet<>(
-            Arrays.asList("SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", "SYSTEM:FUNCTION",
-                "SYSTEM:MUTEX"));
+public class SystemTablePermissionsIT extends BasePermissionsIT {
 
     private static final String TABLE_NAME =
         SystemTablePermissionsIT.class.getSimpleName().toUpperCase();
-    private static final int NUM_RECORDS = 5;
-
-    private HBaseTestingUtility testUtil = null;
-    private Properties clientProperties = null;
 
-    @BeforeClass
-    public static void setup() throws Exception {
-        SUPERUSER = System.getProperty("user.name");
-    }
-
-    private static void setCommonConfigProperties(Configuration conf) {
-        conf.set("hbase.coprocessor.master.classes",
-            "org.apache.hadoop.hbase.security.access.AccessController");
-        conf.set("hbase.coprocessor.region.classes",
-            "org.apache.hadoop.hbase.security.access.AccessController");
-        conf.set("hbase.coprocessor.regionserver.classes",
-            "org.apache.hadoop.hbase.security.access.AccessController");
-        conf.set("hbase.security.exec.permission.checks", "true");
-        conf.set("hbase.security.authorization", "true");
-        conf.set("hbase.superuser", SUPERUSER);
-    }
-
-    @After
-    public void cleanup() throws Exception {
-        if (null != testUtil) {
-          testUtil.shutdownMiniCluster();
-          testUtil = null;
-        }
+    public SystemTablePermissionsIT(boolean isNamespaceMapped) throws Exception {
+        super(isNamespaceMapped);
     }
 
     @Test
-    public void testSystemTablePermissions() throws Exception {
-        testUtil = new HBaseTestingUtility();
-        clientProperties = new Properties();
-        Configuration conf = testUtil.getConfiguration();
-        setCommonConfigProperties(conf);
-        conf.set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "false");
-        clientProperties.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "false");
-        testUtil.startMiniCluster(1);
-        final UserGroupInformation superUser = UserGroupInformation.createUserForTesting(
-            SUPERUSER, new String[0]);
-        final UserGroupInformation regularUser = UserGroupInformation.createUserForTesting(
-            "user", new String[0]);
+    public void testSystemTablePermissions() throws Throwable {
 
-        superUser.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-                createTable();
-                readTable();
-                return null;
-            }
-        });
+        startNewMiniCluster();
+
+        verifyAllowed(createTable(TABLE_NAME), superUser1);
+        verifyAllowed(readTable(TABLE_NAME), superUser1);
 
         Set<String> tables = getHBaseTables();
-        assertTrue("HBase tables do not include expected Phoenix tables: " + tables,
-            tables.containsAll(PHOENIX_SYSTEM_TABLES));
+        if(isNamespaceMapped) {
+            assertTrue("HBase tables do not include expected Phoenix tables: " + tables,
+                    tables.containsAll(PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES));
+        } else {
+            assertTrue("HBase tables do not include expected Phoenix tables: " + tables,
+                    tables.containsAll(PHOENIX_SYSTEM_TABLES));
+        }
 
         // Grant permission to the system tables for the unprivileged user
-        superUser.doAs(new PrivilegedExceptionAction<Void>() {
+        superUser1.runAs(new PrivilegedExceptionAction<Void>() {
             @Override
             public Void run() throws Exception {
                 try {
-                    grantPermissions(regularUser.getShortUserName(), PHOENIX_SYSTEM_TABLES,
-                        Action.EXEC, Action.READ);
-                    grantPermissions(regularUser.getShortUserName(),
-                        Collections.singleton(TABLE_NAME), Action.READ);
-                } catch (Throwable e) {
-                    if (e instanceof Exception) {
-                        throw (Exception) e;
+                    if(isNamespaceMapped) {
+                        grantPermissions(regularUser1.getShortName(),
+                                PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES, Action.EXEC, Action.READ);
                     } else {
-                        throw new Exception(e);
+                        grantPermissions(regularUser1.getShortName(), PHOENIX_SYSTEM_TABLES,
+                                Action.EXEC, Action.READ);
                     }
-                }
-                return null;
-            }
-        });
-
-        // Make sure that the unprivileged user can read the table
-        regularUser.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-                // We expect this to not throw an error
-                readTable();
-                return null;
-            }
-        });
-    }
-
-    @Test
-    public void testNamespaceMappedSystemTables() throws Exception {
-        testUtil = new HBaseTestingUtility();
-        clientProperties = new Properties();
-        Configuration conf = testUtil.getConfiguration();
-        setCommonConfigProperties(conf);
-        testUtil.getConfiguration().set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
-        clientProperties.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
-        testUtil.startMiniCluster(1);
-        final UserGroupInformation superUser =
-            UserGroupInformation.createUserForTesting(SUPERUSER, new String[0]);
-        final UserGroupInformation regularUser =
-            UserGroupInformation.createUserForTesting("user", new String[0]);
-
-        superUser.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-                createTable();
-                readTable();
-                return null;
-            }
-        });
-
-        Set<String> tables = getHBaseTables();
-        assertTrue("HBase tables do not include expected Phoenix tables: " + tables,
-            tables.containsAll(PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES));
-
-        // Grant permission to the system tables for the unprivileged user
-        // An unprivileged user should only need to be able to Read and eXecute on them.
-        superUser.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-                try {
-                    grantPermissions(regularUser.getShortUserName(),
-                        PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES, Action.EXEC, Action.READ);
-                    grantPermissions(regularUser.getShortUserName(),
+                    grantPermissions(regularUser1.getShortName(),
                         Collections.singleton(TABLE_NAME), Action.READ);
                 } catch (Throwable e) {
                     if (e instanceof Exception) {
@@ -199,66 +82,7 @@ public class SystemTablePermissionsIT {
             }
         });
 
-        regularUser.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-                // We expect this to not throw an error
-                readTable();
-                return null;
-            }
-        });
-    }
-
-    private String getJdbcUrl() {
-        return "jdbc:phoenix:localhost:" + testUtil.getZkCluster().getClientPort() + ":/hbase";
-    }
-
-    private void createTable() throws SQLException {
-        try (Connection conn = DriverManager.getConnection(getJdbcUrl(), clientProperties);
-            Statement stmt = conn.createStatement();) {
-            assertFalse(stmt.execute("DROP TABLE IF EXISTS " + TABLE_NAME));
-            assertFalse(stmt.execute("CREATE TABLE " + TABLE_NAME
-                + "(pk INTEGER not null primary key, data VARCHAR)"));
-            try (PreparedStatement pstmt = conn.prepareStatement("UPSERT INTO "
-                + TABLE_NAME + " values(?, ?)")) {
-                for (int i = 0; i < NUM_RECORDS; i++) {
-                    pstmt.setInt(1, i);
-                    pstmt.setString(2, Integer.toString(i));
-                    assertEquals(1, pstmt.executeUpdate());
-                }
-            }
-            conn.commit();
-        }
-    }
-
-    private void readTable() throws SQLException {
-        try (Connection conn = DriverManager.getConnection(getJdbcUrl(), clientProperties);
-            Statement stmt = conn.createStatement()) {
-            ResultSet rs = stmt.executeQuery("SELECT pk, data FROM " + TABLE_NAME);
-            assertNotNull(rs);
-            int i = 0;
-            while (rs.next()) {
-                assertEquals(i, rs.getInt(1));
-                assertEquals(Integer.toString(i), rs.getString(2));
-                i++;
-            }
-            assertEquals(NUM_RECORDS, i);
-        }
-    }
-
-    private void grantPermissions(String toUser, Set<String> tablesToGrant, Action... actions)
-            throws Throwable {
-          for (String table : tablesToGrant) {
-              AccessControlClient.grant(testUtil.getConnection(), TableName.valueOf(table), toUser,
-                  null, null, actions);
-          }
-    }
-
-    private Set<String> getHBaseTables() throws IOException {
-        Set<String> tables = new HashSet<>();
-        for (TableName tn : testUtil.getHBaseAdmin().listTableNames()) {
-            tables.add(tn.getNameAsString());
-        }
-        return tables;
+        // Make sure that the unprivileged user can now read the table
+        verifyAllowed(readTable(TABLE_NAME), regularUser1);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
new file mode 100644
index 0000000..8666bb8
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.Permission.Action;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test that verifies a user can read Phoenix tables with a minimal set of permissions.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class TableDDLPermissionsIT extends BasePermissionsIT {
+
+    public TableDDLPermissionsIT(boolean isNamespaceMapped) throws Exception {
+        super(isNamespaceMapped);
+    }
+
+    private void grantSystemTableAccess() throws Exception {
+        try (Connection conn = getConnection()) {
+            if (isNamespaceMapped) {
+                grantPermissions(regularUser1.getShortName(), PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES, Action.READ,
+                        Action.EXEC);
+                grantPermissions(unprivilegedUser.getShortName(), PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES,
+                        Action.READ, Action.EXEC);
+                grantPermissions(AuthUtil.toGroupEntry(GROUP_SYSTEM_ACCESS), PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES,
+                        Action.READ, Action.EXEC);
+                // Local Index requires WRITE permission on SYSTEM.SEQUENCE TABLE.
+                grantPermissions(regularUser1.getShortName(), Collections.singleton("SYSTEM:SEQUENCE"), Action.WRITE,
+                        Action.READ, Action.EXEC);
+                grantPermissions(unprivilegedUser.getShortName(), Collections.singleton("SYSTEM:SEQUENCE"), Action.WRITE,
+                        Action.READ, Action.EXEC);
+                
+            } else {
+                grantPermissions(regularUser1.getShortName(), PHOENIX_SYSTEM_TABLES, Action.READ, Action.EXEC);
+                grantPermissions(unprivilegedUser.getShortName(), PHOENIX_SYSTEM_TABLES, Action.READ, Action.EXEC);
+                grantPermissions(AuthUtil.toGroupEntry(GROUP_SYSTEM_ACCESS), PHOENIX_SYSTEM_TABLES, Action.READ, Action.EXEC);
+                // Local Index requires WRITE permission on SYSTEM.SEQUENCE TABLE.
+                grantPermissions(regularUser1.getShortName(), Collections.singleton("SYSTEM.SEQUENCE"), Action.WRITE,
+                        Action.READ, Action.EXEC);
+                grantPermissions(unprivilegedUser.getShortName(), Collections.singleton("SYSTEM:SEQUENCE"), Action.WRITE,
+                        Action.READ, Action.EXEC);
+            }
+        } catch (Throwable e) {
+            if (e instanceof Exception) {
+                throw (Exception)e;
+            } else {
+                throw new Exception(e);
+            }
+        }
+    }
+
+    @Test
+    public void testSchemaPermissions() throws Throwable{
+
+        if (!isNamespaceMapped) { return; }
+        try {
+            startNewMiniCluster();
+            grantSystemTableAccess();
+            final String schemaName = "TEST_SCHEMA_PERMISSION";
+            superUser1.runAs(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                    try {
+                        AccessControlClient.grant(getUtility().getConnection(), regularUser1.getShortName(),
+                                Action.ADMIN);
+                    } catch (Throwable e) {
+                        if (e instanceof Exception) {
+                            throw (Exception)e;
+                        } else {
+                            throw new Exception(e);
+                        }
+                    }
+                    return null;
+                }
+            });
+            verifyAllowed(createSchema(schemaName), regularUser1);
+            // Unprivileged user cannot drop a schema
+            verifyDenied(dropSchema(schemaName), AccessDeniedException.class, unprivilegedUser);
+            verifyDenied(createSchema(schemaName), AccessDeniedException.class, unprivilegedUser);
+
+            verifyAllowed(dropSchema(schemaName), regularUser1);
+        } finally {
+            revokeAll();
+        }
+    }
+
+    @Test
+    public void testAutomaticGrantWithIndexAndView() throws Throwable {
+        startNewMiniCluster();
+        final String schema = "TEST_INDEX_VIEW";
+        final String tableName = "TABLE_DDL_PERMISSION_IT";
+        final String phoenixTableName = schema + "." + tableName;
+        final String indexName1 = tableName + "_IDX1";
+        final String indexName2 = tableName + "_IDX2";
+        final String lIndexName1 = tableName + "_LIDX1";
+        final String viewName1 = schema+"."+tableName + "_V1";
+        final String viewName2 = schema+"."+tableName + "_V2";
+        final String viewName3 = schema+"."+tableName + "_V3";
+        final String viewName4 = schema+"."+tableName + "_V4";
+        final String viewIndexName1 = tableName + "_VIDX1";
+        final String viewIndexName2 = tableName + "_VIDX2";
+        grantSystemTableAccess();
+        try {
+            superUser1.runAs(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                    try {
+                        verifyAllowed(createSchema(schema), superUser1);
+                        if (isNamespaceMapped) {
+                            grantPermissions(regularUser1.getShortName(), schema, Action.CREATE);
+                            grantPermissions(AuthUtil.toGroupEntry(GROUP_SYSTEM_ACCESS), schema, Action.CREATE);
+
+                        } else {
+                            grantPermissions(regularUser1.getShortName(),
+                                    NamespaceDescriptor.DEFAULT_NAMESPACE.getName(), Action.CREATE);
+                            grantPermissions(AuthUtil.toGroupEntry(GROUP_SYSTEM_ACCESS),
+                                    NamespaceDescriptor.DEFAULT_NAMESPACE.getName(), Action.CREATE);
+
+                        }
+                    } catch (Throwable e) {
+                        if (e instanceof Exception) {
+                            throw (Exception)e;
+                        } else {
+                            throw new Exception(e);
+                        }
+                    }
+                    return null;
+                }
+            });
+
+            verifyAllowed(createTable(phoenixTableName), regularUser1);
+            verifyAllowed(createIndex(indexName1, phoenixTableName), regularUser1);
+            verifyAllowed(createView(viewName1, phoenixTableName), regularUser1);
+            verifyAllowed(createLocalIndex(lIndexName1, phoenixTableName), regularUser1);
+            verifyAllowed(createIndex(viewIndexName1, viewName1), regularUser1);
+            verifyAllowed(createIndex(viewIndexName2, viewName1), regularUser1);
+            verifyAllowed(createView(viewName4, viewName1), regularUser1);
+            verifyAllowed(readTable(phoenixTableName), regularUser1);
+
+            verifyDenied(createIndex(indexName2, phoenixTableName), AccessDeniedException.class, unprivilegedUser);
+            verifyDenied(createView(viewName2, phoenixTableName),AccessDeniedException.class,  unprivilegedUser);
+            verifyDenied(createView(viewName3, viewName1), AccessDeniedException.class, unprivilegedUser);
+            verifyDenied(dropView(viewName1), AccessDeniedException.class, unprivilegedUser);
+            
+            verifyDenied(dropIndex(indexName1, phoenixTableName), AccessDeniedException.class, unprivilegedUser);
+            verifyDenied(dropTable(phoenixTableName), AccessDeniedException.class, unprivilegedUser);
+            verifyDenied(rebuildIndex(indexName1, phoenixTableName), AccessDeniedException.class, unprivilegedUser);
+            verifyDenied(addColumn(phoenixTableName, "val1"), AccessDeniedException.class, unprivilegedUser);
+            verifyDenied(dropColumn(phoenixTableName, "val"), AccessDeniedException.class, unprivilegedUser);
+            verifyDenied(addProperties(phoenixTableName, "GUIDE_POSTS_WIDTH", "100"), AccessDeniedException.class, unprivilegedUser);
+
+            // Granting read permission to unprivileged user, now he should be able to create view but not index
+            grantPermissions(unprivilegedUser.getShortName(),
+                    Collections.singleton(
+                            SchemaUtil.getPhysicalHBaseTableName(schema, tableName, isNamespaceMapped).getString()),
+                    Action.READ, Action.EXEC);
+            grantPermissions(AuthUtil.toGroupEntry(GROUP_SYSTEM_ACCESS),
+                    Collections.singleton(
+                            SchemaUtil.getPhysicalHBaseTableName(schema, tableName, isNamespaceMapped).getString()),
+                    Action.READ, Action.EXEC);
+            verifyDenied(createIndex(indexName2, phoenixTableName), AccessDeniedException.class, unprivilegedUser);
+            verifyAllowed(createView(viewName2, phoenixTableName), unprivilegedUser);
+            verifyAllowed(createView(viewName3, viewName1), unprivilegedUser);
+            
+            // Grant create permission in namespace
+            if (isNamespaceMapped) {
+                grantPermissions(unprivilegedUser.getShortName(), schema, Action.CREATE);
+            } else {
+                grantPermissions(unprivilegedUser.getShortName(), NamespaceDescriptor.DEFAULT_NAMESPACE.getName(),
+                        Action.CREATE);
+            }
+
+            // we should be able to read the data from another index as well to which we have not given any access to
+            // this user
+            verifyAllowed(createIndex(indexName2, phoenixTableName), unprivilegedUser);
+            verifyAllowed(readTable(phoenixTableName, indexName1), unprivilegedUser);
+            verifyAllowed(readTable(phoenixTableName, indexName2), unprivilegedUser);
+            verifyAllowed(rebuildIndex(indexName2, phoenixTableName), unprivilegedUser);
+
+            // data table user should be able to read new index
+            verifyAllowed(rebuildIndex(indexName2, phoenixTableName), regularUser1);
+            verifyAllowed(readTable(phoenixTableName, indexName2), regularUser1);
+
+            verifyAllowed(readTable(phoenixTableName), regularUser1);
+            verifyAllowed(rebuildIndex(indexName1, phoenixTableName), regularUser1);
+            verifyAllowed(addColumn(phoenixTableName, "val1"), regularUser1);
+            verifyAllowed(addProperties(phoenixTableName, "GUIDE_POSTS_WIDTH", "100"), regularUser1);
+            verifyAllowed(dropView(viewName1), regularUser1);
+            verifyAllowed(dropView(viewName2), regularUser1);
+            verifyAllowed(dropColumn(phoenixTableName, "val1"), regularUser1);
+            verifyAllowed(dropIndex(indexName2, phoenixTableName), regularUser1);
+            verifyAllowed(dropIndex(indexName1, phoenixTableName), regularUser1);
+            verifyAllowed(dropTable(phoenixTableName), regularUser1);
+
+            // check again with super users
+            verifyAllowed(createTable(phoenixTableName), superUser2);
+            verifyAllowed(createIndex(indexName1, phoenixTableName), superUser2);
+            verifyAllowed(createView(viewName1, phoenixTableName), superUser2);
+            verifyAllowed(readTable(phoenixTableName), superUser2);
+            verifyAllowed(dropView(viewName1), superUser2);
+            verifyAllowed(dropTable(phoenixTableName), superUser2);
+
+        } finally {
+            revokeAll();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
index 0ce36dd..986c317 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
@@ -674,4 +674,59 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
             conn.close();
         }
     }
+
+
+
+    @Test
+    public void testIndexAlterPhoenixProperty() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String testTable = generateUniqueName();
+
+
+        String ddl = "create table " + testTable  + " (k varchar primary key, v1 varchar)";
+        Statement stmt = conn.createStatement();
+        stmt.execute(ddl);
+        String indexName = "IDX_" + generateUniqueName();
+
+        ddl = "CREATE INDEX " + indexName + " ON " + testTable  + " (v1) ";
+        stmt.execute(ddl);
+        conn.createStatement().execute("ALTER INDEX "+indexName+" ON " + testTable +" ACTIVE SET GUIDE_POSTS_WIDTH = 10");
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "select GUIDE_POSTS_WIDTH from SYSTEM.\"CATALOG\" where TABLE_NAME='" + indexName + "'");assertTrue(rs.next());
+        assertEquals(10,rs.getInt(1));
+
+        conn.createStatement().execute("ALTER INDEX "+indexName+" ON " + testTable +" ACTIVE SET GUIDE_POSTS_WIDTH = 20");
+        rs = conn.createStatement().executeQuery(
+                "select GUIDE_POSTS_WIDTH from SYSTEM.\"CATALOG\" where TABLE_NAME='" + indexName + "'");assertTrue(rs.next());
+        assertEquals(20,rs.getInt(1));
+    }
+
+
+    @Test
+    public void testIndexAlterHBaseProperty() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String testTable = generateUniqueName();
+
+        String ddl = "create table " + testTable  + " (k varchar primary key, v1 varchar)";
+        Statement stmt = conn.createStatement();
+        stmt.execute(ddl);
+        String indexName = "IDX_" + generateUniqueName();
+
+        ddl = "CREATE INDEX " + indexName + " ON " + testTable  + " (v1) ";
+        stmt.execute(ddl);
+
+        conn.createStatement().execute("ALTER INDEX "+indexName+" ON " + testTable +" ACTIVE SET DISABLE_WAL=false");
+        asssertIsWALDisabled(conn,indexName,false);
+        conn.createStatement().execute("ALTER INDEX "+indexName+" ON " + testTable +" ACTIVE SET DISABLE_WAL=true");
+        asssertIsWALDisabled(conn,indexName,true);
+    }
+
+    private static void asssertIsWALDisabled(Connection conn, String fullTableName, boolean expectedValue) throws SQLException {
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        assertEquals(expectedValue, pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isWALDisabled());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
index 37ffd02..f09f1d3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinMoreIT.java
@@ -895,6 +895,11 @@ public class HashJoinMoreIT extends ParallelStatsDisabledIT {
                     + "FROM ( SELECT ACCOUNT_ID, BUCKET_ID, OBJECT_ID, MAX(OBJECT_VERSION) AS MAXVER "
                     + "       FROM test2961 GROUP BY ACCOUNT_ID, BUCKET_ID, OBJECT_ID) AS X "
                     + "       INNER JOIN test2961 AS OBJ ON X.ACCOUNT_ID = OBJ.ACCOUNT_ID AND X.BUCKET_ID = OBJ.BUCKET_ID AND X.OBJECT_ID = OBJ.OBJECT_ID AND  X.MAXVER = OBJ.OBJECT_VERSION";
+            rs = conn.createStatement().executeQuery("explain " + q);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String dynamicFilter = "DYNAMIC SERVER FILTER BY (OBJ.ACCOUNT_ID, OBJ.BUCKET_ID, OBJ.OBJECT_ID, OBJ.OBJECT_VERSION) IN ((X.ACCOUNT_ID, X.BUCKET_ID, X.OBJECT_ID, X.MAXVER))";
+            assertTrue("Expected '" + dynamicFilter + "' to be used for the query, but got:\n" + plan,
+                    plan.contains(dynamicFilter));
             rs = conn.createStatement().executeQuery(q);
             assertTrue(rs.next());
             assertEquals("2222", rs.getString(4));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index 10fd7f8..e5b57e3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -33,7 +33,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -52,8 +51,8 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseOwnClusterIT;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.monitoring.GlobalMetric;
 import org.apache.phoenix.monitoring.MetricType;
@@ -285,7 +284,7 @@ public class PartialCommitIT extends BaseOwnClusterIT {
     private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException {
         Connection con = driver.connect(url, new Properties());
         PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
-        final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
+        final Map<TableRef, MultiRowMutationState> mutations = Maps.newTreeMap(new TableRefComparator());
         // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState
         return new PhoenixConnection(phxCon, null) {
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 93e0ede..87153cd 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -118,6 +118,7 @@ tokens
     UNION='union';
     FUNCTION='function';
     AS='as';
+    TO='to';
     TEMPORARY='temporary';
     RETURNS='returns';
     USING='using';
@@ -144,6 +145,8 @@ tokens
     DUPLICATE = 'duplicate';
     IGNORE = 'ignore';
     IMMUTABLE = 'immutable';
+    GRANT = 'grant';
+    REVOKE = 'revoke';
 }
 
 
@@ -430,6 +433,8 @@ oneStatement returns [BindableStatement ret]
     |   s=delete_jar_node
     |   s=alter_session_node
     |	s=create_sequence_node
+    |   s=grant_permission_node
+    |   s=revoke_permission_node
     |	s=drop_sequence_node
     |	s=drop_schema_node
     |	s=use_schema_node
@@ -454,10 +459,34 @@ create_table_node returns [CreateTableStatement ret]
    
 // Parse a create schema statement.
 create_schema_node returns [CreateSchemaStatement ret]
-    :   CREATE SCHEMA (IF NOT ex=EXISTS)? (DEFAULT | s=identifier)
+    :   CREATE SCHEMA (IF NOT ex=EXISTS)? s=identifier
         {ret = factory.createSchema(s, ex!=null); }
     ;
 
+// Parse a grant permission statement
+grant_permission_node returns [ChangePermsStatement ret]
+    :   GRANT p=literal (ON ((TABLE)? table=table_name | s=SCHEMA schema=identifier))? TO (g=GROUP)? ug=literal
+        {
+            String permsString = SchemaUtil.normalizeLiteral(p);
+            if (permsString != null && permsString.length() > 5) {
+                throw new RuntimeException("Permissions String length should be less than 5 characters");
+            }
+            $ret = factory.changePermsStatement(permsString, s!=null, table, schema, g!=null, ug, Boolean.TRUE);
+        }
+    ;
+
+// Parse a revoke permission statement
+revoke_permission_node returns [ChangePermsStatement ret]
+    :   REVOKE (p=literal)? (ON ((TABLE)? table=table_name | s=SCHEMA schema=identifier))? FROM (g=GROUP)? ug=literal
+        {
+            String permsString = SchemaUtil.normalizeLiteral(p);
+            if (permsString != null && permsString.length() > 5) {
+                throw new RuntimeException("Permissions String length should be less than 5 characters");
+            }
+            $ret = factory.changePermsStatement(permsString, s!=null, table, schema, g!=null, ug, Boolean.FALSE);
+        }
+    ;
+
 // Parse a create view statement.
 create_view_node returns [CreateTableStatement ret]
     :   CREATE VIEW (IF NOT ex=EXISTS)? t=from_table_name 
@@ -576,8 +605,9 @@ drop_index_node returns [DropIndexStatement ret]
 
 // Parse a alter index statement
 alter_index_node returns [AlterIndexStatement ret]
-    : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE | ACTIVE) (async=ASYNC)?
-      {ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText())), async!=null); }
+    : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name
+      ((s=(USABLE | UNUSABLE | REBUILD | DISABLE | ACTIVE)) (async=ASYNC)? ((SET?)p=fam_properties)?)
+      {ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText())), async!=null, p); }
     ;
 
 // Parse a trace statement.
@@ -1161,7 +1191,6 @@ BIND_NAME
     : COLON (DIGIT)+
     ;
 
-
 NAME
     :    LETTER (FIELDCHAR)*
     |    '\"' (DBL_QUOTE_CHAR)* '\"'

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/RpcUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/RpcUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/RpcUtil.java
new file mode 100644
index 0000000..ac281f1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/RpcUtil.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.ipc.RpcServer.Call;
+
+public class RpcUtil {
+
+    public static Call getRpcContext() {
+        return RpcServer.CurCall.get();
+    }
+    
+    public static void setRpcContext(Call c){
+        RpcServer.CurCall.set(c);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
index 0e45682..60eb59a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
@@ -79,4 +79,9 @@ public abstract class BaseMutationPlan implements MutationPlan {
         return 0l;
     }
 
+    @Override
+    public QueryPlan getQueryPlan() {
+        return null;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
index 343ec32..90eef61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
@@ -42,6 +42,11 @@ public class DelegateMutationPlan implements MutationPlan {
     }
 
     @Override
+    public QueryPlan getQueryPlan() {
+        return plan.getQueryPlan();
+    }
+
+    @Override
     public ParameterMetaData getParameterMetaData() {
         return plan.getParameterMetaData();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index f038cda..a06e2ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
@@ -43,6 +42,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.ValueGetter;
@@ -91,7 +91,6 @@ import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.sun.istack.NotNull;
 
 public class DeleteCompiler {
@@ -121,14 +120,14 @@ public class DeleteCompiler {
         final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
         final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
         final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
-        Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize);
-        List<Map<ImmutableBytesPtr,RowMutationState>> indexMutations = null;
+        MultiRowMutationState mutations = new MultiRowMutationState(batchSize);
+        List<MultiRowMutationState> indexMutations = null;
         // If indexTableRef is set, we're deleting the rows from both the index table and
         // the data table through a single query to save executing an additional one.
         if (!otherTableRefs.isEmpty()) {
             indexMutations = Lists.newArrayListWithExpectedSize(otherTableRefs.size());
             for (int i = 0; i < otherTableRefs.size(); i++) {
-                indexMutations.add(Maps.<ImmutableBytesPtr,RowMutationState>newHashMapWithExpectedSize(batchSize));
+                indexMutations.add(new MultiRowMutationState(batchSize));
             }
         }
         List<PColumn> pkColumns = table.getPKColumns();
@@ -207,7 +206,7 @@ public class DeleteCompiler {
                 // row key will already have its value.
                 // Check for otherTableRefs being empty required when deleting directly from the index
                 if (otherTableRefs.isEmpty() || table.getIndexType() != IndexType.LOCAL) {
-                    mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
+                    mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
                 }
                 for (int i = 0; i < otherTableRefs.size(); i++) {
                     PTable otherTable = otherTableRefs.get(i).getTable();
@@ -221,7 +220,7 @@ public class DeleteCompiler {
                     } else {
                         indexPtr.set(maintainers[i].buildRowKey(getter, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP));
                     }
-                    indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
+                    indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
                 }
                 if (mutations.size() > maxSize) {
                     throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
@@ -303,14 +302,16 @@ public class DeleteCompiler {
         return Collections.emptyList();
     }
     
-    private class MultiDeleteMutationPlan implements MutationPlan {
+    private class MultiRowDeleteMutationPlan implements MutationPlan {
         private final List<MutationPlan> plans;
         private final MutationPlan firstPlan;
-        
-        public MultiDeleteMutationPlan(@NotNull List<MutationPlan> plans) {
+        private final QueryPlan dataPlan;
+
+        public MultiRowDeleteMutationPlan(QueryPlan dataPlan, @NotNull List<MutationPlan> plans) {
             Preconditions.checkArgument(!plans.isEmpty());
             this.plans = plans;
             this.firstPlan = plans.get(0);
+            this.dataPlan = dataPlan;
         }
         
         @Override
@@ -348,8 +349,8 @@ public class DeleteCompiler {
             return firstPlan.getSourceRefs();
         }
 
-		@Override
-		public Operation getOperation() {
+		    @Override
+		    public Operation getOperation() {
 			return operation;
 		}
 
@@ -401,6 +402,11 @@ public class DeleteCompiler {
             }
             return estInfoTimestamp;
         }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return dataPlan;
+        }
     }
 
     public MutationPlan compile(DeleteStatement delete) throws SQLException {
@@ -548,69 +554,9 @@ public class DeleteCompiler {
             List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(queryPlans.size());
             for (final QueryPlan plan : queryPlans) {
                 final StatementContext context = plan.getContext();
-                mutationPlans.add(new MutationPlan() {
-    
-                    @Override
-                    public ParameterMetaData getParameterMetaData() {
-                        return context.getBindManager().getParameterMetaData();
-                    }
-    
-                    @Override
-                    public MutationState execute() throws SQLException {
-                        // We have a point lookup, so we know we have a simple set of fully qualified
-                        // keys for our ranges
-                        ScanRanges ranges = context.getScanRanges();
-                        Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); 
-                        Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
-                        while (iterator.hasNext()) {
-                            mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
-                        }
-                        return new MutationState(plan.getTableRef(), mutation, 0, maxSize, maxSizeBytes, connection);
-                    }
-    
-                    @Override
-                    public ExplainPlan getExplainPlan() throws SQLException {
-                        return new ExplainPlan(Collections.singletonList("DELETE SINGLE ROW"));
-                    }
-    
-                    @Override
-                    public StatementContext getContext() {
-                        return context;
-                    }
-
-                    @Override
-                    public TableRef getTargetRef() {
-                        return dataPlan.getTableRef();
-                    }
-
-                    @Override
-                    public Set<TableRef> getSourceRefs() {
-                        // Don't include the target
-                        return Collections.emptySet();
-                    }
-
-            		@Override
-            		public Operation getOperation() {
-            			return operation;
-            		}
-
-                    @Override
-                    public Long getEstimatedRowsToScan() throws SQLException {
-                        return 0l;
-                    }
-
-                    @Override
-                    public Long getEstimatedBytesToScan() throws SQLException {
-                        return 0l;
-                    }
-
-                    @Override
-                    public Long getEstimateInfoTimestamp() throws SQLException {
-                        return 0l;
-                    }
-                });
+                mutationPlans.add(new SingleRowDeleteMutationPlan(plan, connection, maxSize, maxSizeBytes));
             }
-            return new MultiDeleteMutationPlan(mutationPlans);
+            return new MultiRowDeleteMutationPlan(dataPlan, mutationPlans);
         } else if (runOnServer) {
             // TODO: better abstraction
             final StatementContext context = dataPlan.getContext();
@@ -629,91 +575,7 @@ public class DeleteCompiler {
             final RowProjector projector = projectorToBe;
             final QueryPlan aggPlan = new AggregatePlan(context, select, dataPlan.getTableRef(), projector, null, null,
                     OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
-            return new MutationPlan() {
-                        @Override
-                        public ParameterMetaData getParameterMetaData() {
-                            return context.getBindManager().getParameterMetaData();
-                        }
-        
-                        @Override
-                        public StatementContext getContext() {
-                            return context;
-                        }
-        
-                        @Override
-                        public TableRef getTargetRef() {
-                            return dataPlan.getTableRef();
-                        }
-    
-                        @Override
-                        public Set<TableRef> getSourceRefs() {
-                            return dataPlan.getSourceRefs();
-                        }
-    
-                		@Override
-                		public Operation getOperation() {
-                			return operation;
-                		}
-    
-                        @Override
-                        public MutationState execute() throws SQLException {
-                            // TODO: share this block of code with UPSERT SELECT
-                            ImmutableBytesWritable ptr = context.getTempPtr();
-                            PTable table = dataPlan.getTableRef().getTable();
-                            table.getIndexMaintainers(ptr, context.getConnection());
-                            byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
-                            ServerCache cache = null;
-                            try {
-                                if (ptr.getLength() > 0) {
-                                    byte[] uuidValue = ServerCacheClient.generateId();
-                                    context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                                    context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-                                }
-                                ResultIterator iterator = aggPlan.iterator();
-                                try {
-                                    Tuple row = iterator.next();
-                                    final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
-                                    return new MutationState(maxSize, maxSizeBytes, connection) {
-                                        @Override
-                                        public long getUpdateCount() {
-                                            return mutationCount;
-                                        }
-                                    };
-                                } finally {
-                                    iterator.close();
-                                }
-                            } finally {
-                                if (cache != null) {
-                                    cache.close();
-                                }
-                            }
-                        }
-        
-                        @Override
-                        public ExplainPlan getExplainPlan() throws SQLException {
-                            List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();
-                            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                            planSteps.add("DELETE ROWS");
-                            planSteps.addAll(queryPlanSteps);
-                            return new ExplainPlan(planSteps);
-                        }
-    
-                        @Override
-                        public Long getEstimatedRowsToScan() throws SQLException {
-                            return aggPlan.getEstimatedRowsToScan();
-                        }
-    
-                        @Override
-                        public Long getEstimatedBytesToScan() throws SQLException {
-                            return aggPlan.getEstimatedBytesToScan();
-                        }
-    
-                        @Override
-                        public Long getEstimateInfoTimestamp() throws SQLException {
-                            return aggPlan.getEstimateInfoTimestamp();
-                        }
-                    };
+            return new ServerSelectDeleteMutationPlan(dataPlan, connection, aggPlan, projector, maxSize, maxSizeBytes);
         } else {
             final DeletingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe;
             List<PColumn> adjustedProjectedColumns = Lists.newArrayListWithExpectedSize(projectedColumns.size());
@@ -749,90 +611,322 @@ public class DeleteCompiler {
             if (!bestPlan.getTableRef().getTable().equals(targetTableRef.getTable())) {
                 otherTableRefs.add(projectedTableRef);
             }
-            final StatementContext context = bestPlan.getContext();
-            return new MutationPlan() {
-                @Override
-                public ParameterMetaData getParameterMetaData() {
-                    return context.getBindManager().getParameterMetaData();
-                }
+            return new ClientSelectDeleteMutationPlan(targetTableRef, dataPlan, bestPlan, hasPreOrPostProcessing,
+                    parallelIteratorFactory, otherTableRefs, projectedTableRef, maxSize, maxSizeBytes, connection);
+        }
+    }
 
-                @Override
-                public StatementContext getContext() {
-                    return context;
-                }
+    private class SingleRowDeleteMutationPlan implements MutationPlan {
 
-                @Override
-                public TableRef getTargetRef() {
-                    return targetTableRef;
-                }
+        private final QueryPlan dataPlan;
+        private final PhoenixConnection connection;
+        private final int maxSize;
+        private final StatementContext context;
+        private final int maxSizeBytes;
 
-                @Override
-                public Set<TableRef> getSourceRefs() {
-                    return dataPlan.getSourceRefs();
-                }
+        public SingleRowDeleteMutationPlan(QueryPlan dataPlan, PhoenixConnection connection, int maxSize, int maxSizeBytes) {
+            this.dataPlan = dataPlan;
+            this.connection = connection;
+            this.maxSize = maxSize;
+            this.context = dataPlan.getContext();
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return context.getBindManager().getParameterMetaData();
+        }
 
-        		@Override
-        		public Operation getOperation() {
-        			return operation;
-        		}
-
-                @Override
-                public MutationState execute() throws SQLException {
-                    ResultIterator iterator = bestPlan.iterator();
-                    try {
-                        if (!hasPreOrPostProcessing) {
-                            Tuple tuple;
-                            long totalRowCount = 0;
-                            if (parallelIteratorFactory != null) {
-                                parallelIteratorFactory.setQueryPlan(bestPlan);
-                                parallelIteratorFactory.setOtherTableRefs(otherTableRefs);
-                                parallelIteratorFactory.setProjectedTableRef(projectedTableRef);
-                            }
-                            while ((tuple=iterator.next()) != null) {// Runs query
-                                Cell kv = tuple.getValue(0);
-                                totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
-                            }
-                            // Return total number of rows that have been deleted from the table. In the case of auto commit being off
-                            // the mutations will all be in the mutation state of the current connection. We need to divide by the
-                            // total number of tables we updated as otherwise the client will get an unexpected result
-                            MutationState state = new MutationState(maxSize, maxSizeBytes, connection, totalRowCount / ((bestPlan.getTableRef().getTable().getIndexType() == IndexType.LOCAL && !otherTableRefs.isEmpty() ? 0 : 1) + otherTableRefs.size()));
-
-                            // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
-                            state.setReadMetricQueue(context.getReadMetricsQueue());
-
-                            return state;
-                        } else {
-                            return deleteRows(context, iterator, bestPlan, projectedTableRef, otherTableRefs);
+        @Override
+        public MutationState execute() throws SQLException {
+            // We have a point lookup, so we know we have a simple set of fully qualified
+            // keys for our ranges
+            ScanRanges ranges = context.getScanRanges();
+            Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
+            MultiRowMutationState mutation = new MultiRowMutationState(ranges.getPointLookupCount());
+            while (iterator.hasNext()) {
+                mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()),
+                        new RowMutationState(PRow.DELETE_MARKER, 0,
+                                statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
+            }
+            return new MutationState(dataPlan.getTableRef(), mutation, 0, maxSize, maxSizeBytes, connection);
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            return new ExplainPlan(Collections.singletonList("DELETE SINGLE ROW"));
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return dataPlan;
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return context;
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return dataPlan.getTableRef();
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            // Don't include the target
+            return Collections.emptySet();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return 0l;
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return 0l;
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return 0l;
+        }
+    }
+
+    private class ServerSelectDeleteMutationPlan implements MutationPlan {
+        private final StatementContext context;
+        private final QueryPlan dataPlan;
+        private final PhoenixConnection connection;
+        private final QueryPlan aggPlan;
+        private final RowProjector projector;
+        private final int maxSize;
+        private final int maxSizeBytes;
+
+        public ServerSelectDeleteMutationPlan(QueryPlan dataPlan, PhoenixConnection connection, QueryPlan aggPlan,
+                                              RowProjector projector, int maxSize, int maxSizeBytes) {
+            this.context = dataPlan.getContext();
+            this.dataPlan = dataPlan;
+            this.connection = connection;
+            this.aggPlan = aggPlan;
+            this.projector = projector;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return context.getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return context;
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return dataPlan.getTableRef();
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return dataPlan.getSourceRefs();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            // TODO: share this block of code with UPSERT SELECT
+            ImmutableBytesWritable ptr = context.getTempPtr();
+            PTable table = dataPlan.getTableRef().getTable();
+            table.getIndexMaintainers(ptr, context.getConnection());
+            byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
+            ServerCache cache = null;
+            try {
+                if (ptr.getLength() > 0) {
+                    byte[] uuidValue = ServerCacheClient.generateId();
+                    context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                    context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
+                    context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                }
+                ResultIterator iterator = aggPlan.iterator();
+                try {
+                    Tuple row = iterator.next();
+                    final long mutationCount = (Long) projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
+                    return new MutationState(maxSize, maxSizeBytes, connection) {
+                        @Override
+                        public long getUpdateCount() {
+                            return mutationCount;
                         }
-                    } finally {
-                        iterator.close();
-                    }
+                    };
+                } finally {
+                    iterator.close();
                 }
-
-                @Override
-                public ExplainPlan getExplainPlan() throws SQLException {
-                    List<String> queryPlanSteps =  bestPlan.getExplainPlan().getPlanSteps();
-                    List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                    planSteps.add("DELETE ROWS");
-                    planSteps.addAll(queryPlanSteps);
-                    return new ExplainPlan(planSteps);
+            } finally {
+                if (cache != null) {
+                    cache.close();
                 }
+            }
+        }
 
-                @Override
-                public Long getEstimatedRowsToScan() throws SQLException {
-                    return bestPlan.getEstimatedRowsToScan();
-                }
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();
+            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+            planSteps.add("DELETE ROWS");
+            planSteps.addAll(queryPlanSteps);
+            return new ExplainPlan(planSteps);
+        }
 
-                @Override
-                public Long getEstimatedBytesToScan() throws SQLException {
-                    return bestPlan.getEstimatedBytesToScan();
-                }
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return aggPlan.getEstimatedRowsToScan();
+        }
 
-                @Override
-                public Long getEstimateInfoTimestamp() throws SQLException {
-                    return bestPlan.getEstimateInfoTimestamp();
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return aggPlan.getEstimatedBytesToScan();
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return aggPlan.getEstimateInfoTimestamp();
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return aggPlan;
+        }
+    }
+
+    private class ClientSelectDeleteMutationPlan implements MutationPlan {
+        private final StatementContext context;
+        private final TableRef targetTableRef;
+        private final QueryPlan dataPlan;
+        private final QueryPlan bestPlan;
+        private final boolean hasPreOrPostProcessing;
+        private final DeletingParallelIteratorFactory parallelIteratorFactory;
+        private final List<TableRef> otherTableRefs;
+        private final TableRef projectedTableRef;
+        private final int maxSize;
+        private final int maxSizeBytes;
+        private final PhoenixConnection connection;
+
+        public ClientSelectDeleteMutationPlan(TableRef targetTableRef, QueryPlan dataPlan, QueryPlan bestPlan,
+                                              boolean hasPreOrPostProcessing,
+                                              DeletingParallelIteratorFactory parallelIteratorFactory,
+                                              List<TableRef> otherTableRefs, TableRef projectedTableRef, int maxSize,
+                                              int maxSizeBytes, PhoenixConnection connection) {
+            this.context = bestPlan.getContext();
+            this.targetTableRef = targetTableRef;
+            this.dataPlan = dataPlan;
+            this.bestPlan = bestPlan;
+            this.hasPreOrPostProcessing = hasPreOrPostProcessing;
+            this.parallelIteratorFactory = parallelIteratorFactory;
+            this.otherTableRefs = otherTableRefs;
+            this.projectedTableRef = projectedTableRef;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+            this.connection = connection;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return context.getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return context;
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return targetTableRef;
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return dataPlan.getSourceRefs();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            ResultIterator iterator = bestPlan.iterator();
+            try {
+                if (!hasPreOrPostProcessing) {
+                    Tuple tuple;
+                    long totalRowCount = 0;
+                    if (parallelIteratorFactory != null) {
+                        parallelIteratorFactory.setQueryPlan(bestPlan);
+                        parallelIteratorFactory.setOtherTableRefs(otherTableRefs);
+                        parallelIteratorFactory.setProjectedTableRef(projectedTableRef);
+                    }
+                    while ((tuple=iterator.next()) != null) {// Runs query
+                        Cell kv = tuple.getValue(0);
+                        totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
+                    }
+                    // Return total number of rows that have been deleted from the table. In the case of auto commit being off
+                    // the mutations will all be in the mutation state of the current connection. We need to divide by the
+                    // total number of tables we updated as otherwise the client will get an unexpected result
+                    MutationState state = new MutationState(maxSize, maxSizeBytes, connection,
+                            totalRowCount /
+                                    ((bestPlan.getTableRef().getTable().getIndexType() == IndexType.LOCAL && !otherTableRefs.isEmpty() ? 0 : 1) + otherTableRefs.size()));
+
+                    // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
+                    state.setReadMetricQueue(context.getReadMetricsQueue());
+
+                    return state;
+                } else {
+                    return deleteRows(context, iterator, bestPlan, projectedTableRef, otherTableRefs);
                 }
-            };
+            } finally {
+                iterator.close();
+            }
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> queryPlanSteps =  bestPlan.getExplainPlan().getPlanSteps();
+            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+            planSteps.add("DELETE ROWS");
+            planSteps.addAll(queryPlanSteps);
+            return new ExplainPlan(planSteps);
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return bestPlan.getEstimatedRowsToScan();
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return bestPlan.getEstimatedBytesToScan();
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return bestPlan.getEstimateInfoTimestamp();
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return bestPlan;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 887e2d2..439a79b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -110,6 +110,12 @@ import com.google.common.collect.Sets;
 
 public class JoinCompiler {
 
+    public enum Strategy {
+        HASH_BUILD_LEFT,
+        HASH_BUILD_RIGHT,
+        SORT_MERGE,
+    }
+
     public enum ColumnRefType {
         JOINLOCAL,
         GENERAL,
@@ -489,7 +495,7 @@ public class JoinCompiler {
             return dependencies;
         }
 
-        public Pair<List<Expression>, List<Expression>> compileJoinConditions(StatementContext lhsCtx, StatementContext rhsCtx, boolean sortExpressions) throws SQLException {
+        public Pair<List<Expression>, List<Expression>> compileJoinConditions(StatementContext lhsCtx, StatementContext rhsCtx, Strategy strategy) throws SQLException {
             if (onConditions.isEmpty()) {
                 return new Pair<List<Expression>, List<Expression>>(
                         Collections.<Expression> singletonList(LiteralExpression.newConstant(1)),
@@ -505,15 +511,16 @@ public class JoinCompiler {
                 rhsCompiler.reset();
                 Expression right = condition.getRHS().accept(rhsCompiler);
                 PDataType toType = getCommonType(left.getDataType(), right.getDataType());
-                if (left.getDataType() != toType || left.getSortOrder() == SortOrder.DESC) {
-                    left = CoerceExpression.create(left, toType, SortOrder.ASC, left.getMaxLength());
+                SortOrder toSortOrder = strategy == Strategy.SORT_MERGE ? SortOrder.ASC : (strategy == Strategy.HASH_BUILD_LEFT ? right.getSortOrder() : left.getSortOrder());
+                if (left.getDataType() != toType || left.getSortOrder() != toSortOrder) {
+                    left = CoerceExpression.create(left, toType, toSortOrder, left.getMaxLength());
                 }
-                if (right.getDataType() != toType || right.getSortOrder() == SortOrder.DESC) {
-                    right = CoerceExpression.create(right, toType, SortOrder.ASC, right.getMaxLength());
+                if (right.getDataType() != toType || right.getSortOrder() != toSortOrder) {
+                    right = CoerceExpression.create(right, toType, toSortOrder, right.getMaxLength());
                 }
                 compiled.add(new Pair<Expression, Expression>(left, right));
             }
-            if (sortExpressions) {
+            if (strategy != Strategy.SORT_MERGE) {
                 Collections.sort(compiled, new Comparator<Pair<Expression, Expression>>() {
                     @Override
                     public int compare(Pair<Expression, Expression> o1, Pair<Expression, Expression> o2) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2d70f55a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 839e7c9..0688b94 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.LiteralParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
@@ -186,6 +187,11 @@ public class ListJarsQueryPlan implements QueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        return Cost.ZERO;
+    }
+
+    @Override
     public TableRef getTableRef() {
         return null;
     }