You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/11/15 18:48:14 UTC

[19/37] phoenix git commit: PHOENIX-4290 Full table scan performed for DELETE with table having immutable indexes

PHOENIX-4290 Full table scan performed for DELETE with table having immutable indexes


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e0df4b2e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e0df4b2e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e0df4b2e

Branch: refs/heads/4.x-HBase-1.1
Commit: e0df4b2e6c2f386336f660301a093e295f489ec4
Parents: 969b79c
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Oct 30 19:25:53 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 15 10:46:39 2017 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/DeleteIT.java    | 134 ++-
 .../phoenix/end2end/index/ImmutableIndexIT.java |  22 +-
 .../end2end/index/IndexMaintenanceIT.java       |  18 +-
 .../org/apache/phoenix/tx/TxCheckpointIT.java   |  18 +-
 .../apache/phoenix/compile/DeleteCompiler.java  | 849 ++++++++++---------
 .../apache/phoenix/compile/FromCompiler.java    |  49 +-
 .../compile/TupleProjectionCompiler.java        |   2 +-
 .../phoenix/exception/SQLExceptionCode.java     |   1 -
 .../apache/phoenix/execute/MutationState.java   |   4 +-
 .../apache/phoenix/index/IndexMaintainer.java   |  35 +-
 .../apache/phoenix/optimize/QueryOptimizer.java |   2 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |  10 +
 .../java/org/apache/phoenix/util/IndexUtil.java |  18 +-
 .../phoenix/compile/QueryCompilerTest.java      |  27 -
 14 files changed, 643 insertions(+), 546 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0df4b2e/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
index 09e1021..aa4d36e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.end2end;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.Date;
@@ -33,7 +32,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 
 
@@ -136,18 +138,25 @@ public class DeleteIT extends ParallelStatsDisabledIT {
         rs.close();
     }
     
-    private static void assertIndexUsed (Connection conn, String query, String indexName, boolean expectedToBeUsed) throws SQLException {
-        assertIndexUsed(conn, query, Collections.emptyList(), indexName, expectedToBeUsed);
+    private static void assertIndexUsed (Connection conn, String query, String indexName, boolean expectedToBeUsed, boolean local) throws SQLException {
+        assertIndexUsed(conn, query, Collections.emptyList(), indexName, expectedToBeUsed, local);
     }
 
-    private static void assertIndexUsed (Connection conn, String query, List<Object> binds, String indexName, boolean expectedToBeUsed) throws SQLException {
+    private static void assertIndexUsed (Connection conn, String query, List<Object> binds, String indexName, boolean expectedToBeUsed, boolean local) throws SQLException {
             PreparedStatement stmt = conn.prepareStatement("EXPLAIN " + query);
             for (int i = 0; i < binds.size(); i++) {
                 stmt.setObject(i+1, binds.get(i));
             }
             ResultSet rs = stmt.executeQuery();
             String explainPlan = QueryUtil.getExplainPlan(rs);
-            assertEquals(expectedToBeUsed, explainPlan.contains(" SCAN OVER " + indexName));
+            // It's very difficult currently to check if a local index is being used
+            // This check is brittle as it checks that the index ID appears in the range scan
+            // TODO: surface QueryPlan from MutationPlan
+            if (local) {
+                assertEquals(expectedToBeUsed, explainPlan.contains(indexName + " [1]") || explainPlan.contains(indexName + " [1,"));
+            } else {
+                assertEquals(expectedToBeUsed, explainPlan.contains(" SCAN OVER " + indexName));
+            }
    }
 
     private void testDeleteRange(boolean autoCommit, boolean createIndex) throws Exception {
@@ -190,9 +199,7 @@ public class DeleteIT extends ParallelStatsDisabledIT {
         PreparedStatement stmt;
         conn.setAutoCommit(autoCommit);
         deleteStmt = "DELETE FROM " + tableName + " WHERE i >= ? and i < ?";
-        if(!local) {
-            assertIndexUsed(conn, deleteStmt, Arrays.<Object>asList(5,10), indexInUse, false);
-        }
+        assertIndexUsed(conn, deleteStmt, Arrays.<Object>asList(5,10), indexInUse, false, local);
         stmt = conn.prepareStatement(deleteStmt);
         stmt.setInt(1, 5);
         stmt.setInt(2, 10);
@@ -202,7 +209,7 @@ public class DeleteIT extends ParallelStatsDisabledIT {
         }
         
         String query = "SELECT count(*) FROM " + tableName;
-        assertIndexUsed(conn, query, indexInUse, createIndex);
+        assertIndexUsed(conn, query, indexInUse, createIndex, local);
         query = "SELECT count(*) FROM " + tableName;
         rs = conn.createStatement().executeQuery(query);
         assertTrue(rs.next());
@@ -210,9 +217,7 @@ public class DeleteIT extends ParallelStatsDisabledIT {
         
         deleteStmt = "DELETE FROM " + tableName + " WHERE j IS NULL";
         stmt = conn.prepareStatement(deleteStmt);
-        if(!local) {
-            assertIndexUsed(conn, deleteStmt, indexInUse, createIndex);
-        }
+        assertIndexUsed(conn, deleteStmt, indexInUse, createIndex, local);
         int deleteCount = stmt.executeUpdate();
         assertEquals(3, deleteCount);
         if (!autoCommit) {
@@ -254,40 +259,40 @@ public class DeleteIT extends ParallelStatsDisabledIT {
     }
 
     @Test
-    public void testDeleteAllFromTableWithIndexAutoCommitSalting() throws SQLException {
+    public void testDeleteAllFromTableWithIndexAutoCommitSalting() throws Exception {
         testDeleteAllFromTableWithIndex(true, true, false);
     }
 
     @Test
-    public void testDeleteAllFromTableWithLocalIndexAutoCommitSalting() throws SQLException {
+    public void testDeleteAllFromTableWithLocalIndexAutoCommitSalting() throws Exception {
         testDeleteAllFromTableWithIndex(true, true, true);
     }
     
     @Test
-    public void testDeleteAllFromTableWithIndexAutoCommitNoSalting() throws SQLException {
+    public void testDeleteAllFromTableWithIndexAutoCommitNoSalting() throws Exception {
         testDeleteAllFromTableWithIndex(true, false);
     }
     
     @Test
-    public void testDeleteAllFromTableWithIndexNoAutoCommitNoSalting() throws SQLException {
+    public void testDeleteAllFromTableWithIndexNoAutoCommitNoSalting() throws Exception {
         testDeleteAllFromTableWithIndex(false,false);
     }
     
     @Test
-    public void testDeleteAllFromTableWithIndexNoAutoCommitSalted() throws SQLException {
+    public void testDeleteAllFromTableWithIndexNoAutoCommitSalted() throws Exception {
         testDeleteAllFromTableWithIndex(false, true, false);
     }
     
     @Test
-    public void testDeleteAllFromTableWithLocalIndexNoAutoCommitSalted() throws SQLException {
+    public void testDeleteAllFromTableWithLocalIndexNoAutoCommitSalted() throws Exception {
         testDeleteAllFromTableWithIndex(false, true, true);
     }
 
-    private void testDeleteAllFromTableWithIndex(boolean autoCommit, boolean isSalted) throws SQLException {
+    private void testDeleteAllFromTableWithIndex(boolean autoCommit, boolean isSalted) throws Exception {
         testDeleteAllFromTableWithIndex(autoCommit, isSalted, false);
     }
 
-    private void testDeleteAllFromTableWithIndex(boolean autoCommit, boolean isSalted, boolean localIndex) throws SQLException {
+    private void testDeleteAllFromTableWithIndex(boolean autoCommit, boolean isSalted, boolean localIndex) throws Exception {
         Connection con = null;
         try {
             con = DriverManager.getConnection(getUrl());
@@ -334,6 +339,8 @@ public class DeleteIT extends ParallelStatsDisabledIT {
                 con.commit();
             }
             
+            TestUtil.dumpTable(con.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
+            
             ResultSet rs = con.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM " + tableName);
             assertTrue(rs.next());
             assertEquals(0, rs.getLong(1));
@@ -354,16 +361,16 @@ public class DeleteIT extends ParallelStatsDisabledIT {
     }
     
     @Test
-    public void testDeleteRowFromTableWithImmutableIndex() throws SQLException {
-        testDeleteRowFromTableWithImmutableIndex(false);
+    public void testDeleteRowFromTableWithImmutableIndex() throws Exception {
+        testDeleteRowFromTableWithImmutableIndex(false, true);
     }
     
     @Test
-    public void testDeleteRowFromTableWithImmutableLocalIndex() throws SQLException {
-        testDeleteRowFromTableWithImmutableIndex(true);
+    public void testDeleteRowFromTableWithImmutableLocalIndex() throws Exception {
+        testDeleteRowFromTableWithImmutableIndex(true, false);
     }
     
-    public void testDeleteRowFromTableWithImmutableIndex(boolean localIndex) throws SQLException {
+    public void testDeleteRowFromTableWithImmutableIndex(boolean localIndex, boolean useCoveredIndex) throws Exception {
         Connection con = null;
         try {
             boolean autoCommit = false;
@@ -375,6 +382,7 @@ public class DeleteIT extends ParallelStatsDisabledIT {
             String tableName = generateUniqueName();
             String indexName1 = generateUniqueName();
             String indexName2 = generateUniqueName();
+            String indexName3 = useCoveredIndex? generateUniqueName() : null;
 
             stm.execute("CREATE TABLE IF NOT EXISTS " + tableName + " (" +
                     "HOST CHAR(2) NOT NULL," +
@@ -387,6 +395,9 @@ public class DeleteIT extends ParallelStatsDisabledIT {
                     "CONSTRAINT PK PRIMARY KEY (HOST, DOMAIN, FEATURE, \"DATE\")) IMMUTABLE_ROWS=true");
             stm.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName1 + " ON " + tableName + " (\"DATE\", FEATURE)");
             stm.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName2 + " ON " + tableName + " (\"DATE\", FEATURE, USAGE.DB)");
+            if (useCoveredIndex) {
+                stm.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName3 + " ON " + tableName + " (STATS.ACTIVE_VISITOR) INCLUDE (USAGE.CORE, USAGE.DB)");
+            }
             stm.close();
 
             Date date = new Date(0);
@@ -400,39 +411,48 @@ public class DeleteIT extends ParallelStatsDisabledIT {
             psInsert.setLong(6, 2L);
             psInsert.setLong(7, 3);
             psInsert.execute();
-            psInsert.close();
             if (!autoCommit) {
                 con.commit();
             }
             
-            psInsert = con.prepareStatement("DELETE FROM " + tableName + " WHERE (HOST, DOMAIN, FEATURE, \"DATE\") = (?,?,?,?)");
-            psInsert.setString(1, "AA");
-            psInsert.setString(2, "BB");
-            psInsert.setString(3, "CC");
-            psInsert.setDate(4, date);
-            psInsert.execute();
+            PreparedStatement psDelete = con.prepareStatement("DELETE FROM " + tableName + " WHERE (HOST, DOMAIN, FEATURE, \"DATE\") = (?,?,?,?)");
+            psDelete.setString(1, "AA");
+            psDelete.setString(2, "BB");
+            psDelete.setString(3, "CC");
+            psDelete.setDate(4, date);
+            psDelete.execute();
             if (!autoCommit) {
                 con.commit();
             }
             
-            ResultSet rs = con.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM " + tableName);
-            assertTrue(rs.next());
-            assertEquals(0, rs.getLong(1));
+            assertDeleted(con, tableName, indexName1, indexName2, indexName3);
 
-            rs = con.createStatement().executeQuery("SELECT count(*) FROM " + indexName1);
-            assertTrue(rs.next());
-            assertEquals(0, rs.getLong(1));
+            psInsert.execute();
+            if (!autoCommit) {
+                con.commit();
+            }
 
-            stm.execute("DROP INDEX " + indexName1 + " ON " + tableName);
-            stm.execute("DROP INDEX " + indexName2 + " ON " + tableName);
+            psDelete = con.prepareStatement("DELETE FROM " + tableName + " WHERE  USAGE.DB=2");
+            psDelete.execute();
+            if (!autoCommit) {
+                con.commit();
+            }
+
+            assertDeleted(con, tableName, indexName1, indexName2, indexName3);
+
+            psInsert.execute();
+            if (!autoCommit) {
+                con.commit();
+            }
 
-            stm.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName1 + " ON " + tableName + " (USAGE.DB)");
-            stm.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName2 + " ON " + tableName + " (USAGE.DB, \"DATE\")");
-            try{
-                psInsert = con.prepareStatement("DELETE FROM " + tableName + " WHERE  USAGE.DB=2");
-            } catch(Exception e) {
-                fail("There should not be any exception while deleting row");
+            psDelete = con.prepareStatement("DELETE FROM " + tableName + " WHERE  ACTIVE_VISITOR=3");
+            psDelete.execute();
+            if (!autoCommit) {
+                con.commit();
             }
+
+            assertDeleted(con, tableName, indexName1, indexName2, indexName3);
+
         } finally {
             try {
                 con.close();
@@ -440,6 +460,28 @@ public class DeleteIT extends ParallelStatsDisabledIT {
             }
         }
     }
+
+    private static void assertDeleted(Connection con, String tableName, String indexName1, String indexName2, String indexName3)
+            throws SQLException {
+        ResultSet rs;
+        rs = con.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM " + tableName);
+        assertTrue(rs.next());
+        assertEquals(0, rs.getLong(1));
+
+        rs = con.createStatement().executeQuery("SELECT count(*) FROM " + indexName1);
+        assertTrue(rs.next());
+        assertEquals(0, rs.getLong(1));
+
+        rs = con.createStatement().executeQuery("SELECT count(*) FROM " + indexName2);
+        assertTrue(rs.next());
+        assertEquals(0, rs.getLong(1));
+
+        if (indexName3 != null) {
+            rs = con.createStatement().executeQuery("SELECT count(*) FROM " + indexName3);
+            assertTrue(rs.next());
+            assertEquals(0, rs.getLong(1));
+        }
+    }
     
     
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0df4b2e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 9eb5440..e0398c7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.end2end.index;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
-import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
@@ -149,18 +147,14 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
 
             conn.setAutoCommit(true);
             String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4";
-            try {
-                conn.createStatement().execute(dml);
-                if (!localIndex) {
-                    fail();
-                }
-            } catch (SQLException e) {
-                if (localIndex) {
-                    throw e;
-                }
-                assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(),
-                    e.getErrorCode());
-            }
+            conn.createStatement().execute(dml);
+
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
 
             conn.createStatement().execute("DROP TABLE " + fullTableName);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0df4b2e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java
index d5895ae..9ff5a35 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java
@@ -23,7 +23,6 @@ import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.math.BigDecimal;
 import java.sql.Connection;
@@ -36,7 +35,6 @@ import java.util.Properties;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
-import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -341,22 +339,10 @@ public class IndexMaintenanceIT extends ParallelStatsDisabledIT {
             assertEquals(2, rs.getInt(1));
 
             conn.setAutoCommit(true);
-            String dml = "DELETE from " + fullDataTableName + " WHERE long_col2 = 2";
-            try {
-                conn.createStatement().execute(dml);
-                if (!mutable && !localIndex) {
-                    fail();
-                }
-            } catch (SQLException e) {
-                if (mutable || localIndex) {
-                    throw e;
-                }
-                assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode());
-            }
+            conn.createStatement().execute("DELETE from " + fullDataTableName + " WHERE long_col2 = 2");
 
             if (!mutable) {
-                dml = "DELETE from " + fullDataTableName + " WHERE 2*long_col2 = 4";
-                conn.createStatement().execute(dml);
+                conn.createStatement().execute("DELETE from " + fullDataTableName + " WHERE 2*long_col2 = 4");
             }
 
             rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullDataTableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0df4b2e/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
index 989a97e..bf39dfe 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -311,7 +311,6 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
         String tableName = "TBL_" + generateUniqueName();
         String indexName = "IDX_" + generateUniqueName();
         String fullTableName = SchemaUtil.getTableName(tableName, tableName);
-		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
 		ResultSet rs;
 		try (Connection conn = getConnection()) {
 			conn.setAutoCommit(false);
@@ -400,6 +399,23 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
             assertTrue(rs.next());
             assertEquals(1,rs.getLong(1));
             assertFalse(rs.next());
+            
+            conn.createStatement().execute("drop index " + indexName + " on " + fullTableName + "1");
+            conn.createStatement().execute("delete from " + fullTableName + "1 where id1=fk1b AND fk1b=id1");
+            conn.createStatement().execute("delete from " + fullTableName + "1 where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))");
+            assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
+            assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved
+    
+            rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
+            assertTrue(rs.next());
+            assertEquals(1,rs.getLong(1));
+            assertFalse(rs.next());
+    
+            rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
+            assertTrue(rs.next());
+            assertEquals(1,rs.getLong(1));
+            assertFalse(rs.next());
+    
 		}
     }  
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0df4b2e/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index eb252d3..73689d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -19,12 +19,11 @@ package org.apache.phoenix.compile;
 import static org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO;
 import static org.apache.phoenix.util.NumberUtil.add;
 
+import java.io.IOException;
 import java.sql.ParameterMetaData;
 import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -34,19 +33,20 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
-import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
@@ -64,20 +64,20 @@ import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.MetaDataClient;
-import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
+import org.apache.phoenix.schema.DelegateColumn;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.SortOrder;
@@ -105,9 +105,11 @@ public class DeleteCompiler {
         this.operation = operation;
     }
     
-    private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, List<TableRef> indexTableRefs, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException {
-        PTable table = targetTableRef.getTable();
-        PhoenixStatement statement = childContext.getStatement();
+    private static MutationState deleteRows(StatementContext context, ResultIterator iterator, QueryPlan bestPlan, TableRef projectedTableRef, List<TableRef> otherTableRefs) throws SQLException {
+        RowProjector projector = bestPlan.getProjector();
+        TableRef tableRef = bestPlan.getTableRef();
+        PTable table = tableRef.getTable();
+        PhoenixStatement statement = context.getStatement();
         PhoenixConnection connection = statement.getConnection();
         PName tenantId = connection.getTenantId();
         byte[] tenantIdBytes = null;
@@ -123,9 +125,9 @@ public class DeleteCompiler {
         List<Map<ImmutableBytesPtr,RowMutationState>> indexMutations = null;
         // If indexTableRef is set, we're deleting the rows from both the index table and
         // the data table through a single query to save executing an additional one.
-        if (!indexTableRefs.isEmpty()) {
-            indexMutations = Lists.newArrayListWithExpectedSize(indexTableRefs.size());
-            for (int i = 0; i < indexTableRefs.size(); i++) {
+        if (!otherTableRefs.isEmpty()) {
+            indexMutations = Lists.newArrayListWithExpectedSize(otherTableRefs.size());
+            for (int i = 0; i < otherTableRefs.size(); i++) {
                 indexMutations.add(Maps.<ImmutableBytesPtr,RowMutationState>newHashMapWithExpectedSize(batchSize));
             }
         }
@@ -140,38 +142,84 @@ public class DeleteCompiler {
         if (isMultiTenant) {
             values[offset++] = tenantIdBytes;
         }
-        try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
-            int rowCount = 0;
-            while (rs.next()) {
-                ImmutableBytesPtr ptr = new ImmutableBytesPtr();  // allocate new as this is a key in a Map
-                // Use tuple directly, as projector would not have all the PK columns from
-                // our index table inside of our projection. Since the tables are equal,
-                // there's no transation required.
-                if (sourceTableRef.equals(targetTableRef)) {
-                    rs.getCurrentRow().getKey(ptr);
-                } else {
-                    for (int i = offset; i < values.length; i++) {
-                        byte[] byteValue = rs.getBytes(i+1-offset);
-                        // The ResultSet.getBytes() call will have inverted it - we need to invert it back.
-                        // TODO: consider going under the hood and just getting the bytes
-                        if (pkColumns.get(i).getSortOrder() == SortOrder.DESC) {
-                            byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length);
-                            byteValue = SortOrder.invert(byteValue, 0, tempByteValue, 0, byteValue.length);
+        try (final PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, context)) {
+            ValueGetter getter = null;
+            if (!otherTableRefs.isEmpty()) {
+                getter = new ValueGetter() {
+                    final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
+                    final ImmutableBytesWritable rowKeyPtr = new ImmutableBytesWritable();
+    
+                    @Override
+                    public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
+                        Cell cell = rs.getCurrentRow().getValue(ref.getFamily(), ref.getQualifier());
+                        if (cell == null) {
+                            return null;
                         }
-                        values[i] = byteValue;
+                        valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                        return valuePtr;
+                    }
+    
+                    @Override
+                    public byte[] getRowKey() {
+                        rs.getCurrentRow().getKey(rowKeyPtr);
+                        return ByteUtil.copyKeyBytesIfNecessary(rowKeyPtr);
+                    }
+                };
+            }
+            IndexMaintainer scannedIndexMaintainer = null;
+            IndexMaintainer[] maintainers = null;
+            PTable dataTable = table;
+            if (table.getType() == PTableType.INDEX) {
+                if (!otherTableRefs.isEmpty()) {
+                    // The data table is always the last one in the list if it's
+                    // not chosen as the best of the possible plans.
+                    dataTable = otherTableRefs.get(otherTableRefs.size()-1).getTable();
+                    scannedIndexMaintainer = IndexMaintainer.create(dataTable, table, connection);
+                }
+                maintainers = new IndexMaintainer[otherTableRefs.size()];
+                for (int i = 0; i < otherTableRefs.size(); i++) {
+                    // Create IndexMaintainer based on projected table (i.e. SELECT expressions) so that client-side
+                    // expressions are used instead of server-side ones.
+                    PTable otherTable = otherTableRefs.get(i).getTable();
+                    if (otherTable.getType() == PTableType.INDEX) {
+                        // In this case, we'll convert from index row -> data row -> other index row
+                        maintainers[i] = IndexMaintainer.create(dataTable, otherTable, connection);
+                    } else {
+                        maintainers[i] = scannedIndexMaintainer;
                     }
-                    table.newKey(ptr, values);
                 }
+            } else if (!otherTableRefs.isEmpty()) {
+                dataTable = table;
+                maintainers = new IndexMaintainer[otherTableRefs.size()];
+                for (int i = 0; i < otherTableRefs.size(); i++) {
+                    // Create IndexMaintainer based on projected table (i.e. SELECT expressions) so that client-side
+                    // expressions are used instead of server-side ones.
+                    maintainers[i] = IndexMaintainer.create(projectedTableRef.getTable(), otherTableRefs.get(i).getTable(), connection);
+                }
+
+            }
+            byte[][] viewConstants = IndexUtil.getViewConstants(dataTable);
+            int rowCount = 0;
+            while (rs.next()) {
+                ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr();  // allocate new as this is a key in a Map
+                rs.getCurrentRow().getKey(rowKeyPtr);
                 // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the
-                // row key will already have its value. 
-                mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
-                for (int i = 0; i < indexTableRefs.size(); i++) {
+                // row key will already have its value.
+                // Check for otherTableRefs being empty required when deleting directly from the index
+                if (otherTableRefs.isEmpty() || table.getIndexType() != IndexType.LOCAL) {
+                    mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
+                }
+                for (int i = 0; i < otherTableRefs.size(); i++) {
+                    PTable otherTable = otherTableRefs.get(i).getTable();
                     ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
-                    rs.getCurrentRow().getKey(indexPtr);
                     // Translate the data table row to the index table row
-                    if (sourceTableRef.getTable().getType() != PTableType.INDEX) {
-                        IndexMaintainer maintainer = indexTableRefs.get(i).getTable().getIndexMaintainer(table, connection);
-                        indexPtr.set(maintainer.buildRowKey(null, indexPtr, null, null, HConstants.LATEST_TIMESTAMP));
+                    if (table.getType() == PTableType.INDEX) {
+                        indexPtr.set(scannedIndexMaintainer.buildDataRowKey(rowKeyPtr, viewConstants));
+                        if (otherTable.getType() == PTableType.INDEX) {
+                            indexPtr.set(maintainers[i].buildRowKey(getter, indexPtr, null, null, HConstants.LATEST_TIMESTAMP));                        
+                        }
+                    } else {
+                        indexPtr.set(maintainers[i].buildRowKey(getter, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP));
                     }
                     indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
                 }
@@ -181,10 +229,10 @@ public class DeleteCompiler {
                 rowCount++;
                 // Commit a batch if auto commit is true and we're at our batch size
                 if (isAutoCommit && rowCount % batchSize == 0) {
-                    MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, maxSizeBytes, connection);
+                    MutationState state = new MutationState(tableRef, mutations, 0, maxSize, maxSizeBytes, connection);
                     connection.getMutationState().join(state);
-                    for (int i = 0; i < indexTableRefs.size(); i++) {
-                        MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, maxSizeBytes, connection);
+                    for (int i = 0; i < otherTableRefs.size(); i++) {
+                        MutationState indexState = new MutationState(otherTableRefs.get(i), indexMutations.get(i), 0, maxSize, maxSizeBytes, connection);
                         connection.getMutationState().join(indexState);
                     }
                     connection.getMutationState().send();
@@ -197,10 +245,9 @@ public class DeleteCompiler {
 
             // If auto commit is true, this last batch will be committed upon return
             int nCommittedRows = isAutoCommit ? (rowCount / batchSize * batchSize) : 0;
-            MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, maxSizeBytes, connection);
-            for (int i = 0; i < indexTableRefs.size(); i++) {
-                // To prevent the counting of these index rows, we have a negative for remainingRows.
-                MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, maxSizeBytes, connection);
+            MutationState state = new MutationState(tableRef, mutations, nCommittedRows, maxSize, maxSizeBytes, connection);
+            for (int i = 0; i < otherTableRefs.size(); i++) {
+                MutationState indexState = new MutationState(otherTableRefs.get(i), indexMutations.get(i), 0, maxSize, maxSizeBytes, connection);
                 state.join(indexState);
             }
             return state;
@@ -208,10 +255,9 @@ public class DeleteCompiler {
     }
     
     private static class DeletingParallelIteratorFactory extends MutatingParallelIteratorFactory {
-        private RowProjector projector;
-        private TableRef targetTableRef;
-        private List<TableRef> indexTableRefs;
-        private TableRef sourceTableRef;
+        private QueryPlan queryPlan;
+        private List<TableRef> otherTableRefs;
+        private TableRef projectedTableRef;
         
         private DeletingParallelIteratorFactory(PhoenixConnection connection) {
             super(connection);
@@ -225,41 +271,36 @@ public class DeleteCompiler {
              * need to be captured are already getting collected in the parent statement context enclosed in the result
              * iterator being used for reading rows out.
              */
-            StatementContext ctx = new StatementContext(statement, false);
-            MutationState state = deleteRows(ctx, targetTableRef, indexTableRefs, iterator, projector, sourceTableRef);
+            StatementContext context = new StatementContext(statement, false);
+            MutationState state = deleteRows(context, iterator, queryPlan, projectedTableRef, otherTableRefs);
             return state;
         }
         
-        public void setTargetTableRef(TableRef tableRef) {
-            this.targetTableRef = tableRef;
+        public void setQueryPlan(QueryPlan queryPlan) {
+            this.queryPlan = queryPlan;
         }
         
-        public void setSourceTableRef(TableRef tableRef) {
-            this.sourceTableRef = tableRef;
+        public void setOtherTableRefs(List<TableRef> otherTableRefs) {
+            this.otherTableRefs = otherTableRefs;
         }
         
-        public void setRowProjector(RowProjector projector) {
-            this.projector = projector;
-        }
-
-        public void setIndexTargetTableRefs(List<TableRef> indexTableRefs) {
-            this.indexTableRefs = indexTableRefs;
+        public void setProjectedTableRef(TableRef projectedTableRef) {
+            this.projectedTableRef = projectedTableRef;
         }
-        
     }
     
-    private Map<PTableKey, PTable> getNonDisabledGlobalImmutableIndexes(TableRef tableRef) {
+    private List<PTable> getNonDisabledGlobalImmutableIndexes(TableRef tableRef) {
         PTable table = tableRef.getTable();
         if (table.isImmutableRows() && !table.getIndexes().isEmpty()) {
-            Map<PTableKey, PTable> nonDisabledIndexes = new HashMap<PTableKey, PTable>(table.getIndexes().size());
+            List<PTable> nonDisabledIndexes = Lists.newArrayListWithExpectedSize(table.getIndexes().size());
             for (PTable index : table.getIndexes()) {
                 if (index.getIndexState() != PIndexState.DISABLE && index.getIndexType() == IndexType.GLOBAL) {
-                    nonDisabledIndexes.put(index.getKey(), index);
+                    nonDisabledIndexes.add(index);
                 }
             }
             return nonDisabledIndexes;
         }
-        return Collections.emptyMap();
+        return Collections.emptyList();
     }
     
     private class MultiDeleteMutationPlan implements MutationPlan {
@@ -361,189 +402,151 @@ public class DeleteCompiler {
         }
     }
 
-    private static boolean hasNonPKIndexedColumns(Collection<PTable> immutableIndexes) {
-        for (PTable index : immutableIndexes) {
-            for (PColumn column : index.getPKColumns()) {
-                if (!IndexUtil.isDataPKColumn(column)) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-    
     public MutationPlan compile(DeleteStatement delete) throws SQLException {
         final PhoenixConnection connection = statement.getConnection();
         final boolean isAutoCommit = connection.getAutoCommit();
-        final boolean hasLimit = delete.getLimit() != null;
+        final boolean hasPostProcessing = delete.getLimit() != null;
         final ConnectionQueryServices services = connection.getQueryServices();
         List<QueryPlan> queryPlans;
         NamedTableNode tableNode = delete.getTable();
         String tableName = tableNode.getName().getTableName();
         String schemaName = tableNode.getName().getSchemaName();
-        boolean retryOnce = !isAutoCommit;
-        TableRef tableRefToBe;
-        boolean noQueryReqd = false;
-        boolean runOnServer = false;
         SelectStatement select = null;
         ColumnResolver resolverToBe = null;
-        Map<PTableKey, PTable> immutableIndex = Collections.emptyMap();
-        DeletingParallelIteratorFactory parallelIteratorFactory;
-        QueryPlan dataPlanToBe = null;
-        while (true) {
-            try {
-                resolverToBe = FromCompiler.getResolverForMutation(delete, connection);
-                tableRefToBe = resolverToBe.getTables().get(0);
-                PTable table = tableRefToBe.getTable();
-                // Cannot update:
-                // - read-only VIEW 
-                // - transactional table with a connection having an SCN
-                // TODO: SchemaUtil.isReadOnly(PTable, connection)?
-                if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
-                    throw new ReadOnlyTableException(schemaName,tableName);
-                }
-                else if (table.isTransactional() && connection.getSCN() != null) {
-                   throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName)
-                   .setTableName(tableName).build().buildException();
-                }
-                
-                immutableIndex = getNonDisabledGlobalImmutableIndexes(tableRefToBe);
-                boolean mayHaveImmutableIndexes = !immutableIndex.isEmpty();
-                noQueryReqd = !hasLimit;
-                // Can't run on same server for transactional data, as we need the row keys for the data
-                // that is being upserted for conflict detection purposes.
-                runOnServer = isAutoCommit && noQueryReqd && !table.isTransactional();
-                HintNode hint = delete.getHint();
-                if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
-                    hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
-                }
+        DeletingParallelIteratorFactory parallelIteratorFactoryToBe;
+        resolverToBe = FromCompiler.getResolverForMutation(delete, connection);
+        final TableRef targetTableRef = resolverToBe.getTables().get(0);
+        PTable table = targetTableRef.getTable();
+        // Cannot update:
+        // - read-only VIEW 
+        // - transactional table with a connection having an SCN
+        // TODO: SchemaUtil.isReadOnly(PTable, connection)?
+        if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
+            throw new ReadOnlyTableException(schemaName,tableName);
+        }
+        else if (table.isTransactional() && connection.getSCN() != null) {
+           throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName)
+           .setTableName(tableName).build().buildException();
+        }
         
-                List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(table.getPKColumns().size());
-                boolean isSalted = table.getBucketNum() != null;
-                boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant();
-                boolean isSharedViewIndex = table.getViewIndexId() != null;
-                for (int i = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0); i < table.getPKColumns().size(); i++) {
-                    PColumn column = table.getPKColumns().get(i);
-                    aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null)));
-                }
-                select = FACTORY.select(delete.getTable(), hint, false, aliasedNodes, delete.getWhere(),
-                        Collections.<ParseNode> emptyList(), null, delete.getOrderBy(), delete.getLimit(), null,
-                        delete.getBindCount(), false, false, Collections.<SelectStatement> emptyList(),
-                        delete.getUdfParseNodes());
-                select = StatementNormalizer.normalize(select, resolverToBe);
-                SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection);
-                if (transformedSelect != select) {
-                    resolverToBe = FromCompiler.getResolverForQuery(transformedSelect, connection, false, delete.getTable().getName());
-                    select = StatementNormalizer.normalize(transformedSelect, resolverToBe);
-                }
-                parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection);
-                QueryOptimizer optimizer = new QueryOptimizer(services);
-                QueryCompiler compiler = new QueryCompiler(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory, new SequenceManager(statement));
-                dataPlanToBe = compiler.compile();
-                queryPlans = Lists.newArrayList(mayHaveImmutableIndexes
-                        ? optimizer.getApplicablePlans(dataPlanToBe, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory)
-                        : optimizer.getBestPlan(dataPlanToBe, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory));
-                if (mayHaveImmutableIndexes) { // FIXME: this is ugly
-                    // Lookup the table being deleted from in the cache, as it's possible that the
-                    // optimizer updated the cache if it found indexes that were out of date.
-                    // If the index was marked as disabled, it should not be in the list
-                    // of immutable indexes.
-                    table = connection.getTable(new PTableKey(table.getTenantId(), table.getName().getString()));
-                    tableRefToBe.setTable(table);
-                    immutableIndex = getNonDisabledGlobalImmutableIndexes(tableRefToBe);
-                }
-            } catch (MetaDataEntityNotFoundException e) {
-                // Catch column/column family not found exception, as our meta data may
-                // be out of sync. Update the cache once and retry if we were out of sync.
-                // Otherwise throw, as we'll just get the same error next time.
-                if (retryOnce) {
-                    retryOnce = false;
-                    MetaDataMutationResult result = new MetaDataClient(connection).updateCache(schemaName, tableName);
-                    if (result.wasUpdated()) {
-                        continue;
-                    }
-                }
-                throw e;
-            }
-            break;
-        }
-        boolean isBuildingImmutable = false;
-        final boolean hasImmutableIndexes = !immutableIndex.isEmpty();
-        if (hasImmutableIndexes) {
-            for (PTable index : immutableIndex.values()){
-                if (index.getIndexState() == PIndexState.BUILDING) {
-                    isBuildingImmutable = true;
-                    break;
-                }
-            }
+        List<PTable> immutableIndexes = getNonDisabledGlobalImmutableIndexes(targetTableRef);
+        final boolean hasImmutableIndexes = !immutableIndexes.isEmpty();
+
+        boolean isSalted = table.getBucketNum() != null;
+        boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant();
+        boolean isSharedViewIndex = table.getViewIndexId() != null;
+        int pkColumnOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
+        final int pkColumnCount = table.getPKColumns().size() - pkColumnOffset;
+        int selectColumnCount = pkColumnCount;
+        for (PTable index : immutableIndexes) {
+            selectColumnCount += index.getPKColumns().size() - pkColumnCount;
         }
-        final QueryPlan dataPlan = dataPlanToBe;
-        // tableRefs is parallel with queryPlans
-        TableRef[] tableRefs = new TableRef[hasImmutableIndexes ? immutableIndex.size() : 1];
-        if (hasImmutableIndexes) {
-            int i = 0;
-            Iterator<QueryPlan> plans = queryPlans.iterator();
-            while (plans.hasNext()) {
-                QueryPlan plan = plans.next();
-                PTable table = plan.getTableRef().getTable();
-                if (table.getType() == PTableType.INDEX) { // index plans
-                    tableRefs[i++] = plan.getTableRef();
-                    immutableIndex.remove(table.getKey());
-                } else if (!isBuildingImmutable) { // data plan
-                    /*
-                     * If we have immutable indexes that we need to maintain, don't execute the data plan
-                     * as we can save a query by piggy-backing on any of the other index queries, since the
-                     * PK columns that we need are always in each index row.
-                     */
-                    plans.remove();
+        List<PColumn> projectedColumns = Lists.newArrayListWithExpectedSize(selectColumnCount + pkColumnOffset);
+        List<AliasedNode> aliasedNodes = Lists.newArrayListWithExpectedSize(selectColumnCount);
+        for (int i = isSalted ? 1 : 0; i < pkColumnOffset; i++) {
+            PColumn column = table.getPKColumns().get(i);
+            projectedColumns.add(column);
+        }
+        for (int i = pkColumnOffset; i < table.getPKColumns().size(); i++) {
+            PColumn column = table.getPKColumns().get(i);
+            projectedColumns.add(column);
+            aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null)));
+        }
+        // Project all non PK indexed columns so that we can do the proper index maintenance
+        for (PTable index : table.getIndexes()) {
+            IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
+            // Go through maintainer as it handles functional indexes correctly
+            for (Pair<String,String> columnInfo : maintainer.getIndexedColumnInfo()) {
+                String familyName = columnInfo.getFirst();
+                if (familyName != null) {
+                    String columnName = columnInfo.getSecond();
+                    boolean hasNoColumnFamilies = table.getColumnFamilies().isEmpty();
+                    PColumn column = hasNoColumnFamilies ? table.getColumnForColumnName(columnName) : table.getColumnFamily(familyName).getPColumnForColumnName(columnName);
+                    projectedColumns.add(column);
+                    aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(hasNoColumnFamilies ? null : TableName.create(null, familyName), '"' + columnName + '"', null)));
                 }
             }
-            /*
-             * If we have any immutable indexes remaining, then that means that the plan for that index got filtered out
-             * because it could not be executed. This would occur if a column in the where clause is not found in the
-             * immutable index.
-             */
-            if (!immutableIndex.isEmpty()) {
-                Collection<PTable> immutableIndexes = immutableIndex.values();
-                if (!isBuildingImmutable || hasNonPKIndexedColumns(immutableIndexes)) {
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS).setSchemaName(tableRefToBe.getTable().getSchemaName().getString())
-                    .setTableName(tableRefToBe.getTable().getTableName().getString()).build().buildException();
+        }
+        select = FACTORY.select(delete.getTable(), delete.getHint(), false, aliasedNodes, delete.getWhere(),
+                Collections.<ParseNode> emptyList(), null, delete.getOrderBy(), delete.getLimit(), null,
+                delete.getBindCount(), false, false, Collections.<SelectStatement> emptyList(),
+                delete.getUdfParseNodes());
+        select = StatementNormalizer.normalize(select, resolverToBe);
+        
+        SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection);
+        boolean hasPreProcessing = transformedSelect != select;
+        if (transformedSelect != select) {
+            resolverToBe = FromCompiler.getResolverForQuery(transformedSelect, connection, false, delete.getTable().getName());
+            select = StatementNormalizer.normalize(transformedSelect, resolverToBe);
+        }
+        final boolean hasPreOrPostProcessing = hasPreProcessing || hasPostProcessing;
+        boolean noQueryReqd = !hasPreOrPostProcessing;
+        // No limit and no sub queries, joins, etc in where clause
+        // Can't run on same server for transactional data, as we need the row keys for the data
+        // that is being upserted for conflict detection purposes.
+        // If we have immutable indexes, we'd increase the number of bytes scanned by executing
+        // separate queries against each index, so better to drive from a single table in that case.
+        boolean runOnServer = isAutoCommit && !hasPreOrPostProcessing && !table.isTransactional() && !hasImmutableIndexes;
+        HintNode hint = delete.getHint();
+        if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
+            select = SelectStatement.create(select, HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE));
+        }
+        
+        parallelIteratorFactoryToBe = hasPreOrPostProcessing ? null : new DeletingParallelIteratorFactory(connection);
+        QueryOptimizer optimizer = new QueryOptimizer(services);
+        QueryCompiler compiler = new QueryCompiler(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe, new SequenceManager(statement));
+        final QueryPlan dataPlan = compiler.compile();
+        // TODO: the select clause should know that there's a sub query, but doesn't seem to currently
+        queryPlans = Lists.newArrayList(!immutableIndexes.isEmpty()
+                ? optimizer.getApplicablePlans(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe)
+                : optimizer.getBestPlan(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe));
+        // Filter out any local indexes that don't contain all indexed columns.
+        // We have to do this manually because local indexes are still used
+        // when referenced columns aren't in the index, so they won't be
+        // filtered by the optimizer.
+        queryPlans = new ArrayList<>(queryPlans);
+        Iterator<QueryPlan> iterator = queryPlans.iterator();
+        while (iterator.hasNext()) {
+            QueryPlan plan = iterator.next();
+            if (plan.getTableRef().getTable().getIndexType() == IndexType.LOCAL) {
+                if (!plan.getContext().getDataColumns().isEmpty()) {
+                    iterator.remove();
                 }
-                runOnServer = false;
-            }
+            }            
         }
-        List<TableRef> buildingImmutableIndexes = Lists.newArrayListWithExpectedSize(immutableIndex.values().size());
-        for (PTable index : immutableIndex.values()) {
-            buildingImmutableIndexes.add(new TableRef(index, dataPlan.getTableRef().getTimeStamp(), dataPlan.getTableRef().getLowerBoundTimeStamp()));
+        if (queryPlans.isEmpty()) {
+            queryPlans = Collections.singletonList(dataPlan);
         }
         
-        // Make sure the first plan is targeting deletion from the data table
-        // In the case of an immutable index, we'll also delete from the index.
-        final TableRef dataTableRef = tableRefs[0] = tableRefToBe;
-        /*
-         * Create a mutationPlan for each queryPlan. One plan will be for the deletion of the rows
-         * from the data table, while the others will be for deleting rows from immutable indexes.
-         */
-        List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(tableRefs.length);
-        for (int i = 0; i < tableRefs.length; i++) {
-            final TableRef tableRef = tableRefs[i];
-            final QueryPlan plan = queryPlans.get(i);
-            if (!plan.getTableRef().equals(tableRef) || !(plan instanceof BaseQueryPlan)) {
-                runOnServer = false;
-                noQueryReqd = false; // FIXME: why set this to false in this case?
-            }
-            
-            final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
-            final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
-     
-            final StatementContext context = plan.getContext();
-            // If we're doing a query for a set of rows with no where clause, then we don't need to contact the server at all.
-            // A simple check of the none existence of a where clause in the parse node is not sufficient, as the where clause
-            // may have been optimized out. Instead, we check that there's a single SkipScanFilter
-            if (noQueryReqd
-                    && (!context.getScan().hasFilter()
-                        || context.getScan().getFilter() instanceof SkipScanFilter)
-                    && context.getScanRanges().isPointLookup()) {
+        runOnServer &= queryPlans.get(0).getTableRef().getTable().getType() != PTableType.INDEX;
+        
+        // We need to have all indexed columns available in all immutable indexes in order
+        // to generate the delete markers from the query. We also cannot have any filters
+        // except for our SkipScanFilter for point lookups.
+        // A simple check of the non existence of a where clause in the parse node is not sufficient, as the where clause
+        // may have been optimized out. Instead, we check that there's a single SkipScanFilter
+        // If we can generate a plan for every index, that means all the required columns are available in every index,
+        // hence we can drive the delete from any of the plans.
+        noQueryReqd &= queryPlans.size() == 1 + immutableIndexes.size();
+        int queryPlanIndex = 0;
+        while (noQueryReqd && queryPlanIndex < queryPlans.size()) {
+            QueryPlan plan = queryPlans.get(queryPlanIndex++);
+            StatementContext context = plan.getContext();
+            noQueryReqd &= (!context.getScan().hasFilter()
+                    || context.getScan().getFilter() instanceof SkipScanFilter)
+                && context.getScanRanges().isPointLookup();
+        }
+
+        final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+        final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
+ 
+        // If we're doing a query for a set of rows with no where clause, then we don't need to contact the server at all.
+        if (noQueryReqd) {
+            // Create a mutationPlan for each queryPlan. One plan will be for the deletion of the rows
+            // from the data table, while the others will be for deleting rows from immutable indexes.
+            List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(queryPlans.size());
+            for (final QueryPlan plan : queryPlans) {
+                final StatementContext context = plan.getContext();
                 mutationPlans.add(new MutationPlan() {
     
                     @Override
@@ -561,7 +564,7 @@ public class DeleteCompiler {
                         while (iterator.hasNext()) {
                             mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
                         }
-                        return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
+                        return new MutationState(context.getCurrentTable(), mutation, 0, maxSize, maxSizeBytes, connection);
                     }
     
                     @Override
@@ -576,7 +579,7 @@ public class DeleteCompiler {
 
                     @Override
                     public TableRef getTargetRef() {
-                        return dataTableRef;
+                        return dataPlan.getTableRef();
                     }
 
                     @Override
@@ -605,202 +608,230 @@ public class DeleteCompiler {
                         return 0l;
                     }
                 });
-            } else if (runOnServer) {
-                // TODO: better abstraction
-                Scan scan = context.getScan();
-                scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
+            }
+            return new MultiDeleteMutationPlan(mutationPlans);
+        } else if (runOnServer) {
+            // TODO: better abstraction
+            final StatementContext context = dataPlan.getContext();
+            Scan scan = context.getScan();
+            scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
+
+            // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where>
+            // The coprocessor will delete each row returned from the scan
+            // Ignoring ORDER BY, since with auto commit on and no limit makes no difference
+            SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint());
+            RowProjector projectorToBe = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY);
+            context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
+            if (dataPlan.getProjector().projectEveryRow()) {
+                projectorToBe = new RowProjector(projectorToBe,true);
+            }
+            final RowProjector projector = projectorToBe;
+            final QueryPlan aggPlan = new AggregatePlan(context, select, dataPlan.getTableRef(), projector, null, null,
+                    OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+            return new MutationPlan() {
+                        @Override
+                        public ParameterMetaData getParameterMetaData() {
+                            return context.getBindManager().getParameterMetaData();
+                        }
+        
+                        @Override
+                        public StatementContext getContext() {
+                            return context;
+                        }
+        
+                        @Override
+                        public TableRef getTargetRef() {
+                            return dataPlan.getTableRef();
+                        }
     
-                // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where>
-                // The coprocessor will delete each row returned from the scan
-                // Ignoring ORDER BY, since with auto commit on and no limit makes no difference
-                SelectStatement aggSelect = SelectStatement.create(SelectStatement.COUNT_ONE, delete.getHint());
-                RowProjector projectorToBe = ProjectionCompiler.compile(context, aggSelect, GroupBy.EMPTY_GROUP_BY);
-                context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
-                if (plan.getProjector().projectEveryRow()) {
-                    projectorToBe = new RowProjector(projectorToBe,true);
-                }
-                final RowProjector projector = projectorToBe;
-                final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, projector, null, null,
-                        OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
-                mutationPlans.add(new MutationPlan() {
-                    @Override
-                    public ParameterMetaData getParameterMetaData() {
-                        return context.getBindManager().getParameterMetaData();
-                    }
+                        @Override
+                        public Set<TableRef> getSourceRefs() {
+                            return dataPlan.getSourceRefs();
+                        }
     
-                    @Override
-                    public StatementContext getContext() {
-                        return context;
-                    }
+                		@Override
+                		public Operation getOperation() {
+                			return operation;
+                		}
     
-                    @Override
-                    public TableRef getTargetRef() {
-                        return dataTableRef;
-                    }
-
-                    @Override
-                    public Set<TableRef> getSourceRefs() {
-                        return dataPlan.getSourceRefs();
-                    }
-
-            		@Override
-            		public Operation getOperation() {
-            			return operation;
-            		}
-
-                    @Override
-                    public MutationState execute() throws SQLException {
-                        // TODO: share this block of code with UPSERT SELECT
-                        ImmutableBytesWritable ptr = context.getTempPtr();
-                        PTable table = tableRef.getTable();
-                        table.getIndexMaintainers(ptr, context.getConnection());
-                        byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
-                        ServerCache cache = null;
-                        try {
-                            if (ptr.getLength() > 0) {
-                                byte[] uuidValue = ServerCacheClient.generateId();
-                                context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                                context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-                            }
-                            ResultIterator iterator = aggPlan.iterator();
+                        @Override
+                        public MutationState execute() throws SQLException {
+                            // TODO: share this block of code with UPSERT SELECT
+                            ImmutableBytesWritable ptr = context.getTempPtr();
+                            PTable table = dataPlan.getTableRef().getTable();
+                            table.getIndexMaintainers(ptr, context.getConnection());
+                            byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
+                            ServerCache cache = null;
                             try {
-                                Tuple row = iterator.next();
-                                final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
-                                return new MutationState(maxSize, maxSizeBytes, connection) {
-                                    @Override
-                                    public long getUpdateCount() {
-                                        return mutationCount;
-                                    }
-                                };
+                                if (ptr.getLength() > 0) {
+                                    byte[] uuidValue = ServerCacheClient.generateId();
+                                    context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                    context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
+                                    context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                                }
+                                ResultIterator iterator = aggPlan.iterator();
+                                try {
+                                    Tuple row = iterator.next();
+                                    final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
+                                    return new MutationState(maxSize, maxSizeBytes, connection) {
+                                        @Override
+                                        public long getUpdateCount() {
+                                            return mutationCount;
+                                        }
+                                    };
+                                } finally {
+                                    iterator.close();
+                                }
                             } finally {
-                                iterator.close();
-                            }
-                        } finally {
-                            if (cache != null) {
-                                cache.close();
+                                if (cache != null) {
+                                    cache.close();
+                                }
                             }
                         }
-                    }
+        
+                        @Override
+                        public ExplainPlan getExplainPlan() throws SQLException {
+                            List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();
+                            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+                            planSteps.add("DELETE ROWS");
+                            planSteps.addAll(queryPlanSteps);
+                            return new ExplainPlan(planSteps);
+                        }
     
-                    @Override
-                    public ExplainPlan getExplainPlan() throws SQLException {
-                        List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();
-                        List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                        planSteps.add("DELETE ROWS");
-                        planSteps.addAll(queryPlanSteps);
-                        return new ExplainPlan(planSteps);
-                    }
-
-                    @Override
-                    public Long getEstimatedRowsToScan() throws SQLException {
-                        return aggPlan.getEstimatedRowsToScan();
-                    }
-
-                    @Override
-                    public Long getEstimatedBytesToScan() throws SQLException {
-                        return aggPlan.getEstimatedBytesToScan();
-                    }
-
-                    @Override
-                    public Long getEstimateInfoTimestamp() throws SQLException {
-                        return aggPlan.getEstimateInfoTimestamp();
-                    }
-                });
-            } else {
-                List<TableRef> immutableIndexRefsToBe = Lists.newArrayListWithExpectedSize(dataPlan.getTableRef().getTable().getIndexes().size());
-                if (!buildingImmutableIndexes.isEmpty()) {
-                    immutableIndexRefsToBe = buildingImmutableIndexes;
-                } else if (hasImmutableIndexes && !plan.getTableRef().equals(tableRef)) {
-                    immutableIndexRefsToBe = Collections.singletonList(plan.getTableRef());
-                }
-                final List<TableRef> immutableIndexRefs = immutableIndexRefsToBe;
-                final DeletingParallelIteratorFactory parallelIteratorFactory2 = parallelIteratorFactory;
-                mutationPlans.add( new MutationPlan() {
-                    @Override
-                    public ParameterMetaData getParameterMetaData() {
-                        return context.getBindManager().getParameterMetaData();
-                    }
+                        @Override
+                        public Long getEstimatedRowsToScan() throws SQLException {
+                            return aggPlan.getEstimatedRowsToScan();
+                        }
     
-                    @Override
-                    public StatementContext getContext() {
-                        return context;
-                    }
+                        @Override
+                        public Long getEstimatedBytesToScan() throws SQLException {
+                            return aggPlan.getEstimatedBytesToScan();
+                        }
     
+                        @Override
+                        public Long getEstimateInfoTimestamp() throws SQLException {
+                            return aggPlan.getEstimateInfoTimestamp();
+                        }
+                    };
+        } else {
+            final DeletingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe;
+            List<PColumn> adjustedProjectedColumns = Lists.newArrayListWithExpectedSize(projectedColumns.size());
+            final int offset = table.getBucketNum() == null ? 0 : 1;
+            for (int i = 0; i < projectedColumns.size(); i++) {
+                final int position = i;
+                adjustedProjectedColumns.add(new DelegateColumn(projectedColumns.get(i)) {
                     @Override
-                    public TableRef getTargetRef() {
-                        return dataTableRef;
-                    }
-
-                    @Override
-                    public Set<TableRef> getSourceRefs() {
-                        return dataPlan.getSourceRefs();
+                    public int getPosition() {
+                        return position + offset;
                     }
+                });
+            }
+            PTable projectedTable = PTableImpl.makePTable(table, PTableType.PROJECTED, adjustedProjectedColumns);
+            final TableRef projectedTableRef = new TableRef(projectedTable, targetTableRef.getLowerBoundTimeStamp(), targetTableRef.getTimeStamp());
+
+            QueryPlan bestPlanToBe = dataPlan;
+            for (QueryPlan plan : queryPlans) {
+                PTable planTable = plan.getTableRef().getTable();
+                if (planTable.getIndexState() != PIndexState.BUILDING) {
+                    bestPlanToBe = plan;
+                    break;
+                }
+            }
+            final QueryPlan bestPlan = bestPlanToBe;
+            final List<TableRef>otherTableRefs = Lists.newArrayListWithExpectedSize(immutableIndexes.size());
+            for (PTable index : immutableIndexes) {
+                if (!bestPlan.getTableRef().getTable().equals(index)) {
+                    otherTableRefs.add(new TableRef(index, targetTableRef.getLowerBoundTimeStamp(), targetTableRef.getTimeStamp()));
+                }
+            }
+            
+            if (!bestPlan.getTableRef().getTable().equals(targetTableRef.getTable())) {
+                otherTableRefs.add(projectedTableRef);
+            }
+            final StatementContext context = bestPlan.getContext();
+            return new MutationPlan() {
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return context.getBindManager().getParameterMetaData();
+                }
 
-            		@Override
-            		public Operation getOperation() {
-            			return operation;
-            		}
+                @Override
+                public StatementContext getContext() {
+                    return context;
+                }
 
-                    @Override
-                    public MutationState execute() throws SQLException {
-                        ResultIterator iterator = plan.iterator();
-                        try {
-                            if (!hasLimit) {
-                                Tuple tuple;
-                                long totalRowCount = 0;
-                                if (parallelIteratorFactory2 != null) {
-                                    parallelIteratorFactory2.setRowProjector(plan.getProjector());
-                                    parallelIteratorFactory2.setTargetTableRef(tableRef);
-                                    parallelIteratorFactory2.setSourceTableRef(plan.getTableRef());
-                                    parallelIteratorFactory2.setIndexTargetTableRefs(immutableIndexRefs);
-                                }
-                                while ((tuple=iterator.next()) != null) {// Runs query
-                                    Cell kv = tuple.getValue(0);
-                                    totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
-                                }
-                                // Return total number of rows that have been delete. In the case of auto commit being off
-                                // the mutations will all be in the mutation state of the current connection.
-                                MutationState state = new MutationState(maxSize, maxSizeBytes, connection, totalRowCount);
+                @Override
+                public TableRef getTargetRef() {
+                    return targetTableRef;
+                }
 
-                                // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
-                                state.setReadMetricQueue(plan.getContext().getReadMetricsQueue());
+                @Override
+                public Set<TableRef> getSourceRefs() {
+                    return dataPlan.getSourceRefs();
+                }
 
-                                return state;
-                            } else {
-                                return deleteRows(plan.getContext(), tableRef, immutableIndexRefs, iterator, plan.getProjector(), plan.getTableRef());
+        		@Override
+        		public Operation getOperation() {
+        			return operation;
+        		}
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    ResultIterator iterator = bestPlan.iterator();
+                    try {
+                        if (!hasPreOrPostProcessing) {
+                            Tuple tuple;
+                            long totalRowCount = 0;
+                            if (parallelIteratorFactory != null) {
+                                parallelIteratorFactory.setQueryPlan(bestPlan);
+                                parallelIteratorFactory.setOtherTableRefs(otherTableRefs);
+                                parallelIteratorFactory.setProjectedTableRef(projectedTableRef);
                             }
-                        } finally {
-                            iterator.close();
+                            while ((tuple=iterator.next()) != null) {// Runs query
+                                Cell kv = tuple.getValue(0);
+                                totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
+                            }
+                            // Return total number of rows that have been deleted from the table. In the case of auto commit being off
+                            // the mutations will all be in the mutation state of the current connection. We need to divide by the
+                            // total number of tables we updated as otherwise the client will get an unexpected result
+                            MutationState state = new MutationState(maxSize, maxSizeBytes, connection, totalRowCount / ((bestPlan.getTableRef().getTable().getIndexType() == IndexType.LOCAL && !otherTableRefs.isEmpty() ? 0 : 1) + otherTableRefs.size()));
+
+                            // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
+                            state.setReadMetricQueue(context.getReadMetricsQueue());
+
+                            return state;
+                        } else {
+                            return deleteRows(context, iterator, bestPlan, projectedTableRef, otherTableRefs);
                         }
+                    } finally {
+                        iterator.close();
                     }
-    
-                    @Override
-                    public ExplainPlan getExplainPlan() throws SQLException {
-                        List<String> queryPlanSteps =  plan.getExplainPlan().getPlanSteps();
-                        List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                        planSteps.add("DELETE ROWS");
-                        planSteps.addAll(queryPlanSteps);
-                        return new ExplainPlan(planSteps);
-                    }
+                }
 
-                    @Override
-                    public Long getEstimatedRowsToScan() throws SQLException {
-                        return plan.getEstimatedRowsToScan();
-                    }
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    List<String> queryPlanSteps =  bestPlan.getExplainPlan().getPlanSteps();
+                    List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+                    planSteps.add("DELETE ROWS");
+                    planSteps.addAll(queryPlanSteps);
+                    return new ExplainPlan(planSteps);
+                }
 
-                    @Override
-                    public Long getEstimatedBytesToScan() throws SQLException {
-                        return plan.getEstimatedBytesToScan();
-                    }
+                @Override
+                public Long getEstimatedRowsToScan() throws SQLException {
+                    return bestPlan.getEstimatedRowsToScan();
+                }
 
-                    @Override
-                    public Long getEstimateInfoTimestamp() throws SQLException {
-                        return plan.getEstimateInfoTimestamp();
-                    }
-                });
-            }
+                @Override
+                public Long getEstimatedBytesToScan() throws SQLException {
+                    return bestPlan.getEstimatedBytesToScan();
+                }
+
+                @Override
+                public Long getEstimateInfoTimestamp() throws SQLException {
+                    return bestPlan.getEstimateInfoTimestamp();
+                }
+            };
         }
-        return mutationPlans.size() == 1 ? mutationPlans.get(0) : new MultiDeleteMutationPlan(mutationPlans);
     }
 }
\ No newline at end of file