You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by je...@apache.org on 2014/02/08 08:42:58 UTC

[09/11] Port Phoenix to Hbase0.98

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index bc357b1..cc0dc05 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -52,8 +52,6 @@ import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -63,14 +61,19 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
@@ -80,11 +83,25 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.hadoop.hbase.index.util.IndexManagementUtil;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
@@ -113,6 +130,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
 
 /**
  * 
@@ -129,7 +149,7 @@ import com.google.common.collect.Lists;
  * 
  * @since 0.1
  */
-public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements MetaDataProtocol {
+public class MetaDataEndpointImpl extends MetaDataProtocol implements CoprocessorService, Coprocessor {
     private static final Logger logger = LoggerFactory.getLogger(MetaDataEndpointImpl.class);
 
     // KeyValues for Table
@@ -203,7 +223,8 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
     private static final int COLUMN_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_SIZE_KV);
     private static final int NULLABLE_INDEX = COLUMN_KV_COLUMNS.indexOf(NULLABLE_KV);
     private static final int SQL_DATA_TYPE_INDEX = COLUMN_KV_COLUMNS.indexOf(DATA_TYPE_KV);
-    private static final int ORDINAL_POSITION_INDEX = COLUMN_KV_COLUMNS.indexOf(ORDINAL_POSITION_KV);
+    private static final int ORDINAL_POSITION_INDEX = COLUMN_KV_COLUMNS
+            .indexOf(ORDINAL_POSITION_KV);
     private static final int COLUMN_MODIFIER_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_MODIFIER_KV);
     private static final int ARRAY_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(ARRAY_SIZE_KV);
     
@@ -219,8 +240,9 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
         System.arraycopy(keyBuffer, keyOffset, pnameBuf, 0, length);
         return PNameFactory.newName(pnameBuf);
     }
-    
-    private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp) throws IOException {
+
+    private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp)
+            throws IOException {
         Scan scan = new Scan();
         scan.setTimeRange(startTimeStamp, stopTimeStamp);
         scan.setStartRow(key);
@@ -230,18 +252,159 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
         return scan;
     }
 
+    private RegionCoprocessorEnvironment env;
+
+    private static MetaDataMutationResult checkTableKeyInRegion(byte[] key, HRegion region) {
+        byte[] startKey = region.getStartKey();
+        byte[] endKey = region.getEndKey();
+        if (Bytes.compareTo(startKey, key) <= 0
+                && (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key,
+                    endKey) < 0)) {
+            return null; // normal case;
+        }
+        return new MetaDataMutationResult(MutationCode.TABLE_NOT_IN_REGION,
+                EnvironmentEdgeManager.currentTimeMillis(), null);
+    }
+
+    /**
+     * Stores a reference to the coprocessor environment provided by the
+     * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
+     * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
+     * on a table region, so always expects this to be an instance of
+     * {@link RegionCoprocessorEnvironment}.
+     * @param env the environment provided by the coprocessor host
+     * @throws IOException if the provided environment is not an instance of
+     *             {@code RegionCoprocessorEnvironment}
+     */
     @Override
-    public RegionCoprocessorEnvironment getEnvironment() {
-        return (RegionCoprocessorEnvironment)super.getEnvironment();
+    public void start(CoprocessorEnvironment env) throws IOException {
+        if (env instanceof RegionCoprocessorEnvironment) {
+            this.env = (RegionCoprocessorEnvironment) env;
+        } else {
+            throw new CoprocessorException("Must be loaded on a table region!");
+        }
     }
-    
-    private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) throws IOException, SQLException {
+
+    @Override
+    public void stop(CoprocessorEnvironment env) throws IOException {
+        // nothing to do
+    }
+
+    @Override
+    public Service getService() {
+        return this;
+    }
+
+    @Override
+    public void getTable(RpcController controller, GetTableRequest request,
+            RpcCallback<MetaDataResponse> done) {
+        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
+        byte[] tenantId = request.getTenantId().toByteArray();
+        byte[] schemaName = request.getSchemaName().toByteArray();
+        byte[] tableName = request.getTableName().toByteArray();
+        byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
+        long tableTimeStamp = request.getTableTimestamp();
+
+        try {
+            // TODO: check that key is within region.getStartKey() and region.getEndKey()
+            // and return special code to force client to lookup region from meta.
+            HRegion region = env.getRegion();
+            MetaDataMutationResult result = checkTableKeyInRegion(key, region);
+            if (result != null) {
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
+            }
+
+            long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+            PTable table = doGetTable(key, request.getClientTimestamp());
+            if (table == null) {
+                builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
+                builder.setMutationTime(currentTime);
+                done.run(builder.build());
+                return;
+            }
+            builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
+            builder.setMutationTime(currentTime);
+            if (table.getTimeStamp() != tableTimeStamp) {
+                builder.setTable(PTableImpl.toProto(table));
+            }
+            done.run(builder.build());
+            return;
+        } catch (Throwable t) {
+        	logger.error("getTable failed", t);
+            ProtobufUtil.setControllerException(controller,
+                ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
+        }
+    }
+
+    private PTable doGetTable(byte[] key, long clientTimeStamp) throws IOException, SQLException {
+        ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
+        Map<ImmutableBytesPtr, PTable> metaDataCache =
+                GlobalCache.getInstance(this.env).getMetaDataCache();
+        PTable table = metaDataCache.get(cacheKey);
+        // We only cache the latest, so we'll end up building the table with every call if the
+        // client connection has specified an SCN.
+        // TODO: If we indicate to the client that we're returning an older version, but there's a
+        // newer version available, the client
+        // can safely not call this, since we only allow modifications to the latest.
+        if (table != null && table.getTimeStamp() < clientTimeStamp) {
+            // Table on client is up-to-date with table on server, so just return
+            if (isTableDeleted(table)) {
+                return null;
+            }
+            return table;
+        }
+        // Ask Lars about the expense of this call - if we don't take the lock, we still won't get
+        // partial results
+        // get the co-processor environment
+        // TODO: check that key is within region.getStartKey() and region.getEndKey()
+        // and return special code to force client to lookup region from meta.
+        HRegion region = env.getRegion();
+        /*
+         * Lock directly on key, though it may be an index table. This will just prevent a table
+         * from getting rebuilt too often.
+         */
+        RowLock rowLock = region.getRowLock(key);
+        if (rowLock == null) {
+            throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
+        }
+        try {
+            // Try cache again in case we were waiting on a lock
+            table = metaDataCache.get(cacheKey);
+            // We only cache the latest, so we'll end up building the table with every call if the
+            // client connection has specified an SCN.
+            // TODO: If we indicate to the client that we're returning an older version, but there's
+            // a newer version available, the client
+            // can safely not call this, since we only allow modifications to the latest.
+            if (table != null && table.getTimeStamp() < clientTimeStamp) {
+                // Table on client is up-to-date with table on server, so just return
+                if (isTableDeleted(table)) {
+                    return null;
+                }
+                return table;
+            }
+            // Query for the latest table first, since it's not cached
+            table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP);
+            if (table != null && table.getTimeStamp() < clientTimeStamp) {
+                return table;
+            }
+            // Otherwise, query for an older version of the table - it won't be cached
+            return buildTable(key, cacheKey, region, clientTimeStamp);
+        } finally {
+            rowLock.release();
+        }
+    }
+
+    private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
+            long clientTimeStamp) throws IOException, SQLException {
         Scan scan = newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
         RegionScanner scanner = region.getScanner(scan);
-        Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+        Map<ImmutableBytesPtr, PTable> metaDataCache =
+                GlobalCache.getInstance(this.env).getMetaDataCache();
         try {
             PTable oldTable = metaDataCache.get(cacheKey);
-            long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp();
+            long tableTimeStamp =
+                    oldTable == null ? MIN_TABLE_TIMESTAMP - 1 : oldTable.getTimeStamp();
             PTable newTable;
             newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
             if (newTable == null) {
@@ -249,14 +412,24 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
             }
             if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()) {
                 if (logger.isDebugEnabled()) {
-                    logger.debug("Caching table " + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(), cacheKey.getLength()) + " at seqNum " + newTable.getSequenceNumber() + " with newer timestamp " + newTable.getTimeStamp() + " versus " + tableTimeStamp);
+                    logger.debug("Caching table "
+                            + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
+                                cacheKey.getLength()) + " at seqNum "
+                            + newTable.getSequenceNumber() + " with newer timestamp "
+                            + newTable.getTimeStamp() + " versus " + tableTimeStamp);
                 }
                 oldTable = metaDataCache.put(cacheKey, newTable);
                 if (logger.isDebugEnabled()) {
                     if (oldTable == null) {
-                        logger.debug("No previously cached table " + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(), cacheKey.getLength()));
+                        logger.debug("No previously cached table "
+                                + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
+                                    cacheKey.getLength()));
                     } else {
-                        logger.debug("Previously cached table " + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(), cacheKey.getLength()) + " was at seqNum " + oldTable.getSequenceNumber() + " with timestamp " + oldTable.getTimeStamp());
+                        logger.debug("Previously cached table "
+                                + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
+                                    cacheKey.getLength()) + " was at seqNum "
+                                + oldTable.getSequenceNumber() + " with timestamp "
+                                + oldTable.getTimeStamp());
                     }
                 }
             }
@@ -276,14 +449,17 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
         indexes.add(indexTable);
     }
 
-    private void addColumnToTable(List<KeyValue> results, PName colName, PName famName, KeyValue[] colKeyValues, List<PColumn> columns) {
+    private void addColumnToTable(List<Cell> results, PName colName, PName famName,
+        Cell[] colKeyValues, List<PColumn> columns) {
         int i = 0;
         int j = 0;
         while (i < results.size() && j < COLUMN_KV_COLUMNS.size()) {
-            KeyValue kv = results.get(i);
-            KeyValue searchKv = COLUMN_KV_COLUMNS.get(j);
-            int cmp = Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(), 
-                    searchKv.getBuffer(), searchKv.getQualifierOffset(), searchKv.getQualifierLength());
+            Cell kv = results.get(i);
+            Cell searchKv = COLUMN_KV_COLUMNS.get(j);
+            int cmp =
+                    Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
+                        kv.getQualifierLength(), searchKv.getQualifierArray(),
+                        searchKv.getQualifierOffset(), searchKv.getQualifierLength());
             if (cmp == 0) {
                 colKeyValues[j++] = kv;
                 i++;
@@ -293,53 +469,77 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
                 i++; // shouldn't happen - means unexpected KV in system table column row
             }
         }
-        // COLUMN_SIZE and DECIMAL_DIGIT are optional. NULLABLE, DATA_TYPE and ORDINAL_POSITION_KV are required.
+        // COLUMN_SIZE and DECIMAL_DIGIT are optional. NULLABLE, DATA_TYPE and ORDINAL_POSITION_KV
+        // are required.
         if (colKeyValues[SQL_DATA_TYPE_INDEX] == null || colKeyValues[NULLABLE_INDEX] == null
                 || colKeyValues[ORDINAL_POSITION_INDEX] == null) {
-            throw new IllegalStateException("Didn't find all required key values in '" + colName.getString() + "' column metadata row");
+            throw new IllegalStateException("Didn't find all required key values in '"
+                    + colName.getString() + "' column metadata row");
         }
-        KeyValue columnSizeKv = colKeyValues[COLUMN_SIZE_INDEX];
-        Integer maxLength = columnSizeKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(columnSizeKv.getBuffer(), columnSizeKv.getValueOffset(), null);
-        KeyValue decimalDigitKv = colKeyValues[DECIMAL_DIGITS_INDEX];
-        Integer scale = decimalDigitKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(decimalDigitKv.getBuffer(), decimalDigitKv.getValueOffset(), null);
-        KeyValue ordinalPositionKv = colKeyValues[ORDINAL_POSITION_INDEX];
-        int position = PDataType.INTEGER.getCodec().decodeInt(ordinalPositionKv.getBuffer(), ordinalPositionKv.getValueOffset(), null);
-        KeyValue nullableKv = colKeyValues[NULLABLE_INDEX];
-        boolean isNullable = PDataType.INTEGER.getCodec().decodeInt(nullableKv.getBuffer(), nullableKv.getValueOffset(), null) != ResultSetMetaData.columnNoNulls;
-        KeyValue sqlDataTypeKv = colKeyValues[SQL_DATA_TYPE_INDEX];
-        PDataType dataType = PDataType.fromTypeId(PDataType.INTEGER.getCodec().decodeInt(sqlDataTypeKv.getBuffer(), sqlDataTypeKv.getValueOffset(), null));
-        if (maxLength == null && dataType == PDataType.BINARY) dataType = PDataType.VARBINARY; // For backward compatibility.
-        KeyValue columnModifierKv = colKeyValues[COLUMN_MODIFIER_INDEX];
-        ColumnModifier sortOrder = columnModifierKv == null ? null : ColumnModifier.fromSystemValue(PDataType.INTEGER.getCodec().decodeInt(columnModifierKv.getBuffer(), columnModifierKv.getValueOffset(), null));
-        KeyValue arraySizeKv = colKeyValues[ARRAY_SIZE_INDEX];
-        Integer arraySize = arraySizeKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(arraySizeKv.getBuffer(), arraySizeKv.getValueOffset(), null);
-        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize);
+        Cell columnSizeKv = colKeyValues[COLUMN_SIZE_INDEX];
+        Integer maxLength =
+                columnSizeKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(
+                    columnSizeKv.getValueArray(), columnSizeKv.getValueOffset(), null);
+        Cell decimalDigitKv = colKeyValues[DECIMAL_DIGITS_INDEX];
+        Integer scale =
+                decimalDigitKv == null ? null : PDataType.INTEGER.getCodec().decodeInt(
+                    decimalDigitKv.getValueArray(), decimalDigitKv.getValueOffset(), null);
+        Cell ordinalPositionKv = colKeyValues[ORDINAL_POSITION_INDEX];
+        int position =
+                PDataType.INTEGER.getCodec().decodeInt(ordinalPositionKv.getValueArray(),
+                    ordinalPositionKv.getValueOffset(), null);
+        Cell nullableKv = colKeyValues[NULLABLE_INDEX];
+        boolean isNullable =
+                PDataType.INTEGER.getCodec().decodeInt(nullableKv.getValueArray(),
+                    nullableKv.getValueOffset(), null) != ResultSetMetaData.columnNoNulls;
+        Cell sqlDataTypeKv = colKeyValues[SQL_DATA_TYPE_INDEX];
+        PDataType dataType =
+                PDataType.fromTypeId(PDataType.INTEGER.getCodec().decodeInt(
+                    sqlDataTypeKv.getValueArray(), sqlDataTypeKv.getValueOffset(), null));
+        if (maxLength == null && dataType == PDataType.BINARY) dataType = PDataType.VARBINARY; // For
+                                                                                               // backward
+                                                                                               // compatibility.
+        Cell columnModifierKv = colKeyValues[COLUMN_MODIFIER_INDEX];
+        ColumnModifier sortOrder =
+                columnModifierKv == null ? null : ColumnModifier.fromSystemValue(PDataType.INTEGER
+                        .getCodec().decodeInt(columnModifierKv.getValueArray(),
+                            columnModifierKv.getValueOffset(), null));
+        
+        Cell arraySizeKv = colKeyValues[ARRAY_SIZE_INDEX];
+        Integer arraySize = arraySizeKv == null ? null : 
+          PDataType.INTEGER.getCodec().decodeInt(arraySizeKv.getValueArray(), arraySizeKv.getValueOffset(), null);
+    
+        PColumn column =
+                new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable,
+                        position - 1, sortOrder, arraySize);
         columns.add(column);
     }
-
-    private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp) throws IOException, SQLException {
-        List<KeyValue> results = Lists.newArrayList();
+    
+    private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp)
+        throws IOException, SQLException {
+        List<Cell> results = Lists.newArrayList();
         scanner.next(results);
         if (results.isEmpty()) {
             return null;
         }
-        KeyValue[] tableKeyValues = new KeyValue[TABLE_KV_COLUMNS.size()];
-        KeyValue[] colKeyValues = new KeyValue[COLUMN_KV_COLUMNS.size()];
-        
+        Cell[] tableKeyValues = new Cell[TABLE_KV_COLUMNS.size()];
+        Cell[] colKeyValues = new Cell[COLUMN_KV_COLUMNS.size()];
+    
         // Create PTable based on KeyValues from scan
-        KeyValue keyValue = results.get(0);
-        byte[] keyBuffer = keyValue.getBuffer();
+        Cell keyValue = results.get(0);
+        byte[] keyBuffer = keyValue.getRowArray();
         int keyLength = keyValue.getRowLength();
         int keyOffset = keyValue.getRowOffset();
         PName tenantId = newPName(keyBuffer, keyOffset, keyLength);
         int tenantIdLength = tenantId.getBytes().length;
-        PName schemaName = newPName(keyBuffer, keyOffset+tenantIdLength+1, keyLength);
+        PName schemaName = newPName(keyBuffer, keyOffset + tenantIdLength + 1, keyLength);
         int schemaNameLength = schemaName.getBytes().length;
-        int tableNameLength = keyLength-schemaNameLength-1-tenantIdLength-1;
+        int tableNameLength = keyLength - schemaNameLength - 1 - tenantIdLength - 1;
         byte[] tableNameBytes = new byte[tableNameLength];
-        System.arraycopy(keyBuffer, keyOffset+schemaNameLength+1+tenantIdLength+1, tableNameBytes, 0, tableNameLength);
+        System.arraycopy(keyBuffer, keyOffset + schemaNameLength + 1 + tenantIdLength + 1,
+            tableNameBytes, 0, tableNameLength);
         PName tableName = PNameFactory.newName(tableNameBytes);
-        
+    
         int offset = tenantIdLength + schemaNameLength + tableNameLength + 3;
         // This will prevent the client from continually looking for the current
         // table when we know that there will never be one since we disallow updates
@@ -349,20 +549,23 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
         // bump up the timeStamp to right before the client time stamp, since
         // we know it can't possibly change.
         long timeStamp = keyValue.getTimestamp();
-//        long timeStamp = tableTimeStamp > keyValue.getTimestamp() && 
-//                         clientTimeStamp < tableTimeStamp
-//                         ? clientTimeStamp-1 
-//                         : keyValue.getTimestamp();
-
+        // long timeStamp = tableTimeStamp > keyValue.getTimestamp() &&
+        // clientTimeStamp < tableTimeStamp
+        // ? clientTimeStamp-1
+        // : keyValue.getTimestamp();
+    
         int i = 0;
         int j = 0;
         while (i < results.size() && j < TABLE_KV_COLUMNS.size()) {
-            KeyValue kv = results.get(i);
-            KeyValue searchKv = TABLE_KV_COLUMNS.get(j);
-            int cmp = Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(), 
-                    searchKv.getBuffer(), searchKv.getQualifierOffset(), searchKv.getQualifierLength());
+            Cell kv = results.get(i);
+            Cell searchKv = TABLE_KV_COLUMNS.get(j);
+            int cmp =
+                    Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
+                        kv.getQualifierLength(), searchKv.getQualifierArray(),
+                        searchKv.getQualifierOffset(), searchKv.getQualifierLength());
             if (cmp == 0) {
-                timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find max timestamp of table header row
+                timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find max timestamp of table
+                                                                    // header row
                 tableKeyValues[j++] = kv;
                 i++;
             } else if (cmp > 0) {
@@ -374,83 +577,105 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
         // TABLE_TYPE, TABLE_SEQ_NUM and COLUMN_COUNT are required.
         if (tableKeyValues[TABLE_TYPE_INDEX] == null || tableKeyValues[TABLE_SEQ_NUM_INDEX] == null
                 || tableKeyValues[COLUMN_COUNT_INDEX] == null) {
-            throw new IllegalStateException("Didn't find expected key values for table row in metadata row");
+            throw new IllegalStateException(
+                    "Didn't find expected key values for table row in metadata row");
         }
-        KeyValue tableTypeKv = tableKeyValues[TABLE_TYPE_INDEX];
-        PTableType tableType = PTableType.fromSerializedValue(tableTypeKv.getBuffer()[tableTypeKv.getValueOffset()]);
-        KeyValue tableSeqNumKv = tableKeyValues[TABLE_SEQ_NUM_INDEX];
-        long tableSeqNum = PDataType.LONG.getCodec().decodeLong(tableSeqNumKv.getBuffer(), tableSeqNumKv.getValueOffset(), null);
-        KeyValue columnCountKv = tableKeyValues[COLUMN_COUNT_INDEX];
-        int columnCount = PDataType.INTEGER.getCodec().decodeInt(columnCountKv.getBuffer(), columnCountKv.getValueOffset(), null);
-        KeyValue pkNameKv = tableKeyValues[PK_NAME_INDEX];
-        PName pkName = pkNameKv != null ? newPName(pkNameKv.getBuffer(), pkNameKv.getValueOffset(), pkNameKv.getValueLength()) : null;
-        KeyValue saltBucketNumKv = tableKeyValues[SALT_BUCKETS_INDEX];
-        Integer saltBucketNum = saltBucketNumKv != null ? (Integer)PDataType.INTEGER.getCodec().decodeInt(saltBucketNumKv.getBuffer(), saltBucketNumKv.getValueOffset(), null) : null;
-        KeyValue dataTableNameKv = tableKeyValues[DATA_TABLE_NAME_INDEX];
-        PName dataTableName = dataTableNameKv != null ? newPName(dataTableNameKv.getBuffer(), dataTableNameKv.getValueOffset(), dataTableNameKv.getValueLength()) : null;
-        KeyValue indexStateKv = tableKeyValues[INDEX_STATE_INDEX];
-        PIndexState indexState = indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv.getBuffer()[indexStateKv.getValueOffset()]);
-        KeyValue immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
-        boolean isImmutableRows = immutableRowsKv == null ? false : (Boolean)PDataType.BOOLEAN.toObject(immutableRowsKv.getBuffer(), immutableRowsKv.getValueOffset(), immutableRowsKv.getValueLength());
-        KeyValue defaultFamilyNameKv = tableKeyValues[DEFAULT_COLUMN_FAMILY_INDEX];
-        PName defaultFamilyName = defaultFamilyNameKv != null ? newPName(defaultFamilyNameKv.getBuffer(), defaultFamilyNameKv.getValueOffset(), defaultFamilyNameKv.getValueLength()) : null;
-        KeyValue viewStatementKv = tableKeyValues[VIEW_STATEMENT_INDEX];
-        String viewStatement = viewStatementKv != null ? (String)PDataType.VARCHAR.toObject(viewStatementKv.getBuffer(), viewStatementKv.getValueOffset(), viewStatementKv.getValueLength()) : null;
-        KeyValue disableWALKv = tableKeyValues[DISABLE_WAL_INDEX];
-        boolean disableWAL = disableWALKv == null ? PTable.DEFAULT_DISABLE_WAL : Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(disableWALKv.getBuffer(), disableWALKv.getValueOffset(), disableWALKv.getValueLength()));
-        KeyValue multiTenantKv = tableKeyValues[MULTI_TENANT_INDEX];
-        boolean multiTenant = multiTenantKv == null ? false : Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(multiTenantKv.getBuffer(), multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
-        KeyValue viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
-        ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getBuffer()[viewTypeKv.getValueOffset()]);
+        Cell tableTypeKv = tableKeyValues[TABLE_TYPE_INDEX];
+        PTableType tableType =
+                PTableType
+                        .fromSerializedValue(tableTypeKv.getValueArray()[tableTypeKv.getValueOffset()]);
+        Cell tableSeqNumKv = tableKeyValues[TABLE_SEQ_NUM_INDEX];
+        long tableSeqNum =
+                PDataType.LONG.getCodec().decodeLong(tableSeqNumKv.getValueArray(),
+                    tableSeqNumKv.getValueOffset(), null);
+        Cell columnCountKv = tableKeyValues[COLUMN_COUNT_INDEX];
+        int columnCount =
+                PDataType.INTEGER.getCodec().decodeInt(columnCountKv.getValueArray(),
+                    columnCountKv.getValueOffset(), null);
+        Cell pkNameKv = tableKeyValues[PK_NAME_INDEX];
+        PName pkName =
+                pkNameKv != null ? newPName(pkNameKv.getValueArray(), pkNameKv.getValueOffset(),
+                    pkNameKv.getValueLength()) : null;
+        Cell saltBucketNumKv = tableKeyValues[SALT_BUCKETS_INDEX];
+        Integer saltBucketNum =
+                saltBucketNumKv != null ? (Integer) PDataType.INTEGER.getCodec().decodeInt(
+                    saltBucketNumKv.getValueArray(), saltBucketNumKv.getValueOffset(), null) : null;
+        Cell dataTableNameKv = tableKeyValues[DATA_TABLE_NAME_INDEX];
+        PName dataTableName =
+                dataTableNameKv != null ? newPName(dataTableNameKv.getValueArray(),
+                    dataTableNameKv.getValueOffset(), dataTableNameKv.getValueLength()) : null;
+        Cell indexStateKv = tableKeyValues[INDEX_STATE_INDEX];
+        PIndexState indexState =
+                indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv
+                        .getValueArray()[indexStateKv.getValueOffset()]);
+        Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
+        boolean isImmutableRows =
+                immutableRowsKv == null ? false : (Boolean) PDataType.BOOLEAN.toObject(
+                    immutableRowsKv.getValueArray(), immutableRowsKv.getValueOffset(),
+                    immutableRowsKv.getValueLength());
+        Cell defaultFamilyNameKv = tableKeyValues[DEFAULT_COLUMN_FAMILY_INDEX];
+        PName defaultFamilyName = defaultFamilyNameKv != null ? newPName(defaultFamilyNameKv.getValueArray(), defaultFamilyNameKv.getValueOffset(), defaultFamilyNameKv.getValueLength()) : null;
+        Cell viewStatementKv = tableKeyValues[VIEW_STATEMENT_INDEX];
+        String viewStatement = viewStatementKv != null ? (String)PDataType.VARCHAR.toObject(viewStatementKv.getValueArray(), viewStatementKv.getValueOffset(), viewStatementKv.getValueLength()) : null;
+        Cell disableWALKv = tableKeyValues[DISABLE_WAL_INDEX];
+        boolean disableWAL = disableWALKv == null ? PTable.DEFAULT_DISABLE_WAL : Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(disableWALKv.getValueArray(), disableWALKv.getValueOffset(), disableWALKv.getValueLength()));
+        Cell multiTenantKv = tableKeyValues[MULTI_TENANT_INDEX];
+        boolean multiTenant = multiTenantKv == null ? false : Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(multiTenantKv.getValueArray(), multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
+        Cell viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
+        ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
         
         List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
         List<PTable> indexes = new ArrayList<PTable>();
         List<PName> physicalTables = new ArrayList<PName>();
         while (true) {
-            results.clear();
-            scanner.next(results);
-            if (results.isEmpty()) {
-                break;
-            }
-            KeyValue colKv = results.get(LINK_TYPE_INDEX);
-            int colKeyLength = colKv.getRowLength();
-            PName colName = newPName(colKv.getBuffer(), colKv.getRowOffset() + offset, colKeyLength-offset);
-            int colKeyOffset = offset + colName.getBytes().length + 1;
-            PName famName = newPName(colKv.getBuffer(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
-            if (colName.getString().isEmpty() && famName != null) {
-                LinkType linkType = LinkType.fromSerializedValue(colKv.getBuffer()[colKv.getValueOffset()]);
-                if (linkType == LinkType.INDEX_TABLE) {
-                    addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
-                } else if (linkType == LinkType.PHYSICAL_TABLE) {
-                    physicalTables.add(famName);
-                } else {
-                    logger.warn("Unknown link type: " + colKv.getBuffer()[colKv.getValueOffset()] + " for " + SchemaUtil.getTableName(schemaName.getString(), tableName.getString()));
-                }
-            } else {
-                addColumnToTable(results, colName, famName, colKeyValues, columns);
-            }
+          results.clear();
+          scanner.next(results);
+          if (results.isEmpty()) {
+              break;
+          }
+          Cell colKv = results.get(LINK_TYPE_INDEX);
+          int colKeyLength = colKv.getRowLength();
+          PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset);
+          int colKeyOffset = offset + colName.getBytes().length + 1;
+          PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
+          if (colName.getString().isEmpty() && famName != null) {
+              LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
+              if (linkType == LinkType.INDEX_TABLE) {
+                  addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
+              } else if (linkType == LinkType.PHYSICAL_TABLE) {
+                  physicalTables.add(famName);
+              } else {
+                  logger.warn("Unknown link type: " + colKv.getValueArray()[colKv.getValueOffset()] + " for " + SchemaUtil.getTableName(schemaName.getString(), tableName.getString()));
+              }
+          } else {
+              addColumnToTable(results, colName, famName, colKeyValues, columns);
+          }
         }
         
-        return PTableImpl.makePTable(schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? dataTableName : null, 
-                indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, multiTenant, viewType);
+        return PTableImpl.makePTable(schemaName, tableName, tableType, indexState, timeStamp, 
+            tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? dataTableName : null, 
+            indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, 
+            multiTenant, viewType);
     }
 
-    private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) throws IOException {
+    private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
+        long clientTimeStamp) throws IOException {
         if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
             return null;
         }
-        
+    
         Scan scan = newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
         scan.setFilter(new FirstKeyOnlyFilter());
         scan.setRaw(true);
         RegionScanner scanner = region.getScanner(scan);
-        List<KeyValue> results = Lists.<KeyValue>newArrayList();
+        List<Cell> results = Lists.<Cell> newArrayList();
         scanner.next(results);
         // HBase ignores the time range on a raw scan (HBASE-7362)
         if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
-            KeyValue kv = results.get(0);
-            if (kv.isDelete()) {
-                Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+            Cell kv = results.get(0);
+            if (kv.getTypeByte() == Type.Delete.getCode()) {
+                Map<ImmutableBytesPtr, PTable> metaDataCache =
+                        GlobalCache.getInstance(this.env).getMetaDataCache();
                 PTable table = newDeletedTableMarker(kv.getTimestamp());
                 metaDataCache.put(cacheKey, table);
                 return table;
@@ -467,16 +692,21 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
         return table.getName() == null;
     }
 
-    private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key, ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp) throws IOException, SQLException {
+    private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
+        ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
+        throws IOException, SQLException {
         HRegion region = env.getRegion();
-        Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+        Map<ImmutableBytesPtr, PTable> metaDataCache =
+                GlobalCache.getInstance(this.env).getMetaDataCache();
         PTable table = metaDataCache.get(cacheKey);
         // We always cache the latest version - fault in if not in cache
         if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp)) != null) {
             return table;
         }
-        // if not found then check if newer table already exists and add delete marker for timestamp found
-        if (table == null && (table=buildDeletedTable(key, cacheKey, region, clientTimeStamp)) != null) {
+        // if not found then check if newer table already exists and add delete marker for timestamp
+        // found
+        if (table == null
+                && (table = buildDeletedTable(key, cacheKey, region, clientTimeStamp)) != null) {
             return table;
         }
         return null;
@@ -484,102 +714,136 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
     
     
     @Override
-    public MetaDataMutationResult createTable(List<Mutation> tableMetadata) throws IOException {
+    public void createTable(RpcController controller, CreateTableRequest request,
+            RpcCallback<MetaDataResponse> done) {
+        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
         byte[][] rowKeyMetaData = new byte[3][];
-        MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata,rowKeyMetaData);
-        byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
-        byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
-        byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
-        
+        byte[] schemaName = null;
+        byte[] tableName = null;
+
         try {
+            List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
+            MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
+            byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+            schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+            tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
             byte[] parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
             byte[] lockTableName = parentTableName == null ? tableName : parentTableName;
             byte[] lockKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, lockTableName);
-            byte[] key = parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
+            byte[] key =
+                    parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes,
+                        schemaName, tableName);
             byte[] parentKey = parentTableName == null ? null : lockKey;
-            
-            RegionCoprocessorEnvironment env = getEnvironment();
+
             HRegion region = env.getRegion();
             MetaDataMutationResult result = checkTableKeyInRegion(lockKey, region);
             if (result != null) {
-                return result; 
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
             }
-            List<Integer> lids = Lists.newArrayList(5);
+            List<RowLock> locks = Lists.newArrayList();
             long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
             try {
-                acquireLock(region, lockKey, lids);
+                acquireLock(region, lockKey, locks);
                 if (key != lockKey) {
-                    acquireLock(region, key, lids);
+                    acquireLock(region, key, locks);
                 }
                 // Load parent table first
                 PTable parentTable = null;
                 ImmutableBytesPtr parentCacheKey = null;
                 if (parentKey != null) {
                     parentCacheKey = new ImmutableBytesPtr(parentKey);
-                    parentTable = loadTable(env, parentKey, parentCacheKey, clientTimeStamp, clientTimeStamp);
+                    parentTable =
+                            loadTable(env, parentKey, parentCacheKey, clientTimeStamp,
+                                clientTimeStamp);
                     if (parentTable == null || isTableDeleted(parentTable)) {
-                        return new MetaDataMutationResult(MutationCode.PARENT_TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), parentTable);
+                        builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
+                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                        builder.setTable(PTableImpl.toProto(parentTable));
+                        done.run(builder.build());
+                        return;
                     }
                     // If parent table isn't at the expected sequence number, then return
-                    if (parentTable.getSequenceNumber() != MetaDataUtil.getParentSequenceNumber(tableMetadata)) {
-                        return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), parentTable);
+                    if (parentTable.getSequenceNumber() != MetaDataUtil
+                            .getParentSequenceNumber(tableMetadata)) {
+                        builder.setReturnCode(MetaDataProtos.MutationCode.CONCURRENT_TABLE_MUTATION);
+                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                        builder.setTable(PTableImpl.toProto(parentTable));
+                        done.run(builder.build());
+                        return;
                     }
                 }
                 // Load child table next
                 ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
-                // Get as of latest timestamp so we can detect if we have a newer table that already exists
+                // Get as of latest timestamp so we can detect if we have a newer table that already
+                // exists
                 // without making an additional query
-                PTable table = loadTable(env, key, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
+                PTable table =
+                        loadTable(env, key, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
                 if (table != null) {
                     if (table.getTimeStamp() < clientTimeStamp) {
-                        // If the table is older than the client time stamp and it's deleted, continue
+                        // If the table is older than the client time stamp and it's deleted,
+                        // continue
                         if (!isTableDeleted(table)) {
-                            return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), table);
+                            builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
+                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                            builder.setTable(PTableImpl.toProto(table));
+                            done.run(builder.build());
+                            return;
                         }
                     } else {
-                        return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table);
+                        builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_TABLE_FOUND);
+                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                        builder.setTable(PTableImpl.toProto(table));
+                        done.run(builder.build());
+                        return;
                     }
                 }
-                
-                // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the system
+                // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the
+                // system
                 // table. Basically, we get all the locks that we don't already hold for all the
-                // tableMetadata rows. This ensures we don't have deadlock situations (ensuring primary and
-                // then index table locks are held, in that order). For now, we just don't support indexing
-                // on the system table. This is an issue because of the way we manage batch mutation in the
+                // tableMetadata rows. This ensures we don't have deadlock situations (ensuring
+                // primary and
+                // then index table locks are held, in that order). For now, we just don't support
+                // indexing
+                // on the system table. This is an issue because of the way we manage batch mutation
+                // in the
                 // Indexer.
-                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
-                
+                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
+
                 // Invalidate the cache - the next getTable call will add it
-                // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
-                Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+                // TODO: consider loading the table that was just created here, patching up the
+                // parent table, and updating the cache
+                Map<ImmutableBytesPtr, PTable> metaDataCache =
+                        GlobalCache.getInstance(this.env).getMetaDataCache();
                 if (parentCacheKey != null) {
                     metaDataCache.remove(parentCacheKey);
                 }
                 metaDataCache.remove(cacheKey);
                 // Get timeStamp from mutations - the above method sets it if it's unset
                 long currentTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
-                return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, currentTimeStamp, null);
+                builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
+                builder.setMutationTime(currentTimeStamp);
+                done.run(builder.build());
+                return;
             } finally {
-                releaseLocks(region, lids);
+                region.releaseRowLocks(locks);
             }
         } catch (Throwable t) {
-            ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
-            return null; // impossible
+          logger.error("createTable failed", t);
+            ProtobufUtil.setControllerException(controller,
+                ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
         }
     }
 
-    private static void acquireLock(HRegion region, byte[] key, List<Integer> lids) throws IOException {
-        Integer lid = region.getLock(null, key, true);
-        if (lid == null) {
+
+    private static void acquireLock(HRegion region, byte[] key, List<RowLock> locks)
+        throws IOException {
+        RowLock rowLock = region.getRowLock(key);
+        if (rowLock == null) {
             throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
         }
-        lids.add(lid);
-    }
-    
-    private static void releaseLocks(HRegion region, List<Integer> lids) {
-        for (Integer lid : lids) {
-            region.releaseRowLock(lid);
-        }
+        locks.add(rowLock);
     }
     
     private static final byte[] PHYSICAL_TABLE_BYTES = new byte[] {PTable.LinkType.PHYSICAL_TABLE.getSerializedValue()};
@@ -610,7 +874,7 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
         scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
         RegionScanner scanner = region.getScanner(scan);
         try {
-            List<KeyValue> results = newArrayList();
+            List<Cell> results = newArrayList();
             scanner.next(results);
             return results.size() > 0;
         }
@@ -620,108 +884,141 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
     }
     
     @Override
-    public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, String tableType) throws IOException {
+    public void dropTable(RpcController controller, DropTableRequest request,
+            RpcCallback<MetaDataResponse> done) {
+        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
         byte[][] rowKeyMetaData = new byte[3][];
-        MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata,rowKeyMetaData);
-        byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
-        byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
-        byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
-        // Disallow deletion of a system table
-        if (tableType.equals(PTableType.SYSTEM.getSerializedValue())) {
-            return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
-        }
-        List<byte[]> tableNamesToDelete = Lists.newArrayList();
+        String tableType = request.getTableType();
+        byte[] schemaName = null;
+        byte[] tableName = null;
+
         try {
+            List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
+            MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
+            byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+            schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+            tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+            // Disallow deletion of a system table
+            if (tableType.equals(PTableType.SYSTEM.getSerializedValue())) {
+                builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
+                builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                done.run(builder.build());
+                return;
+            }
+            List<byte[]> tableNamesToDelete = Lists.newArrayList();
             byte[] parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
             byte[] lockTableName = parentTableName == null ? tableName : parentTableName;
             byte[] lockKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, lockTableName);
-            byte[] key = parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
-            
-            RegionCoprocessorEnvironment env = getEnvironment();
+            byte[] key =
+                    parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes,
+                        schemaName, tableName);
+
             HRegion region = env.getRegion();
             MetaDataMutationResult result = checkTableKeyInRegion(key, region);
             if (result != null) {
-                return result; 
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
             }
-            List<Integer> lids = Lists.newArrayList(5);
+            List<RowLock> locks = Lists.newArrayList();
             try {
-                acquireLock(region, lockKey, lids);
+                acquireLock(region, lockKey, locks);
                 if (key != lockKey) {
-                    acquireLock(region, key, lids);
+                    acquireLock(region, key, locks);
                 }
                 List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
-                result = doDropTable(key, tenantIdBytes, schemaName, tableName, PTableType.fromSerializedValue(tableType), tableMetadata, invalidateList, lids, tableNamesToDelete);
-                if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS || result.getTable() == null) {
-                    return result;
+                result =
+                        doDropTable(key, tenantIdBytes, schemaName, tableName,
+                            PTableType.fromSerializedValue(tableType), tableMetadata,
+                            invalidateList, locks, tableNamesToDelete);
+                if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS
+                        || result.getTable() == null) {
+                    done.run(MetaDataMutationResult.toProto(result));
+                    return;
                 }
-                Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+                Map<ImmutableBytesPtr, PTable> metaDataCache =
+                        GlobalCache.getInstance(this.env).getMetaDataCache();
                 // Commit the list of deletion.
-                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
+                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
                 long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
-                for (ImmutableBytesPtr ckey: invalidateList) {
+                for (ImmutableBytesPtr ckey : invalidateList) {
                     metaDataCache.put(ckey, newDeletedTableMarker(currentTime));
                 }
                 if (parentTableName != null) {
                     ImmutableBytesPtr parentCacheKey = new ImmutableBytesPtr(lockKey);
                     metaDataCache.remove(parentCacheKey);
                 }
-                return result;
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
             } finally {
-                releaseLocks(region, lids);
+                region.releaseRowLocks(locks);
             }
         } catch (Throwable t) {
-            ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
-            return null; // impossible
+          logger.error("dropTable failed", t);
+            ProtobufUtil.setControllerException(controller,
+                ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
         }
     }
 
-    private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName, byte[] tableName, PTableType tableType, 
-            List<Mutation> rowsToDelete, List<ImmutableBytesPtr> invalidateList, List<Integer> lids, List<byte[]> tableNamesToDelete) throws IOException, SQLException {
+    private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName,
+        byte[] tableName, PTableType tableType, List<Mutation> rowsToDelete,
+        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks,
+        List<byte[]> tableNamesToDelete) throws IOException, SQLException {
         long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete);
-
-        RegionCoprocessorEnvironment env = getEnvironment();
+    
         HRegion region = env.getRegion();
         ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
-        
-        Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+    
+        Map<ImmutableBytesPtr, PTable> metaDataCache =
+                GlobalCache.getInstance(this.env).getMetaDataCache();
         PTable table = metaDataCache.get(cacheKey);
-        
+    
         // We always cache the latest version - fault in if not in cache
-        if (table != null || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) != null) {
+        if (table != null
+                || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) != null) {
             if (table.getTimeStamp() < clientTimeStamp) {
                 // If the table is older than the client time stamp and its deleted, continue
                 if (isTableDeleted(table)) {
-                    return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), null);
+                    return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,
+                            EnvironmentEdgeManager.currentTimeMillis(), null);
                 }
-                if ( tableType != table.getType())  {
+                if (tableType != table.getType()) {
                     // We said to drop a table, but found a view or visa versa
-                    return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+                    return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                            EnvironmentEdgeManager.currentTimeMillis(), null);
                 }
             } else {
-                return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table);
+                return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
+                        EnvironmentEdgeManager.currentTimeMillis(), table);
             }
         }
         if (table == null && buildDeletedTable(key, cacheKey, region, clientTimeStamp) != null) {
-            return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+            return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
+                    EnvironmentEdgeManager.currentTimeMillis(), null);
         }
         // Get mutations for main table.
         Scan scan = newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
         RegionScanner scanner = region.getScanner(scan);
-        List<KeyValue> results = Lists.newArrayList();
+        List<Cell> results = Lists.newArrayList();
         scanner.next(results);
         if (results.isEmpty()) {
-            return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+            return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                    EnvironmentEdgeManager.currentTimeMillis(), null);
         }
-        KeyValue typeKeyValue = KeyValueUtil.getColumnLatest(results, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.TABLE_TYPE_BYTES);
-        assert(typeKeyValue != null && typeKeyValue.getValueLength() == 1);
-        if ( tableType != PTableType.fromSerializedValue(typeKeyValue.getBuffer()[typeKeyValue.getValueOffset()]))  {
+        Cell typeKeyValue =
+                KeyValueUtil.getColumnLatest(results, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    PhoenixDatabaseMetaData.TABLE_TYPE_BYTES);
+        assert (typeKeyValue != null && typeKeyValue.getValueLength() == 1);
+        if (tableType != PTableType.fromSerializedValue(typeKeyValue.getValueArray()[typeKeyValue
+                .getValueOffset()])) {
             // We said to drop a table, but found a view or visa versa
-            return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+            return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                    EnvironmentEdgeManager.currentTimeMillis(), null);
         }
         // Don't allow a table with views to be deleted
         // TODO: support CASCADE with DROP
         if (tableType == PTableType.TABLE && hasViews(region, tenantId, table)) {
-            return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
+            return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, 
+              EnvironmentEdgeManager.currentTimeMillis(), null);
         }
         if (table.getType() != PTableType.VIEW) { // Add to list of HTables to delete, unless it's a view
             tableNamesToDelete.add(table.getName().getBytes());
@@ -729,141 +1026,178 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
         List<byte[]> indexNames = Lists.newArrayList();
         invalidateList.add(cacheKey);
         byte[][] rowKeyMetaData = new byte[5][];
-        byte[] rowKey;
         do {
-            KeyValue kv = results.get(LINK_TYPE_INDEX);
-            rowKey = kv.getRow();
-            int nColumns = getVarChars(rowKey, rowKeyMetaData);
-            if (nColumns == 5 && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length == 0 && rowKeyMetaData[PhoenixDatabaseMetaData.INDEX_NAME_INDEX].length > 0
-                    && Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(), LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length) == 0
-                    && LinkType.fromSerializedValue(kv.getBuffer()[kv.getValueOffset()]) == LinkType.INDEX_TABLE) {
+            Cell kv = results.get(LINK_TYPE_INDEX);
+            int nColumns = getVarChars(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), 0, rowKeyMetaData);
+            if (nColumns == 5
+                    && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length == 0
+                    && rowKeyMetaData[PhoenixDatabaseMetaData.INDEX_NAME_INDEX].length > 0
+                    && Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length) == 0
+                    && LinkType.fromSerializedValue(kv.getValueArray()[kv.getValueOffset()]) == LinkType.INDEX_TABLE) {
                 indexNames.add(rowKeyMetaData[PhoenixDatabaseMetaData.INDEX_NAME_INDEX]);
             }
-            @SuppressWarnings("deprecation") // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
+            @SuppressWarnings("deprecation")
+            // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
             // FIXME: the version of the Delete constructor without the lock args was introduced
             // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
             // of the client.
-            Delete delete = new Delete(rowKey, clientTimeStamp, null);
+            Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), clientTimeStamp);
             rowsToDelete.add(delete);
             results.clear();
             scanner.next(results);
         } while (!results.isEmpty());
-        
+    
         // Recursively delete indexes
         for (byte[] indexName : indexNames) {
             byte[] indexKey = SchemaUtil.getTableKey(tenantId, schemaName, indexName);
-            @SuppressWarnings("deprecation") // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
+            @SuppressWarnings("deprecation")
+            // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
             // FIXME: the version of the Delete constructor without the lock args was introduced
             // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
             // of the client.
-            Delete delete = new Delete(indexKey, clientTimeStamp, null);
+            Delete delete = new Delete(indexKey, clientTimeStamp);
             rowsToDelete.add(delete);
-            acquireLock(region, indexKey, lids);
-            MetaDataMutationResult result = doDropTable(indexKey, tenantId, schemaName, indexName, PTableType.INDEX, rowsToDelete, invalidateList, lids, tableNamesToDelete);
-            if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS || result.getTable() == null) {
+            acquireLock(region, indexKey, locks);
+            MetaDataMutationResult result =
+                    doDropTable(indexKey, tenantId, schemaName, indexName, PTableType.INDEX,
+                        rowsToDelete, invalidateList, locks, tableNamesToDelete);
+            if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS
+                    || result.getTable() == null) {
                 return result;
             }
         }
-        
-        return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), table, tableNamesToDelete);
+    
+        return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,
+                EnvironmentEdgeManager.currentTimeMillis(), table, tableNamesToDelete);
     }
 
     private static interface ColumnMutator {
-        MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData, List<Mutation> tableMetadata, HRegion region, List<ImmutableBytesPtr> invalidateList, List<Integer> lids) throws IOException, SQLException;
+      MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
+          List<Mutation> tableMetadata, HRegion region,
+          List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) throws IOException,
+          SQLException;
     }
 
-    private MetaDataMutationResult mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator) throws IOException {
+    private MetaDataMutationResult
+    mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator) throws IOException {
         byte[][] rowKeyMetaData = new byte[5][];
-        MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata,rowKeyMetaData);
+        MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
         byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
         byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
         byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
         try {
-            RegionCoprocessorEnvironment env = getEnvironment();
             byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
             HRegion region = env.getRegion();
             MetaDataMutationResult result = checkTableKeyInRegion(key, region);
             if (result != null) {
-                return result; 
+                return result;
             }
-            List<Integer> lids = Lists.newArrayList(5);
+            List<RowLock> locks = Lists.newArrayList();
             try {
-                acquireLock(region, key, lids);
+                acquireLock(region, key, locks);
                 ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
                 List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
                 invalidateList.add(cacheKey);
-                Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
+                Map<ImmutableBytesPtr, PTable> metaDataCache =
+                        GlobalCache.getInstance(this.env).getMetaDataCache();
                 PTable table = metaDataCache.get(cacheKey);
                 if (logger.isDebugEnabled()) {
                     if (table == null) {
-                        logger.debug("Table " + Bytes.toStringBinary(key) + " not found in cache. Will build through scan");
+                        logger.debug("Table " + Bytes.toStringBinary(key)
+                                + " not found in cache. Will build through scan");
                     } else {
-                        logger.debug("Table " + Bytes.toStringBinary(key) + " found in cache with timestamp " + table.getTimeStamp() + " seqNum " + table.getSequenceNumber());
+                        logger.debug("Table " + Bytes.toStringBinary(key)
+                                + " found in cache with timestamp " + table.getTimeStamp()
+                                + " seqNum " + table.getSequenceNumber());
                     }
                 }
                 // Get client timeStamp from mutations
                 long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
-                if (table == null && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) == null) {
-                    // if not found then call newerTableExists and add delete marker for timestamp found
+                if (table == null
+                        && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) == null) {
+                    // if not found then call newerTableExists and add delete marker for timestamp
+                    // found
                     if (buildDeletedTable(key, cacheKey, region, clientTimeStamp) != null) {
-                        return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+                        return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
+                                EnvironmentEdgeManager.currentTimeMillis(), null);
                     }
-                    return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+                    return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                            EnvironmentEdgeManager.currentTimeMillis(), null);
                 }
                 if (table.getTimeStamp() >= clientTimeStamp) {
-                    return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), table);
+                    return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
+                            EnvironmentEdgeManager.currentTimeMillis(), table);
                 } else if (isTableDeleted(table)) {
-                    return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+                    return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                            EnvironmentEdgeManager.currentTimeMillis(), null);
                 }
-                    
-                long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup TABLE_SEQ_NUM in tableMetaData
+        
+                long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup
+                                                                                         // TABLE_SEQ_NUM
+                                                                                         // in
+                                                                                         // tableMetaData
                 if (logger.isDebugEnabled()) {
-                    logger.debug("For table " + Bytes.toStringBinary(key) + " expecting seqNum " + expectedSeqNum + " and found seqNum " + table.getSequenceNumber() + " with " + table.getColumns().size() + " columns: " + table.getColumns());
+                    logger.debug("For table " + Bytes.toStringBinary(key) + " expecting seqNum "
+                            + expectedSeqNum + " and found seqNum " + table.getSequenceNumber()
+                            + " with " + table.getColumns().size() + " columns: "
+                            + table.getColumns());
                 }
                 if (expectedSeqNum != table.getSequenceNumber()) {
                     if (logger.isDebugEnabled()) {
-                        logger.debug("For table " + Bytes.toStringBinary(key) + " returning CONCURRENT_TABLE_MUTATION due to unexpected seqNum");
+                        logger.debug("For table " + Bytes.toStringBinary(key)
+                                + " returning CONCURRENT_TABLE_MUTATION due to unexpected seqNum");
                     }
-                    return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), table);
+                    return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION,
+                            EnvironmentEdgeManager.currentTimeMillis(), table);
                 }
-                
+        
                 PTableType type = table.getType();
-                if (type == PTableType.INDEX) { 
+                if (type == PTableType.INDEX) {
                     // Disallow mutation of an index table
-                    return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
+                    return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
+                            EnvironmentEdgeManager.currentTimeMillis(), null);
                 } else {
-                    PTableType expectedType = MetaDataUtil.getTableType(tableMetadata);
-                    // We said to drop a table, but found a view or visa versa
-                    if (type != expectedType) {
-                        return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
-                    }
-                    if (hasViews(region, tenantId, table)) {
-                        // Disallow any column mutations for parents of tenant tables
-                        return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
-                    }
+                  PTableType expectedType = MetaDataUtil.getTableType(tableMetadata);
+                  // We said to drop a table, but found a view or visa versa
+                  if (type != expectedType) {
+                      return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
+                  }
+                  if (hasViews(region, tenantId, table)) {
+                      // Disallow any column mutations for parents of tenant tables
+                      return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), null);
+                  }
                 }
-                result = mutator.updateMutation(table, rowKeyMetaData, tableMetadata, region, invalidateList, lids);
+                result = mutator.updateMutation(table, rowKeyMetaData, tableMetadata, region,
+                            invalidateList, locks);
                 if (result != null) {
                     return result;
                 }
-                
-                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]>emptySet());
+        
+                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
                 // Invalidate from cache
                 for (ImmutableBytesPtr invalidateKey : invalidateList) {
                     PTable invalidatedTable = metaDataCache.remove(invalidateKey);
                     if (logger.isDebugEnabled()) {
                         if (invalidatedTable == null) {
-                            logger.debug("Attempted to invalidated table key " + Bytes.toStringBinary(cacheKey.get(),cacheKey.getOffset(),cacheKey.getLength()) + " but found no cached table");
+                            logger.debug("Attempted to invalidated table key "
+                                    + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
+                                        cacheKey.getLength()) + " but found no cached table");
                         } else {
-                            logger.debug("Invalidated table key " + Bytes.toStringBinary(cacheKey.get(),cacheKey.getOffset(),cacheKey.getLength()) + " with timestamp " + invalidatedTable.getTimeStamp() + " and seqNum " + invalidatedTable.getSequenceNumber());
+                            logger.debug("Invalidated table key "
+                                    + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
+                                        cacheKey.getLength()) + " with timestamp "
+                                    + invalidatedTable.getTimeStamp() + " and seqNum "
+                                    + invalidatedTable.getSequenceNumber());
                         }
                     }
                 }
-                // Get client timeStamp from mutations, since it may get updated by the mutateRowsWithLocks call
+                // Get client timeStamp from mutations, since it may get updated by the
+                // mutateRowsWithLocks call
                 long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
-                return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, null);
+                return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime,
+                        null);
             } finally {
-                releaseLocks(region,lids);
+                region.releaseRowLocks(locks);
             }
         } catch (Throwable t) {
             ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
@@ -871,261 +1205,251 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
         }
     }
 
+
     @Override
-    public MetaDataMutationResult addColumn(List<Mutation> tableMetaData) throws IOException {
-        return mutateColumn(tableMetaData, new ColumnMutator() {
-            @Override
-            public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData, List<Mutation> tableMetaData, HRegion region, List<ImmutableBytesPtr> invalidateList, List<Integer> lids) {
-                byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
-                byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
-                byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
-                for (Mutation m : tableMetaData) {
-                    byte[] key = m.getRow();
-                    boolean addingPKColumn = false;
-                    int pkCount = getVarChars(key, rowKeyMetaData);
-                    if (pkCount > COLUMN_NAME_INDEX 
-                            && Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0 
-                            && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0 ) {
-                        try {
-                            if (pkCount > FAMILY_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
-                                PColumnFamily family = table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
-                                family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
-                            } else if (pkCount > COLUMN_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
-                                addingPKColumn = true;
-                                table.getPKColumn(new String(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
-                            } else {
-                                continue;
-                            }
-                            return new MetaDataMutationResult(MutationCode.COLUMN_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), table);
-                        } catch (ColumnFamilyNotFoundException e) {
-                            continue;
-                        } catch (ColumnNotFoundException e) {
-                            if (addingPKColumn) {
-                                // Add all indexes to invalidate list, as they will all be adding the same PK column
-                                // No need to lock them, as we have the parent table lock at this point
-                                for (PTable index : table.getIndexes()) {
-                                    invalidateList.add(new ImmutableBytesPtr(SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(),index.getTableName().getBytes())));
-                                }
-                            }
-                            continue;
-                        }
-                    }
-                }
-                return null;
-            }
-        });
-    }
-    
-    @Override
-    public MetaDataMutationResult dropColumn(List<Mutation> tableMetaData) throws IOException {
-        final long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
-        final List<byte[]> tableNamesToDelete = Lists.newArrayList();
-        return mutateColumn(tableMetaData, new ColumnMutator() {
-            @SuppressWarnings("deprecation")
-            @Override
-            public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData, List<Mutation> tableMetaData, HRegion region, List<ImmutableBytesPtr> invalidateList, List<Integer> lids) throws IOException, SQLException {
-                byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
-                byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
-                byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
-                boolean deletePKColumn = false;
-                List<Mutation> additionalTableMetaData = Lists.newArrayList();
-                for (Mutation m : tableMetaData) {
-                    if (m instanceof Delete) {
+    public void addColumn(RpcController controller, AddColumnRequest request,
+            RpcCallback<MetaDataResponse> done) {
+        try {
+            List<Mutation> tableMetaData = ProtobufUtil.getMutations(request);
+            MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() {
+                @Override
+                public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
+                        List<Mutation> tableMetaData, HRegion region,
+                        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) {
+                    byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
+                    byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
+                    byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
+                    for (Mutation m : tableMetaData) {
                         byte[] key = m.getRow();
+                        boolean addingPKColumn = false;
                         int pkCount = getVarChars(key, rowKeyMetaData);
-                        if (pkCount > COLUMN_NAME_INDEX 
-                                && Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0 
-                                && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0 ) {
-                            PColumn columnToDelete = null;
+                        if (pkCount > COLUMN_NAME_INDEX
+                                && Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
+                                && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
                             try {
-                                if (pkCount > FAMILY_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
-                                    PColumnFamily family = table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
-                                    columnToDelete = family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
-                                } else if (pkCount > COLUMN_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
-                                    deletePKColumn = true;
-                                    columnToDelete = table.getPKColumn(new String(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
+                                if (pkCount > FAMILY_NAME_INDEX
+                                        && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
+                                    PColumnFamily family =
+                                            table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
+                                    family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+                                } else if (pkCount > COLUMN_NAME_INDEX
+                                        && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
+                                    addingPKColumn = true;
+                                    table.getPKColumn(new String(
+                                            rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
                                 } else {
                                     continue;
                                 }
-                                // Look for columnToDelete in any indexes. If found as PK column, get lock and drop the index. If found as covered column, delete from index (do this client side?).
-                                // In either case, invalidate index if the column is in it
-     

<TRUNCATED>