You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2017/11/13 21:08:55 UTC

phoenix git commit: PHOENIX-4342 - Surface QueryPlan in MutationPlan

Repository: phoenix
Updated Branches:
  refs/heads/master 2a8e1c750 -> 1d8a6bc3a


PHOENIX-4342 - Surface QueryPlan in MutationPlan


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1d8a6bc3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1d8a6bc3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1d8a6bc3

Branch: refs/heads/master
Commit: 1d8a6bc3a6a277d9e3201066b753fa9fd7018545
Parents: 2a8e1c7
Author: Geoffrey Jacoby <gj...@apache.org>
Authored: Thu Nov 2 13:41:02 2017 -0700
Committer: Geoffrey Jacoby <gj...@apache.org>
Committed: Mon Nov 13 11:47:50 2017 -0800

----------------------------------------------------------------------
 .../phoenix/compile/BaseMutationPlan.java       |   5 +
 .../phoenix/compile/DelegateMutationPlan.java   |   5 +
 .../apache/phoenix/compile/DeleteCompiler.java  | 545 ++++++++-------
 .../apache/phoenix/compile/MutationPlan.java    |   5 +-
 .../apache/phoenix/compile/UpsertCompiler.java  | 675 +++++++++++--------
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   9 +-
 6 files changed, 733 insertions(+), 511 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
index 0e45682..60eb59a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/BaseMutationPlan.java
@@ -79,4 +79,9 @@ public abstract class BaseMutationPlan implements MutationPlan {
         return 0l;
     }
 
+    @Override
+    public QueryPlan getQueryPlan() {
+        return null;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
index 343ec32..90eef61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
@@ -42,6 +42,11 @@ public class DelegateMutationPlan implements MutationPlan {
     }
 
     @Override
+    public QueryPlan getQueryPlan() {
+        return plan.getQueryPlan();
+    }
+
+    @Override
     public ParameterMetaData getParameterMetaData() {
         return plan.getParameterMetaData();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index f038cda..8d9a5b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -303,14 +303,16 @@ public class DeleteCompiler {
         return Collections.emptyList();
     }
     
-    private class MultiDeleteMutationPlan implements MutationPlan {
+    private class MultiRowDeleteMutationPlan implements MutationPlan {
         private final List<MutationPlan> plans;
         private final MutationPlan firstPlan;
-        
-        public MultiDeleteMutationPlan(@NotNull List<MutationPlan> plans) {
+        private final QueryPlan dataPlan;
+
+        public MultiRowDeleteMutationPlan(QueryPlan dataPlan, @NotNull List<MutationPlan> plans) {
             Preconditions.checkArgument(!plans.isEmpty());
             this.plans = plans;
             this.firstPlan = plans.get(0);
+            this.dataPlan = dataPlan;
         }
         
         @Override
@@ -348,8 +350,8 @@ public class DeleteCompiler {
             return firstPlan.getSourceRefs();
         }
 
-		@Override
-		public Operation getOperation() {
+		    @Override
+		    public Operation getOperation() {
 			return operation;
 		}
 
@@ -401,6 +403,11 @@ public class DeleteCompiler {
             }
             return estInfoTimestamp;
         }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return dataPlan;
+        }
     }
 
     public MutationPlan compile(DeleteStatement delete) throws SQLException {
@@ -548,69 +555,9 @@ public class DeleteCompiler {
             List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(queryPlans.size());
             for (final QueryPlan plan : queryPlans) {
                 final StatementContext context = plan.getContext();
-                mutationPlans.add(new MutationPlan() {
-    
-                    @Override
-                    public ParameterMetaData getParameterMetaData() {
-                        return context.getBindManager().getParameterMetaData();
-                    }
-    
-                    @Override
-                    public MutationState execute() throws SQLException {
-                        // We have a point lookup, so we know we have a simple set of fully qualified
-                        // keys for our ranges
-                        ScanRanges ranges = context.getScanRanges();
-                        Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); 
-                        Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
-                        while (iterator.hasNext()) {
-                            mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
-                        }
-                        return new MutationState(plan.getTableRef(), mutation, 0, maxSize, maxSizeBytes, connection);
-                    }
-    
-                    @Override
-                    public ExplainPlan getExplainPlan() throws SQLException {
-                        return new ExplainPlan(Collections.singletonList("DELETE SINGLE ROW"));
-                    }
-    
-                    @Override
-                    public StatementContext getContext() {
-                        return context;
-                    }
-
-                    @Override
-                    public TableRef getTargetRef() {
-                        return dataPlan.getTableRef();
-                    }
-
-                    @Override
-                    public Set<TableRef> getSourceRefs() {
-                        // Don't include the target
-                        return Collections.emptySet();
-                    }
-
-            		@Override
-            		public Operation getOperation() {
-            			return operation;
-            		}
-
-                    @Override
-                    public Long getEstimatedRowsToScan() throws SQLException {
-                        return 0l;
-                    }
-
-                    @Override
-                    public Long getEstimatedBytesToScan() throws SQLException {
-                        return 0l;
-                    }
-
-                    @Override
-                    public Long getEstimateInfoTimestamp() throws SQLException {
-                        return 0l;
-                    }
-                });
+                mutationPlans.add(new SingleRowDeleteMutationPlan(plan, connection, maxSize, maxSizeBytes));
             }
-            return new MultiDeleteMutationPlan(mutationPlans);
+            return new MultiRowDeleteMutationPlan(dataPlan, mutationPlans);
         } else if (runOnServer) {
             // TODO: better abstraction
             final StatementContext context = dataPlan.getContext();
@@ -629,91 +576,7 @@ public class DeleteCompiler {
             final RowProjector projector = projectorToBe;
             final QueryPlan aggPlan = new AggregatePlan(context, select, dataPlan.getTableRef(), projector, null, null,
                     OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
-            return new MutationPlan() {
-                        @Override
-                        public ParameterMetaData getParameterMetaData() {
-                            return context.getBindManager().getParameterMetaData();
-                        }
-        
-                        @Override
-                        public StatementContext getContext() {
-                            return context;
-                        }
-        
-                        @Override
-                        public TableRef getTargetRef() {
-                            return dataPlan.getTableRef();
-                        }
-    
-                        @Override
-                        public Set<TableRef> getSourceRefs() {
-                            return dataPlan.getSourceRefs();
-                        }
-    
-                		@Override
-                		public Operation getOperation() {
-                			return operation;
-                		}
-    
-                        @Override
-                        public MutationState execute() throws SQLException {
-                            // TODO: share this block of code with UPSERT SELECT
-                            ImmutableBytesWritable ptr = context.getTempPtr();
-                            PTable table = dataPlan.getTableRef().getTable();
-                            table.getIndexMaintainers(ptr, context.getConnection());
-                            byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
-                            ServerCache cache = null;
-                            try {
-                                if (ptr.getLength() > 0) {
-                                    byte[] uuidValue = ServerCacheClient.generateId();
-                                    context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                                    context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-                                }
-                                ResultIterator iterator = aggPlan.iterator();
-                                try {
-                                    Tuple row = iterator.next();
-                                    final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
-                                    return new MutationState(maxSize, maxSizeBytes, connection) {
-                                        @Override
-                                        public long getUpdateCount() {
-                                            return mutationCount;
-                                        }
-                                    };
-                                } finally {
-                                    iterator.close();
-                                }
-                            } finally {
-                                if (cache != null) {
-                                    cache.close();
-                                }
-                            }
-                        }
-        
-                        @Override
-                        public ExplainPlan getExplainPlan() throws SQLException {
-                            List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();
-                            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                            planSteps.add("DELETE ROWS");
-                            planSteps.addAll(queryPlanSteps);
-                            return new ExplainPlan(planSteps);
-                        }
-    
-                        @Override
-                        public Long getEstimatedRowsToScan() throws SQLException {
-                            return aggPlan.getEstimatedRowsToScan();
-                        }
-    
-                        @Override
-                        public Long getEstimatedBytesToScan() throws SQLException {
-                            return aggPlan.getEstimatedBytesToScan();
-                        }
-    
-                        @Override
-                        public Long getEstimateInfoTimestamp() throws SQLException {
-                            return aggPlan.getEstimateInfoTimestamp();
-                        }
-                    };
+            return new ServerSelectDeleteMutationPlan(dataPlan, connection, aggPlan, projector, maxSize, maxSizeBytes);
         } else {
             final DeletingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe;
             List<PColumn> adjustedProjectedColumns = Lists.newArrayListWithExpectedSize(projectedColumns.size());
@@ -749,90 +612,322 @@ public class DeleteCompiler {
             if (!bestPlan.getTableRef().getTable().equals(targetTableRef.getTable())) {
                 otherTableRefs.add(projectedTableRef);
             }
-            final StatementContext context = bestPlan.getContext();
-            return new MutationPlan() {
-                @Override
-                public ParameterMetaData getParameterMetaData() {
-                    return context.getBindManager().getParameterMetaData();
-                }
+            return new ClientSelectDeleteMutationPlan(targetTableRef, dataPlan, bestPlan, hasPreOrPostProcessing,
+                    parallelIteratorFactory, otherTableRefs, projectedTableRef, maxSize, maxSizeBytes, connection);
+        }
+    }
 
-                @Override
-                public StatementContext getContext() {
-                    return context;
-                }
+    private class SingleRowDeleteMutationPlan implements MutationPlan {
 
-                @Override
-                public TableRef getTargetRef() {
-                    return targetTableRef;
-                }
+        private final QueryPlan dataPlan;
+        private final PhoenixConnection connection;
+        private final int maxSize;
+        private final StatementContext context;
+        private final int maxSizeBytes;
 
-                @Override
-                public Set<TableRef> getSourceRefs() {
-                    return dataPlan.getSourceRefs();
-                }
+        public SingleRowDeleteMutationPlan(QueryPlan dataPlan, PhoenixConnection connection, int maxSize, int maxSizeBytes) {
+            this.dataPlan = dataPlan;
+            this.connection = connection;
+            this.maxSize = maxSize;
+            this.context = dataPlan.getContext();
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return context.getBindManager().getParameterMetaData();
+        }
 
-        		@Override
-        		public Operation getOperation() {
-        			return operation;
-        		}
-
-                @Override
-                public MutationState execute() throws SQLException {
-                    ResultIterator iterator = bestPlan.iterator();
-                    try {
-                        if (!hasPreOrPostProcessing) {
-                            Tuple tuple;
-                            long totalRowCount = 0;
-                            if (parallelIteratorFactory != null) {
-                                parallelIteratorFactory.setQueryPlan(bestPlan);
-                                parallelIteratorFactory.setOtherTableRefs(otherTableRefs);
-                                parallelIteratorFactory.setProjectedTableRef(projectedTableRef);
-                            }
-                            while ((tuple=iterator.next()) != null) {// Runs query
-                                Cell kv = tuple.getValue(0);
-                                totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
-                            }
-                            // Return total number of rows that have been deleted from the table. In the case of auto commit being off
-                            // the mutations will all be in the mutation state of the current connection. We need to divide by the
-                            // total number of tables we updated as otherwise the client will get an unexpected result
-                            MutationState state = new MutationState(maxSize, maxSizeBytes, connection, totalRowCount / ((bestPlan.getTableRef().getTable().getIndexType() == IndexType.LOCAL && !otherTableRefs.isEmpty() ? 0 : 1) + otherTableRefs.size()));
-
-                            // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
-                            state.setReadMetricQueue(context.getReadMetricsQueue());
-
-                            return state;
-                        } else {
-                            return deleteRows(context, iterator, bestPlan, projectedTableRef, otherTableRefs);
+        @Override
+        public MutationState execute() throws SQLException {
+            // We have a point lookup, so we know we have a simple set of fully qualified
+            // keys for our ranges
+            ScanRanges ranges = context.getScanRanges();
+            Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
+            Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
+            while (iterator.hasNext()) {
+                mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()),
+                        new RowMutationState(PRow.DELETE_MARKER,
+                                statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
+            }
+            return new MutationState(dataPlan.getTableRef(), mutation, 0, maxSize, maxSizeBytes, connection);
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            return new ExplainPlan(Collections.singletonList("DELETE SINGLE ROW"));
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return dataPlan;
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return context;
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return dataPlan.getTableRef();
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            // Don't include the target
+            return Collections.emptySet();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return 0l;
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return 0l;
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return 0l;
+        }
+    }
+
+    private class ServerSelectDeleteMutationPlan implements MutationPlan {
+        private final StatementContext context;
+        private final QueryPlan dataPlan;
+        private final PhoenixConnection connection;
+        private final QueryPlan aggPlan;
+        private final RowProjector projector;
+        private final int maxSize;
+        private final int maxSizeBytes;
+
+        public ServerSelectDeleteMutationPlan(QueryPlan dataPlan, PhoenixConnection connection, QueryPlan aggPlan,
+                                              RowProjector projector, int maxSize, int maxSizeBytes) {
+            this.context = dataPlan.getContext();
+            this.dataPlan = dataPlan;
+            this.connection = connection;
+            this.aggPlan = aggPlan;
+            this.projector = projector;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return context.getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return context;
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return dataPlan.getTableRef();
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return dataPlan.getSourceRefs();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            // TODO: share this block of code with UPSERT SELECT
+            ImmutableBytesWritable ptr = context.getTempPtr();
+            PTable table = dataPlan.getTableRef().getTable();
+            table.getIndexMaintainers(ptr, context.getConnection());
+            byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
+            ServerCache cache = null;
+            try {
+                if (ptr.getLength() > 0) {
+                    byte[] uuidValue = ServerCacheClient.generateId();
+                    context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                    context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
+                    context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                }
+                ResultIterator iterator = aggPlan.iterator();
+                try {
+                    Tuple row = iterator.next();
+                    final long mutationCount = (Long) projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
+                    return new MutationState(maxSize, maxSizeBytes, connection) {
+                        @Override
+                        public long getUpdateCount() {
+                            return mutationCount;
                         }
-                    } finally {
-                        iterator.close();
-                    }
+                    };
+                } finally {
+                    iterator.close();
                 }
-
-                @Override
-                public ExplainPlan getExplainPlan() throws SQLException {
-                    List<String> queryPlanSteps =  bestPlan.getExplainPlan().getPlanSteps();
-                    List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                    planSteps.add("DELETE ROWS");
-                    planSteps.addAll(queryPlanSteps);
-                    return new ExplainPlan(planSteps);
+            } finally {
+                if (cache != null) {
+                    cache.close();
                 }
+            }
+        }
 
-                @Override
-                public Long getEstimatedRowsToScan() throws SQLException {
-                    return bestPlan.getEstimatedRowsToScan();
-                }
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();
+            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+            planSteps.add("DELETE ROWS");
+            planSteps.addAll(queryPlanSteps);
+            return new ExplainPlan(planSteps);
+        }
 
-                @Override
-                public Long getEstimatedBytesToScan() throws SQLException {
-                    return bestPlan.getEstimatedBytesToScan();
-                }
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return aggPlan.getEstimatedRowsToScan();
+        }
 
-                @Override
-                public Long getEstimateInfoTimestamp() throws SQLException {
-                    return bestPlan.getEstimateInfoTimestamp();
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return aggPlan.getEstimatedBytesToScan();
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return aggPlan.getEstimateInfoTimestamp();
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return aggPlan;
+        }
+    }
+
+    private class ClientSelectDeleteMutationPlan implements MutationPlan {
+        private final StatementContext context;
+        private final TableRef targetTableRef;
+        private final QueryPlan dataPlan;
+        private final QueryPlan bestPlan;
+        private final boolean hasPreOrPostProcessing;
+        private final DeletingParallelIteratorFactory parallelIteratorFactory;
+        private final List<TableRef> otherTableRefs;
+        private final TableRef projectedTableRef;
+        private final int maxSize;
+        private final int maxSizeBytes;
+        private final PhoenixConnection connection;
+
+        public ClientSelectDeleteMutationPlan(TableRef targetTableRef, QueryPlan dataPlan, QueryPlan bestPlan,
+                                              boolean hasPreOrPostProcessing,
+                                              DeletingParallelIteratorFactory parallelIteratorFactory,
+                                              List<TableRef> otherTableRefs, TableRef projectedTableRef, int maxSize,
+                                              int maxSizeBytes, PhoenixConnection connection) {
+            this.context = bestPlan.getContext();
+            this.targetTableRef = targetTableRef;
+            this.dataPlan = dataPlan;
+            this.bestPlan = bestPlan;
+            this.hasPreOrPostProcessing = hasPreOrPostProcessing;
+            this.parallelIteratorFactory = parallelIteratorFactory;
+            this.otherTableRefs = otherTableRefs;
+            this.projectedTableRef = projectedTableRef;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+            this.connection = connection;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return context.getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return context;
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return targetTableRef;
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return dataPlan.getSourceRefs();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            ResultIterator iterator = bestPlan.iterator();
+            try {
+                if (!hasPreOrPostProcessing) {
+                    Tuple tuple;
+                    long totalRowCount = 0;
+                    if (parallelIteratorFactory != null) {
+                        parallelIteratorFactory.setQueryPlan(bestPlan);
+                        parallelIteratorFactory.setOtherTableRefs(otherTableRefs);
+                        parallelIteratorFactory.setProjectedTableRef(projectedTableRef);
+                    }
+                    while ((tuple=iterator.next()) != null) {// Runs query
+                        Cell kv = tuple.getValue(0);
+                        totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
+                    }
+                    // Return total number of rows that have been deleted from the table. In the case of auto commit being off
+                    // the mutations will all be in the mutation state of the current connection. We need to divide by the
+                    // total number of tables we updated as otherwise the client will get an unexpected result
+                    MutationState state = new MutationState(maxSize, maxSizeBytes, connection,
+                            totalRowCount /
+                                    ((bestPlan.getTableRef().getTable().getIndexType() == IndexType.LOCAL && !otherTableRefs.isEmpty() ? 0 : 1) + otherTableRefs.size()));
+
+                    // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
+                    state.setReadMetricQueue(context.getReadMetricsQueue());
+
+                    return state;
+                } else {
+                    return deleteRows(context, iterator, bestPlan, projectedTableRef, otherTableRefs);
                 }
-            };
+            } finally {
+                iterator.close();
+            }
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> queryPlanSteps =  bestPlan.getExplainPlan().getPlanSteps();
+            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+            planSteps.add("DELETE ROWS");
+            planSteps.addAll(queryPlanSteps);
+            return new ExplainPlan(planSteps);
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return bestPlan.getEstimatedRowsToScan();
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return bestPlan.getEstimatedBytesToScan();
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return bestPlan.getEstimateInfoTimestamp();
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return bestPlan;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
index ddc2004..97f3f3d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
@@ -24,6 +24,7 @@ import org.apache.phoenix.schema.TableRef;
 
 
 public interface MutationPlan extends StatementPlan {
-    public MutationState execute() throws SQLException;
-    public TableRef getTargetRef();
+    MutationState execute() throws SQLException;
+    TableRef getTargetRef();
+    QueryPlan getQueryPlan();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 6445894..3603ce7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -694,173 +694,13 @@ public class UpsertCompiler {
                     
                     // Ignore order by - it has no impact
                     final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
-                    return new MutationPlan() {
-                        @Override
-                        public ParameterMetaData getParameterMetaData() {
-                            return queryPlan.getContext().getBindManager().getParameterMetaData();
-                        }
-    
-                        @Override
-                        public StatementContext getContext() {
-                            return queryPlan.getContext();
-                        }
-
-                        @Override
-                        public TableRef getTargetRef() {
-                            return tableRef;
-                        }
-
-                        @Override
-                        public Set<TableRef> getSourceRefs() {
-                            return originalQueryPlan.getSourceRefs();
-                        }
-
-                		@Override
-                		public Operation getOperation() {
-                			return operation;
-                		}
-
-                        @Override
-                        public MutationState execute() throws SQLException {
-                            ImmutableBytesWritable ptr = context.getTempPtr();
-                            PTable table = tableRef.getTable();
-                            table.getIndexMaintainers(ptr, context.getConnection());
-                            byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
-
-                            if (ptr.getLength() > 0) {
-                                byte[] uuidValue = ServerCacheClient.generateId();
-                                scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-                            }
-                            ResultIterator iterator = aggPlan.iterator();
-                            try {
-                                Tuple row = iterator.next();
-                                final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row,
-                                        PLong.INSTANCE, ptr);
-                                return new MutationState(maxSize, maxSizeBytes, connection) {
-                                    @Override
-                                    public long getUpdateCount() {
-                                        return mutationCount;
-                                    }
-                                };
-                            } finally {
-                                iterator.close();
-                            }
-                            
-                        }
-
-                        @Override
-                        public ExplainPlan getExplainPlan() throws SQLException {
-                            List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();
-                            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                            planSteps.add("UPSERT ROWS");
-                            planSteps.addAll(queryPlanSteps);
-                            return new ExplainPlan(planSteps);
-                        }
-
-                        @Override
-                        public Long getEstimatedRowsToScan() throws SQLException {
-                            return aggPlan.getEstimatedRowsToScan();
-                        }
-
-                        @Override
-                        public Long getEstimatedBytesToScan() throws SQLException {
-                            return aggPlan.getEstimatedBytesToScan();
-                        }
-
-                        @Override
-                        public Long getEstimateInfoTimestamp() throws SQLException {
-                            return aggPlan.getEstimateInfoTimestamp();
-                        }
-                    };
+                    return new ServerUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, context, connection, scan, aggPlan, aggProjector, maxSize, maxSizeBytes);
                 }
             }
             ////////////////////////////////////////////////////////////////////
             // UPSERT SELECT run client-side
             /////////////////////////////////////////////////////////////////////
-            return new MutationPlan() {
-                @Override
-                public ParameterMetaData getParameterMetaData() {
-                    return queryPlan.getContext().getBindManager().getParameterMetaData();
-                }
-
-                @Override
-                public StatementContext getContext() {
-                    return queryPlan.getContext();
-                }
-
-                @Override
-                public TableRef getTargetRef() {
-                    return tableRef;
-                }
-
-                @Override
-                public Set<TableRef> getSourceRefs() {
-                    return originalQueryPlan.getSourceRefs();
-                }
-
-        		@Override
-        		public Operation getOperation() {
-        			return operation;
-        		}
-
-                @Override
-                public MutationState execute() throws SQLException {
-                    ResultIterator iterator = queryPlan.iterator();
-                    if (parallelIteratorFactory == null) {
-                        return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false);
-                    }
-                    try {
-                        parallelIteratorFactory.setRowProjector(projector);
-                        parallelIteratorFactory.setColumnIndexes(columnIndexes);
-                        parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
-                        Tuple tuple;
-                        long totalRowCount = 0;
-                        StatementContext context = queryPlan.getContext();
-                        while ((tuple=iterator.next()) != null) {// Runs query
-                            Cell kv = tuple.getValue(0);
-                            totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
-                        }
-                        // Return total number of rows that have been updated. In the case of auto commit being off
-                        // the mutations will all be in the mutation state of the current connection.
-                        MutationState mutationState = new MutationState(maxSize, maxSizeBytes, statement.getConnection(), totalRowCount);
-                        /*
-                         *  All the metrics collected for measuring the reads done by the parallel mutating iterators
-                         *  is included in the ReadMetricHolder of the statement context. Include these metrics in the
-                         *  returned mutation state so they can be published on commit. 
-                         */
-                        mutationState.setReadMetricQueue(context.getReadMetricsQueue());
-                        return mutationState; 
-                    } finally {
-                        iterator.close();
-                    }
-                }
-
-                @Override
-                public ExplainPlan getExplainPlan() throws SQLException {
-                    List<String> queryPlanSteps =  queryPlan.getExplainPlan().getPlanSteps();
-                    List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-                    planSteps.add("UPSERT SELECT");
-                    planSteps.addAll(queryPlanSteps);
-                    return new ExplainPlan(planSteps);
-                }
-
-                @Override
-                public Long getEstimatedRowsToScan() throws SQLException {
-                    return queryPlan.getEstimatedRowsToScan();
-                }
-
-                @Override
-                public Long getEstimatedBytesToScan() throws SQLException {
-                    return queryPlan.getEstimatedBytesToScan();
-                }
-
-                @Override
-                public Long getEstimateInfoTimestamp() throws SQLException {
-                    return queryPlan.getEstimateInfoTimestamp();
-                }
-            };
+            return new ClientUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, parallelIteratorFactory, projector, columnIndexes, pkSlotIndexes, useServerTimestamp, maxSize, maxSizeBytes);
         }
 
             
@@ -986,124 +826,9 @@ public class UpsertCompiler {
         }
         final byte[] onDupKeyBytes = onDupKeyBytesToBe;
         
-        return new MutationPlan() {
-            @Override
-            public ParameterMetaData getParameterMetaData() {
-                return context.getBindManager().getParameterMetaData();
-            }
-
-            @Override
-            public StatementContext getContext() {
-                return context;
-            }
-
-            @Override
-            public TableRef getTargetRef() {
-                return tableRef;
-            }
-
-            @Override
-            public Set<TableRef> getSourceRefs() {
-                return Collections.emptySet();
-            }
-
-    		@Override
-    		public Operation getOperation() {
-    			return operation;
-    		}
-
-            @Override
-            public MutationState execute() throws SQLException {
-                ImmutableBytesWritable ptr = context.getTempPtr();
-                final SequenceManager sequenceManager = context.getSequenceManager();
-                // Next evaluate all the expressions
-                int nodeIndex = nodeIndexOffset;
-                PTable table = tableRef.getTable();
-                Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null :
-                    sequenceManager.newSequenceTuple(null);
-                for (Expression constantExpression : constantExpressions) {
-                    PColumn column = allColumns.get(columnIndexes[nodeIndex]);
-                    constantExpression.evaluate(tuple, ptr);
-                    Object value = null;
-                    if (constantExpression.getDataType() != null) {
-                        value = constantExpression.getDataType().toObject(ptr, constantExpression.getSortOrder(), constantExpression.getMaxLength(), constantExpression.getScale());
-                        if (!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) { 
-                            throw TypeMismatchException.newException(
-                                constantExpression.getDataType(), column.getDataType(), "expression: "
-                                        + constantExpression.toString() + " in column " + column);
-                        }
-                        if (!column.getDataType().isSizeCompatible(ptr, value, constantExpression.getDataType(),
-                                constantExpression.getSortOrder(), constantExpression.getMaxLength(), 
-                                constantExpression.getScale(), column.getMaxLength(), column.getScale())) { 
-                            throw new SQLExceptionInfo.Builder(
-                                SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
-                                .setMessage("value=" + constantExpression.toString()).build().buildException();
-                        }
-                    }
-                    column.getDataType().coerceBytes(ptr, value, constantExpression.getDataType(), 
-                            constantExpression.getMaxLength(), constantExpression.getScale(), constantExpression.getSortOrder(),
-                            column.getMaxLength(), column.getScale(),column.getSortOrder(),
-                            table.rowKeyOrderOptimizable());
-                    if (overlapViewColumns.contains(column) && Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), column.getViewConstant(), 0, column.getViewConstant().length-1) != 0) {
-                        throw new SQLExceptionInfo.Builder(
-                                SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
-                                .setColumnName(column.getName().getString())
-                                .setMessage("value=" + constantExpression.toString()).build().buildException();
-                    }
-                    values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                    nodeIndex++;
-                }
-                // Add columns based on view
-                for (PColumn column : addViewColumns) {
-                    if (IndexUtil.getViewConstantValue(column, ptr)) {
-                        values[nodeIndex++] = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                    } else {
-                        throw new IllegalStateException();
-                    }
-                }
-                Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
-                IndexMaintainer indexMaintainer = null;
-                byte[][] viewConstants = null;
-                if (table.getIndexType() == IndexType.LOCAL) {
-                    PTable parentTable =
-                            statement
-                                    .getConnection()
-                                    .getMetaDataCache()
-                                    .getTableRef(
-                                        new PTableKey(statement.getConnection().getTenantId(),
-                                                table.getParentName().getString())).getTable();
-                    indexMaintainer = table.getIndexMaintainer(parentTable, connection);
-                    viewConstants = IndexUtil.getViewConstants(parentTable);
-                }
-                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0);
-                return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
-            }
-
-            @Override
-            public ExplainPlan getExplainPlan() throws SQLException {
-                List<String> planSteps = Lists.newArrayListWithExpectedSize(2);
-                if (context.getSequenceManager().getSequenceCount() > 0) {
-                    planSteps.add("CLIENT RESERVE " + context.getSequenceManager().getSequenceCount() + " SEQUENCES");
-                }
-                planSteps.add("PUT SINGLE ROW");
-                return new ExplainPlan(planSteps);
-            }
-
-            @Override
-            public Long getEstimatedRowsToScan() throws SQLException {
-                return 0l;
-            }
-
-            @Override
-            public Long getEstimatedBytesToScan() throws SQLException {
-                return 0l;
-            }
-
-            @Override
-            public Long getEstimateInfoTimestamp() throws SQLException {
-                return 0l;
-            }
-        };
+        return new UpsertValuesMutationPlan(context, tableRef, nodeIndexOffset, constantExpressions,
+                allColumns, columnIndexes, overlapViewColumns, values, addViewColumns,
+                connection, pkSlotIndexes, useServerTimestamp, onDupKeyBytes, maxSize, maxSizeBytes);
     }
     
     private static boolean isRowTimestampSet(int[] pkSlotIndexes, PTable table) {
@@ -1214,4 +939,394 @@ public class UpsertCompiler {
             }
         }
     }
+
+    private class ServerUpsertSelectMutationPlan implements MutationPlan {
+        private final QueryPlan queryPlan;
+        private final TableRef tableRef;
+        private final QueryPlan originalQueryPlan;
+        private final StatementContext context;
+        private final PhoenixConnection connection;
+        private final Scan scan;
+        private final QueryPlan aggPlan;
+        private final RowProjector aggProjector;
+        private final int maxSize;
+        private final int maxSizeBytes;
+
+        public ServerUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef tableRef, QueryPlan originalQueryPlan,
+                                              StatementContext context, PhoenixConnection connection,
+                                              Scan scan, QueryPlan aggPlan, RowProjector aggProjector,
+                                              int maxSize, int maxSizeBytes) {
+            this.queryPlan = queryPlan;
+            this.tableRef = tableRef;
+            this.originalQueryPlan = originalQueryPlan;
+            this.context = context;
+            this.connection = connection;
+            this.scan = scan;
+            this.aggPlan = aggPlan;
+            this.aggProjector = aggProjector;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return queryPlan.getContext().getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return queryPlan.getContext();
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return tableRef;
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return aggPlan;
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return originalQueryPlan.getSourceRefs();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            ImmutableBytesWritable ptr = context.getTempPtr();
+            PTable table = tableRef.getTable();
+            table.getIndexMaintainers(ptr, context.getConnection());
+            byte[] txState = table.isTransactional() ?
+                    connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
+
+            if (ptr.getLength() > 0) {
+                byte[] uuidValue = ServerCacheClient.generateId();
+                scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
+                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+            }
+            ResultIterator iterator = aggPlan.iterator();
+            try {
+                Tuple row = iterator.next();
+                final long mutationCount = (Long) aggProjector.getColumnProjector(0).getValue(row,
+                        PLong.INSTANCE, ptr);
+                return new MutationState(maxSize, maxSizeBytes, connection) {
+                    @Override
+                    public long getUpdateCount() {
+                        return mutationCount;
+                    }
+                };
+            } finally {
+                iterator.close();
+            }
+
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();
+            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+            planSteps.add("UPSERT ROWS");
+            planSteps.addAll(queryPlanSteps);
+            return new ExplainPlan(planSteps);
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return aggPlan.getEstimatedRowsToScan();
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return aggPlan.getEstimatedBytesToScan();
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return aggPlan.getEstimateInfoTimestamp();
+        }
+    }
+
+    private class UpsertValuesMutationPlan implements MutationPlan {
+        private final StatementContext context;
+        private final TableRef tableRef;
+        private final int nodeIndexOffset;
+        private final List<Expression> constantExpressions;
+        private final List<PColumn> allColumns;
+        private final int[] columnIndexes;
+        private final Set<PColumn> overlapViewColumns;
+        private final byte[][] values;
+        private final Set<PColumn> addViewColumns;
+        private final PhoenixConnection connection;
+        private final int[] pkSlotIndexes;
+        private final boolean useServerTimestamp;
+        private final byte[] onDupKeyBytes;
+        private final int maxSize;
+        private final int maxSizeBytes;
+
+        public UpsertValuesMutationPlan(StatementContext context, TableRef tableRef, int nodeIndexOffset,
+                                        List<Expression> constantExpressions, List<PColumn> allColumns,
+                                        int[] columnIndexes, Set<PColumn> overlapViewColumns, byte[][] values,
+                                        Set<PColumn> addViewColumns, PhoenixConnection connection,
+                                        int[] pkSlotIndexes, boolean useServerTimestamp, byte[] onDupKeyBytes,
+                                        int maxSize, int maxSizeBytes) {
+            this.context = context;
+            this.tableRef = tableRef;
+            this.nodeIndexOffset = nodeIndexOffset;
+            this.constantExpressions = constantExpressions;
+            this.allColumns = allColumns;
+            this.columnIndexes = columnIndexes;
+            this.overlapViewColumns = overlapViewColumns;
+            this.values = values;
+            this.addViewColumns = addViewColumns;
+            this.connection = connection;
+            this.pkSlotIndexes = pkSlotIndexes;
+            this.useServerTimestamp = useServerTimestamp;
+            this.onDupKeyBytes = onDupKeyBytes;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return context.getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return context;
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return tableRef;
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return null;
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            ImmutableBytesWritable ptr = context.getTempPtr();
+            final SequenceManager sequenceManager = context.getSequenceManager();
+            // Next evaluate all the expressions
+            int nodeIndex = nodeIndexOffset;
+            PTable table = tableRef.getTable();
+            Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null :
+                sequenceManager.newSequenceTuple(null);
+            for (Expression constantExpression : constantExpressions) {
+                PColumn column = allColumns.get(columnIndexes[nodeIndex]);
+                constantExpression.evaluate(tuple, ptr);
+                Object value = null;
+                if (constantExpression.getDataType() != null) {
+                    value = constantExpression.getDataType().toObject(ptr, constantExpression.getSortOrder(),
+                            constantExpression.getMaxLength(), constantExpression.getScale());
+                    if (!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) {
+                        throw TypeMismatchException.newException(
+                            constantExpression.getDataType(), column.getDataType(), "expression: "
+                                    + constantExpression.toString() + " in column " + column);
+                    }
+                    if (!column.getDataType().isSizeCompatible(ptr, value, constantExpression.getDataType(),
+                            constantExpression.getSortOrder(), constantExpression.getMaxLength(),
+                            constantExpression.getScale(), column.getMaxLength(), column.getScale())) {
+                        throw new SQLExceptionInfo.Builder(
+                            SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
+                            .setMessage("value=" + constantExpression.toString()).build().buildException();
+                    }
+                }
+                column.getDataType().coerceBytes(ptr, value, constantExpression.getDataType(),
+                        constantExpression.getMaxLength(), constantExpression.getScale(), constantExpression.getSortOrder(),
+                        column.getMaxLength(), column.getScale(),column.getSortOrder(),
+                        table.rowKeyOrderOptimizable());
+                if (overlapViewColumns.contains(column) && Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), column.getViewConstant(), 0, column.getViewConstant().length-1) != 0) {
+                    throw new SQLExceptionInfo.Builder(
+                            SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
+                            .setColumnName(column.getName().getString())
+                            .setMessage("value=" + constantExpression.toString()).build().buildException();
+                }
+                values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                nodeIndex++;
+            }
+            // Add columns based on view
+            for (PColumn column : addViewColumns) {
+                if (IndexUtil.getViewConstantValue(column, ptr)) {
+                    values[nodeIndex++] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                } else {
+                    throw new IllegalStateException();
+                }
+            }
+            Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
+            IndexMaintainer indexMaintainer = null;
+            byte[][] viewConstants = null;
+            if (table.getIndexType() == IndexType.LOCAL) {
+                PTable parentTable =
+                        statement
+                                .getConnection()
+                                .getMetaDataCache()
+                                .getTableRef(
+                                    new PTableKey(statement.getConnection().getTenantId(),
+                                            table.getParentName().getString())).getTable();
+                indexMaintainer = table.getIndexMaintainer(parentTable, connection);
+                viewConstants = IndexUtil.getViewConstants(parentTable);
+            }
+            setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0);
+            return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> planSteps = Lists.newArrayListWithExpectedSize(2);
+            if (context.getSequenceManager().getSequenceCount() > 0) {
+                planSteps.add("CLIENT RESERVE " + context.getSequenceManager().getSequenceCount() + " SEQUENCES");
+            }
+            planSteps.add("PUT SINGLE ROW");
+            return new ExplainPlan(planSteps);
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return 0l;
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return 0l;
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return 0l;
+        }
+    }
+
+    private class ClientUpsertSelectMutationPlan implements MutationPlan {
+        private final QueryPlan queryPlan;
+        private final TableRef tableRef;
+        private final QueryPlan originalQueryPlan;
+        private final UpsertingParallelIteratorFactory parallelIteratorFactory;
+        private final RowProjector projector;
+        private final int[] columnIndexes;
+        private final int[] pkSlotIndexes;
+        private final boolean useServerTimestamp;
+        private final int maxSize;
+        private final int maxSizeBytes;
+
+        public ClientUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef tableRef, QueryPlan originalQueryPlan, UpsertingParallelIteratorFactory parallelIteratorFactory, RowProjector projector, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp, int maxSize, int maxSizeBytes) {
+            this.queryPlan = queryPlan;
+            this.tableRef = tableRef;
+            this.originalQueryPlan = originalQueryPlan;
+            this.parallelIteratorFactory = parallelIteratorFactory;
+            this.projector = projector;
+            this.columnIndexes = columnIndexes;
+            this.pkSlotIndexes = pkSlotIndexes;
+            this.useServerTimestamp = useServerTimestamp;
+            this.maxSize = maxSize;
+            this.maxSizeBytes = maxSizeBytes;
+        }
+
+        @Override
+        public ParameterMetaData getParameterMetaData() {
+            return queryPlan.getContext().getBindManager().getParameterMetaData();
+        }
+
+        @Override
+        public StatementContext getContext() {
+            return queryPlan.getContext();
+        }
+
+        @Override
+        public TableRef getTargetRef() {
+            return tableRef;
+        }
+
+        @Override
+        public QueryPlan getQueryPlan() {
+            return queryPlan;
+        }
+
+        @Override
+        public Set<TableRef> getSourceRefs() {
+            return originalQueryPlan.getSourceRefs();
+        }
+
+        @Override
+        public Operation getOperation() {
+          return operation;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            ResultIterator iterator = queryPlan.iterator();
+            if (parallelIteratorFactory == null) {
+                return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false);
+            }
+            try {
+                parallelIteratorFactory.setRowProjector(projector);
+                parallelIteratorFactory.setColumnIndexes(columnIndexes);
+                parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
+                Tuple tuple;
+                long totalRowCount = 0;
+                StatementContext context = queryPlan.getContext();
+                while ((tuple=iterator.next()) != null) {// Runs query
+                    Cell kv = tuple.getValue(0);
+                    totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
+                }
+                // Return total number of rows that have been updated. In the case of auto commit being off
+                // the mutations will all be in the mutation state of the current connection.
+                MutationState mutationState = new MutationState(maxSize, maxSizeBytes, statement.getConnection(), totalRowCount);
+                /*
+                 *  All the metrics collected for measuring the reads done by the parallel mutating iterators
+                 *  is included in the ReadMetricHolder of the statement context. Include these metrics in the
+                 *  returned mutation state so they can be published on commit.
+                 */
+                mutationState.setReadMetricQueue(context.getReadMetricsQueue());
+                return mutationState;
+            } finally {
+                iterator.close();
+            }
+        }
+
+        @Override
+        public ExplainPlan getExplainPlan() throws SQLException {
+            List<String> queryPlanSteps =  queryPlan.getExplainPlan().getPlanSteps();
+            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+            planSteps.add("UPSERT SELECT");
+            planSteps.addAll(queryPlanSteps);
+            return new ExplainPlan(planSteps);
+        }
+
+        @Override
+        public Long getEstimatedRowsToScan() throws SQLException {
+            return queryPlan.getEstimatedRowsToScan();
+        }
+
+        @Override
+        public Long getEstimatedBytesToScan() throws SQLException {
+            return queryPlan.getEstimatedBytesToScan();
+        }
+
+        @Override
+        public Long getEstimateInfoTimestamp() throws SQLException {
+            return queryPlan.getEstimateInfoTimestamp();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d8a6bc3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index d35cce1..174e643 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -1313,11 +1313,12 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 public ExplainPlan getExplainPlan() throws SQLException {
                     return new ExplainPlan(Collections.singletonList("EXECUTE UPGRADE"));
                 }
-                
+
                 @Override
-                public StatementContext getContext() {
-                    return new StatementContext(stmt);
-                }
+                public QueryPlan getQueryPlan() { return null; }
+
+                @Override
+                public StatementContext getContext() { return new StatementContext(stmt); }
                 
                 @Override
                 public TableRef getTargetRef() {