You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/12/17 00:37:07 UTC

[4/6] phoenix git commit: Revert "Sync 4.x-HBase-1.2 to master (Pedro Boado)"

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1f2274/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
index 97f3f3d..ddc2004 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutationPlan.java
@@ -24,7 +24,6 @@ import org.apache.phoenix.schema.TableRef;
 
 
 public interface MutationPlan extends StatementPlan {
-    MutationState execute() throws SQLException;
-    TableRef getTargetRef();
-    QueryPlan getQueryPlan();
+    public MutationState execute() throws SQLException;
+    public TableRef getTargetRef();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1f2274/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 287f9e0..af2254b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -307,7 +307,7 @@ public class QueryCompiler {
                 JoinSpec joinSpec = joinSpecs.get(i);
                 context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
                 joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
-                Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], JoinCompiler.Strategy.HASH_BUILD_RIGHT);
+                Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], true);
                 joinExpressions[i] = joinConditions.getFirst();
                 List<Expression> hashExpressions = joinConditions.getSecond();
                 Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
@@ -369,7 +369,7 @@ public class QueryCompiler {
             context.setCurrentTable(rhsTableRef);
             context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes()));
             ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
-            Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, JoinCompiler.Strategy.HASH_BUILD_LEFT);
+            Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, true);
             List<Expression> joinExpressions = joinConditions.getSecond();
             List<Expression> hashExpressions = joinConditions.getFirst();
             boolean needsMerge = lhsJoin.hasPostReference();
@@ -422,7 +422,7 @@ public class QueryCompiler {
         QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
         PTable rhsProjTable = rhsCtx.getResolver().getTables().get(0).getTable();
         
-        Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, JoinCompiler.Strategy.SORT_MERGE);
+        Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, false);
         List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst();
         List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond();
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1f2274/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index ca88984..f7cdcbf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -26,7 +26,6 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
@@ -53,9 +52,7 @@ public interface QueryPlan extends StatementPlan {
     public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException;
 
     public long getEstimatedSize();
-
-    public Cost getCost();
-
+    
     // TODO: change once joins are supported
     TableRef getTableRef();
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1f2274/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 2714858..62e6991 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -46,7 +46,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.metrics.MetricInfo;
-import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.LiteralParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
@@ -195,11 +194,6 @@ public class TraceQueryPlan implements QueryPlan {
     }
 
     @Override
-    public Cost getCost() {
-        return Cost.ZERO;
-    }
-
-    @Override
     public Set<TableRef> getSourceRefs() {
         return Collections.emptySet();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1f2274/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index a81a427..9eaaf62 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -47,7 +47,6 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.execute.MutationState.RowTimestampColInfo;
 import org.apache.phoenix.expression.Determinism;
@@ -117,10 +116,9 @@ import com.google.common.collect.Sets;
 public class UpsertCompiler {
 
     private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes,
-            PTable table, MultiRowMutationState mutation,
+            PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
             PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer,
             byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException {
-        long columnValueSize = 0;
         Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
         byte[][] pkValues = new byte[table.getPKColumns().size()][];
         // If the table uses salting, the first byte is the salting byte, set to an empty array
@@ -150,7 +148,6 @@ public class UpsertCompiler {
                 }
             } else {
                 columnValues.put(column, value);
-                columnValueSize += (column.getEstimatedSize() + value.length);
             }
         }
         ImmutableBytesPtr ptr = new ImmutableBytesPtr();
@@ -169,7 +166,7 @@ public class UpsertCompiler {
                     regionPrefix.length));
             }
         } 
-        mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
+        mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
     }
     
     public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
@@ -198,7 +195,7 @@ public class UpsertCompiler {
             }
         }
         int rowCount = 0;
-        MultiRowMutationState mutation = new MultiRowMutationState(batchSize);
+        Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
         PTable table = tableRef.getTable();
         IndexMaintainer indexMaintainer = null;
         byte[][] viewConstants = null;
@@ -698,13 +695,173 @@ public class UpsertCompiler {
                     
                     // Ignore order by - it has no impact
                     final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
-                    return new ServerUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, context, connection, scan, aggPlan, aggProjector, maxSize, maxSizeBytes);
+                    return new MutationPlan() {
+                        @Override
+                        public ParameterMetaData getParameterMetaData() {
+                            return queryPlan.getContext().getBindManager().getParameterMetaData();
+                        }
+    
+                        @Override
+                        public StatementContext getContext() {
+                            return queryPlan.getContext();
+                        }
+
+                        @Override
+                        public TableRef getTargetRef() {
+                            return tableRef;
+                        }
+
+                        @Override
+                        public Set<TableRef> getSourceRefs() {
+                            return originalQueryPlan.getSourceRefs();
+                        }
+
+                		@Override
+                		public Operation getOperation() {
+                			return operation;
+                		}
+
+                        @Override
+                        public MutationState execute() throws SQLException {
+                            ImmutableBytesWritable ptr = context.getTempPtr();
+                            PTable table = tableRef.getTable();
+                            table.getIndexMaintainers(ptr, context.getConnection());
+                            byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
+
+                            if (ptr.getLength() > 0) {
+                                byte[] uuidValue = ServerCacheClient.generateId();
+                                scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
+                                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                            }
+                            ResultIterator iterator = aggPlan.iterator();
+                            try {
+                                Tuple row = iterator.next();
+                                final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row,
+                                        PLong.INSTANCE, ptr);
+                                return new MutationState(maxSize, maxSizeBytes, connection) {
+                                    @Override
+                                    public long getUpdateCount() {
+                                        return mutationCount;
+                                    }
+                                };
+                            } finally {
+                                iterator.close();
+                            }
+                            
+                        }
+
+                        @Override
+                        public ExplainPlan getExplainPlan() throws SQLException {
+                            List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();
+                            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+                            planSteps.add("UPSERT ROWS");
+                            planSteps.addAll(queryPlanSteps);
+                            return new ExplainPlan(planSteps);
+                        }
+
+                        @Override
+                        public Long getEstimatedRowsToScan() throws SQLException {
+                            return aggPlan.getEstimatedRowsToScan();
+                        }
+
+                        @Override
+                        public Long getEstimatedBytesToScan() throws SQLException {
+                            return aggPlan.getEstimatedBytesToScan();
+                        }
+
+                        @Override
+                        public Long getEstimateInfoTimestamp() throws SQLException {
+                            return aggPlan.getEstimateInfoTimestamp();
+                        }
+                    };
                 }
             }
             ////////////////////////////////////////////////////////////////////
             // UPSERT SELECT run client-side
             /////////////////////////////////////////////////////////////////////
-            return new ClientUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, parallelIteratorFactory, projector, columnIndexes, pkSlotIndexes, useServerTimestamp, maxSize, maxSizeBytes);
+            return new MutationPlan() {
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return queryPlan.getContext().getBindManager().getParameterMetaData();
+                }
+
+                @Override
+                public StatementContext getContext() {
+                    return queryPlan.getContext();
+                }
+
+                @Override
+                public TableRef getTargetRef() {
+                    return tableRef;
+                }
+
+                @Override
+                public Set<TableRef> getSourceRefs() {
+                    return originalQueryPlan.getSourceRefs();
+                }
+
+        		@Override
+        		public Operation getOperation() {
+        			return operation;
+        		}
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    ResultIterator iterator = queryPlan.iterator();
+                    if (parallelIteratorFactory == null) {
+                        return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false);
+                    }
+                    try {
+                        parallelIteratorFactory.setRowProjector(projector);
+                        parallelIteratorFactory.setColumnIndexes(columnIndexes);
+                        parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
+                        Tuple tuple;
+                        long totalRowCount = 0;
+                        StatementContext context = queryPlan.getContext();
+                        while ((tuple=iterator.next()) != null) {// Runs query
+                            Cell kv = tuple.getValue(0);
+                            totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
+                        }
+                        // Return total number of rows that have been updated. In the case of auto commit being off
+                        // the mutations will all be in the mutation state of the current connection.
+                        MutationState mutationState = new MutationState(maxSize, maxSizeBytes, statement.getConnection(), totalRowCount);
+                        /*
+                         *  All the metrics collected for measuring the reads done by the parallel mutating iterators
+                         *  is included in the ReadMetricHolder of the statement context. Include these metrics in the
+                         *  returned mutation state so they can be published on commit. 
+                         */
+                        mutationState.setReadMetricQueue(context.getReadMetricsQueue());
+                        return mutationState; 
+                    } finally {
+                        iterator.close();
+                    }
+                }
+
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    List<String> queryPlanSteps =  queryPlan.getExplainPlan().getPlanSteps();
+                    List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+                    planSteps.add("UPSERT SELECT");
+                    planSteps.addAll(queryPlanSteps);
+                    return new ExplainPlan(planSteps);
+                }
+
+                @Override
+                public Long getEstimatedRowsToScan() throws SQLException {
+                    return queryPlan.getEstimatedRowsToScan();
+                }
+
+                @Override
+                public Long getEstimatedBytesToScan() throws SQLException {
+                    return queryPlan.getEstimatedBytesToScan();
+                }
+
+                @Override
+                public Long getEstimateInfoTimestamp() throws SQLException {
+                    return queryPlan.getEstimateInfoTimestamp();
+                }
+            };
         }
 
             
@@ -830,9 +987,124 @@ public class UpsertCompiler {
         }
         final byte[] onDupKeyBytes = onDupKeyBytesToBe;
         
-        return new UpsertValuesMutationPlan(context, tableRef, nodeIndexOffset, constantExpressions,
-                allColumns, columnIndexes, overlapViewColumns, values, addViewColumns,
-                connection, pkSlotIndexes, useServerTimestamp, onDupKeyBytes, maxSize, maxSizeBytes);
+        return new MutationPlan() {
+            @Override
+            public ParameterMetaData getParameterMetaData() {
+                return context.getBindManager().getParameterMetaData();
+            }
+
+            @Override
+            public StatementContext getContext() {
+                return context;
+            }
+
+            @Override
+            public TableRef getTargetRef() {
+                return tableRef;
+            }
+
+            @Override
+            public Set<TableRef> getSourceRefs() {
+                return Collections.emptySet();
+            }
+
+    		@Override
+    		public Operation getOperation() {
+    			return operation;
+    		}
+
+            @Override
+            public MutationState execute() throws SQLException {
+                ImmutableBytesWritable ptr = context.getTempPtr();
+                final SequenceManager sequenceManager = context.getSequenceManager();
+                // Next evaluate all the expressions
+                int nodeIndex = nodeIndexOffset;
+                PTable table = tableRef.getTable();
+                Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null :
+                    sequenceManager.newSequenceTuple(null);
+                for (Expression constantExpression : constantExpressions) {
+                    PColumn column = allColumns.get(columnIndexes[nodeIndex]);
+                    constantExpression.evaluate(tuple, ptr);
+                    Object value = null;
+                    if (constantExpression.getDataType() != null) {
+                        value = constantExpression.getDataType().toObject(ptr, constantExpression.getSortOrder(), constantExpression.getMaxLength(), constantExpression.getScale());
+                        if (!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) { 
+                            throw TypeMismatchException.newException(
+                                constantExpression.getDataType(), column.getDataType(), "expression: "
+                                        + constantExpression.toString() + " in column " + column);
+                        }
+                        if (!column.getDataType().isSizeCompatible(ptr, value, constantExpression.getDataType(),
+                                constantExpression.getSortOrder(), constantExpression.getMaxLength(), 
+                                constantExpression.getScale(), column.getMaxLength(), column.getScale())) { 
+                            throw new SQLExceptionInfo.Builder(
+                                SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
+                                .setMessage("value=" + constantExpression.toString()).build().buildException();
+                        }
+                    }
+                    column.getDataType().coerceBytes(ptr, value, constantExpression.getDataType(), 
+                            constantExpression.getMaxLength(), constantExpression.getScale(), constantExpression.getSortOrder(),
+                            column.getMaxLength(), column.getScale(),column.getSortOrder(),
+                            table.rowKeyOrderOptimizable());
+                    if (overlapViewColumns.contains(column) && Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), column.getViewConstant(), 0, column.getViewConstant().length-1) != 0) {
+                        throw new SQLExceptionInfo.Builder(
+                                SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
+                                .setColumnName(column.getName().getString())
+                                .setMessage("value=" + constantExpression.toString()).build().buildException();
+                    }
+                    values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                    nodeIndex++;
+                }
+                // Add columns based on view
+                for (PColumn column : addViewColumns) {
+                    if (IndexUtil.getViewConstantValue(column, ptr)) {
+                        values[nodeIndex++] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                    } else {
+                        throw new IllegalStateException();
+                    }
+                }
+                Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
+                IndexMaintainer indexMaintainer = null;
+                byte[][] viewConstants = null;
+                if (table.getIndexType() == IndexType.LOCAL) {
+                    PTable parentTable =
+                            statement
+                                    .getConnection()
+                                    .getMetaDataCache()
+                                    .getTableRef(
+                                        new PTableKey(statement.getConnection().getTenantId(),
+                                                table.getParentName().getString())).getTable();
+                    indexMaintainer = table.getIndexMaintainer(parentTable, connection);
+                    viewConstants = IndexUtil.getViewConstants(parentTable);
+                }
+                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0);
+                return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                List<String> planSteps = Lists.newArrayListWithExpectedSize(2);
+                if (context.getSequenceManager().getSequenceCount() > 0) {
+                    planSteps.add("CLIENT RESERVE " + context.getSequenceManager().getSequenceCount() + " SEQUENCES");
+                }
+                planSteps.add("PUT SINGLE ROW");
+                return new ExplainPlan(planSteps);
+            }
+
+            @Override
+            public Long getEstimatedRowsToScan() throws SQLException {
+                return 0l;
+            }
+
+            @Override
+            public Long getEstimatedBytesToScan() throws SQLException {
+                return 0l;
+            }
+
+            @Override
+            public Long getEstimateInfoTimestamp() throws SQLException {
+                return 0l;
+            }
+        };
     }
     
     private static boolean isRowTimestampSet(int[] pkSlotIndexes, PTable table) {
@@ -943,394 +1215,4 @@ public class UpsertCompiler {
             }
         }
     }
-
-    private class ServerUpsertSelectMutationPlan implements MutationPlan {
-        private final QueryPlan queryPlan;
-        private final TableRef tableRef;
-        private final QueryPlan originalQueryPlan;
-        private final StatementContext context;
-        private final PhoenixConnection connection;
-        private final Scan scan;
-        private final QueryPlan aggPlan;
-        private final RowProjector aggProjector;
-        private final int maxSize;
-        private final int maxSizeBytes;
-
-        public ServerUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef tableRef, QueryPlan originalQueryPlan,
-                                              StatementContext context, PhoenixConnection connection,
-                                              Scan scan, QueryPlan aggPlan, RowProjector aggProjector,
-                                              int maxSize, int maxSizeBytes) {
-            this.queryPlan = queryPlan;
-            this.tableRef = tableRef;
-            this.originalQueryPlan = originalQueryPlan;
-            this.context = context;
-            this.connection = connection;
-            this.scan = scan;
-            this.aggPlan = aggPlan;
-            this.aggProjector = aggProjector;
-            this.maxSize = maxSize;
-            this.maxSizeBytes = maxSizeBytes;
-        }
-
-        @Override
-        public ParameterMetaData getParameterMetaData() {
-            return queryPlan.getContext().getBindManager().getParameterMetaData();
-        }
-
-        @Override
-        public StatementContext getContext() {
-            return queryPlan.getContext();
-        }
-
-        @Override
-        public TableRef getTargetRef() {
-            return tableRef;
-        }
-
-        @Override
-        public QueryPlan getQueryPlan() {
-            return aggPlan;
-        }
-
-        @Override
-        public Set<TableRef> getSourceRefs() {
-            return originalQueryPlan.getSourceRefs();
-        }
-
-        @Override
-        public Operation getOperation() {
-          return operation;
-        }
-
-        @Override
-        public MutationState execute() throws SQLException {
-            ImmutableBytesWritable ptr = context.getTempPtr();
-            PTable table = tableRef.getTable();
-            table.getIndexMaintainers(ptr, context.getConnection());
-            byte[] txState = table.isTransactional() ?
-                    connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
-
-            if (ptr.getLength() > 0) {
-                byte[] uuidValue = ServerCacheClient.generateId();
-                scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-            }
-            ResultIterator iterator = aggPlan.iterator();
-            try {
-                Tuple row = iterator.next();
-                final long mutationCount = (Long) aggProjector.getColumnProjector(0).getValue(row,
-                        PLong.INSTANCE, ptr);
-                return new MutationState(maxSize, maxSizeBytes, connection) {
-                    @Override
-                    public long getUpdateCount() {
-                        return mutationCount;
-                    }
-                };
-            } finally {
-                iterator.close();
-            }
-
-        }
-
-        @Override
-        public ExplainPlan getExplainPlan() throws SQLException {
-            List<String> queryPlanSteps =  aggPlan.getExplainPlan().getPlanSteps();
-            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-            planSteps.add("UPSERT ROWS");
-            planSteps.addAll(queryPlanSteps);
-            return new ExplainPlan(planSteps);
-        }
-
-        @Override
-        public Long getEstimatedRowsToScan() throws SQLException {
-            return aggPlan.getEstimatedRowsToScan();
-        }
-
-        @Override
-        public Long getEstimatedBytesToScan() throws SQLException {
-            return aggPlan.getEstimatedBytesToScan();
-        }
-
-        @Override
-        public Long getEstimateInfoTimestamp() throws SQLException {
-            return aggPlan.getEstimateInfoTimestamp();
-        }
-    }
-
-    private class UpsertValuesMutationPlan implements MutationPlan {
-        private final StatementContext context;
-        private final TableRef tableRef;
-        private final int nodeIndexOffset;
-        private final List<Expression> constantExpressions;
-        private final List<PColumn> allColumns;
-        private final int[] columnIndexes;
-        private final Set<PColumn> overlapViewColumns;
-        private final byte[][] values;
-        private final Set<PColumn> addViewColumns;
-        private final PhoenixConnection connection;
-        private final int[] pkSlotIndexes;
-        private final boolean useServerTimestamp;
-        private final byte[] onDupKeyBytes;
-        private final int maxSize;
-        private final int maxSizeBytes;
-
-        public UpsertValuesMutationPlan(StatementContext context, TableRef tableRef, int nodeIndexOffset,
-                                        List<Expression> constantExpressions, List<PColumn> allColumns,
-                                        int[] columnIndexes, Set<PColumn> overlapViewColumns, byte[][] values,
-                                        Set<PColumn> addViewColumns, PhoenixConnection connection,
-                                        int[] pkSlotIndexes, boolean useServerTimestamp, byte[] onDupKeyBytes,
-                                        int maxSize, int maxSizeBytes) {
-            this.context = context;
-            this.tableRef = tableRef;
-            this.nodeIndexOffset = nodeIndexOffset;
-            this.constantExpressions = constantExpressions;
-            this.allColumns = allColumns;
-            this.columnIndexes = columnIndexes;
-            this.overlapViewColumns = overlapViewColumns;
-            this.values = values;
-            this.addViewColumns = addViewColumns;
-            this.connection = connection;
-            this.pkSlotIndexes = pkSlotIndexes;
-            this.useServerTimestamp = useServerTimestamp;
-            this.onDupKeyBytes = onDupKeyBytes;
-            this.maxSize = maxSize;
-            this.maxSizeBytes = maxSizeBytes;
-        }
-
-        @Override
-        public ParameterMetaData getParameterMetaData() {
-            return context.getBindManager().getParameterMetaData();
-        }
-
-        @Override
-        public StatementContext getContext() {
-            return context;
-        }
-
-        @Override
-        public TableRef getTargetRef() {
-            return tableRef;
-        }
-
-        @Override
-        public QueryPlan getQueryPlan() {
-            return null;
-        }
-
-        @Override
-        public Set<TableRef> getSourceRefs() {
-            return Collections.emptySet();
-        }
-
-        @Override
-        public Operation getOperation() {
-          return operation;
-        }
-
-        @Override
-        public MutationState execute() throws SQLException {
-            ImmutableBytesWritable ptr = context.getTempPtr();
-            final SequenceManager sequenceManager = context.getSequenceManager();
-            // Next evaluate all the expressions
-            int nodeIndex = nodeIndexOffset;
-            PTable table = tableRef.getTable();
-            Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null :
-                sequenceManager.newSequenceTuple(null);
-            for (Expression constantExpression : constantExpressions) {
-                PColumn column = allColumns.get(columnIndexes[nodeIndex]);
-                constantExpression.evaluate(tuple, ptr);
-                Object value = null;
-                if (constantExpression.getDataType() != null) {
-                    value = constantExpression.getDataType().toObject(ptr, constantExpression.getSortOrder(),
-                            constantExpression.getMaxLength(), constantExpression.getScale());
-                    if (!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) {
-                        throw TypeMismatchException.newException(
-                            constantExpression.getDataType(), column.getDataType(), "expression: "
-                                    + constantExpression.toString() + " in column " + column);
-                    }
-                    if (!column.getDataType().isSizeCompatible(ptr, value, constantExpression.getDataType(),
-                            constantExpression.getSortOrder(), constantExpression.getMaxLength(),
-                            constantExpression.getScale(), column.getMaxLength(), column.getScale())) {
-                        throw new SQLExceptionInfo.Builder(
-                            SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
-                            .setMessage("value=" + constantExpression.toString()).build().buildException();
-                    }
-                }
-                column.getDataType().coerceBytes(ptr, value, constantExpression.getDataType(),
-                        constantExpression.getMaxLength(), constantExpression.getScale(), constantExpression.getSortOrder(),
-                        column.getMaxLength(), column.getScale(),column.getSortOrder(),
-                        table.rowKeyOrderOptimizable());
-                if (overlapViewColumns.contains(column) && Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), column.getViewConstant(), 0, column.getViewConstant().length-1) != 0) {
-                    throw new SQLExceptionInfo.Builder(
-                            SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN)
-                            .setColumnName(column.getName().getString())
-                            .setMessage("value=" + constantExpression.toString()).build().buildException();
-                }
-                values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                nodeIndex++;
-            }
-            // Add columns based on view
-            for (PColumn column : addViewColumns) {
-                if (IndexUtil.getViewConstantValue(column, ptr)) {
-                    values[nodeIndex++] = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                } else {
-                    throw new IllegalStateException();
-                }
-            }
-            MultiRowMutationState mutation = new MultiRowMutationState(1);
-            IndexMaintainer indexMaintainer = null;
-            byte[][] viewConstants = null;
-            if (table.getIndexType() == IndexType.LOCAL) {
-                PTable parentTable =
-                        statement
-                                .getConnection()
-                                .getMetaDataCache()
-                                .getTableRef(
-                                    new PTableKey(statement.getConnection().getTenantId(),
-                                            table.getParentName().getString())).getTable();
-                indexMaintainer = table.getIndexMaintainer(parentTable, connection);
-                viewConstants = IndexUtil.getViewConstants(parentTable);
-            }
-            setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0);
-            return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
-        }
-
-        @Override
-        public ExplainPlan getExplainPlan() throws SQLException {
-            List<String> planSteps = Lists.newArrayListWithExpectedSize(2);
-            if (context.getSequenceManager().getSequenceCount() > 0) {
-                planSteps.add("CLIENT RESERVE " + context.getSequenceManager().getSequenceCount() + " SEQUENCES");
-            }
-            planSteps.add("PUT SINGLE ROW");
-            return new ExplainPlan(planSteps);
-        }
-
-        @Override
-        public Long getEstimatedRowsToScan() throws SQLException {
-            return 0l;
-        }
-
-        @Override
-        public Long getEstimatedBytesToScan() throws SQLException {
-            return 0l;
-        }
-
-        @Override
-        public Long getEstimateInfoTimestamp() throws SQLException {
-            return 0l;
-        }
-    }
-
-    private class ClientUpsertSelectMutationPlan implements MutationPlan {
-        private final QueryPlan queryPlan;
-        private final TableRef tableRef;
-        private final QueryPlan originalQueryPlan;
-        private final UpsertingParallelIteratorFactory parallelIteratorFactory;
-        private final RowProjector projector;
-        private final int[] columnIndexes;
-        private final int[] pkSlotIndexes;
-        private final boolean useServerTimestamp;
-        private final int maxSize;
-        private final int maxSizeBytes;
-
-        public ClientUpsertSelectMutationPlan(QueryPlan queryPlan, TableRef tableRef, QueryPlan originalQueryPlan, UpsertingParallelIteratorFactory parallelIteratorFactory, RowProjector projector, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp, int maxSize, int maxSizeBytes) {
-            this.queryPlan = queryPlan;
-            this.tableRef = tableRef;
-            this.originalQueryPlan = originalQueryPlan;
-            this.parallelIteratorFactory = parallelIteratorFactory;
-            this.projector = projector;
-            this.columnIndexes = columnIndexes;
-            this.pkSlotIndexes = pkSlotIndexes;
-            this.useServerTimestamp = useServerTimestamp;
-            this.maxSize = maxSize;
-            this.maxSizeBytes = maxSizeBytes;
-        }
-
-        @Override
-        public ParameterMetaData getParameterMetaData() {
-            return queryPlan.getContext().getBindManager().getParameterMetaData();
-        }
-
-        @Override
-        public StatementContext getContext() {
-            return queryPlan.getContext();
-        }
-
-        @Override
-        public TableRef getTargetRef() {
-            return tableRef;
-        }
-
-        @Override
-        public QueryPlan getQueryPlan() {
-            return queryPlan;
-        }
-
-        @Override
-        public Set<TableRef> getSourceRefs() {
-            return originalQueryPlan.getSourceRefs();
-        }
-
-        @Override
-        public Operation getOperation() {
-          return operation;
-        }
-
-        @Override
-        public MutationState execute() throws SQLException {
-            ResultIterator iterator = queryPlan.iterator();
-            if (parallelIteratorFactory == null) {
-                return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, false);
-            }
-            try {
-                parallelIteratorFactory.setRowProjector(projector);
-                parallelIteratorFactory.setColumnIndexes(columnIndexes);
-                parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
-                Tuple tuple;
-                long totalRowCount = 0;
-                StatementContext context = queryPlan.getContext();
-                while ((tuple=iterator.next()) != null) {// Runs query
-                    Cell kv = tuple.getValue(0);
-                    totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
-                }
-                // Return total number of rows that have been updated. In the case of auto commit being off
-                // the mutations will all be in the mutation state of the current connection.
-                MutationState mutationState = new MutationState(maxSize, maxSizeBytes, statement.getConnection(), totalRowCount);
-                /*
-                 *  All the metrics collected for measuring the reads done by the parallel mutating iterators
-                 *  is included in the ReadMetricHolder of the statement context. Include these metrics in the
-                 *  returned mutation state so they can be published on commit.
-                 */
-                mutationState.setReadMetricQueue(context.getReadMetricsQueue());
-                return mutationState;
-            } finally {
-                iterator.close();
-            }
-        }
-
-        @Override
-        public ExplainPlan getExplainPlan() throws SQLException {
-            List<String> queryPlanSteps =  queryPlan.getExplainPlan().getPlanSteps();
-            List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
-            planSteps.add("UPSERT SELECT");
-            planSteps.addAll(queryPlanSteps);
-            return new ExplainPlan(planSteps);
-        }
-
-        @Override
-        public Long getEstimatedRowsToScan() throws SQLException {
-            return queryPlan.getEstimatedRowsToScan();
-        }
-
-        @Override
-        public Long getEstimatedBytesToScan() throws SQLException {
-            return queryPlan.getEstimatedBytesToScan();
-        }
-
-        @Override
-        public Long getEstimateInfoTimestamp() throws SQLException {
-            return queryPlan.getEstimateInfoTimestamp();
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1f2274/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
index 7bf8259..5e7b996 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereOptimizer.java
@@ -326,6 +326,11 @@ public class WhereOptimizer {
         PTable table = context.getCurrentTable().getTable();
         for (int i = 0; i < expressions.size(); i++) {
             Expression expression = expressions.get(i);
+            // TODO this is a temporary fix for PHOENIX-3029.
+            if (expression instanceof CoerceExpression
+                    && expression.getSortOrder() != expression.getChildren().get(0).getSortOrder()) {
+                continue;
+            }
             KeyExpressionVisitor visitor = new KeyExpressionVisitor(context, table);
             KeyExpressionVisitor.KeySlots keySlots = expression.accept(visitor);
             int minPkPos = Integer.MAX_VALUE; 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1f2274/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java
deleted file mode 100644
index 8decc8c..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseMetaDataEndpointObserver.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.coprocessor;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTableType;
-
-public class BaseMetaDataEndpointObserver implements MetaDataEndpointObserver{
-
-    @Override
-    public void start(CoprocessorEnvironment env) throws IOException {
-
-    }
-
-    @Override
-    public void stop(CoprocessorEnvironment env) throws IOException {
-
-    }
-
-    @Override
-    public void preGetTable(
-            org.apache.hadoop.hbase.coprocessor.ObserverContext<PhoenixMetaDataControllerEnvironment> ctx,
-            String tenantId, String tableName, TableName physicalTableName) throws IOException {
-
-    }
-
-    
-    @Override
-    public void preCreateTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
-            String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType,
-            Set<byte[]> familySet, Set<TableName> indexes) throws IOException {
-
-    }
-
-    @Override
-    public void preDropTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
-            String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType,
-            List<PTable> indexes) throws IOException {
-
-    }
-
-    @Override
-    public void preAlterTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
-            String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType type) throws IOException {
-
-    }
-
-    @Override
-    public void preGetSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String schemaName)
-            throws IOException {
-
-    }
-
-    @Override
-    public void preCreateSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String schemaName)
-            throws IOException {
-
-    }
-
-    @Override
-    public void preDropSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String schemaName) throws IOException {
-
-    }
-
-    @Override
-    public void preCreateFunction(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
-            String functionName) throws IOException {
-
-    }
-
-    @Override
-    public void preDropFunction(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, String functionName)
-            throws IOException {}
-
-    @Override
-    public void preGetFunctions(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, String functionName)
-            throws IOException {
-
-    }
-
-    @Override
-    public void preIndexUpdate(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
-            String indexName, TableName physicalTableName, TableName parentPhysicalTableName, PIndexState newState)
-            throws IOException {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b1f2274/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index afbd63f..d05ab79 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -84,7 +84,6 @@ import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
 
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -92,12 +91,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -108,7 +105,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -125,12 +121,9 @@ import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.ipc.RpcServer.Call;
-import org.apache.hadoop.hbase.ipc.RpcUtil;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Region.RowLock;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.VersionInfo;
@@ -459,7 +452,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int DEFAULT_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(DEFAULT_VALUE_KV);
     private static final int MIN_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MIN_VALUE_KV);
     private static final int MAX_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MAX_VALUE_KV);
-
+    
     private static PName newPName(byte[] keyBuffer, int keyOffset, int keyLength) {
         if (keyLength <= 0) {
             return null;
@@ -470,9 +463,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
 
     private RegionCoprocessorEnvironment env;
 
-    private PhoenixMetaDataCoprocessorHost phoenixAccessCoprocessorHost;
-    private boolean accessCheckEnabled;
-
     /**
      * Stores a reference to the coprocessor environment provided by the
      * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
@@ -490,10 +480,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         } else {
             throw new CoprocessorException("Must be loaded on a table region!");
         }
-        
-        phoenixAccessCoprocessorHost = new PhoenixMetaDataCoprocessorHost(this.env);
-        this.accessCheckEnabled = env.getConfiguration().getBoolean(QueryServices.PHOENIX_ACLS_ENABLED,
-                QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED);
         logger.info("Starting Tracing-Metrics Systems");
         // Start the phoenix trace collection
         Tracing.addTraceMetricsSource();
@@ -537,9 +523,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 done.run(builder.build());
                 return;
             }
-            getCoprocessorHost().preGetTable(Bytes.toString(tenantId), SchemaUtil.getTableName(schemaName, tableName),
-                    TableName.valueOf(table.getPhysicalName().getBytes()));
-
             builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
             long disableIndexTimestamp = table.getIndexDisableTimestamp();
             long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE;
@@ -571,10 +554,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         }
     }
 
-    private PhoenixMetaDataCoprocessorHost getCoprocessorHost() {
-        return phoenixAccessCoprocessorHost;
-    }
-
     private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, Region region,
             long clientTimeStamp, int clientVersion) throws IOException, SQLException {
         Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
@@ -1338,14 +1317,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
      * @return null if the physical table row information is not present.
      * 
      */
-    private static Mutation getPhysicalTableRowForView(List<Mutation> tableMetadata, byte[][] parentTenantSchemaTableNames, byte[][] physicalSchemaTableNames) {
+    private static Mutation getPhysicalTableForView(List<Mutation> tableMetadata, byte[][] parentSchemaTableNames) {
         int size = tableMetadata.size();
         byte[][] rowKeyMetaData = new byte[3][];
         MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
         Mutation physicalTableRow = null;
-        Mutation parentTableRow = null;
         boolean physicalTableLinkFound = false;
-        boolean parentTableLinkFound = false;
         if (size >= 2) {
             int i = size - 1;
             while (i >= 1) {
@@ -1355,51 +1332,28 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     if (linkType == LinkType.PHYSICAL_TABLE) {
                         physicalTableRow = m;
                         physicalTableLinkFound = true;
+                        break;
                     }
-                    if (linkType == LinkType.PARENT_TABLE) {
-                        parentTableRow=m;
-                        parentTableLinkFound = true;
-                    }
-                }
-                if(physicalTableLinkFound && parentTableLinkFound){
-                    break;
                 }
                 i--;
             }
         }
-        if (!parentTableLinkFound) {
-            parentTenantSchemaTableNames[0] = null;
-            parentTenantSchemaTableNames[1] = null;
-            parentTenantSchemaTableNames[2] = null;
-            
-        }
         if (!physicalTableLinkFound) {
-            physicalSchemaTableNames[0] = null;
-            physicalSchemaTableNames[1] = null;
-            physicalSchemaTableNames[2] = null;
-        }
-        if (physicalTableLinkFound) {
-            getSchemaTableNames(physicalTableRow,physicalSchemaTableNames);
-        }
-        if (parentTableLinkFound) {
-            getSchemaTableNames(parentTableRow,parentTenantSchemaTableNames);   
+            parentSchemaTableNames[0] = null;
+            parentSchemaTableNames[1] = null;
+            return null;
         }
-        return physicalTableRow;
-    }
-    
-    private static void getSchemaTableNames(Mutation row, byte[][] schemaTableNames) {
-        byte[][] rowKeyMetaData = new byte[5][];
-        getVarChars(row.getRow(), 5, rowKeyMetaData);
-        byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+        rowKeyMetaData = new byte[5][];
+        getVarChars(physicalTableRow.getRow(), 5, rowKeyMetaData);
         byte[] colBytes = rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX];
         byte[] famBytes = rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX];
         if ((colBytes == null || colBytes.length == 0) && (famBytes != null && famBytes.length > 0)) {
             byte[] sName = SchemaUtil.getSchemaNameFromFullName(famBytes).getBytes();
             byte[] tName = SchemaUtil.getTableNameFromFullName(famBytes).getBytes();
-            schemaTableNames[0]= tenantId;
-            schemaTableNames[1] = sName;
-            schemaTableNames[2] = tName;
+            parentSchemaTableNames[0] = sName;
+            parentSchemaTableNames[1] = tName;
         }
+        return physicalTableRow;
     }
     
     @Override
@@ -1416,76 +1370,25 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
             schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
             tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
-            boolean isNamespaceMapped = MetaDataUtil.isNameSpaceMapped(tableMetadata, GenericKeyValueBuilder.INSTANCE,
-                    new ImmutableBytesWritable());
-            final IndexType indexType = MetaDataUtil.getIndexType(tableMetadata, GenericKeyValueBuilder.INSTANCE,
-                    new ImmutableBytesWritable());
+
             byte[] parentSchemaName = null;
             byte[] parentTableName = null;
             PTableType tableType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());
             byte[] parentTableKey = null;
             Mutation viewPhysicalTableRow = null;
-            Set<TableName> indexes = new HashSet<TableName>();;
-            byte[] cPhysicalName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, isNamespaceMapped)
-                    .getBytes();
-            byte[] cParentPhysicalName=null;
             if (tableType == PTableType.VIEW) {
-                byte[][] parentSchemaTableNames = new byte[3][];
-                byte[][] parentPhysicalSchemaTableNames = new byte[3][];
+                byte[][] parentSchemaTableNames = new byte[2][];
                 /*
                  * For a view, we lock the base physical table row. For a mapped view, there is 
                  * no link present to the physical table. So the viewPhysicalTableRow is null
                  * in that case.
                  */
-                
-                viewPhysicalTableRow = getPhysicalTableRowForView(tableMetadata, parentSchemaTableNames,parentPhysicalSchemaTableNames);
-                long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
-                if (parentPhysicalSchemaTableNames[2] != null) {
-                    
-                    parentTableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
-                            parentPhysicalSchemaTableNames[1], parentPhysicalSchemaTableNames[2]);
-                    PTable parentTable = loadTable(env, parentTableKey, new ImmutableBytesPtr(parentTableKey),
-                            clientTimeStamp, clientTimeStamp, clientVersion);
-                    if (parentTable == null) {
-                        builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
-                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
-                        done.run(builder.build());
-                        return;
-                    }
-                    cParentPhysicalName = parentTable.getPhysicalName().getBytes();
-                    if (parentSchemaTableNames[2] != null
-                            && Bytes.compareTo(parentSchemaTableNames[2], parentPhysicalSchemaTableNames[2]) != 0) {
-                        // if view is created on view
-                        byte[] parentKey = SchemaUtil.getTableKey(
-                                parentSchemaTableNames[0] == null ? ByteUtil.EMPTY_BYTE_ARRAY : parentSchemaTableNames[0],
-                                parentSchemaTableNames[1], parentSchemaTableNames[2]);
-                        parentTable = loadTable(env, parentKey, new ImmutableBytesPtr(parentKey),
-                                clientTimeStamp, clientTimeStamp, clientVersion);
-                        if (parentTable == null) {
-                            // it could be a global view
-                            parentKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
-                                    parentSchemaTableNames[1], parentSchemaTableNames[2]);
-                            parentTable = loadTable(env, parentKey, new ImmutableBytesPtr(parentKey),
-                                    clientTimeStamp, clientTimeStamp, clientVersion);
-                        }
-                    }
-                    if (parentTable == null) {
-                        builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
-                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
-                        done.run(builder.build());
-                        return;
-                    }
-                    for (PTable index : parentTable.getIndexes()) {
-                        indexes.add(TableName.valueOf(index.getPhysicalName().getBytes()));
-                    }
-
-                } else {
-                    // Mapped View
-                    cParentPhysicalName = SchemaUtil.getTableNameAsBytes(schemaName, tableName);
+                viewPhysicalTableRow = getPhysicalTableForView(tableMetadata, parentSchemaTableNames);
+                parentSchemaName = parentSchemaTableNames[0];
+                parentTableName = parentSchemaTableNames[1];
+                if (parentTableName != null) {
+                    parentTableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY, parentSchemaName, parentTableName);
                 }
-                parentSchemaName = parentPhysicalSchemaTableNames[1];
-                parentTableName = parentPhysicalSchemaTableNames[2];
-                    
             } else if (tableType == PTableType.INDEX) {
                 parentSchemaName = schemaName;
                 /* 
@@ -1495,27 +1398,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                  */ 
                 parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
                 parentTableKey = SchemaUtil.getTableKey(tenantIdBytes, parentSchemaName, parentTableName);
-                long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
-                PTable parentTable = loadTable(env, parentTableKey, new ImmutableBytesPtr(parentTableKey),
-                        clientTimeStamp, clientTimeStamp, clientVersion);
-                if (IndexType.LOCAL == indexType) {
-                    cPhysicalName = parentTable.getPhysicalName().getBytes();
-                    cParentPhysicalName=parentTable.getPhysicalName().getBytes();
-                } else if (parentTable.getType() == PTableType.VIEW) {
-                    cPhysicalName = MetaDataUtil.getViewIndexPhysicalName(parentTable.getPhysicalName().getBytes());
-                    cParentPhysicalName = parentTable.getPhysicalName().getBytes();
-                }else{
-                    cParentPhysicalName = SchemaUtil
-                            .getPhysicalHBaseTableName(parentSchemaName, parentTableName, isNamespaceMapped).getBytes();
-                }
             }
-            
-            getCoprocessorHost().preCreateTable(Bytes.toString(tenantIdBytes),
-                    SchemaUtil.getTableName(schemaName, tableName),
-                    (tableType == PTableType.VIEW) ? null : TableName.valueOf(cPhysicalName),
-                    cParentPhysicalName == null ? null : TableName.valueOf(cParentPhysicalName), tableType,
-                    /* TODO: During inital create we may not need the family map */
-                    Collections.<byte[]> emptySet(), indexes);
 
             Region region = env.getRegion();
             List<RowLock> locks = Lists.newArrayList();
@@ -1730,7 +1613,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 // primary and then index table locks are held, in that order). For now, we just don't support
                 // indexing on the system table. This is an issue because of the way we manage batch mutation
                 // in the Indexer.
-                mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
 
                 // Invalidate the cache - the next getTable call will add it
                 // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
@@ -1749,7 +1632,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 done.run(builder.build());
                 return;
             } finally {
-                releaseRowLocks(region,locks);
+                region.releaseRowLocks(locks);
             }
         } catch (Throwable t) {
             logger.error("createTable failed", t);
@@ -1765,6 +1648,16 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 QueryServicesOptions.DEFAULT_MAX_INDEXES_PER_TABLE);
     }
 
+    private static RowLock acquireLock(Region region, byte[] key, List<RowLock> locks)
+        throws IOException {
+        RowLock rowLock = region.getRowLock(key, false);
+        if (rowLock == null) {
+            throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
+        }
+        locks.add(rowLock);
+        return rowLock;
+    }
+
     private static final byte[] CHILD_TABLE_BYTES = new byte[] {PTable.LinkType.CHILD_TABLE.getSerializedValue()};
 
     
@@ -1953,23 +1846,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes,
                         schemaName, tableName);
 
-            
-            PTableType ptableType=PTableType.fromSerializedValue(tableType);
-            long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
-            byte[] cKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
-            PTable loadedTable = loadTable(env, cKey, new ImmutableBytesPtr(cKey), clientTimeStamp, clientTimeStamp,
-                    request.getClientVersion());
-            if (loadedTable == null) {
-                builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
-                builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
-                done.run(builder.build());
-                return;
-            }
-            getCoprocessorHost().preDropTable(Bytes.toString(tenantIdBytes),
-                    SchemaUtil.getTableName(schemaName, tableName),
-                    TableName.valueOf(loadedTable.getPhysicalName().getBytes()),
-                    getParentPhysicalTableName(loadedTable), ptableType,loadedTable.getIndexes());
-
             Region region = env.getRegion();
             MetaDataMutationResult result = checkTableKeyInRegion(key, region);
             if (result != null) {
@@ -1994,7 +1870,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 }
                 Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                 // Commit the list of deletion.
-                mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
+                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
                     HConstants.NO_NONCE);
                 long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
                 for (ImmutableBytesPtr ckey : invalidateList) {
@@ -2007,7 +1883,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 done.run(MetaDataMutationResult.toProto(result));
                 return;
             } finally {
-                releaseRowLocks(region,locks);
+                region.releaseRowLocks(locks);
             }
         } catch (Throwable t) {
           logger.error("dropTable failed", t);
@@ -2015,24 +1891,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
         }
     }
-    
-    protected void releaseRowLocks(Region region, List<RowLock> locks) {
-        if (locks != null) {
-            region.releaseRowLocks(locks);
-        }
-    }
-
-    private RowLock acquireLock(Region region, byte[] lockKey, List<RowLock> locks) throws IOException {
-        //LockManager.RowLock rowLock = lockManager.lockRow(lockKey, rowLockWaitDuration);
-        RowLock rowLock = region.getRowLock(lockKey, false);
-        if (rowLock == null) {
-            throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(lockKey));
-        }
-        if (locks != null) {
-            locks.add(rowLock);
-        }
-        return rowLock;
-    }
 
     private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName,
         byte[] tableName, byte[] parentTableName, PTableType tableType, List<Mutation> rowsToDelete,
@@ -2235,15 +2093,18 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                             EnvironmentEdgeManager.currentTimeMillis(), null);
                 }
                 if (table.getTimeStamp() >= clientTimeStamp) {
-                    logger.info("Found newer table as of " + table.getTimeStamp() + " versus client timestamp of "
-                            + clientTimeStamp);
+                    logger.info("Found newer table as of " + table.getTimeStamp() + " versus client timestamp of " + clientTimeStamp);
                     return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
                             EnvironmentEdgeManager.currentTimeMillis(), table);
-                } else if (isTableDeleted(table)) { return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
-                        EnvironmentEdgeManager.currentTimeMillis(), null); }
-                long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup TABLE_SEQ_NUM in
-                                                                                         // tableMetaData
+                } else if (isTableDeleted(table)) {
+                    return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                            EnvironmentEdgeManager.currentTimeMillis(), null);
+                }
 
+                long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup
+                                                                                         // TABLE_SEQ_NUM
+                                                                                         // in
+                                                                                         // tableMetaData
                 if (logger.isDebugEnabled()) {
                     logger.debug("For table " + Bytes.toStringBinary(key) + " expecting seqNum "
                             + expectedSeqNum + " and found seqNum " + table.getSequenceNumber()
@@ -2278,7 +2139,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 if (result != null && result.getMutationCode()!=MutationCode.TABLE_ALREADY_EXISTS) {
                     return result;
                 }
-                mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
                 // Invalidate from cache
                 for (ImmutableBytesPtr invalidateKey : invalidateList) {
                     metaDataCache.invalidate(invalidateKey);
@@ -2294,7 +2155,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table);
                 }
             } finally {
-                releaseRowLocks(region,locks);
+                region.releaseRowLocks(locks);
             }
         } catch (Throwable t) {
             ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
@@ -3110,11 +2971,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     PTableType type = table.getType();
                     byte[] tableHeaderRowKey = SchemaUtil.getTableKey(tenantId,
                             schemaName, tableName);
-                    byte[] cPhysicalTableName=table.getPhysicalName().getBytes();
-                    getCoprocessorHost().preAlterTable(Bytes.toString(tenantId),
-                            SchemaUtil.getTableName(schemaName, tableName), TableName.valueOf(cPhysicalTableName),
-                            getParentPhysicalTableName(table),type);
-
                     // Size for worst case - all new columns are PK column
                     List<Mutation> mutationsForAddingColumnsToViews = Lists.newArrayListWithExpectedSize(tableMetaData.size() * ( 1 + table.getIndexes().size()));
                     if (type == PTableType.TABLE || type == PTableType.SYSTEM) {
@@ -3268,7 +3124,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
                 QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
         if (!wasLocked) {
-            rowLock = acquireLock(region, key, null);
+            rowLock = region.getRowLock(key, false);
+            if (rowLock == null) {
+                throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
+            }
         }
         try {
             PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
@@ -3325,10 +3184,16 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
          * Lock directly on key, though it may be an index table. This will just prevent a table
          * from getting rebuilt too often.
          */
-        List<RowLock> rowLocks = new ArrayList<RowLock>(keys.size());;
+        List<RowLock> rowLocks = new ArrayList<Region.RowLock>(keys.size());;
         try {
+            rowLocks = new ArrayList<Region.RowLock>(keys.size());
             for (int i = 0; i < keys.size(); i++) {
-                acquireLock(region, keys.get(i), rowLocks);
+                Region.RowLock rowLock = region.getRowLock(keys.get(i), false);
+                if (rowLock == null) {
+                    throw new IOException("Failed to acquire lock on "
+                            + Bytes.toStringBinary(keys.get(i)));
+                }
+                rowLocks.add(rowLock);
             }
 
             List<PFunction> functionsAvailable = new ArrayList<PFunction>(keys.size());
@@ -3358,7 +3223,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             if(functionsAvailable.size() == numFunctions) return functionsAvailable;
             return null;
         } finally {
-            releaseRowLocks(region,rowLocks);
+            for (Region.RowLock lock : rowLocks) {
+                lock.release();
+            }
+            rowLocks.clear();
         }
     }
 
@@ -3380,11 +3248,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
                     byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
                     boolean deletePKColumn = false;
-                    getCoprocessorHost().preAlterTable(Bytes.toString(tenantId),
-                            SchemaUtil.getTableName(schemaName, tableName),
-                            TableName.valueOf(table.getPhysicalName().getBytes()),
-                            getParentPhysicalTableName(table),table.getType());
-
                     List<Mutation> additionalTableMetaData = Lists.newArrayList();
                     
                     PTableType type = table.getType();
@@ -3617,7 +3480,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             }
             PIndexState newState =
                     PIndexState.fromSerializedValue(newKV.getValueArray()[newKV.getValueOffset()]);
-            RowLock rowLock = acquireLock(region, key, null);
+            RowLock rowLock = region.getRowLock(key, false);
             if (rowLock == null) {
                 throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
             }
@@ -3639,22 +3502,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 Cell currentDisableTimeStamp = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
                 boolean rowKeyOrderOptimizable = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES) != null;
 
-                //check permission on data table
-                long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
-                PTable loadedTable = loadTable(env, key, new ImmutableBytesPtr(key), clientTimeStamp, clientTimeStamp,
-                        request.getClientVersion());
-                if (loadedTable == null) {
-                    builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
-                    builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
-                    done.run(builder.build());
-                    return;
-                }
-                getCoprocessorHost().preIndexUpdate(Bytes.toString(tenantId),
-                        SchemaUtil.getTableName(schemaName, tableName),
-                        TableName.valueOf(loadedTable.getPhysicalName().getBytes()),
-                        getParentPhysicalTableName(loadedTable),
-                        newState);
-
                 PIndexState currentState =
                         PIndexState.fromSerializedValue(currentStateKV.getValueArray()[currentStateKV
                                 .getValueOffset()]);
@@ -3764,7 +3611,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     if (setRowKeyOrderOptimizableCell) {
                         UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetadata, key, timeStamp);
                     }
-                    mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
+                    region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
                         HConstants.NO_NONCE);
                     // Invalidate from cache
                     Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
@@ -3925,7 +3772,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         long clientTimeStamp = request.getClientTimestamp();
         List<RowLock> locks = Lists.newArrayList();
         try {
-            getCoprocessorHost().preGetSchema(schemaName);
             acquireLock(region, lockKey, locks);
             // Get as of latest timestamp so we can detect if we have a
             // newer schema that already
@@ -3956,7 +3802,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             done.run(builder.build());
             return;
         } finally {
-            releaseRowLocks(region,locks);
+            region.releaseRowLocks(locks);
         }
     }
 
@@ -4059,7 +3905,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 }
                 // Don't store function info for temporary functions.
                 if(!temporaryFunction) {
-                    mutateRowsWithLocks(region, functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
                 }
 
                 // Invalidate the cache - the next getFunction call will add it
@@ -4073,7 +3919,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 done.run(builder.build());
                 return;
             } finally {
-                releaseRowLocks(region,locks);
+                region.releaseRowLocks(locks);
             }
         } catch (Throwable t) {
           logger.error("createFunction failed", t);
@@ -4112,7 +3958,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     done.run(MetaDataMutationResult.toProto(result));
                     return;
                 }
-                mutateRowsWithLocks(region, functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+                region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
 
                 Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                 long currentTime = MetaDataUtil.getClientTimeStamp(functionMetaData);
@@ -4125,7 +3971,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 done.run(MetaDataMutationResult.toProto(result));
                 return;
             } finally {
-                releaseRowLocks(region,locks);
+                region.releaseRowLocks(locks);
             }
         } catch (Throwable t) {
           logger.error("dropFunction failed", t);
@@ -4222,7 +4068,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                         return;
                     }
                 }
-                mutateRowsWithLocks(region, schemaMutations, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
+                region.mutateRowsWithLocks(schemaMutations, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
                         HConstants.NO_NONCE);
 
                 // Invalidate the cache - the next getSchema call will add it
@@ -4240,7 +4086,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 done.run(builder.build());
                 return;
             } finally {
-                releaseRowLocks(region,locks);
+                region.releaseRowLocks(locks);
             }
         } catch (Throwable t) {
             logger.error("Creating the schema" + schemaName + "failed", t);
@@ -4254,7 +4100,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         try {
             List<Mutation> schemaMetaData = ProtobufUtil.getMutations(request);
             schemaName = request.getSchemaName();
-            getCoprocessorHost().preDropSchema(schemaName);
             byte[] lockKey = SchemaUtil.getSchemaKey(schemaName);
             Region region = env.getRegion();
             MetaDataMutationResult result = checkSchemaKeyInRegion(lockKey, region);
@@ -4272,7 +4117,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     done.run(MetaDataMutationResult.toProto(result));
                     return;
                 }
-                mutateRowsWithLocks(region, schemaMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
+                region.mutateRowsWithLocks(schemaMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
                         HConstants.NO_NONCE);
                 Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env)
                         .getMetaDataCache();
@@ -4284,7 +4129,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 done.run(MetaDataMutationResult.toProto(result));
                 return;
             } finally {
-                releaseRowLocks(region,locks);
+                region.releaseRowLocks(locks);
             }
         } catch (Throwable t) {
             logger.error("drop schema failed:", t);
@@ -4330,48 +4175,4 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 null);
 
     }
-    
-    private void mutateRowsWithLocks(final Region region, final List<Mutation> mutations, final Set<byte[]> rowsToLock,
-            final long nonceGroup, final long nonce) throws IOException {
-        // we need to mutate SYSTEM.CATALOG with HBase/login user if access is enabled.
-        if (this.accessCheckEnabled) {
-            User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
-                @Override
-                public Void run() throws Exception {
-                    final Call rpcContext = RpcUtil.getRpcContext();
-                    // Setting RPC context as null so that user can be resetted
-                    try {
-                        RpcUtil.setRpcContext(null);
-                        region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
-                    } catch (Throwable e) {
-                        throw new IOException(e);
-                    } finally {
-                        // Setting RPC context back to original context of the RPC
-                        RpcUtil.setRpcContext(rpcContext);
-                    }
-                    return null;
-                }
-            });
-        } else {
-            region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
-        }
-    }
-    
-    private TableName getParentPhysicalTableName(PTable table) {
-        return table
-                .getType() == PTableType.VIEW
-                        ? TableName.valueOf(table.getPhysicalName().getBytes())
-                        : table.getType() == PTableType.INDEX
-                                ? TableName
-                                        .valueOf(SchemaUtil
-                                                .getPhysicalHBaseTableName(table.getParentSchemaName(),
-                                                        table.getParentTableName(), table.isNamespaceMapped())
-                                                .getBytes())
-                                : TableName
-                                        .valueOf(
-                                                SchemaUtil
-                                                        .getPhysicalHBaseTableName(table.getSchemaName(),
-                                                                table.getTableName(), table.isNamespaceMapped())
-                                                        .getBytes());
-    }
 }