You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/03/20 20:36:47 UTC
svn commit: r1459013 [1/8] - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-client/src/test/java/org...
Author: stack
Date: Wed Mar 20 19:36:46 2013
New Revision: 1459013
URL: http://svn.apache.org/r1459013
Log:
HBASE-7905 Add passing of optional cell blocks over rpc
Added:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FatalConnectionException.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ReflectionCache.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java
hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/
hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java
hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MultiRowMutation.java
hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
hbase/trunk/hbase-protocol/src/main/protobuf/MultiRowMutation.proto
hbase/trunk/hbase-protocol/src/main/protobuf/RPC.proto
hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterWithScanLimits.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClusterId.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Mar 20 19:36:46 2013
@@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.util.Pair
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Triple;
import org.apache.hadoop.hbase.zookeeper.*;
-import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
@@ -523,7 +522,6 @@ public class HConnectionManager {
// package protected for the tests
ClusterStatusListener clusterStatusListener;
- private final Object metaRegionLock = new Object();
private final Object userRegionLock = new Object();
// We have a single lock for master & zk to prevent deadlocks. Having
@@ -645,7 +643,7 @@ public class HConnectionManager {
* @return
*/
public String toString(){
- return "hconnection 0x" + Integer.toHexString( hashCode() );
+ return "hconnection-0x" + Integer.toHexString(hashCode());
}
private String clusterId = null;
@@ -882,7 +880,7 @@ public class HConnectionManager {
MetaScanner.metaScan(conf, visitor, tableName);
return available.get() && (regionCount.get() > 0);
}
-
+
@Override
public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
throws IOException {
@@ -1011,13 +1009,16 @@ public class HConnectionManager {
if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
ZooKeeperKeepAliveConnection zkw = getKeepAliveZooKeeperWatcher();
try {
- LOG.debug("Looking up meta region location in ZK," +
- " connection=" + this);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Looking up meta region location in ZK," + " connection=" + this);
+ }
ServerName servername =
MetaRegionTracker.blockUntilAvailable(zkw, this.rpcTimeout);
- LOG.debug("Looked up meta region location, connection=" + this +
- "; serverName=" + ((servername == null) ? "null" : servername));
+ if (LOG.isTraceEnabled()) {
+ LOG.debug("Looked up meta region location, connection=" + this +
+ "; serverName=" + ((servername == null) ? "null" : servername));
+ }
if (servername == null) return null;
return new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, servername, 0);
} catch (InterruptedException e) {
@@ -1821,26 +1822,17 @@ public class HConnectionManager {
}
@Deprecated
- private <R> Callable<MultiResponse> createCallable(
- final HRegionLocation loc, final MultiAction<R> multi,
- final byte [] tableName) {
- // TODO: This does not belong in here!!! St.Ack HConnections should
+ private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc,
+ final MultiAction<R> multi, final byte[] tableName) {
+ // TODO: This does not belong in here!!! St.Ack HConnections should
// not be dealing in Callables; Callables have HConnections, not other
// way around.
final HConnection connection = this;
return new Callable<MultiResponse>() {
- public MultiResponse call() throws IOException {
+ @Override
+ public MultiResponse call() throws Exception {
ServerCallable<MultiResponse> callable =
- new ServerCallable<MultiResponse>(connection, tableName, null) {
- public MultiResponse call() throws IOException {
- return ProtobufUtil.multi(server, multi);
- }
-
- @Override
- public void connect(boolean reload) throws IOException {
- server = connection.getClient(loc.getServerName());
- }
- };
+ new MultiServerCallable<R>(connection, tableName, loc, multi);
return callable.withoutRetries();
}
};
@@ -2162,8 +2154,7 @@ public class HConnectionManager {
} else // success
if (callback != null) {
this.callback.update(resultsForRS.getKey(),
- this.rows.get(regionResult.getFirst()).getRow(),
- (R) result);
+ this.rows.get(regionResult.getFirst()).getRow(), (R) result);
}
}
}
@@ -2222,8 +2213,6 @@ public class HConnectionManager {
}
}
-
-
/**
* Put the action that has to be retried in the Replay list.
* @return true if we're out of numRetries and it's the last retry.
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Mar 20 19:36:46 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.HC
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -728,9 +729,11 @@ public class HTable implements HTableInt
try {
MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), append);
- MutateResponse response = server.mutate(null, request);
+ PayloadCarryingRpcController rpcController =
+ new PayloadCarryingRpcController();
+ MutateResponse response = server.mutate(rpcController, request);
if (!response.hasResult()) return null;
- return ProtobufUtil.toResult(response.getResult());
+ return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
@@ -752,8 +755,9 @@ public class HTable implements HTableInt
try {
MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), increment);
- MutateResponse response = server.mutate(null, request);
- return ProtobufUtil.toResult(response.getResult());
+ PayloadCarryingRpcController rpcContoller = new PayloadCarryingRpcController();
+ MutateResponse response = server.mutate(rpcContoller, request);
+ return ProtobufUtil.toResult(response.getResult(), rpcContoller.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
@@ -796,8 +800,10 @@ public class HTable implements HTableInt
MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), row, family,
qualifier, amount, writeToWAL);
- MutateResponse response = server.mutate(null, request);
- Result result = ProtobufUtil.toResult(response.getResult());
+ PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
+ MutateResponse response = server.mutate(rpcController, request);
+ Result result =
+ ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * Callable that handles the <code>multi</code> method call going against a single
+ * regionserver; i.e. A {@link ServerCallable} for the multi call (It is not a
+ * {@link Callable} that goes against multiple regions.
+ * @param <R>
+ */
+class MultiServerCallable<R> extends ServerCallable<MultiResponse> {
+ private final MultiAction<R> multi;
+ private final HRegionLocation loc;
+
+ MultiServerCallable(final HConnection connection, final byte [] tableName,
+ final HRegionLocation loc, final MultiAction<R> multi) {
+ super(connection, tableName, null);
+ this.multi = multi;
+ this.loc = loc;
+ }
+
+ @Override
+ public MultiResponse call() throws IOException {
+ MultiResponse response = new MultiResponse();
+ // The multi object is a list of Actions by region.
+ for (Map.Entry<byte[], List<Action<R>>> e: this.multi.actions.entrySet()) {
+ byte[] regionName = e.getKey();
+ int rowMutations = 0;
+ List<Action<R>> actions = e.getValue();
+ for (Action<R> action : actions) {
+ Row row = action.getAction();
+ // Row Mutations are a set of Puts and/or Deletes all to be applied atomically
+ // on the one row. We do these a row at a time.
+ if (row instanceof RowMutations) {
+ try {
+ RowMutations rms = (RowMutations)row;
+ // Stick all Cells for all RowMutations in here into 'cells'. Populated when we call
+ // buildNoDataMultiRequest in the below.
+ List<CellScannable> cells = new ArrayList<CellScannable>(rms.getMutations().size());
+ // Build a multi request absent its Cell payload (this is the 'nodata' in the below).
+ MultiRequest multiRequest =
+ RequestConverter.buildNoDataMultiRequest(regionName, rms, cells);
+ // Carry the cells over the proxy/pb Service interface using the payload carrying
+ // rpc controller.
+ server.multi(new PayloadCarryingRpcController(cells), multiRequest);
+ // This multi call does not return results.
+ response.add(regionName, action.getOriginalIndex(), Result.EMPTY_RESULT);
+ } catch (ServiceException se) {
+ response.add(regionName, action.getOriginalIndex(),
+ ProtobufUtil.getRemoteException(se));
+ }
+ rowMutations++;
+ }
+ }
+ // Are there any non-RowMutation actions to send for this region?
+ if (actions.size() > rowMutations) {
+ Exception ex = null;
+ List<Object> results = null;
+ // Stick all Cells for the multiRequest in here into 'cells'. Gets filled in when we
+ // call buildNoDataMultiRequest
+ List<CellScannable> cells = new ArrayList<CellScannable>(actions.size() - rowMutations);
+ try {
+ // The call to buildNoDataMultiRequest will skip RowMutations. They have
+ // already been handled above.
+ MultiRequest multiRequest =
+ RequestConverter.buildNoDataMultiRequest(regionName, actions, cells);
+ // Controller optionally carries cell data over the proxy/service boundary and also
+ // optionally ferries cell response data back out again.
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
+ ClientProtos.MultiResponse responseProto = server.multi(controller, multiRequest);
+ results = ResponseConverter.getResults(responseProto, controller.cellScanner());
+ } catch (ServiceException se) {
+ ex = ProtobufUtil.getRemoteException(se);
+ }
+ for (int i = 0, n = actions.size(); i < n; i++) {
+ int originalIndex = actions.get(i).getOriginalIndex();
+ response.add(regionName, originalIndex, results == null ? ex : results.get(i));
+ }
+ }
+ }
+ return response;
+ }
+
+ @Override
+ public void connect(boolean reload) throws IOException {
+ server = connection.getClient(loc.getServerName());
+ }
+}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java Wed Mar 20 19:36:46 2013
@@ -19,13 +19,11 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -52,10 +50,8 @@ public class RowMutations implements Row
* @param row row key
*/
public RowMutations(byte [] row) {
- if(row == null || row.length > HConstants.MAX_ROW_LENGTH) {
- throw new IllegalArgumentException("Row key is invalid");
- }
- this.row = Arrays.copyOf(row, row.length);
+ Mutation.checkRow(row);
+ this.row = Bytes.copy(row);
}
/**
@@ -78,10 +74,10 @@ public class RowMutations implements Row
private void internalAdd(Mutation m) throws IOException {
int res = Bytes.compareTo(this.row, m.getRow());
- if(res != 0) {
- throw new IOException("The row in the recently added Put/Delete " +
- Bytes.toStringBinary(m.getRow()) + " doesn't match the original one " +
- Bytes.toStringBinary(this.row));
+ if (res != 0) {
+ throw new WrongRowIOException("The row in the recently added Put/Delete <" +
+ Bytes.toStringBinary(m.getRow()) + "> doesn't match the original one <" +
+ Bytes.toStringBinary(this.row) + ">");
}
mutations.add(m);
}
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FatalConnectionException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FatalConnectionException.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FatalConnectionException.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FatalConnectionException.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+
+/**
+ * Thrown when server finds fatal issue w/ connection setup: e.g. bad rpc version
+ * or unsupported auth method.
+ * Closes connection after throwing this exception with message on why the failure.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Private
+public class FatalConnectionException extends DoNotRetryIOException {
+ public FatalConnectionException() {
+ super();
+ }
+
+ public FatalConnectionException(String msg) {
+ super(msg);
+ }
+
+ public FatalConnectionException(String msg, Throwable t) {
+ super(msg, t);
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Wed Mar 20 19:36:46 2013
@@ -19,24 +19,50 @@
package org.apache.hadoop.hbase.ipc;
-import com.google.protobuf.CodedOutputStream;
-import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.net.SocketFactory;
+import javax.security.sasl.SaslException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.IpcProtocol;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseBody;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.protobuf.generated.Tracing.RPCTInfo;
import org.apache.hadoop.hbase.security.AuthMethod;
@@ -52,6 +78,7 @@ import org.apache.hadoop.hbase.util.Pool
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
@@ -62,54 +89,23 @@ import org.apache.hadoop.security.token.
import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace;
-import javax.net.SocketFactory;
-import javax.security.sasl.SaslException;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.TextFormat;
-/** A client for an IPC service. IPC calls take a single Protobuf message as a
- * parameter, and return a single Protobuf message as their value. A service runs on
+/**
+ * A client for an IPC service. IPC calls take a single Protobuf message as a
+ * request and returns a single Protobuf message as result. A service runs on
* a port and is defined by a parameter class and a value class.
*
- * <p>This is the org.apache.hadoop.ipc.Client renamed as HBaseClient and
- * moved into this package so can access package-private methods.
- *
- * See HBaseServer
+ * <p>See HBaseServer
*/
@InterfaceAudience.Private
public class HBaseClient {
-
- public static final Log LOG =
- LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
+ public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
protected final PoolMap<ConnectionId, Connection> connections;
- private static final Map<String, Method> methodInstances =
- new ConcurrentHashMap<String, Method>();
+ private ReflectionCache reflectionCache = new ReflectionCache();
protected int counter; // counter for call ids
protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
@@ -123,6 +119,9 @@ public class HBaseClient {
protected int pingInterval; // how often sends ping to the server in msecs
protected int socketTimeout; // socket timeout
protected FailedServers failedServers;
+ private final Codec codec;
+ private final CompressionCodec compressor;
+ private final IPCUtil ipcUtil;
protected final SocketFactory socketFactory; // how to create sockets
protected String clusterId;
@@ -187,9 +186,8 @@ public class HBaseClient {
}
}
+ @SuppressWarnings("serial")
public static class FailedServerException extends IOException {
- private static final long serialVersionUID = -4744376109431464127L;
-
public FailedServerException(String s) {
super(s);
}
@@ -201,6 +199,8 @@ public class HBaseClient {
* @param conf Configuration
* @param pingInterval the ping interval
*/
+ // Any reason we couldn't just do tcp keepalive instead of this pingery?
+ // St.Ack 20130121
public static void setPingInterval(Configuration conf, int pingInterval) {
conf.setInt(PING_INTERVAL_NAME, pingInterval);
}
@@ -235,20 +235,34 @@ public class HBaseClient {
/** A call waiting for a value. */
protected class Call {
final int id; // call id
- final RpcRequestBody param; // rpc request object
- Message value; // value, null if error
+ final Message param; // rpc request method param object
+ /**
+ * Optionally has cells when making call. Optionally has cells set on response. Used
+ * passing cells to the rpc and receiving the response.
+ */
+ CellScanner cells;
+ Message response; // value, null if error
IOException error; // exception, null if value
boolean done; // true when call is done
long startTime;
+ final Method method;
- protected Call(RpcRequestBody param) {
+ protected Call(final Method method, Message param, final CellScanner cells) {
this.param = param;
+ this.method = method;
+ this.cells = cells;
this.startTime = System.currentTimeMillis();
synchronized (HBaseClient.this) {
this.id = counter++;
}
}
+ @Override
+ public String toString() {
+ return "callId: " + this.id + " methodName: " + this.method.getName() + " param {" +
+ (this.param != null? TextFormat.shortDebugString(this.param): "") + "}";
+ }
+
/** Indicate when the call is complete and the
* value or error are available. Notifies by default. */
protected synchronized void callComplete() {
@@ -269,10 +283,12 @@ public class HBaseClient {
/** Set the return value when there is no error.
* Notify the caller the call is done.
*
- * @param value return value of the call.
+ * @param response return value of the call.
+ * @param cells Can be null
*/
- public synchronized void setValue(Message value) {
- this.value = value;
+ public synchronized void setResponse(Message response, final CellScanner cells) {
+ this.response = response;
+ this.cells = cells;
callComplete();
}
@@ -281,7 +297,7 @@ public class HBaseClient {
}
}
- protected static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
+ protected final static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
new HashMap<String,TokenSelector<? extends TokenIdentifier>>();
static {
tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(),
@@ -292,8 +308,10 @@ public class HBaseClient {
* Creates a connection. Can be overridden by a subclass for testing.
* @param remoteId - the ConnectionId to use for the connection creation.
*/
- protected Connection createConnection(ConnectionId remoteId) throws IOException {
- return new Connection(remoteId);
+ protected Connection createConnection(ConnectionId remoteId, final Codec codec,
+ final CompressionCodec compressor)
+ throws IOException {
+ return new Connection(remoteId, codec, compressor);
}
/** Thread that reads responses and notifies callers. Each connection owns a
@@ -312,6 +330,8 @@ public class HBaseClient {
private Token<? extends TokenIdentifier> token;
private HBaseSaslRpcClient saslRpcClient;
private int reloginMaxBackoff; // max pause before relogin on sasl failure
+ private final Codec codec;
+ private final CompressionCodec compressor;
// currently active calls
protected final ConcurrentSkipListMap<Integer, Call> calls =
@@ -322,12 +342,14 @@ public class HBaseClient {
new AtomicBoolean(); // indicate if the connection is closed
protected IOException closeException; // close reason
- Connection(ConnectionId remoteId) throws IOException {
+ Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
+ throws IOException {
if (remoteId.getAddress().isUnresolved()) {
- throw new UnknownHostException("unknown host: " +
- remoteId.getAddress().getHostName());
+ throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
}
this.server = remoteId.getAddress();
+ this.codec = codec;
+ this.compressor = compressor;
UserGroupInformation ticket = remoteId.getTicket().getUGI();
Class<?> protocol = remoteId.getProtocol();
@@ -368,29 +390,33 @@ public class HBaseClient {
authMethod = AuthMethod.KERBEROS;
}
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug("Use " + authMethod + " authentication for protocol "
+ protocol.getSimpleName());
-
+ }
reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
this.remoteId = remoteId;
ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
builder.setProtocol(protocol == null ? "" : protocol.getName());
UserInformation userInfoPB;
- if ((userInfoPB = getUserInfoPB(ticket)) != null) {
+ if ((userInfoPB = getUserInfo(ticket)) != null) {
builder.setUserInfo(userInfoPB);
}
+ builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
+ if (this.compressor != null) {
+ builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
+ }
this.header = builder.build();
this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
remoteId.getAddress().toString() +
- ((ticket==null)?" from an unknown user": (" from "
+ ((ticket==null)?" from an unknown user": (" from "
+ ticket.getUserName())));
this.setDaemon(true);
}
- private UserInformation getUserInfoPB(UserGroupInformation ugi) {
+ private UserInformation getUserInfo(UserGroupInformation ugi) {
if (ugi == null || authMethod == AuthMethod.DIGEST) {
// Don't send user for token auth
return null;
@@ -582,8 +608,7 @@ public class HBaseClient {
*/
protected synchronized boolean waitForWork() {
if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
- long timeout = maxIdleTime-
- (System.currentTimeMillis()-lastActivity.get());
+ long timeout = maxIdleTime - (System.currentTimeMillis()-lastActivity.get());
if (timeout>0) {
try {
wait(timeout);
@@ -613,6 +638,7 @@ public class HBaseClient {
* since last I/O activity is equal to or greater than the ping interval
*/
protected synchronized void sendPing() throws IOException {
+ // Can we do tcp keepalive instead of this pinging?
long curTime = System.currentTimeMillis();
if ( curTime - lastActivity.get() >= pingInterval) {
lastActivity.set(curTime);
@@ -626,24 +652,23 @@ public class HBaseClient {
@Override
public void run() {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": starting, having connections "
- + connections.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": starting, connections " + connections.size());
+ }
try {
while (waitForWork()) {//wait here for work - read or close connection
- receiveResponse();
+ readResponse();
}
} catch (Throwable t) {
- LOG.warn("Unexpected exception receiving call responses", t);
+ LOG.warn(getName() + ": unexpected exception receiving call responses", t);
markClosed(new IOException("Unexpected exception receiving call responses", t));
}
close();
if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": stopped, remaining connections "
- + connections.size());
+ LOG.debug(getName() + ": stopped, connections " + connections.size());
}
private synchronized void disposeSasl() {
@@ -691,7 +716,7 @@ public class HBaseClient {
* method. In case when the user doesn't have valid credentials, we don't
* need to retry (from cache or ticket). In such cases, it is prudent to
* throw a runtime exception when we receive a SaslException from the
- * underlying authentication implementation, so there is no retry from
+ * underlying authentication implementation, so there is no retry from
* other high level (for eg, HCM or HBaseAdmin).
* </p>
*/
@@ -766,7 +791,7 @@ public class HBaseClient {
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("Connecting to "+server);
+ LOG.debug("Connecting to " + server);
}
short numRetries = 0;
final short MAX_RETRIES = 5;
@@ -775,7 +800,8 @@ public class HBaseClient {
setupConnection();
InputStream inStream = NetUtils.getInputStream(socket);
OutputStream outStream = NetUtils.getOutputStream(socket);
- writeRpcHeader(outStream);
+ // Write out the preamble -- MAGIC, version, and auth to use.
+ writeConnectionHeaderPreamble(outStream);
if (useSasl) {
final InputStream in2 = inStream;
final OutputStream out2 = outStream;
@@ -787,19 +813,22 @@ public class HBaseClient {
}
boolean continueSasl = false;
try {
- continueSasl =
- ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
- @Override
- public Boolean run() throws IOException {
- return setupSaslConnection(in2, out2);
- }
- });
+ if (ticket == null) {
+ throw new NullPointerException("ticket is null");
+ } else {
+ continueSasl =
+ ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
+ @Override
+ public Boolean run() throws IOException {
+ return setupSaslConnection(in2, out2);
+ }
+ });
+ }
} catch (Exception ex) {
if (rand == null) {
rand = new Random();
}
- handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand,
- ticket);
+ handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket);
continue;
}
if (continueSasl) {
@@ -812,11 +841,10 @@ public class HBaseClient {
useSasl = false;
}
}
- this.in = new DataInputStream(new BufferedInputStream
- (new PingInputStream(inStream)));
- this.out = new DataOutputStream
- (new BufferedOutputStream(outStream));
- writeHeader();
+ this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream)));
+ this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+ // Now write out the connection header
+ writeConnectionHeader();
// update last activity time
touch();
@@ -840,30 +868,38 @@ public class HBaseClient {
}
}
- /* Write the RPC header */
- private void writeRpcHeader(OutputStream outStream) throws IOException {
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
- // Write out the header, version and authentication method
- out.write(HConstants.RPC_HEADER.array());
- out.write(HConstants.CURRENT_VERSION);
- authMethod.write(out);
- out.flush();
+ /**
+ * Write the RPC header: <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>
+ */
+ private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException {
+ // Assemble the preamble up in a buffer first and then send it. Writing individual elements,
+ // they are getting sent across piecemeal according to wireshark and then server is messing
+ // up the reading on occasion (the passed in stream is not buffered yet).
+
+ // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE
+ int rpcHeaderLen = HConstants.RPC_HEADER.array().length;
+ byte [] preamble = new byte [rpcHeaderLen + 2];
+ System.arraycopy(HConstants.RPC_HEADER.array(), 0, preamble, 0, rpcHeaderLen);
+ preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
+ preamble[rpcHeaderLen + 1] = authMethod.code;
+ outStream.write(preamble);
+ outStream.flush();
}
/**
- * Write the protocol header for each connection
+ * Write the connection header.
* Out is not synchronized because only the first thread does this.
*/
- private void writeHeader() throws IOException {
- // Write out the ConnectionHeader
- out.writeInt(header.getSerializedSize());
- header.writeTo(out);
+ private void writeConnectionHeader() throws IOException {
+ this.out.writeInt(this.header.getSerializedSize());
+ this.header.writeTo(this.out);
+ this.out.flush();
}
/** Close the connection. */
protected synchronized void close() {
if (!shouldCloseConnection.get()) {
- LOG.error("The connection is not in the closed state");
+ LOG.error(getName() + ": the connection is not in the closed state");
return;
}
@@ -883,8 +919,7 @@ public class HBaseClient {
// clean up all calls
if (closeException == null) {
if (!calls.isEmpty()) {
- LOG.warn(
- "A connection is closed for no cause and calls are not empty. " +
+ LOG.warn(getName() + ": connection is closed for no cause and calls are not empty. " +
"#Calls: " + calls.size());
// clean up calls anyway
@@ -894,7 +929,7 @@ public class HBaseClient {
} else {
// log the info
if (LOG.isDebugEnabled()) {
- LOG.debug("closing ipc connection to " + server + ": " +
+ LOG.debug(getName() + ": closing ipc connection to " + server + ": " +
closeException.getMessage(),closeException);
}
@@ -905,126 +940,100 @@ public class HBaseClient {
LOG.debug(getName() + ": closed");
}
- /* Initiates a call by sending the parameter to the remote server.
+ /**
+ * Initiates a call by sending the parameter to the remote server.
* Note: this is not called from the Connection thread, but by other
* threads.
+ * @param call
+ * @see #readResponse()
*/
- protected void sendParam(Call call) {
- if (shouldCloseConnection.get()) {
- return;
- }
+ protected void writeRequest(Call call) {
+ if (shouldCloseConnection.get()) return;
try {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + " sending #" + call.id);
-
- RpcRequestHeader.Builder headerBuilder = RPCProtos.RpcRequestHeader.newBuilder();
- headerBuilder.setCallId(call.id);
-
+ RequestHeader.Builder builder = RequestHeader.newBuilder();
+ builder.setCallId(call.id);
if (Trace.isTracing()) {
Span s = Trace.currentTrace();
- headerBuilder.setTinfo(RPCTInfo.newBuilder()
- .setParentId(s.getSpanId())
- .setTraceId(s.getTraceId()));
+ builder.setTraceInfo(RPCTInfo.newBuilder().
+ setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
+ }
+ builder.setMethodName(call.method.getName());
+ builder.setRequestParam(call.param != null);
+ ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
+ if (cellBlock != null) {
+ CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
+ cellBlockBuilder.setLength(cellBlock.limit());
+ builder.setCellBlockMeta(cellBlockBuilder.build());
}
-
//noinspection SynchronizeOnNonFinalField
+ RequestHeader header = builder.build();
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
- RpcRequestHeader header = headerBuilder.build();
- int serializedHeaderSize = header.getSerializedSize();
- int requestSerializedSize = call.param.getSerializedSize();
- this.out.writeInt(serializedHeaderSize +
- CodedOutputStream.computeRawVarint32Size(serializedHeaderSize) +
- requestSerializedSize +
- CodedOutputStream.computeRawVarint32Size(requestSerializedSize));
- header.writeDelimitedTo(this.out);
- call.param.writeDelimitedTo(this.out);
- this.out.flush();
+ IPCUtil.write(this.out, header, call.param, cellBlock);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header));
}
} catch(IOException e) {
markClosed(e);
}
}
-
- private Method getMethod(Class<? extends IpcProtocol> protocol,
- String methodName) {
- Method method = methodInstances.get(methodName);
- if (method != null) {
- return method;
- }
- Method[] methods = protocol.getMethods();
- for (Method m : methods) {
- if (m.getName().equals(methodName)) {
- m.setAccessible(true);
- methodInstances.put(methodName, m);
- return m;
- }
- }
- return null;
- }
-
/* Receive a response.
* Because only one receiver, so no synchronization on in.
*/
- protected void receiveResponse() {
- if (shouldCloseConnection.get()) {
- return;
- }
+ protected void readResponse() {
+ if (shouldCloseConnection.get()) return;
touch();
-
try {
// See HBaseServer.Call.setResponse for where we write out the response.
- // It writes the call.id (int), a boolean signifying any error (and if
- // so the exception name/trace), and the response bytes
- // Read the call id.
- RpcResponseHeader response = RpcResponseHeader.parseDelimitedFrom(in);
- if (response == null) {
- // When the stream is closed, protobuf doesn't raise an EOFException,
- // instead, it returns a null message object.
- throw new EOFException();
- }
- int id = response.getCallId();
+ // Total size of the response. Unused. But have to read it in anyways.
+ /*int totalSize =*/ in.readInt();
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + " got value #" + id);
+ // Read the header
+ ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
+ int id = responseHeader.getCallId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": got response header " +
+ TextFormat.shortDebugString(responseHeader));
+ }
Call call = calls.get(id);
-
- Status status = response.getStatus();
- if (status == Status.SUCCESS) {
+ if (responseHeader.hasException()) {
+ ExceptionResponse exceptionResponse = responseHeader.getException();
+ RemoteException re = createRemoteException(exceptionResponse);
+ if (isFatalConnectionException(exceptionResponse)) {
+ markClosed(re);
+ } else {
+ if (call != null) call.setException(re);
+ }
+ } else {
Message rpcResponseType;
try {
- rpcResponseType = ProtobufRpcClientEngine.Invoker.getReturnProtoType(
- getMethod(remoteId.getProtocol(),
- call.param.getMethodName()));
+ // TODO: Why pb engine pollution in here in this class? FIX.
+ rpcResponseType =
+ ProtobufRpcClientEngine.Invoker.getReturnProtoType(
+ reflectionCache.getMethod(remoteId.getProtocol(), call.method.getName()));
} catch (Exception e) {
throw new RuntimeException(e); //local exception
}
- Builder builder = rpcResponseType.newBuilderForType();
- builder.mergeDelimitedFrom(in);
- Message value = builder.build();
- // it's possible that this call may have been cleaned up due to a RPC
- // timeout, so check if it still exists before setting the value.
- if (call != null) {
- call.setValue(value);
+ Message value = null;
+ if (rpcResponseType != null) {
+ Builder builder = rpcResponseType.newBuilderForType();
+ builder.mergeDelimitedFrom(in);
+ value = builder.build();
}
- calls.remove(id);
- } else if (status == Status.ERROR) {
- RpcException exceptionResponse = RpcException.parseDelimitedFrom(in);
- if (call != null) {
- //noinspection ThrowableInstanceNeverThrown
- call.setException(new RemoteException(
- exceptionResponse.getExceptionName(),
- exceptionResponse.getStackTrace()));
- calls.remove(id);
+ CellScanner cellBlockScanner = null;
+ if (responseHeader.hasCellBlockMeta()) {
+ int size = responseHeader.getCellBlockMeta().getLength();
+ byte [] cellBlock = new byte[size];
+ IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
+ cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
}
- } else if (status == Status.FATAL) {
- RpcException exceptionResponse = RpcException.parseDelimitedFrom(in);
- // Close the connection
- markClosed(new RemoteException(
- exceptionResponse.getExceptionName(),
- exceptionResponse.getStackTrace()));
+ // it's possible that this call may have been cleaned up due to a RPC
+ // timeout, so check if it still exists before setting the value.
+ if (call != null) call.setResponse(value, cellBlockScanner);
}
+ if (call != null) calls.remove(id);
} catch (IOException e) {
if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
// Clean up open calls but don't treat this as a fatal condition,
@@ -1043,6 +1052,30 @@ public class HBaseClient {
}
}
+ /**
+ * @param e
+ * @return True if the exception is a fatal connection exception.
+ */
+ private boolean isFatalConnectionException(final ExceptionResponse e) {
+ return e.getExceptionClassName().
+ equals(FatalConnectionException.class.getName());
+ }
+
+ /**
+ * @param e
+ * @return RemoteException made from passed <code>e</code>
+ */
+ private RemoteException createRemoteException(final ExceptionResponse e) {
+ String innerExceptionClassName = e.getExceptionClassName();
+ boolean doNotRetry = e.getDoNotRetry();
+ return e.hasHostname()?
+ // If a hostname then add it to the RemoteWithExtrasException
+ new RemoteWithExtrasException(innerExceptionClassName,
+ e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
+ new RemoteWithExtrasException(innerExceptionClassName,
+ e.getStackTrace(), doNotRetry);
+ }
+
protected synchronized void markClosed(IOException e) {
if (shouldCloseConnection.compareAndSet(false, true)) {
closeException = e;
@@ -1103,53 +1136,13 @@ public class HBaseClient {
/**
* Client-side call timeout
*/
+ @SuppressWarnings("serial")
public static class CallTimeoutException extends IOException {
public CallTimeoutException(final String msg) {
super(msg);
}
}
- /** Call implementation used for parallel calls. */
- protected class ParallelCall extends Call {
- private final ParallelResults results;
- protected final int index;
-
- public ParallelCall(RpcRequestBody param, ParallelResults results, int index) {
- super(param);
- this.results = results;
- this.index = index;
- }
-
- /** Deliver result to result collector. */
- @Override
- protected void callComplete() {
- results.callComplete(this);
- }
- }
-
- /** Result collector for parallel calls. */
- protected static class ParallelResults {
- protected final Message[] values;
- protected int size;
- protected int count;
-
- public ParallelResults(int size) {
- this.values = new RpcResponseBody[size];
- this.size = size;
- }
-
- /*
- * Collect a result.
- */
- synchronized void callComplete(ParallelCall call) {
- // FindBugs IS2_INCONSISTENT_SYNC
- values[call.index] = call.value; // store the value
- count++; // count it
- if (count == size) // if all values are in
- notify(); // then notify waiting caller
- }
- }
-
/**
* Construct an IPC client whose values are of the {@link Message}
* class.
@@ -1165,9 +1158,12 @@ public class HBaseClient {
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
this.pingInterval = getPingInterval(conf);
if (LOG.isDebugEnabled()) {
- LOG.debug("The ping interval is" + this.pingInterval + "ms.");
+ LOG.debug("Ping interval: " + this.pingInterval + "ms.");
}
+ this.ipcUtil = new IPCUtil(conf);
this.conf = conf;
+ this.codec = getCodec(conf);
+ this.compressor = getCompressor(conf);
this.socketFactory = factory;
this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
this.connections = new PoolMap<ConnectionId, Connection>(
@@ -1176,6 +1172,35 @@ public class HBaseClient {
}
/**
+ * Encapsulate the ugly casting and RuntimeException conversion in private method.
+ * @param conf
+ * @return Codec to use on this client.
+ */
+ private static Codec getCodec(final Configuration conf) {
+ String className = conf.get("hbase.client.rpc.codec", KeyValueCodec.class.getCanonicalName());
+ try {
+ return (Codec)Class.forName(className).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed getting codec " + className, e);
+ }
+ }
+
+ /**
+ * Encapsulate the ugly casting and RuntimeException conversion in private method.
+ * @param conf
+ * @return The compressor to use on this client.
+ */
+ private static CompressionCodec getCompressor(final Configuration conf) {
+ String className = conf.get("hbase.client.rpc.compressor", null);
+ if (className == null || className.isEmpty()) return null;
+ try {
+ return (CompressionCodec)Class.forName(className).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed getting compressor " + className, e);
+ }
+ }
+
+ /**
* Construct an IPC client with the default SocketFactory
* @param conf configuration
*/
@@ -1250,36 +1275,30 @@ public class HBaseClient {
}
/** Make a call, passing <code>param</code>, to the IPC server running at
- * <code>address</code>, returning the value. Throws exceptions if there are
- * network problems or if the remote code threw an exception.
- * @param param RpcRequestBody parameter
- * @param address network address
- * @return Message
- * @throws IOException e
- */
- public Message call(RpcRequestBody param, InetSocketAddress address)
- throws IOException, InterruptedException {
- return call(param, address, null, 0);
- }
-
- public Message call(RpcRequestBody param, InetSocketAddress addr,
- User ticket, int rpcTimeout)
- throws IOException, InterruptedException {
- return call(param, addr, null, ticket, rpcTimeout);
- }
-
- /** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code> protocol,
* with the <code>ticket</code> credentials, returning the value.
* Throws exceptions if there are network problems or if the remote code
- * threw an exception. */
- public Message call(RpcRequestBody param, InetSocketAddress addr,
- Class<? extends IpcProtocol> protocol,
- User ticket, int rpcTimeout)
+ * threw an exception.
+ * @param method
+ * @param param
+ * @param cells
+ * @param addr
+ * @param protocol
+ * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
+ * {@link User#getCurrent()} makes a new instance of User each time so will be a new Connection
+ * each time.
+ * @param rpcTimeout
+ * @return A pair with the Message response and the Cell data (if any).
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public Pair<Message, CellScanner> call(Method method, Message param, CellScanner cells,
+ InetSocketAddress addr, Class<? extends IpcProtocol> protocol, User ticket, int rpcTimeout)
throws InterruptedException, IOException {
- Call call = new Call(param);
- Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);
- connection.sendParam(call); // send the parameter
+ Call call = new Call(method, param, cells);
+ Connection connection =
+ getConnection(addr, protocol, ticket, rpcTimeout, call, this.codec, this.compressor);
+ connection.writeRequest(call); // send the parameter
boolean interrupted = false;
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (call) {
@@ -1305,7 +1324,7 @@ public class HBaseClient {
// local exception
throw wrapException(addr, call.error);
}
- return call.value;
+ return new Pair<Message, CellScanner>(call.response, call.cells);
}
}
@@ -1329,14 +1348,11 @@ public class HBaseClient {
"Call to " + addr + " failed on connection exception: " + exception)
.initCause(exception);
} else if (exception instanceof SocketTimeoutException) {
- return (SocketTimeoutException)new SocketTimeoutException(
- "Call to " + addr + " failed on socket timeout exception: "
- + exception).initCause(exception);
+ return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr +
+ " failed on socket timeout exception: " + exception).initCause(exception);
} else {
- return (IOException)new IOException(
- "Call to " + addr + " failed on local exception: " + exception)
- .initCause(exception);
-
+ return (IOException)new IOException("Call to " + addr + " failed on local exception: " +
+ exception).initCause(exception);
}
}
@@ -1364,51 +1380,11 @@ public class HBaseClient {
}
}
- /** Makes a set of calls in parallel. Each parameter is sent to the
- * corresponding address. When all values are available, or have timed out
- * or errored, the collected results are returned in an array. The array
- * contains nulls for calls that timed out or errored. */
- public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses,
- Class<? extends IpcProtocol> protocol,
- User ticket)
- throws IOException, InterruptedException {
- if (addresses.length == 0) return new RpcResponseBody[0];
-
- ParallelResults results = new ParallelResults(params.length);
- // TODO this synchronization block doesnt make any sense, we should possibly fix it
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (results) {
- for (int i = 0; i < params.length; i++) {
- ParallelCall call = new ParallelCall(params[i], results, i);
- try {
- Connection connection =
- getConnection(addresses[i], protocol, ticket, 0, call);
- connection.sendParam(call); // send each parameter
- } catch (IOException e) {
- // log errors
- LOG.info("Calling "+addresses[i]+" caught: " +
- e.getMessage(),e);
- results.size--; // wait for one fewer result
- }
- }
- while (results.count != results.size) {
- try {
- results.wait(); // wait for all results
- } catch (InterruptedException ignored) {}
- }
-
- return results.values;
- }
- }
-
/* Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given host/port are reused. */
- protected Connection getConnection(InetSocketAddress addr,
- Class<? extends IpcProtocol> protocol,
- User ticket,
- int rpcTimeout,
- Call call)
- throws IOException, InterruptedException {
+ protected Connection getConnection(InetSocketAddress addr, Class<? extends IpcProtocol> protocol,
+ User ticket, int rpcTimeout, Call call, final Codec codec, final CompressionCodec compressor)
+ throws IOException, InterruptedException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
@@ -1422,7 +1398,7 @@ public class HBaseClient {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
- connection = createConnection(remoteId);
+ connection = createConnection(remoteId, this.codec, this.compressor);
connections.put(remoteId, connection);
}
}
@@ -1472,6 +1448,12 @@ public class HBaseClient {
}
@Override
+ public String toString() {
+ return this.address.toString() + "/" + this.protocol + "/" + this.ticket + "/" +
+ this.rpcTimeout;
+ }
+
+ @Override
public boolean equals(Object obj) {
if (obj instanceof ConnectionId) {
ConnectionId id = (ConnectionId) obj;
@@ -1484,9 +1466,9 @@ public class HBaseClient {
@Override // simply use the default Object#hashcode() ?
public int hashCode() {
- return (address.hashCode() + PRIME * (
- PRIME * System.identityHashCode(protocol) ^
- (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
+ int hashcode = (address.hashCode() + PRIME * (PRIME * System.identityHashCode(protocol) ^
+ (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
+ return hashcode;
}
}
}
\ No newline at end of file
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+
+/**
+ * Utility to help ipc'ing.
+ */
+class IPCUtil {
+ public static final Log LOG = LogFactory.getLog(IPCUtil.class);
+ private final int cellBlockBuildingInitialBufferSize;
+ /**
+ * How much we think the decompressor will expand the original compressed content.
+ */
+ private final int cellBlockDecompressionMultiplier;
+ private final Configuration conf;
+
+ IPCUtil(final Configuration conf) {
+ super();
+ this.conf = conf;
+ this.cellBlockBuildingInitialBufferSize =
+ conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024);
+ this.cellBlockDecompressionMultiplier =
+ conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
+ }
+
+ /**
+ * Build a cell block using passed in <code>codec</code>
+ * @param codec
+ * @param compressor
+ * @Param cells
+ * @return Null or byte buffer filled with passed-in Cells encoded using passed in
+ * <code>codec</code>; the returned buffer has been flipped and is ready for
+ * reading. Use limit to find total size.
+ * @throws IOException
+ */
+ @SuppressWarnings("resource")
+ ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
+ final CellScanner cells)
+ throws IOException {
+ if (cells == null) return null;
+ // TOOD: Reuse buffers?
+ // Presizing doesn't work because can't tell what size will be when serialized.
+ // BBOS will resize itself.
+ ByteBufferOutputStream baos =
+ new ByteBufferOutputStream(this.cellBlockBuildingInitialBufferSize);
+ OutputStream os = baos;
+ Compressor poolCompressor = null;
+ try {
+ if (compressor != null) {
+ if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
+ poolCompressor = CodecPool.getCompressor(compressor);
+ os = compressor.createOutputStream(os, poolCompressor);
+ }
+ Codec.Encoder encoder = codec.getEncoder(os);
+ while (cells.advance()) {
+ encoder.write(cells.current());
+ }
+ encoder.flush();
+ } finally {
+ os.close();
+ if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
+ }
+ if (this.cellBlockBuildingInitialBufferSize < baos.size()) {
+ LOG.warn("Buffer grew from " + this.cellBlockBuildingInitialBufferSize +
+ " to " + baos.size());
+ }
+ return baos.getByteBuffer();
+ }
+
+ /**
+ * @param codec
+ * @param cellBlock
+ * @return CellScanner to work against the content of <code>cellBlock</code>
+ * @throws IOException
+ */
+ CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+ final byte [] cellBlock)
+ throws IOException {
+ return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
+ }
+
+ /**
+ * @param codec
+ * @param cellBlock
+ * @param offset
+ * @param length
+ * @return CellScanner to work against the content of <code>cellBlock</code>
+ * @throws IOException
+ */
+ CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+ final byte [] cellBlock, final int offset, final int length)
+ throws IOException {
+ // If compressed, decompress it first before passing it on else we will leak compression
+ // resources if the stream is not closed properly after we let it out.
+ InputStream is = null;
+ if (compressor != null) {
+ // GZIPCodec fails w/ NPE if no configuration.
+ if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
+ Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
+ CompressionInputStream cis =
+ compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length),
+ poolDecompressor);
+ try {
+ // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
+ // TODO: Reuse buffers.
+ ByteBufferOutputStream bbos = new ByteBufferOutputStream((length - offset) *
+ this.cellBlockDecompressionMultiplier);
+ IOUtils.copy(cis, bbos);
+ bbos.close();
+ ByteBuffer bb = bbos.getByteBuffer();
+ is = new ByteArrayInputStream(bb.array(), 0, bb.limit());
+ } finally {
+ if (is != null) is.close();
+ CodecPool.returnDecompressor(poolDecompressor);
+ }
+ } else {
+ is = new ByteArrayInputStream(cellBlock, offset, length);
+ }
+ return codec.getDecoder(is);
+ }
+
+ /**
+ * Write out header, param, and cell block if there to a {@link ByteBufferOutputStream} sized
+ * to hold these elements.
+ * @param header
+ * @param param
+ * @param cellBlock
+ * @return A {@link ByteBufferOutputStream} filled with the content of the passed in
+ * <code>header</code>, <code>param</code>, and <code>cellBlock</code>.
+ * @throws IOException
+ */
+ static ByteBufferOutputStream write(final Message header, final Message param,
+ final ByteBuffer cellBlock)
+ throws IOException {
+ int totalSize = getTotalSizeWhenWrittenDelimited(header, param);
+ if (cellBlock != null) totalSize += cellBlock.limit();
+ ByteBufferOutputStream bbos = new ByteBufferOutputStream(totalSize);
+ write(bbos, header, param, cellBlock, totalSize);
+ bbos.close();
+ return bbos;
+ }
+
+ /**
+ * Write out header, param, and cell block if there is one.
+ * @param dos
+ * @param header
+ * @param param
+ * @param cellBlock
+ * @return Total number of bytes written.
+ * @throws IOException
+ */
+ static int write(final OutputStream dos, final Message header, final Message param,
+ final ByteBuffer cellBlock)
+ throws IOException {
+ // Must calculate total size and write that first so other side can read it all in in one
+ // swoop. This is dictated by how the server is currently written. Server needs to change
+ // if we are to be able to write without the length prefixing.
+ int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
+ if (cellBlock != null) totalSize += cellBlock.remaining();
+ return write(dos, header, param, cellBlock, totalSize);
+ }
+
+ private static int write(final OutputStream dos, final Message header, final Message param,
+ final ByteBuffer cellBlock, final int totalSize)
+ throws IOException {
+ // I confirmed toBytes does same as say DataOutputStream#writeInt.
+ dos.write(Bytes.toBytes(totalSize));
+ header.writeDelimitedTo(dos);
+ if (param != null) param.writeDelimitedTo(dos);
+ if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
+ dos.flush();
+ return totalSize;
+ }
+
+ /**
+ * @param in Stream cue'd up just before a delimited message
+ * @return Bytes that hold the bytes that make up the message read from <code>in</code>
+ * @throws IOException
+ */
+ static byte [] getDelimitedMessageBytes(final DataInputStream in) throws IOException {
+ byte b = in.readByte();
+ int size = CodedInputStream.readRawVarint32(b, in);
+ // Allocate right-sized buffer rather than let pb allocate its default minimum 4k.
+ byte [] bytes = new byte[size];
+ IOUtils.readFully(in, bytes);
+ return bytes;
+ }
+
+ /**
+ * @param header
+ * @param body
+ * @return Size on the wire when the two messages are written with writeDelimitedTo
+ */
+ static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
+ int totalSize = 0;
+ for (Message m: messages) {
+ if (m == null) continue;
+ totalSize += m.getSerializedSize();
+ totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
+ }
+ Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
+ return totalSize;
+ }
+}
\ No newline at end of file
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+/**
+ * Optionally carries Cells across the proxy/service interface down into ipc. On its
+ * way out it optionally carries a set of result Cell data. We stick the Cells here when we want
+ * to avoid having to protobuf them. This class is used ferrying data across the proxy/protobuf
+ * service chasm. Used by client and server ipc'ing.
+ */
+@InterfaceAudience.Private
+public class PayloadCarryingRpcController implements RpcController, CellScannable {
+ // TODO: Fill out the rest of this class methods rather than return UnsupportedOperationException
+
+ /**
+ * They are optionally set on construction, cleared after we make the call, and then optionally
+ * set on response with the result. We use this lowest common denominator access to Cells because
+ * sometimes the scanner is backed by a List of Cells and other times, it is backed by an
+ * encoded block that implements CellScanner.
+ */
+ private CellScanner cellScanner;
+
+ public PayloadCarryingRpcController() {
+ this((CellScanner)null);
+ }
+
+ public PayloadCarryingRpcController(final CellScanner cellScanner) {
+ this.cellScanner = cellScanner;
+ }
+
+ public PayloadCarryingRpcController(final List<CellScannable> cellIterables) {
+ this.cellScanner = CellUtil.createCellScanner(cellIterables);
+ }
+
+ /**
+ * @return One-shot cell scanner (you cannot back it up and restart)
+ */
+ public CellScanner cellScanner() {
+ return cellScanner;
+ }
+
+ public void setCellScanner(final CellScanner cellScanner) {
+ this.cellScanner = cellScanner;
+ }
+
+ @Override
+ public String errorText() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean failed() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isCanceled() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void notifyOnCancel(RpcCallback<Object> arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reset() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setFailed(String arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void startCancel() {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java Wed Mar 20 19:36:46 2013
@@ -24,9 +24,10 @@ import com.google.protobuf.ServiceExcept
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.IpcProtocol;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.ipc.RemoteException;
import java.io.IOException;
@@ -77,7 +78,8 @@ public class ProtobufRpcClientEngine imp
final private int rpcTimeout;
public Invoker(Class<? extends IpcProtocol> protocol, InetSocketAddress addr, User ticket,
- int rpcTimeout, HBaseClient client) throws IOException {
+ int rpcTimeout, HBaseClient client)
+ throws IOException {
this.protocol = protocol;
this.address = addr;
this.ticket = ticket;
@@ -85,30 +87,6 @@ public class ProtobufRpcClientEngine imp
this.rpcTimeout = rpcTimeout;
}
- private RpcRequestBody constructRpcRequest(Method method,
- Object[] params) throws ServiceException {
- RpcRequestBody rpcRequest;
- RpcRequestBody.Builder builder = RpcRequestBody.newBuilder();
- builder.setMethodName(method.getName());
- Message param;
- int length = params.length;
- if (length == 2) {
- // RpcController + Message in the method args
- // (generated code from RPC bits in .proto files have RpcController)
- param = (Message)params[1];
- } else if (length == 1) { // Message
- param = (Message)params[0];
- } else {
- throw new ServiceException("Too many parameters for request. Method: ["
- + method.getName() + "]" + ", Expected: 2, Actual: "
- + params.length);
- }
- builder.setRequestClassName(param.getClass().getName());
- builder.setRequest(param.toByteString());
- rpcRequest = builder.build();
- return rpcRequest;
- }
-
/**
* This is the client side invoker of RPC method. It only throws
* ServiceException, since the invocation proxy expects only
@@ -122,33 +100,51 @@ public class ProtobufRpcClientEngine imp
* set as cause in ServiceException</li>
* </ol>
*
- * Note that the client calling protobuf RPC methods, must handle
+ * <p>Note that the client calling protobuf RPC methods, must handle
* ServiceException by getting the cause from the ServiceException. If the
* cause is RemoteException, then unwrap it to get the exception thrown by
* the server.
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args)
- throws ServiceException {
+ throws ServiceException {
long startTime = 0;
- if (LOG.isDebugEnabled()) {
+ if (LOG.isTraceEnabled()) {
startTime = System.currentTimeMillis();
}
-
- RpcRequestBody rpcRequest = constructRpcRequest(method, args);
- Message val = null;
+ if (args.length != 2) {
+ throw new ServiceException(method.getName() + " didn't get two args: " + args.length);
+ }
+ // Get the controller. Often null. Presume payload carrying controller. Payload is optional.
+ // It is cells/data that we do not want to protobuf.
+ PayloadCarryingRpcController controller = (PayloadCarryingRpcController)args[0];
+ CellScanner cells = null;
+ if (controller != null) {
+ cells = controller.cellScanner();
+ // Clear it here so we don't by mistake try and these cells processing results.
+ controller.setCellScanner(null);
+ }
+ // The request parameter
+ Message param = (Message)args[1];
+ Pair<Message, CellScanner> val = null;
try {
- val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout);
+ val = client.call(method, param, cells, address, protocol, ticket, rpcTimeout);
+ if (controller != null) {
+ // Shove the results into controller so can be carried across the proxy/pb service void.
+ if (val.getSecond() != null) controller.setCellScanner(val.getSecond());
+ } else if (val.getSecond() != null) {
+ throw new ServiceException("Client dropping data on the floor!");
+ }
- if (LOG.isDebugEnabled()) {
+ if (LOG.isTraceEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
if (LOG.isTraceEnabled()) LOG.trace("Call: " + method.getName() + " " + callTime);
}
- return val;
+ return val.getFirst();
} catch (Throwable e) {
if (e instanceof RemoteException) {
Throwable cause = ((RemoteException)e).unwrapRemoteException();
- throw new ServiceException(cause);
+ throw new ServiceException("methodName=" + method.getName(), cause);
}
throw new ServiceException(e);
}
@@ -158,8 +154,8 @@ public class ProtobufRpcClientEngine imp
if (returnTypes.containsKey(method.getName())) {
return returnTypes.get(method.getName());
}
-
Class<?> returnType = method.getReturnType();
+ if (returnType.getName().equals("void")) return null;
Method newInstMethod = returnType.getMethod("getDefaultInstance");
newInstMethod.setAccessible(true);
Message protoType = (Message) newInstMethod.invoke(null, (Object[]) null);