You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/03/20 20:36:47 UTC

svn commit: r1459013 [1/8] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-client/src/test/java/org...

Author: stack
Date: Wed Mar 20 19:36:46 2013
New Revision: 1459013

URL: http://svn.apache.org/r1459013
Log:
HBASE-7905 Add passing of optional cell blocks over rpc

Added:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FatalConnectionException.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ReflectionCache.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java
    hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/
    hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
    hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
    hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java
    hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MultiRowMutation.java
    hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
    hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
    hbase/trunk/hbase-protocol/src/main/protobuf/MultiRowMutation.proto
    hbase/trunk/hbase-protocol/src/main/protobuf/RPC.proto
    hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterWithScanLimits.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Mar 20 19:36:46 2013
@@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.util.Pair
 import org.apache.hadoop.hbase.util.SoftValueSortedMap;
 import org.apache.hadoop.hbase.util.Triple;
 import org.apache.hadoop.hbase.zookeeper.*;
-import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
@@ -523,7 +522,6 @@ public class HConnectionManager {
     // package protected for the tests
     ClusterStatusListener clusterStatusListener;
 
-    private final Object metaRegionLock = new Object();
     private final Object userRegionLock = new Object();
 
     // We have a single lock for master & zk to prevent deadlocks. Having
@@ -645,7 +643,7 @@ public class HConnectionManager {
      * @return
      */
     public String toString(){
-      return "hconnection 0x" + Integer.toHexString( hashCode() );
+      return "hconnection-0x" + Integer.toHexString(hashCode());
     }
 
     private String clusterId = null;
@@ -882,7 +880,7 @@ public class HConnectionManager {
       MetaScanner.metaScan(conf, visitor, tableName);
       return available.get() && (regionCount.get() > 0);
     }
-    
+
     @Override
     public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
         throws IOException {
@@ -1011,13 +1009,16 @@ public class HConnectionManager {
       if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
         ZooKeeperKeepAliveConnection zkw = getKeepAliveZooKeeperWatcher();
         try {
-          LOG.debug("Looking up meta region location in ZK," +
-            " connection=" + this);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Looking up meta region location in ZK," + " connection=" + this);
+          }
           ServerName servername =
             MetaRegionTracker.blockUntilAvailable(zkw, this.rpcTimeout);
 
-          LOG.debug("Looked up meta region location, connection=" + this +
-            "; serverName=" + ((servername == null) ? "null" : servername));
+          if (LOG.isTraceEnabled()) {
+            LOG.debug("Looked up meta region location, connection=" + this +
+              "; serverName=" + ((servername == null) ? "null" : servername));
+          }
           if (servername == null) return null;
           return new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, servername, 0);
         } catch (InterruptedException e) {
@@ -1821,26 +1822,17 @@ public class HConnectionManager {
     }
 
     @Deprecated
-    private <R> Callable<MultiResponse> createCallable(
-      final HRegionLocation loc, final MultiAction<R> multi,
-      final byte [] tableName) {
-      // TODO: This does not belong in here!!! St.Ack  HConnections should
+    private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc,
+        final MultiAction<R> multi, final byte[] tableName) {
+      // TODO: This does not belong in here!!! St.Ack HConnections should
       // not be dealing in Callables; Callables have HConnections, not other
       // way around.
       final HConnection connection = this;
       return new Callable<MultiResponse>() {
-        public MultiResponse call() throws IOException {
+        @Override
+        public MultiResponse call() throws Exception {
           ServerCallable<MultiResponse> callable =
-            new ServerCallable<MultiResponse>(connection, tableName, null) {
-              public MultiResponse call() throws IOException {
-                return ProtobufUtil.multi(server, multi);
-              }
-
-              @Override
-              public void connect(boolean reload) throws IOException {
-                server = connection.getClient(loc.getServerName());
-              }
-            };
+            new MultiServerCallable<R>(connection, tableName, loc, multi);
           return callable.withoutRetries();
         }
       };
@@ -2162,8 +2154,7 @@ public class HConnectionManager {
                 } else // success
                   if (callback != null) {
                     this.callback.update(resultsForRS.getKey(),
-                      this.rows.get(regionResult.getFirst()).getRow(),
-                      (R) result);
+                      this.rows.get(regionResult.getFirst()).getRow(), (R) result);
                 }
               }
             }
@@ -2222,8 +2213,6 @@ public class HConnectionManager {
         }
       }
 
-
-
       /**
        * Put the action that has to be retried in the Replay list.
        * @return true if we're out of numRetries and it's the last retry.

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Mar 20 19:36:46 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.HC
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -728,9 +729,11 @@ public class HTable implements HTableInt
             try {
               MutateRequest request = RequestConverter.buildMutateRequest(
                 location.getRegionInfo().getRegionName(), append);
-              MutateResponse response = server.mutate(null, request);
+              PayloadCarryingRpcController rpcController =
+                new PayloadCarryingRpcController();
+              MutateResponse response = server.mutate(rpcController, request);
               if (!response.hasResult()) return null;
-              return ProtobufUtil.toResult(response.getResult());
+              return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
             } catch (ServiceException se) {
               throw ProtobufUtil.getRemoteException(se);
             }
@@ -752,8 +755,9 @@ public class HTable implements HTableInt
             try {
               MutateRequest request = RequestConverter.buildMutateRequest(
                 location.getRegionInfo().getRegionName(), increment);
-              MutateResponse response = server.mutate(null, request);
-              return ProtobufUtil.toResult(response.getResult());
+              PayloadCarryingRpcController rpcContoller = new PayloadCarryingRpcController();
+              MutateResponse response = server.mutate(rpcContoller, request);
+              return ProtobufUtil.toResult(response.getResult(), rpcContoller.cellScanner());
             } catch (ServiceException se) {
               throw ProtobufUtil.getRemoteException(se);
             }
@@ -796,8 +800,10 @@ public class HTable implements HTableInt
               MutateRequest request = RequestConverter.buildMutateRequest(
                 location.getRegionInfo().getRegionName(), row, family,
                 qualifier, amount, writeToWAL);
-              MutateResponse response = server.mutate(null, request);
-              Result result = ProtobufUtil.toResult(response.getResult());
+              PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
+              MutateResponse response = server.mutate(rpcController, request);
+              Result result =
+                ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
               return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
             } catch (ServiceException se) {
               throw ProtobufUtil.getRemoteException(se);

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * Callable that handles the <code>multi</code> method call going against a single
+ * regionserver; i.e. A {@link ServerCallable} for the multi call (It is not a
+ * {@link Callable} that goes against multiple regions.
+ * @param <R>
+ */
+class MultiServerCallable<R> extends ServerCallable<MultiResponse> {
+  private final MultiAction<R> multi;
+  private final HRegionLocation loc;
+
+  MultiServerCallable(final HConnection connection, final byte [] tableName,
+      final HRegionLocation loc, final MultiAction<R> multi) {
+    super(connection, tableName, null);
+    this.multi = multi;
+    this.loc = loc;
+  }
+
+  @Override
+  public MultiResponse call() throws IOException {
+    MultiResponse response = new MultiResponse();
+    // The multi object is a list of Actions by region.
+    for (Map.Entry<byte[], List<Action<R>>> e: this.multi.actions.entrySet()) {
+      byte[] regionName = e.getKey();
+      int rowMutations = 0;
+      List<Action<R>> actions = e.getValue();
+      for (Action<R> action : actions) {
+        Row row = action.getAction();
+        // Row Mutations are a set of Puts and/or Deletes all to be applied atomically
+        // on the one row.  We do these a row at a time.
+        if (row instanceof RowMutations) {
+          try {
+            RowMutations rms = (RowMutations)row;
+            // Stick all Cells for all RowMutations in here into 'cells'.  Populated when we call
+            // buildNoDataMultiRequest in the below.
+            List<CellScannable> cells = new ArrayList<CellScannable>(rms.getMutations().size());
+            // Build a multi request absent its Cell payload (this is the 'nodata' in the below).
+            MultiRequest multiRequest =
+                RequestConverter.buildNoDataMultiRequest(regionName, rms, cells);
+            // Carry the cells over the proxy/pb Service interface using the payload carrying
+            // rpc controller.
+            server.multi(new PayloadCarryingRpcController(cells), multiRequest);
+            // This multi call does not return results.
+            response.add(regionName, action.getOriginalIndex(), Result.EMPTY_RESULT);
+          } catch (ServiceException se) {
+            response.add(regionName, action.getOriginalIndex(),
+              ProtobufUtil.getRemoteException(se));
+          }
+          rowMutations++;
+        }
+      }
+      // Are there any non-RowMutation actions to send for this region?
+      if (actions.size() > rowMutations) {
+        Exception ex = null;
+        List<Object> results = null;
+        // Stick all Cells for the multiRequest in here into 'cells'.  Gets filled in when we
+        // call buildNoDataMultiRequest
+        List<CellScannable> cells = new ArrayList<CellScannable>(actions.size() - rowMutations);
+        try {
+          // The call to buildNoDataMultiRequest will skip RowMutations.  They have
+          // already been handled above.
+          MultiRequest multiRequest =
+              RequestConverter.buildNoDataMultiRequest(regionName, actions, cells);
+          // Controller optionally carries cell data over the proxy/service boundary and also
+          // optionally ferries cell response data back out again.
+          PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
+          ClientProtos.MultiResponse responseProto = server.multi(controller, multiRequest);
+          results = ResponseConverter.getResults(responseProto, controller.cellScanner());
+        } catch (ServiceException se) {
+          ex = ProtobufUtil.getRemoteException(se);
+        }
+        for (int i = 0, n = actions.size(); i < n; i++) {
+          int originalIndex = actions.get(i).getOriginalIndex();
+          response.add(regionName, originalIndex, results == null ? ex : results.get(i));
+        }
+      }
+    }
+    return response;
+  }
+
+  @Override
+  public void connect(boolean reload) throws IOException {
+    server = connection.getClient(loc.getServerName());
+  }
+}

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java Wed Mar 20 19:36:46 2013
@@ -19,13 +19,11 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -52,10 +50,8 @@ public class RowMutations implements Row
    * @param row row key
    */
   public RowMutations(byte [] row) {
-    if(row == null || row.length > HConstants.MAX_ROW_LENGTH) {
-      throw new IllegalArgumentException("Row key is invalid");
-    }
-    this.row = Arrays.copyOf(row, row.length);
+    Mutation.checkRow(row);
+    this.row = Bytes.copy(row);
   }
 
   /**
@@ -78,10 +74,10 @@ public class RowMutations implements Row
 
   private void internalAdd(Mutation m) throws IOException {
     int res = Bytes.compareTo(this.row, m.getRow());
-    if(res != 0) {
-      throw new IOException("The row in the recently added Put/Delete " +
-          Bytes.toStringBinary(m.getRow()) + " doesn't match the original one " +
-          Bytes.toStringBinary(this.row));
+    if (res != 0) {
+      throw new WrongRowIOException("The row in the recently added Put/Delete <" +
+          Bytes.toStringBinary(m.getRow()) + "> doesn't match the original one <" +
+          Bytes.toStringBinary(this.row) + ">");
     }
     mutations.add(m);
   }

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FatalConnectionException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FatalConnectionException.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FatalConnectionException.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FatalConnectionException.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+
+/**
+ * Thrown when server finds fatal issue w/ connection setup: e.g. bad rpc version
+ * or unsupported auth method.
+ * Closes connection after throwing this exception with message on why the failure.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Private
+public class FatalConnectionException extends DoNotRetryIOException {
+  public FatalConnectionException() {
+    super();
+  }
+
+  public FatalConnectionException(String msg) {
+    super(msg);
+  }
+
+  public FatalConnectionException(String msg, Throwable t) {
+    super(msg, t);
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Wed Mar 20 19:36:46 2013
@@ -19,24 +19,50 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import com.google.protobuf.CodedOutputStream;
-import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.net.SocketFactory;
+import javax.security.sasl.SaslException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.IpcProtocol;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseBody;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
 import org.apache.hadoop.hbase.protobuf.generated.Tracing.RPCTInfo;
 import org.apache.hadoop.hbase.security.AuthMethod;
@@ -52,6 +78,7 @@ import org.apache.hadoop.hbase.util.Pool
 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
@@ -62,54 +89,23 @@ import org.apache.hadoop.security.token.
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.Trace;
 
-import javax.net.SocketFactory;
-import javax.security.sasl.SaslException;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.TextFormat;
 
 
-/** A client for an IPC service.  IPC calls take a single Protobuf message as a
- * parameter, and return a single Protobuf message as their value.  A service runs on
+/**
+ * A client for an IPC service.  IPC calls take a single Protobuf message as a
+ * request and returns a single Protobuf message as result.  A service runs on
  * a port and is defined by a parameter class and a value class.
  *
- * <p>This is the org.apache.hadoop.ipc.Client renamed as HBaseClient and
- * moved into this package so can access package-private methods.
- *
- * See HBaseServer
+ * <p>See HBaseServer
  */
 @InterfaceAudience.Private
 public class HBaseClient {
-
-  public static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
+  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
   protected final PoolMap<ConnectionId, Connection> connections;
-  private static final Map<String, Method> methodInstances =
-      new ConcurrentHashMap<String, Method>();
+  private ReflectionCache reflectionCache = new ReflectionCache();
 
   protected int counter;                            // counter for call ids
   protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
@@ -123,6 +119,9 @@ public class HBaseClient {
   protected int pingInterval; // how often sends ping to the server in msecs
   protected int socketTimeout; // socket timeout
   protected FailedServers failedServers;
+  private final Codec codec;
+  private final CompressionCodec compressor;
+  private final IPCUtil ipcUtil;
 
   protected final SocketFactory socketFactory;           // how to create sockets
   protected String clusterId;
@@ -187,9 +186,8 @@ public class HBaseClient {
     }
   }
 
+  @SuppressWarnings("serial")
   public static class FailedServerException extends IOException {
-    private static final long serialVersionUID = -4744376109431464127L;
-
     public FailedServerException(String s) {
       super(s);
     }
@@ -201,6 +199,8 @@ public class HBaseClient {
    * @param conf Configuration
    * @param pingInterval the ping interval
    */
+  // Any reason we couldn't just do tcp keepalive instead of this pingery?
+  // St.Ack 20130121
   public static void setPingInterval(Configuration conf, int pingInterval) {
     conf.setInt(PING_INTERVAL_NAME, pingInterval);
   }
@@ -235,20 +235,34 @@ public class HBaseClient {
   /** A call waiting for a value. */
   protected class Call {
     final int id;                                 // call id
-    final RpcRequestBody param;                   // rpc request object
-    Message value;                                // value, null if error
+    final Message param;                          // rpc request method param object
+    /**
+     * Optionally has cells when making call.  Optionally has cells set on response.  Used
+     * passing cells to the rpc and receiving the response.
+     */
+    CellScanner cells;
+    Message response;                             // value, null if error
     IOException error;                            // exception, null if value
     boolean done;                                 // true when call is done
     long startTime;
+    final Method method;
 
-    protected Call(RpcRequestBody param) {
+    protected Call(final Method method, Message param, final CellScanner cells) {
       this.param = param;
+      this.method = method;
+      this.cells = cells;
       this.startTime = System.currentTimeMillis();
       synchronized (HBaseClient.this) {
         this.id = counter++;
       }
     }
 
+    @Override
+    public String toString() {
+      return "callId: " + this.id + " methodName: " + this.method.getName() + " param {" +
+        (this.param != null? TextFormat.shortDebugString(this.param): "") + "}";
+    }
+
     /** Indicate when the call is complete and the
      * value or error are available.  Notifies by default.  */
     protected synchronized void callComplete() {
@@ -269,10 +283,12 @@ public class HBaseClient {
     /** Set the return value when there is no error.
      * Notify the caller the call is done.
      *
-     * @param value return value of the call.
+     * @param response return value of the call.
+     * @param cells Can be null
      */
-    public synchronized void setValue(Message value) {
-      this.value = value;
+    public synchronized void setResponse(Message response, final CellScanner cells) {
+      this.response = response;
+      this.cells = cells;
       callComplete();
     }
 
@@ -281,7 +297,7 @@ public class HBaseClient {
     }
   }
 
-  protected static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
+  protected final static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
       new HashMap<String,TokenSelector<? extends TokenIdentifier>>();
   static {
     tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(),
@@ -292,8 +308,10 @@ public class HBaseClient {
    * Creates a connection. Can be overridden by a subclass for testing.
    * @param remoteId - the ConnectionId to use for the connection creation.
    */
-  protected Connection createConnection(ConnectionId remoteId) throws IOException {
-    return new Connection(remoteId);
+  protected Connection createConnection(ConnectionId remoteId, final Codec codec,
+      final CompressionCodec compressor)
+  throws IOException {
+    return new Connection(remoteId, codec, compressor);
   }
 
   /** Thread that reads responses and notifies callers.  Each connection owns a
@@ -312,6 +330,8 @@ public class HBaseClient {
     private Token<? extends TokenIdentifier> token;
     private HBaseSaslRpcClient saslRpcClient;
     private int reloginMaxBackoff; // max pause before relogin on sasl failure
+    private final Codec codec;
+    private final CompressionCodec compressor;
 
     // currently active calls
     protected final ConcurrentSkipListMap<Integer, Call> calls =
@@ -322,12 +342,14 @@ public class HBaseClient {
       new AtomicBoolean();  // indicate if the connection is closed
     protected IOException closeException; // close reason
 
-    Connection(ConnectionId remoteId) throws IOException {
+    Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
+    throws IOException {
       if (remoteId.getAddress().isUnresolved()) {
-        throw new UnknownHostException("unknown host: " +
-                                       remoteId.getAddress().getHostName());
+        throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
       }
       this.server = remoteId.getAddress();
+      this.codec = codec;
+      this.compressor = compressor;
 
       UserGroupInformation ticket = remoteId.getTicket().getUGI();
       Class<?> protocol = remoteId.getProtocol();
@@ -368,29 +390,33 @@ public class HBaseClient {
         authMethod = AuthMethod.KERBEROS;
       }
 
-      if (LOG.isDebugEnabled())
+      if (LOG.isDebugEnabled()) {
         LOG.debug("Use " + authMethod + " authentication for protocol "
             + protocol.getSimpleName());
-
+      }
       reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
       this.remoteId = remoteId;
 
       ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
       builder.setProtocol(protocol == null ? "" : protocol.getName());
       UserInformation userInfoPB;
-      if ((userInfoPB = getUserInfoPB(ticket)) != null) {
+      if ((userInfoPB = getUserInfo(ticket)) != null) {
         builder.setUserInfo(userInfoPB);
       }
+      builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
+      if (this.compressor != null) {
+        builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
+      }
       this.header = builder.build();
 
       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
         remoteId.getAddress().toString() +
-        ((ticket==null)?" from an unknown user": (" from " 
+        ((ticket==null)?" from an unknown user": (" from "
         + ticket.getUserName())));
       this.setDaemon(true);
     }
 
-    private UserInformation getUserInfoPB(UserGroupInformation ugi) {
+    private UserInformation getUserInfo(UserGroupInformation ugi) {
       if (ugi == null || authMethod == AuthMethod.DIGEST) {
         // Don't send user for token auth
         return null;
@@ -582,8 +608,7 @@ public class HBaseClient {
      */
     protected synchronized boolean waitForWork() {
       if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
-        long timeout = maxIdleTime-
-              (System.currentTimeMillis()-lastActivity.get());
+        long timeout = maxIdleTime - (System.currentTimeMillis()-lastActivity.get());
         if (timeout>0) {
           try {
             wait(timeout);
@@ -613,6 +638,7 @@ public class HBaseClient {
      * since last I/O activity is equal to or greater than the ping interval
      */
     protected synchronized void sendPing() throws IOException {
+      // Can we do tcp keepalive instead of this pinging?
       long curTime = System.currentTimeMillis();
       if ( curTime - lastActivity.get() >= pingInterval) {
         lastActivity.set(curTime);
@@ -626,24 +652,23 @@ public class HBaseClient {
 
     @Override
     public void run() {
-      if (LOG.isDebugEnabled())
-        LOG.debug(getName() + ": starting, having connections "
-            + connections.size());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getName() + ": starting, connections " + connections.size());
+      }
 
       try {
         while (waitForWork()) {//wait here for work - read or close connection
-          receiveResponse();
+          readResponse();
         }
       } catch (Throwable t) {
-        LOG.warn("Unexpected exception receiving call responses", t);
+        LOG.warn(getName() + ": unexpected exception receiving call responses", t);
         markClosed(new IOException("Unexpected exception receiving call responses", t));
       }
 
       close();
 
       if (LOG.isDebugEnabled())
-        LOG.debug(getName() + ": stopped, remaining connections "
-            + connections.size());
+        LOG.debug(getName() + ": stopped, connections " + connections.size());
     }
 
     private synchronized void disposeSasl() {
@@ -691,7 +716,7 @@ public class HBaseClient {
      * method. In case when the user doesn't have valid credentials, we don't
      * need to retry (from cache or ticket). In such cases, it is prudent to
      * throw a runtime exception when we receive a SaslException from the
-     * underlying authentication implementation, so there is no retry from 
+     * underlying authentication implementation, so there is no retry from
      * other high level (for eg, HCM or HBaseAdmin).
      * </p>
      */
@@ -766,7 +791,7 @@ public class HBaseClient {
 
       try {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Connecting to "+server);
+          LOG.debug("Connecting to " + server);
         }
         short numRetries = 0;
         final short MAX_RETRIES = 5;
@@ -775,7 +800,8 @@ public class HBaseClient {
           setupConnection();
           InputStream inStream = NetUtils.getInputStream(socket);
           OutputStream outStream = NetUtils.getOutputStream(socket);
-          writeRpcHeader(outStream);
+          // Write out the preamble -- MAGIC, version, and auth to use.
+          writeConnectionHeaderPreamble(outStream);
           if (useSasl) {
             final InputStream in2 = inStream;
             final OutputStream out2 = outStream;
@@ -787,19 +813,22 @@ public class HBaseClient {
             }
             boolean continueSasl = false;
             try {
-              continueSasl =
-                ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
-                  @Override
-                  public Boolean run() throws IOException {
-                    return setupSaslConnection(in2, out2);
-                  }
-                });
+              if (ticket == null) {
+                throw new NullPointerException("ticket is null");
+              } else {
+                continueSasl =
+                  ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
+                    @Override
+                    public Boolean run() throws IOException {
+                      return setupSaslConnection(in2, out2);
+                    }
+                  });
+              }
             } catch (Exception ex) {
               if (rand == null) {
                 rand = new Random();
               }
-              handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand,
-                   ticket);
+              handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket);
               continue;
             }
             if (continueSasl) {
@@ -812,11 +841,10 @@ public class HBaseClient {
               useSasl = false;
             }
           }
-          this.in = new DataInputStream(new BufferedInputStream
-              (new PingInputStream(inStream)));
-          this.out = new DataOutputStream
-          (new BufferedOutputStream(outStream));
-          writeHeader();
+          this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream)));
+          this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+          // Now write out the connection header
+          writeConnectionHeader();
 
           // update last activity time
           touch();
@@ -840,30 +868,38 @@ public class HBaseClient {
       }
     }
 
-    /* Write the RPC header */
-    private void writeRpcHeader(OutputStream outStream) throws IOException {
-      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
-      // Write out the header, version and authentication method
-      out.write(HConstants.RPC_HEADER.array());
-      out.write(HConstants.CURRENT_VERSION);
-      authMethod.write(out);
-      out.flush();
+    /**
+     * Write the RPC header: <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>
+     */
+    private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException {
+      // Assemble the preamble up in a buffer first and then send it.  Writing individual elements,
+      // they are getting sent across piecemeal according to wireshark and then server is messing
+      // up the reading on occasion (the passed in stream is not buffered yet).
+
+      // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE
+      int rpcHeaderLen = HConstants.RPC_HEADER.array().length;
+      byte [] preamble = new byte [rpcHeaderLen + 2];
+      System.arraycopy(HConstants.RPC_HEADER.array(), 0, preamble, 0, rpcHeaderLen);
+      preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
+      preamble[rpcHeaderLen + 1] = authMethod.code;
+      outStream.write(preamble);
+      outStream.flush();
     }
 
     /**
-     * Write the protocol header for each connection
+     * Write the connection header.
      * Out is not synchronized because only the first thread does this.
      */
-    private void writeHeader() throws IOException {
-      // Write out the ConnectionHeader
-      out.writeInt(header.getSerializedSize());
-      header.writeTo(out);
+    private void writeConnectionHeader() throws IOException {
+      this.out.writeInt(this.header.getSerializedSize());
+      this.header.writeTo(this.out);
+      this.out.flush();
     }
 
     /** Close the connection. */
     protected synchronized void close() {
       if (!shouldCloseConnection.get()) {
-        LOG.error("The connection is not in the closed state");
+        LOG.error(getName() + ": the connection is not in the closed state");
         return;
       }
 
@@ -883,8 +919,7 @@ public class HBaseClient {
       // clean up all calls
       if (closeException == null) {
         if (!calls.isEmpty()) {
-          LOG.warn(
-              "A connection is closed for no cause and calls are not empty. " +
+          LOG.warn(getName() + ": connection is closed for no cause and calls are not empty. " +
               "#Calls: " + calls.size());
 
           // clean up calls anyway
@@ -894,7 +929,7 @@ public class HBaseClient {
       } else {
         // log the info
         if (LOG.isDebugEnabled()) {
-          LOG.debug("closing ipc connection to " + server + ": " +
+          LOG.debug(getName() + ": closing ipc connection to " + server + ": " +
               closeException.getMessage(),closeException);
         }
 
@@ -905,126 +940,100 @@ public class HBaseClient {
         LOG.debug(getName() + ": closed");
     }
 
-    /* Initiates a call by sending the parameter to the remote server.
+    /**
+     * Initiates a call by sending the parameter to the remote server.
      * Note: this is not called from the Connection thread, but by other
      * threads.
+     * @param call
+     * @see #readResponse()
      */
-    protected void sendParam(Call call) {
-      if (shouldCloseConnection.get()) {
-        return;
-      }
+    protected void writeRequest(Call call) {
+      if (shouldCloseConnection.get()) return;
       try {
-        if (LOG.isDebugEnabled())
-          LOG.debug(getName() + " sending #" + call.id);
-
-        RpcRequestHeader.Builder headerBuilder = RPCProtos.RpcRequestHeader.newBuilder();
-        headerBuilder.setCallId(call.id);
-
+        RequestHeader.Builder builder = RequestHeader.newBuilder();
+        builder.setCallId(call.id);
         if (Trace.isTracing()) {
           Span s = Trace.currentTrace();
-          headerBuilder.setTinfo(RPCTInfo.newBuilder()
-              .setParentId(s.getSpanId())
-              .setTraceId(s.getTraceId()));
+          builder.setTraceInfo(RPCTInfo.newBuilder().
+            setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
+        }
+        builder.setMethodName(call.method.getName());
+        builder.setRequestParam(call.param != null);
+        ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
+        if (cellBlock != null) {
+          CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
+          cellBlockBuilder.setLength(cellBlock.limit());
+          builder.setCellBlockMeta(cellBlockBuilder.build());
         }
-
         //noinspection SynchronizeOnNonFinalField
+        RequestHeader header = builder.build();
         synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
-          RpcRequestHeader header = headerBuilder.build();
-          int serializedHeaderSize = header.getSerializedSize();
-          int requestSerializedSize = call.param.getSerializedSize();
-          this.out.writeInt(serializedHeaderSize +
-              CodedOutputStream.computeRawVarint32Size(serializedHeaderSize) +
-              requestSerializedSize +
-              CodedOutputStream.computeRawVarint32Size(requestSerializedSize));
-          header.writeDelimitedTo(this.out);
-          call.param.writeDelimitedTo(this.out);
-          this.out.flush();
+          IPCUtil.write(this.out, header, call.param, cellBlock);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header));
         }
       } catch(IOException e) {
         markClosed(e);
       }
     }
 
-
-    private Method getMethod(Class<? extends IpcProtocol> protocol,
-        String methodName) {
-      Method method = methodInstances.get(methodName);
-      if (method != null) {
-        return method;
-      }
-      Method[] methods = protocol.getMethods();
-      for (Method m : methods) {
-        if (m.getName().equals(methodName)) {
-          m.setAccessible(true);
-          methodInstances.put(methodName, m);
-          return m;
-        }
-      }
-      return null;
-    }
-
     /* Receive a response.
      * Because only one receiver, so no synchronization on in.
      */
-    protected void receiveResponse() {
-      if (shouldCloseConnection.get()) {
-        return;
-      }
+    protected void readResponse() {
+      if (shouldCloseConnection.get()) return;
       touch();
-
       try {
         // See HBaseServer.Call.setResponse for where we write out the response.
-        // It writes the call.id (int), a boolean signifying any error (and if 
-        // so the exception name/trace), and the response bytes
 
-        // Read the call id.
-        RpcResponseHeader response = RpcResponseHeader.parseDelimitedFrom(in);
-        if (response == null) {
-          // When the stream is closed, protobuf doesn't raise an EOFException,
-          // instead, it returns a null message object. 
-          throw new EOFException();
-        }
-        int id = response.getCallId();
+        // Total size of the response.  Unused.  But have to read it in anyways.
+        /*int totalSize =*/ in.readInt();
 
-        if (LOG.isDebugEnabled())
-          LOG.debug(getName() + " got value #" + id);
+        // Read the header
+        ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
+        int id = responseHeader.getCallId();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(getName() + ": got response header " +
+            TextFormat.shortDebugString(responseHeader));
+        }
         Call call = calls.get(id);
-
-        Status status = response.getStatus();
-        if (status == Status.SUCCESS) {
+        if (responseHeader.hasException()) {
+          ExceptionResponse exceptionResponse = responseHeader.getException();
+          RemoteException re = createRemoteException(exceptionResponse);
+          if (isFatalConnectionException(exceptionResponse)) {
+            markClosed(re);
+          } else {
+            if (call != null) call.setException(re);
+          }
+        } else {
           Message rpcResponseType;
           try {
-            rpcResponseType = ProtobufRpcClientEngine.Invoker.getReturnProtoType(
-                getMethod(remoteId.getProtocol(),
-                          call.param.getMethodName()));
+            // TODO: Why pb engine pollution in here in this class?  FIX.
+            rpcResponseType =
+              ProtobufRpcClientEngine.Invoker.getReturnProtoType(
+                reflectionCache.getMethod(remoteId.getProtocol(), call.method.getName()));
           } catch (Exception e) {
             throw new RuntimeException(e); //local exception
           }
-          Builder builder = rpcResponseType.newBuilderForType();
-          builder.mergeDelimitedFrom(in);
-          Message value = builder.build();
-          // it's possible that this call may have been cleaned up due to a RPC
-          // timeout, so check if it still exists before setting the value.
-          if (call != null) {
-            call.setValue(value);
+          Message value = null;
+          if (rpcResponseType != null) {
+            Builder builder = rpcResponseType.newBuilderForType();
+            builder.mergeDelimitedFrom(in);
+            value = builder.build();
           }
-          calls.remove(id);
-        } else if (status == Status.ERROR) {
-          RpcException exceptionResponse = RpcException.parseDelimitedFrom(in);
-          if (call != null) {
-            //noinspection ThrowableInstanceNeverThrown
-            call.setException(new RemoteException(
-                exceptionResponse.getExceptionName(),
-                exceptionResponse.getStackTrace()));
-            calls.remove(id);
+          CellScanner cellBlockScanner = null;
+          if (responseHeader.hasCellBlockMeta()) {
+            int size = responseHeader.getCellBlockMeta().getLength();
+            byte [] cellBlock = new byte[size];
+            IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
+            cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
           }
-        } else if (status == Status.FATAL) {
-          RpcException exceptionResponse = RpcException.parseDelimitedFrom(in);
-          // Close the connection
-          markClosed(new RemoteException(
-              exceptionResponse.getExceptionName(),
-              exceptionResponse.getStackTrace()));
+          // it's possible that this call may have been cleaned up due to a RPC
+          // timeout, so check if it still exists before setting the value.
+          if (call != null) call.setResponse(value, cellBlockScanner);
         }
+        if (call != null) calls.remove(id);
       } catch (IOException e) {
         if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
           // Clean up open calls but don't treat this as a fatal condition,
@@ -1043,6 +1052,30 @@ public class HBaseClient {
       }
     }
 
+    /**
+     * @param e
+     * @return True if the exception is a fatal connection exception.
+     */
+    private boolean isFatalConnectionException(final ExceptionResponse e) {
+      return e.getExceptionClassName().
+        equals(FatalConnectionException.class.getName());
+    }
+
+    /**
+     * @param e
+     * @return RemoteException made from passed <code>e</code>
+     */
+    private RemoteException createRemoteException(final ExceptionResponse e) {
+      String innerExceptionClassName = e.getExceptionClassName();
+      boolean doNotRetry = e.getDoNotRetry();
+      return e.hasHostname()?
+        // If a hostname then add it to the RemoteWithExtrasException
+        new RemoteWithExtrasException(innerExceptionClassName,
+          e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
+        new RemoteWithExtrasException(innerExceptionClassName,
+          e.getStackTrace(), doNotRetry);
+    }
+
     protected synchronized void markClosed(IOException e) {
       if (shouldCloseConnection.compareAndSet(false, true)) {
         closeException = e;
@@ -1103,53 +1136,13 @@ public class HBaseClient {
   /**
    * Client-side call timeout
    */
+  @SuppressWarnings("serial")
   public static class CallTimeoutException extends IOException {
     public CallTimeoutException(final String msg) {
       super(msg);
     }
   }
 
-  /** Call implementation used for parallel calls. */
-  protected class ParallelCall extends Call {
-    private final ParallelResults results;
-    protected final int index;
-
-    public ParallelCall(RpcRequestBody param, ParallelResults results, int index) {
-      super(param);
-      this.results = results;
-      this.index = index;
-    }
-
-    /** Deliver result to result collector. */
-    @Override
-    protected void callComplete() {
-      results.callComplete(this);
-    }
-  }
-
-  /** Result collector for parallel calls. */
-  protected static class ParallelResults {
-    protected final Message[] values;
-    protected int size;
-    protected int count;
-
-    public ParallelResults(int size) {
-      this.values = new RpcResponseBody[size];
-      this.size = size;
-    }
-
-    /*
-     * Collect a result.
-     */
-    synchronized void callComplete(ParallelCall call) {
-      // FindBugs IS2_INCONSISTENT_SYNC
-      values[call.index] = call.value;            // store the value
-      count++;                                    // count it
-      if (count == size)                          // if all values are in
-        notify();                                 // then notify waiting caller
-    }
-  }
-
   /**
    * Construct an IPC client whose values are of the {@link Message}
    * class.
@@ -1165,9 +1158,12 @@ public class HBaseClient {
     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
     this.pingInterval = getPingInterval(conf);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("The ping interval is" + this.pingInterval + "ms.");
+      LOG.debug("Ping interval: " + this.pingInterval + "ms.");
     }
+    this.ipcUtil = new IPCUtil(conf);
     this.conf = conf;
+    this.codec = getCodec(conf);
+    this.compressor = getCompressor(conf);
     this.socketFactory = factory;
     this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
     this.connections = new PoolMap<ConnectionId, Connection>(
@@ -1176,6 +1172,35 @@ public class HBaseClient {
   }
 
   /**
+   * Encapsulate the ugly casting and RuntimeException conversion in private method.
+   * @param conf
+   * @return Codec to use on this client.
+   */
+  private static Codec getCodec(final Configuration conf) {
+    String className = conf.get("hbase.client.rpc.codec", KeyValueCodec.class.getCanonicalName());
+    try {
+        return (Codec)Class.forName(className).newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed getting codec " + className, e);
+    }
+  }
+
+  /**
+   * Encapsulate the ugly casting and RuntimeException conversion in private method.
+   * @param conf
+   * @return The compressor to use on this client.
+   */
+  private static CompressionCodec getCompressor(final Configuration conf) {
+    String className = conf.get("hbase.client.rpc.compressor", null);
+    if (className == null || className.isEmpty()) return null;
+    try {
+        return (CompressionCodec)Class.forName(className).newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed getting compressor " + className, e);
+    }
+  }
+
+  /**
    * Construct an IPC client with the default SocketFactory
    * @param conf configuration
    */
@@ -1250,36 +1275,30 @@ public class HBaseClient {
   }
 
   /** Make a call, passing <code>param</code>, to the IPC server running at
-   * <code>address</code>, returning the value.  Throws exceptions if there are
-   * network problems or if the remote code threw an exception.
-   * @param param RpcRequestBody parameter
-   * @param address network address
-   * @return Message
-   * @throws IOException e
-   */
-  public Message call(RpcRequestBody param, InetSocketAddress address)
-  throws IOException, InterruptedException {
-      return call(param, address, null, 0);
-  }
-
-  public Message call(RpcRequestBody param, InetSocketAddress addr,
-                       User ticket, int rpcTimeout)
-                       throws IOException, InterruptedException {
-    return call(param, addr, null, ticket, rpcTimeout);
-  }
-
-  /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code> which is servicing the <code>protocol</code> protocol,
    * with the <code>ticket</code> credentials, returning the value.
    * Throws exceptions if there are network problems or if the remote code
-   * threw an exception. */
-  public Message call(RpcRequestBody param, InetSocketAddress addr,
-                       Class<? extends IpcProtocol> protocol,
-                       User ticket, int rpcTimeout)
+   * threw an exception.
+   * @param method
+   * @param param
+   * @param cells
+   * @param addr
+   * @param protocol
+   * @param ticket Be careful which ticket you pass.  A new user will mean a new Connection.
+   * {@link User#getCurrent()} makes a new instance of User each time so will be a new Connection
+   * each time.
+   * @param rpcTimeout
+   * @return A pair with the Message response and the Cell data (if any).
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public Pair<Message, CellScanner> call(Method method, Message param, CellScanner cells,
+      InetSocketAddress addr, Class<? extends IpcProtocol> protocol, User ticket, int rpcTimeout)
       throws InterruptedException, IOException {
-    Call call = new Call(param);
-    Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);
-    connection.sendParam(call);                 // send the parameter
+    Call call = new Call(method, param, cells);
+    Connection connection =
+      getConnection(addr, protocol, ticket, rpcTimeout, call, this.codec, this.compressor);
+    connection.writeRequest(call);                 // send the parameter
     boolean interrupted = false;
     //noinspection SynchronizationOnLocalVariableOrMethodParameter
     synchronized (call) {
@@ -1305,7 +1324,7 @@ public class HBaseClient {
         // local exception
         throw wrapException(addr, call.error);
       }
-      return call.value;
+      return new Pair<Message, CellScanner>(call.response, call.cells);
     }
   }
 
@@ -1329,14 +1348,11 @@ public class HBaseClient {
            "Call to " + addr + " failed on connection exception: " + exception)
                     .initCause(exception);
     } else if (exception instanceof SocketTimeoutException) {
-      return (SocketTimeoutException)new SocketTimeoutException(
-           "Call to " + addr + " failed on socket timeout exception: "
-                      + exception).initCause(exception);
+      return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr +
+        " failed on socket timeout exception: " + exception).initCause(exception);
     } else {
-      return (IOException)new IOException(
-           "Call to " + addr + " failed on local exception: " + exception)
-                                 .initCause(exception);
-
+      return (IOException)new IOException("Call to " + addr + " failed on local exception: " +
+        exception).initCause(exception);
     }
   }
 
@@ -1364,51 +1380,11 @@ public class HBaseClient {
     }
   }
 
-  /** Makes a set of calls in parallel.  Each parameter is sent to the
-   * corresponding address.  When all values are available, or have timed out
-   * or errored, the collected results are returned in an array.  The array
-   * contains nulls for calls that timed out or errored.  */
-  public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses,
-                         Class<? extends IpcProtocol> protocol,
-                         User ticket)
-      throws IOException, InterruptedException {
-    if (addresses.length == 0) return new RpcResponseBody[0];
-
-    ParallelResults results = new ParallelResults(params.length);
-    // TODO this synchronization block doesnt make any sense, we should possibly fix it
-    //noinspection SynchronizationOnLocalVariableOrMethodParameter
-    synchronized (results) {
-      for (int i = 0; i < params.length; i++) {
-        ParallelCall call = new ParallelCall(params[i], results, i);
-        try {
-          Connection connection =
-              getConnection(addresses[i], protocol, ticket, 0, call);
-          connection.sendParam(call);             // send each parameter
-        } catch (IOException e) {
-          // log errors
-          LOG.info("Calling "+addresses[i]+" caught: " +
-                   e.getMessage(),e);
-          results.size--;                         //  wait for one fewer result
-        }
-      }
-      while (results.count != results.size) {
-        try {
-          results.wait();                    // wait for all results
-        } catch (InterruptedException ignored) {}
-      }
-
-      return results.values;
-    }
-  }
-
   /* Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given host/port are reused. */
-  protected Connection getConnection(InetSocketAddress addr,
-                                   Class<? extends IpcProtocol> protocol,
-                                   User ticket,
-                                   int rpcTimeout,
-                                   Call call)
-                                   throws IOException, InterruptedException {
+  protected Connection getConnection(InetSocketAddress addr, Class<? extends IpcProtocol> protocol,
+      User ticket, int rpcTimeout, Call call, final Codec codec, final CompressionCodec compressor)
+  throws IOException, InterruptedException {
     if (!running.get()) {
       // the client is stopped
       throw new IOException("The client is stopped");
@@ -1422,7 +1398,7 @@ public class HBaseClient {
     synchronized (connections) {
       connection = connections.get(remoteId);
       if (connection == null) {
-        connection = createConnection(remoteId);
+        connection = createConnection(remoteId, this.codec, this.compressor);
         connections.put(remoteId, connection);
       }
     }
@@ -1472,6 +1448,12 @@ public class HBaseClient {
     }
 
     @Override
+    public String toString() {
+      return this.address.toString() + "/" + this.protocol + "/" + this.ticket + "/" +
+        this.rpcTimeout;
+    }
+
+    @Override
     public boolean equals(Object obj) {
      if (obj instanceof ConnectionId) {
        ConnectionId id = (ConnectionId) obj;
@@ -1484,9 +1466,9 @@ public class HBaseClient {
 
     @Override  // simply use the default Object#hashcode() ?
     public int hashCode() {
-      return (address.hashCode() + PRIME * (
-                  PRIME * System.identityHashCode(protocol) ^
-             (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
+      int hashcode = (address.hashCode() + PRIME * (PRIME * System.identityHashCode(protocol) ^
+        (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
+      return hashcode;
     }
   }
 }
\ No newline at end of file

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+
+/**
+ * Utility to help ipc'ing.
+ */
+class IPCUtil {
+  public static final Log LOG = LogFactory.getLog(IPCUtil.class);
+  private final int cellBlockBuildingInitialBufferSize;
+  /**
+   * How much we think the decompressor will expand the original compressed content.
+   */
+  private final int cellBlockDecompressionMultiplier;
+  private final Configuration conf;
+
+  IPCUtil(final Configuration conf) {
+    super();
+    this.conf = conf;
+    this.cellBlockBuildingInitialBufferSize =
+      conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024);
+    this.cellBlockDecompressionMultiplier =
+        conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
+  }
+
+  /**
+   * Build a cell block using passed in <code>codec</code>
+   * @param codec
+   * @param compressor
+   * @Param cells
+   * @return Null or byte buffer filled with passed-in Cells encoded using passed in
+   * <code>codec</code>; the returned buffer has been flipped and is ready for
+   * reading.  Use limit to find total size.
+   * @throws IOException
+   */
+  @SuppressWarnings("resource")
+  ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
+      final CellScanner cells)
+  throws IOException {
+    if (cells == null) return null;
+    // TOOD: Reuse buffers?
+    // Presizing doesn't work because can't tell what size will be when serialized.
+    // BBOS will resize itself.
+    ByteBufferOutputStream baos =
+      new ByteBufferOutputStream(this.cellBlockBuildingInitialBufferSize);
+    OutputStream os = baos;
+    Compressor poolCompressor = null;
+    try {
+      if (compressor != null) {
+        if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
+        poolCompressor = CodecPool.getCompressor(compressor);
+        os = compressor.createOutputStream(os, poolCompressor);
+      }
+      Codec.Encoder encoder = codec.getEncoder(os);
+      while (cells.advance()) {
+        encoder.write(cells.current());
+      }
+      encoder.flush();
+    } finally {
+      os.close();
+      if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
+    }
+    if (this.cellBlockBuildingInitialBufferSize < baos.size()) {
+      LOG.warn("Buffer grew from " + this.cellBlockBuildingInitialBufferSize +
+      " to " + baos.size());
+    }
+    return baos.getByteBuffer();
+  }
+
+  /**
+   * @param codec
+   * @param cellBlock
+   * @return CellScanner to work against the content of <code>cellBlock</code>
+   * @throws IOException
+   */
+  CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+      final byte [] cellBlock)
+  throws IOException {
+    return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
+  }
+
+  /**
+   * @param codec
+   * @param cellBlock
+   * @param offset
+   * @param length
+   * @return CellScanner to work against the content of <code>cellBlock</code>
+   * @throws IOException
+   */
+  CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+      final byte [] cellBlock, final int offset, final int length)
+  throws IOException {
+    // If compressed, decompress it first before passing it on else we will leak compression
+    // resources if the stream is not closed properly after we let it out.
+    InputStream is = null;
+    if (compressor != null) {
+      // GZIPCodec fails w/ NPE if no configuration.
+      if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
+      Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
+      CompressionInputStream cis =
+        compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length),
+        poolDecompressor);
+      try {
+        // TODO: This is ugly.  The buffer will be resized on us if we guess wrong.
+        // TODO: Reuse buffers.
+        ByteBufferOutputStream bbos = new ByteBufferOutputStream((length - offset) *
+          this.cellBlockDecompressionMultiplier);
+        IOUtils.copy(cis, bbos);
+        bbos.close();
+        ByteBuffer bb = bbos.getByteBuffer();
+        is = new ByteArrayInputStream(bb.array(), 0, bb.limit());
+      } finally {
+        if (is != null) is.close();
+        CodecPool.returnDecompressor(poolDecompressor);
+      }
+    } else {
+      is = new ByteArrayInputStream(cellBlock, offset, length);
+    }
+    return codec.getDecoder(is);
+  }
+
+  /**
+   * Write out header, param, and cell block if there to a {@link ByteBufferOutputStream} sized
+   * to hold these elements.
+   * @param header
+   * @param param
+   * @param cellBlock
+   * @return A {@link ByteBufferOutputStream} filled with the content of the passed in
+   * <code>header</code>, <code>param</code>, and <code>cellBlock</code>.
+   * @throws IOException
+   */
+  static ByteBufferOutputStream write(final Message header, final Message param,
+      final ByteBuffer cellBlock)
+  throws IOException {
+    int totalSize = getTotalSizeWhenWrittenDelimited(header, param);
+    if (cellBlock != null) totalSize += cellBlock.limit();
+    ByteBufferOutputStream bbos = new ByteBufferOutputStream(totalSize);
+    write(bbos, header, param, cellBlock, totalSize);
+    bbos.close();
+    return bbos;
+  }
+
+  /**
+   * Write out header, param, and cell block if there is one.
+   * @param dos
+   * @param header
+   * @param param
+   * @param cellBlock
+   * @return Total number of bytes written.
+   * @throws IOException
+   */
+  static int write(final OutputStream dos, final Message header, final Message param,
+      final ByteBuffer cellBlock)
+  throws IOException {
+    // Must calculate total size and write that first so other side can read it all in in one
+    // swoop.  This is dictated by how the server is currently written.  Server needs to change
+    // if we are to be able to write without the length prefixing.
+    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
+    if (cellBlock != null) totalSize += cellBlock.remaining();
+    return write(dos, header, param, cellBlock, totalSize);
+  }
+
+  private static int write(final OutputStream dos, final Message header, final Message param,
+    final ByteBuffer cellBlock, final int totalSize)
+  throws IOException {
+    // I confirmed toBytes does same as say DataOutputStream#writeInt.
+    dos.write(Bytes.toBytes(totalSize));
+    header.writeDelimitedTo(dos);
+    if (param != null) param.writeDelimitedTo(dos);
+    if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
+    dos.flush();
+    return totalSize;
+  }
+
+  /**
+   * @param in Stream cue'd up just before a delimited message
+   * @return Bytes that hold the bytes that make up the message read from <code>in</code>
+   * @throws IOException
+   */
+  static byte [] getDelimitedMessageBytes(final DataInputStream in) throws IOException {
+    byte b = in.readByte();
+    int size = CodedInputStream.readRawVarint32(b, in);
+    // Allocate right-sized buffer rather than let pb allocate its default minimum 4k.
+    byte [] bytes = new byte[size];
+    IOUtils.readFully(in, bytes);
+    return bytes;
+  }
+
+  /**
+   * @param header
+   * @param body
+   * @return Size on the wire when the two messages are written with writeDelimitedTo
+   */
+  static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
+    int totalSize = 0;
+    for (Message m: messages) {
+      if (m == null) continue;
+      totalSize += m.getSerializedSize();
+      totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
+    }
+    Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
+    return totalSize;
+  }
+}
\ No newline at end of file

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+/**
+ * Optionally carries Cells across the proxy/service interface down into ipc. On its
+ * way out it optionally carries a set of result Cell data.  We stick the Cells here when we want
+ * to avoid having to protobuf them.  This class is used ferrying data across the proxy/protobuf
+ * service chasm.  Used by client and server ipc'ing.
+ */
+@InterfaceAudience.Private
+public class PayloadCarryingRpcController implements RpcController, CellScannable {
+  // TODO: Fill out the rest of this class methods rather than return UnsupportedOperationException
+
+  /**
+   * They are optionally set on construction, cleared after we make the call, and then optionally
+   * set on response with the result. We use this lowest common denominator access to Cells because
+   * sometimes the scanner is backed by a List of Cells and other times, it is backed by an
+   * encoded block that implements CellScanner.
+   */
+  private CellScanner cellScanner;
+
+  public PayloadCarryingRpcController() {
+    this((CellScanner)null);
+  }
+
+  public PayloadCarryingRpcController(final CellScanner cellScanner) {
+    this.cellScanner = cellScanner;
+  }
+
+  public PayloadCarryingRpcController(final List<CellScannable> cellIterables) {
+    this.cellScanner = CellUtil.createCellScanner(cellIterables);
+  }
+
+  /**
+   * @return One-shot cell scanner (you cannot back it up and restart)
+   */
+  public CellScanner cellScanner() {
+    return cellScanner;
+  }
+
+  public void setCellScanner(final CellScanner cellScanner) {
+    this.cellScanner = cellScanner;
+  }
+
+  @Override
+  public String errorText() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean failed() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isCanceled() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void notifyOnCancel(RpcCallback<Object> arg0) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void reset() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setFailed(String arg0) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void startCancel() {
+    throw new UnsupportedOperationException();
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java Wed Mar 20 19:36:46 2013
@@ -24,9 +24,10 @@ import com.google.protobuf.ServiceExcept
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.IpcProtocol;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.ipc.RemoteException;
 
 import java.io.IOException;
@@ -77,7 +78,8 @@ public class ProtobufRpcClientEngine imp
     final private int rpcTimeout;
 
     public Invoker(Class<? extends IpcProtocol> protocol, InetSocketAddress addr, User ticket,
-        int rpcTimeout, HBaseClient client) throws IOException {
+        int rpcTimeout, HBaseClient client)
+    throws IOException {
       this.protocol = protocol;
       this.address = addr;
       this.ticket = ticket;
@@ -85,30 +87,6 @@ public class ProtobufRpcClientEngine imp
       this.rpcTimeout = rpcTimeout;
     }
 
-    private RpcRequestBody constructRpcRequest(Method method,
-                                               Object[] params) throws ServiceException {
-      RpcRequestBody rpcRequest;
-      RpcRequestBody.Builder builder = RpcRequestBody.newBuilder();
-      builder.setMethodName(method.getName());
-      Message param;
-      int length = params.length;
-      if (length == 2) {
-        // RpcController + Message in the method args
-        // (generated code from RPC bits in .proto files have RpcController)
-        param = (Message)params[1];
-      } else if (length == 1) { // Message
-        param = (Message)params[0];
-      } else {
-        throw new ServiceException("Too many parameters for request. Method: ["
-            + method.getName() + "]" + ", Expected: 2, Actual: "
-            + params.length);
-      }
-      builder.setRequestClassName(param.getClass().getName());
-      builder.setRequest(param.toByteString());
-      rpcRequest = builder.build();
-      return rpcRequest;
-    }
-
     /**
      * This is the client side invoker of RPC method. It only throws
      * ServiceException, since the invocation proxy expects only
@@ -122,33 +100,51 @@ public class ProtobufRpcClientEngine imp
      * set as cause in ServiceException</li>
      * </ol>
      *
-     * Note that the client calling protobuf RPC methods, must handle
+     * <p>Note that the client calling protobuf RPC methods, must handle
      * ServiceException by getting the cause from the ServiceException. If the
      * cause is RemoteException, then unwrap it to get the exception thrown by
      * the server.
      */
     @Override
     public Object invoke(Object proxy, Method method, Object[] args)
-        throws ServiceException {
+    throws ServiceException {
       long startTime = 0;
-      if (LOG.isDebugEnabled()) {
+      if (LOG.isTraceEnabled()) {
         startTime = System.currentTimeMillis();
       }
-
-      RpcRequestBody rpcRequest = constructRpcRequest(method, args);
-      Message val = null;
+      if (args.length != 2) {
+        throw new ServiceException(method.getName() + " didn't get two args: " + args.length);
+      }
+      // Get the controller.  Often null.  Presume payload carrying controller.  Payload is optional.
+      // It is cells/data that we do not want to protobuf.
+      PayloadCarryingRpcController controller = (PayloadCarryingRpcController)args[0];
+      CellScanner cells = null;
+      if (controller != null) {
+        cells = controller.cellScanner();
+        // Clear it here so we don't by mistake try and these cells processing results.
+        controller.setCellScanner(null);
+      }
+      // The request parameter
+      Message param = (Message)args[1];
+      Pair<Message, CellScanner> val = null;
       try {
-        val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout);
+        val = client.call(method, param, cells, address, protocol, ticket, rpcTimeout);
+        if (controller != null) {
+          // Shove the results into controller so can be carried across the proxy/pb service void.
+          if (val.getSecond() != null) controller.setCellScanner(val.getSecond());
+        } else if (val.getSecond() != null) {
+          throw new ServiceException("Client dropping data on the floor!");
+        }
 
-        if (LOG.isDebugEnabled()) {
+        if (LOG.isTraceEnabled()) {
           long callTime = System.currentTimeMillis() - startTime;
           if (LOG.isTraceEnabled()) LOG.trace("Call: " + method.getName() + " " + callTime);
         }
-        return val;
+        return val.getFirst();
       } catch (Throwable e) {
         if (e instanceof RemoteException) {
           Throwable cause = ((RemoteException)e).unwrapRemoteException();
-          throw new ServiceException(cause);
+          throw new ServiceException("methodName=" + method.getName(), cause);
         }
         throw new ServiceException(e);
       }
@@ -158,8 +154,8 @@ public class ProtobufRpcClientEngine imp
       if (returnTypes.containsKey(method.getName())) {
         return returnTypes.get(method.getName());
       }
-
       Class<?> returnType = method.getReturnType();
+      if (returnType.getName().equals("void")) return null;
       Method newInstMethod = returnType.getMethod("getDefaultInstance");
       newInstMethod.setAccessible(true);
       Message protoType = (Message) newInstMethod.invoke(null, (Object[]) null);