You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/03/20 20:36:47 UTC
svn commit: r1459013 [6/8] - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-client/src/test/java/org...
Modified: hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto Wed Mar 20 19:36:46 2013
@@ -51,7 +51,16 @@ message Get {
}
message Result {
- repeated KeyValue keyValue = 1;
+ // Result includes the Cells or else it just has a count of Cells
+ // that are carried otherwise.
+ repeated Cell cell = 1;
+ // The below count is set when the associated cells are
+ // not part of this protobuf message; they are passed alongside
+ // and then this Message is just a placeholder with metadata.
+ // The count is needed to know how many to peel off the block of Cells as
+ // ours. NOTE: This is different from the pb managed cellCount of the
+ // 'cell' field above which is non-null when the cells are pb'd.
+ optional int32 associatedCellCount = 2;
}
/**
@@ -118,24 +127,34 @@ message Condition {
required Comparator comparator = 5;
}
+
/**
- * A specific mutate inside a mutate request.
+ * A specific mutation inside a mutate request.
* It can be an append, increment, put or delete based
- * on the mutate type.
- */
-message Mutate {
- required bytes row = 1;
- required MutateType mutateType = 2;
+ * on the mutation type. It can be fully filled in or
+ * only metadata present because data is being carried
+ * elsewhere outside of pb.
+ */
+message MutationProto {
+ optional bytes row = 1;
+ optional MutationType mutateType = 2;
repeated ColumnValue columnValue = 3;
- repeated NameBytesPair attribute = 4;
- optional uint64 timestamp = 5;
+ optional uint64 timestamp = 4;
+ repeated NameBytesPair attribute = 5;
optional bool writeToWAL = 6 [default = true];
- // For some mutate, result may be returned, in which case,
+ // For some mutations, a result may be returned, in which case,
// time range can be specified for potential performance gain
- optional TimeRange timeRange = 10;
+ optional TimeRange timeRange = 7;
+ // The below count is set when the associated cells are NOT
+ // part of this protobuf message; they are passed alongside
+ // and then this Message is a placeholder with metadata. The
+ // count is needed to know how many to peel off the block of Cells as
+ // ours. NOTE: This is different from the pb managed cellCount of the
+ // 'cell' field above which is non-null when the cells are pb'd.
+ optional int32 associatedCellCount = 8;
- enum MutateType {
+ enum MutationType {
APPEND = 0;
INCREMENT = 1;
PUT = 2;
@@ -172,7 +191,7 @@ message Mutate {
*/
message MutateRequest {
required RegionSpecifier region = 1;
- required Mutate mutate = 2;
+ required MutationProto mutation = 2;
optional Condition condition = 3;
}
@@ -281,7 +300,7 @@ message CoprocessorServiceResponse {
* This is a union type - exactly one of the fields will be set.
*/
message MultiAction {
- optional Mutate mutate = 1;
+ optional MutationProto mutation = 1;
optional Get get = 2;
}
Modified: hbase/trunk/hbase-protocol/src/main/protobuf/MultiRowMutation.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/MultiRowMutation.proto?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/MultiRowMutation.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/MultiRowMutation.proto Wed Mar 20 19:36:46 2013
@@ -23,7 +23,7 @@ option java_generic_services = true;
option optimize_for = SPEED;
message MultiMutateRequest {
- repeated Mutate mutationRequest = 1;
+ repeated MutationProto mutationRequest = 1;
}
message MultiMutateResponse {
Modified: hbase/trunk/hbase-protocol/src/main/protobuf/RPC.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/RPC.proto?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/RPC.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/RPC.proto Wed Mar 20 19:36:46 2013
@@ -15,123 +15,117 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-/**
- * Specification of (unsecure) HBase RPC:
- *
- * Client needs to set up a connection first to a server serving a certain
- * HBase protocol (like ClientProtocol). Once the connection is set up, the
- * client and server communicates on that channel for RPC requests/responses.
- * The sections below describe the flow.
- *
- * As part of setting up a connection to a server, the client needs to send
- * the ConnectionHeader header. At the data level, this looks like
- * <"hrpc"-bytearray><'5'[byte]><length-of-serialized-ConnectionHeader-obj[int32]><ConnectionHeader-object serialized>
- *
- * For every RPC that the client makes it needs to send the following
- * RpcRequestHeader and the RpcRequestBody. At the data level this looks like:
- * <length-of-serialized-RpcRequestHeader + length-of-varint32-of-serialized-RpcRequestHeader +
- * length-of-serialized-RpcRequestBody + length-of-varint32-of-serialized-RpcRequestBody>
- * <RpcRequestHeader [serialized using Message.writeDelimitedTo]>
- * <RpcRequestBody [serialized using Message.writeDelimitedTo]>
- *
- * On a success, the server's protobuf response looks like
- * <RpcResponseHeader-object [serialized using Message.writeDelimitedTo]>
- * <RpcResponseBody-object [serialized using Message.writeDelimitedTo]>
- * On a failure, the server's protobuf response looks like
- * <RpcResponseHeader-object [serialized using Message.writeDelimitedTo]>
- * <RpcException-object [serialized using Message.writeDelimitedTo]>
- *
- * There is one special message that's sent from client to server -
- * the Ping message. At the data level, this is just the bytes corresponding
- * to integer -1.
- */
-
import "Tracing.proto";
+import "hbase.proto";
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
option java_outer_classname = "RPCProtos";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
+// See https://issues.apache.org/jira/browse/HBASE-7898 for high-level
+// description of RPC specification.
+//
+// On connection setup, the client sends six bytes of preamble -- a four
+// byte magic, a byte of version, and a byte of authentication type.
+//
+// We then send a "ConnectionHeader" protobuf of user information and the
+// 'protocol' or 'service' that is to be run over this connection as well as
+// info such as codecs and compression to use when we send cell blocks(see below).
+// This connection header protobuf is prefaced by an int that holds the length
+// of this connection header (this is NOT a varint). The pb connection header
+// is sent with Message#writeTo. The server throws an exception if it doesn't
+// like what it was sent noting what it is objecting too. Otherwise, the server
+// says nothing and is open for business.
+//
+// Hereafter the client makes requests and the server returns responses.
+//
+// Requests look like this:
+//
+// <An int with the total length of the request>
+// <RequestHeader Message written out using Message#writeDelimitedTo>
+// <Optionally a Request Parameter Message written out using Message#writeDelimitedTo>
+// <Optionally a Cell block>
+//
+// ...where the Request Parameter Message is whatever the method name stipulated
+// in the RequestHeader expects; e.g. if the method is a scan, then the pb
+// Request Message is a GetRequest, or a ScanRequest. A block of Cells
+// optionally follows. The presence of a Request param Message and/or a
+// block of Cells will be noted in the RequestHeader.
+//
+// Response is the mirror of the request:
+//
+// <An int with the total length of the response>
+// <ResponseHeader Message written out using Message#writeDelimitedTo>
+// <Optionally a Response Result Message written out using Message#writeDelimitedTo>
+// <Optionally a Cell block>
+//
+// ...where the Response Message is the response type that goes with the
+// method specified when making the request and the follow on Cell blocks may
+// or may not be there -- read the response header to find out if one following.
+// If an exception, it will be included inside the Response Header.
+//
+// Any time we write a pb, we do it with Message#writeDelimitedTo EXCEPT when
+// the connection header is sent; this is prefaced by an int with its length
+// and the pb connection header is then written with Message#writeTo.
+//
+
+// User Information proto. Included in ConnectionHeader on connection setup
message UserInformation {
required string effectiveUser = 1;
optional string realUser = 2;
}
+// This is sent on connection setup after the connection preamble is sent.
message ConnectionHeader {
- /** User Info beyond what is established at connection establishment
- * (applies to secure HBase setup)
- */
optional UserInformation userInfo = 1;
- /** Protocol name for next rpc layer
- * the client created a proxy with this protocol name
- */
optional string protocol = 2 [default = "org.apache.hadoop.hbase.client.ClientProtocol"];
-}
-
-
-/**
- * The RPC request header
- */
-message RpcRequestHeader {
- /** Monotonically increasing callId, mostly to keep track of RPCs */
- required uint32 callId = 1;
- optional RPCTInfo tinfo = 2;
-}
-/**
- * The RPC request body
- */
-message RpcRequestBody {
- /** Name of the RPC method */
- required string methodName = 1;
-
- /** Bytes corresponding to the client protobuf request. This is the actual
- * bytes corresponding to the RPC request argument.
- */
- optional bytes request = 2;
-
- /** Some metainfo about the request. Helps us to treat RPCs with
- * different priorities. For now this is just the classname of the request
- * proto object.
- */
- optional string requestClassName = 4;
-}
-
-/**
- * The RPC response header
- */
-message RpcResponseHeader {
- /** Echo back the callId the client sent */
- required uint32 callId = 1;
- /** Did the RPC execution encounter an error at the server */
- enum Status {
- SUCCESS = 0;
- ERROR = 1;
- FATAL = 2;
- }
- required Status status = 2;
-}
-/**
- * The RPC response body
- */
-message RpcResponseBody {
- /** Optional response bytes. This is the actual bytes corresponding to the
- * return value of the invoked RPC.
- */
- optional bytes response = 1;
-}
-/**
- * At the RPC layer, this message is used to indicate
- * the server side exception to the RPC client.
- *
- * HBase RPC client throws an exception indicated
- * by exceptionName with the stackTrace.
- */
-message RpcException {
- /** Class name of the exception thrown from the server */
- required string exceptionName = 1;
-
- /** Exception stack trace from the server side */
+ // Cell block codec we will use sending over optional cell blocks. Server throws exception
+ // if cannot deal.
+ optional string cellBlockCodecClass = 3 [default = "org.apache.hadoop.hbase.codec.KeyValueCodec"];
+ // Compressor we will use if cell block is compressed. Server will throw exception if not supported.
+ // Class must implement hadoop's CompressionCodec Interface
+ optional string cellBlockCompressorClass = 4;
+}
+
+// Optional Cell block Message. Included in client RequestHeader
+message CellBlockMeta {
+ // Length of the following cell block. Could calculate it but convenient having it too hand.
+ optional uint32 length = 1;
+}
+
+// At the RPC layer, this message is used to carry
+// the server side exception to the RPC client.
+message ExceptionResponse {
+ // Class name of the exception thrown from the server
+ optional string exceptionClassName = 1;
+ // Exception stack trace from the server side
optional string stackTrace = 2;
+ // Optional hostname. Filled in for some exceptions such as region moved
+ // where exception gives clue on where the region may have moved.
+ optional string hostname = 3;
+ optional int32 port = 4;
+ // Set if we are NOT to retry on receipt of this exception
+ optional bool doNotRetry = 5;
+}
+
+// Header sent making a request.
+message RequestHeader {
+ // Monotonically increasing callId to keep track of RPC requests and their response
+ optional uint32 callId = 1;
+ optional RPCTInfo traceInfo = 2;
+ optional string methodName = 3;
+ // If true, then a pb Message param follows.
+ optional bool requestParam = 4;
+ // If present, then an encoded data block follows.
+ optional CellBlockMeta cellBlockMeta = 5;
+ // TODO: Have client specify priority
+}
+
+message ResponseHeader {
+ optional uint32 callId = 1;
+ // If present, then request threw an exception and no response message (else we presume one)
+ optional ExceptionResponse exception = 2;
+ // If present, then an encoded data block follows.
+ optional CellBlockMeta cellBlockMeta = 3;
}
Modified: hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto Wed Mar 20 19:36:46 2013
@@ -24,6 +24,33 @@ option java_generate_equals_and_hash = t
option optimize_for = SPEED;
/**
+ * The type of the key in a Cell
+ */
+enum CellType {
+ MINIMUM = 0;
+ PUT = 4;
+
+ DELETE = 8;
+ DELETE_COLUMN = 12;
+ DELETE_FAMILY = 14;
+
+ // MAXIMUM is used when searching; you look from maximum on down.
+ MAXIMUM = 255;
+}
+
+/**
+ * Protocol buffer version of Cell.
+ */
+message Cell {
+ optional bytes row = 1;
+ optional bytes family = 2;
+ optional bytes qualifier = 3;
+ optional uint64 timestamp = 4;
+ optional CellType cellType = 5;
+ optional bytes value = 6;
+}
+
+/**
* Table Schema
* Inspired by the rest TableSchema
*/
@@ -201,21 +228,6 @@ enum CompareType {
}
/**
- * The type of the key in a KeyValue.
- */
-enum KeyType {
- MINIMUM = 0;
- PUT = 4;
-
- DELETE = 8;
- DELETE_COLUMN = 12;
- DELETE_FAMILY = 14;
-
- // MAXIMUM is used when searching; you look from maximum on down.
- MAXIMUM = 255;
-}
-
-/**
* Protocol buffer version of KeyValue.
* It doesn't have those transient parameters
*/
@@ -224,7 +236,7 @@ message KeyValue {
required bytes family = 2;
required bytes qualifier = 3;
optional uint64 timestamp = 4;
- optional KeyType keyType = 5;
+ optional CellType keyType = 5;
optional bytes value = 6;
}
@@ -288,4 +300,4 @@ message LongMsg {
message BigDecimalMsg {
required bytes bigdecimalMsg = 1;
-}
\ No newline at end of file
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java Wed Mar 20 19:36:46 2013
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.client.Mu
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiMutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiRowMutationService;
import org.apache.hadoop.hbase.util.Bytes;
@@ -301,7 +301,7 @@ public class MetaEditor {
CoprocessorRpcChannel channel = table.coprocessorService(row);
MultiMutateRequest.Builder mmrBuilder = MultiMutateRequest.newBuilder();
for (Put put : puts) {
- mmrBuilder.addMutationRequest(ProtobufUtil.toMutate(MutateType.PUT, put));
+ mmrBuilder.addMutationRequest(ProtobufUtil.toMutation(MutationType.PUT, put));
}
MultiRowMutationService.BlockingInterface service =
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java?rev=1459013&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java Wed Mar 20 19:36:46 2013
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.codec.BaseDecoder;
+import org.apache.hadoop.hbase.codec.BaseEncoder;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.CodecException;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Codec that just writes out Cell as a protobuf Cell Message. Does not write the mvcc stamp.
+ * Use a different codec if you want that in the stream.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MessageCodec implements Codec {
+ static class MessageEncoder extends BaseEncoder {
+ MessageEncoder(final OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void write(Cell cell) throws IOException {
+ checkFlushed();
+ HBaseProtos.Cell.Builder builder = HBaseProtos.Cell.newBuilder();
+ // This copies bytes from Cell to ByteString. I don't see anyway around the copy.
+ // ByteString is final.
+ builder.setRow(ByteString.copyFrom(cell.getRowArray(), cell.getRowOffset(),
+ cell.getRowLength()));
+ builder.setFamily(ByteString.copyFrom(cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength()));
+ builder.setQualifier(ByteString.copyFrom(cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength()));
+ builder.setTimestamp(cell.getTimestamp());
+ builder.setCellType(HBaseProtos.CellType.valueOf(cell.getTypeByte()));
+ builder.setValue(ByteString.copyFrom(cell.getValueArray(), cell.getValueOffset(),
+ cell.getValueLength()));
+ HBaseProtos.Cell pbcell = builder.build();
+ try {
+ pbcell.writeDelimitedTo(this.out);
+ } catch (IOException e) {
+ throw new CodecException(e);
+ }
+ }
+ }
+
+ static class MessageDecoder extends BaseDecoder {
+ MessageDecoder(final InputStream in) {
+ super(in);
+ }
+
+ protected Cell parseCell() throws IOException {
+ HBaseProtos.Cell pbcell = HBaseProtos.Cell.parseDelimitedFrom(this.in);
+ return CellUtil.createCell(pbcell.getRow().toByteArray(),
+ pbcell.getFamily().toByteArray(), pbcell.getQualifier().toByteArray(),
+ pbcell.getTimestamp(), (byte)pbcell.getCellType().getNumber(),
+ pbcell.getValue().toByteArray());
+ }
+ }
+
+ @Override
+ public Decoder getDecoder(InputStream is) {
+ return new MessageDecoder(is);
+ }
+
+ @Override
+ public Encoder getEncoder(OutputStream os) {
+ return new MessageEncoder(os);
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MultiRowMutationEndpoint.java Wed Mar 20 19:36:46 2013
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.exception
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiMutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiMutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiRowMutationService;
@@ -86,9 +86,9 @@ CoprocessorService, Coprocessor {
try {
// set of rows to lock, sorted to avoid deadlocks
SortedSet<byte[]> rowsToLock = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- List<Mutate> mutateRequestList = request.getMutationRequestList();
+ List<MutationProto> mutateRequestList = request.getMutationRequestList();
List<Mutation> mutations = new ArrayList<Mutation>(mutateRequestList.size());
- for (Mutate m : mutateRequestList) {
+ for (MutationProto m : mutateRequestList) {
mutations.add(ProtobufUtil.toMutation(m));
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Wed Mar 20 19:36:46 2013
@@ -23,9 +23,9 @@ import static org.apache.hadoop.fs.Commo
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -66,19 +66,22 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.IpcProtocol;
+import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.CallerDisconnectedException;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.security.AuthMethod;
@@ -87,11 +90,13 @@ import org.apache.hadoop.hbase.security.
import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -112,7 +117,10 @@ import org.cloudera.htrace.impl.NullSpan
import com.google.common.base.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedInputStream;
import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.TextFormat;
// Uses Writables doing sasl
/** A client for an IPC service. IPC calls take a single Protobuf message as a
@@ -126,9 +134,12 @@ import com.google.protobuf.Message;
*/
@InterfaceAudience.Private
public abstract class HBaseServer implements RpcServer {
+ public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
private final boolean authorize;
protected boolean isSecurityEnabled;
+ public static final byte CURRENT_VERSION = 0;
+
/**
* How many calls/handler are allowed in the queue.
*/
@@ -150,11 +161,7 @@ public abstract class HBaseServer implem
private final int warnDelayedCalls;
private AtomicInteger delayedCalls;
-
- public static final Log LOG =
- LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
- protected static final Log TRACELOG =
- LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer.trace");
+ private final IPCUtil ipcUtil;
private static final String AUTH_FAILED_FOR = "Auth failed for ";
private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
@@ -166,6 +173,7 @@ public abstract class HBaseServer implem
protected static final ThreadLocal<RpcServer> SERVER =
new ThreadLocal<RpcServer>();
private volatile boolean started = false;
+ private static final ReflectionCache methodCache = new ReflectionCache();
private static final Map<String, Class<? extends IpcProtocol>> PROTOCOL_CACHE =
new ConcurrentHashMap<String, Class<? extends IpcProtocol>>();
@@ -307,7 +315,10 @@ public abstract class HBaseServer implem
/** A call queued for handling. */
protected class Call implements RpcCallContext {
protected int id; // the client's call id
- protected RpcRequestBody rpcRequestBody; // the parameter passed
+ protected Method method;
+ protected Message param; // the parameter passed
+ // Optional cell data passed outside of protobufs.
+ protected CellScanner cellScanner;
protected Connection connection; // connection to client
protected long timestamp; // the time received when response is null
// the time served when response is not null
@@ -320,10 +331,12 @@ public abstract class HBaseServer implem
protected boolean isError;
protected TraceInfo tinfo;
- public Call(int id, RpcRequestBody rpcRequestBody, Connection connection,
- Responder responder, long size, TraceInfo tinfo) {
+ public Call(int id, Method method, Message param, CellScanner cellScanner,
+ Connection connection, Responder responder, long size, TraceInfo tinfo) {
this.id = id;
- this.rpcRequestBody = rpcRequestBody;
+ this.method = method;
+ this.param = param;
+ this.cellScanner = cellScanner;
this.connection = connection;
this.timestamp = System.currentTimeMillis();
this.response = null;
@@ -336,57 +349,74 @@ public abstract class HBaseServer implem
@Override
public String toString() {
- return rpcRequestBody.toString() + " from " + connection.toString();
+ return "callId: " + this.id + " methodName: " +
+ ((this.method != null)? this.method.getName(): null) + " param: " +
+ (this.param != null? TextFormat.shortDebugString(this.param): "") +
+ " from " + connection.toString();
}
protected synchronized void setSaslTokenResponse(ByteBuffer response) {
this.response = response;
}
- protected synchronized void setResponse(Object value, Status status,
- String errorClass, String error) {
- if (this.isError)
- return;
- if (errorClass != null) {
- this.isError = true;
- }
-
- ByteBufferOutputStream buf = null;
- if (value != null) {
- buf = new ByteBufferOutputStream(((Message)value).getSerializedSize());
- } else {
- buf = new ByteBufferOutputStream(BUFFER_INITIAL_SIZE);
- }
- DataOutputStream out = new DataOutputStream(buf);
+ protected synchronized void setResponse(Object m, final CellScanner cells,
+ Throwable t, String errorMsg) {
+ if (this.isError) return;
+ if (t != null) this.isError = true;
+ ByteBufferOutputStream bbos = null;
try {
- RpcResponseHeader.Builder builder = RpcResponseHeader.newBuilder();
+ ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
+ // Presume it a pb Message. Could be null.
+ Message result = (Message)m;
// Call id.
- builder.setCallId(this.id);
- builder.setStatus(status);
- builder.build().writeDelimitedTo(out);
- if (error != null) {
- RpcException.Builder b = RpcException.newBuilder();
- b.setExceptionName(errorClass);
- b.setStackTrace(error);
- b.build().writeDelimitedTo(out);
- } else {
- if (value != null) {
- ((Message)value).writeDelimitedTo(out);
- }
+ headerBuilder.setCallId(this.id);
+ if (t != null) {
+ ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
+ exceptionBuilder.setExceptionClassName(t.getClass().getName());
+ exceptionBuilder.setStackTrace(errorMsg);
+ exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
+ if (t instanceof RegionMovedException) {
+ // Special casing for this exception. This is only one carrying a payload.
+ // Do this instead of build a generic system for allowing exceptions carry
+ // any kind of payload.
+ RegionMovedException rme = (RegionMovedException)t;
+ exceptionBuilder.setHostname(rme.getHostname());
+ exceptionBuilder.setPort(rme.getPort());
+ }
+ // Set the exception as the result of the method invocation.
+ headerBuilder.setException(exceptionBuilder.build());
+ }
+ ByteBuffer cellBlock =
+ ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells);
+ if (cellBlock != null) {
+ CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
+ // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
+ cellBlockBuilder.setLength(cellBlock.limit());
+ headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
+ Message header = headerBuilder.build();
+ bbos = IPCUtil.write(header, result, cellBlock);
if (connection.useWrap) {
- wrapWithSasl(buf);
+ wrapWithSasl(bbos);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Header " + TextFormat.shortDebugString(header) +
+ ", result " + (result != null? TextFormat.shortDebugString(result): "null"));
}
} catch (IOException e) {
LOG.warn("Exception while creating response " + e);
}
- ByteBuffer bb = buf.getByteBuffer();
- bb.position(0);
+ ByteBuffer bb = null;
+ if (bbos != null) {
+ // TODO: If SASL, maybe buffer already been flipped and written?
+ bb = bbos.getByteBuffer();
+ bb.position(0);
+ }
this.response = bb;
}
private void wrapWithSasl(ByteBufferOutputStream response)
- throws IOException {
+ throws IOException {
if (connection.useSasl) {
// getByteBuffer calls flip()
ByteBuffer buf = response.getByteBuffer();
@@ -413,8 +443,9 @@ public abstract class HBaseServer implem
assert this.delayReturnValue || result == null;
this.delayResponse = false;
delayedCalls.decrementAndGet();
- if (this.delayReturnValue)
- this.setResponse(result, Status.SUCCESS, null, null);
+ if (this.delayReturnValue) {
+ this.setResponse(result, null, null, null);
+ }
this.responder.doRespond(this);
}
@@ -437,8 +468,7 @@ public abstract class HBaseServer implem
@Override
public synchronized void endDelayThrowing(Throwable t) throws IOException {
- this.setResponse(null, Status.ERROR, t.getClass().toString(),
- StringUtils.stringifyException(t));
+ this.setResponse(null, null, t, StringUtils.stringifyException(t));
this.delayResponse = false;
this.sendResponseIfReady();
}
@@ -517,7 +547,7 @@ public abstract class HBaseServer implem
readers[i] = reader;
readPool.execute(reader);
}
- LOG.info("Started " + readThreads + " reader(s) in Listener.");
+ LOG.info(getName() + ": started " + readThreads + " reader(s) in Listener.");
// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
@@ -540,7 +570,7 @@ public abstract class HBaseServer implem
try {
readSelector.close();
} catch (IOException ioe) {
- LOG.error("Error closing read selector in " + getName(), ioe);
+ LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
}
}
}
@@ -567,11 +597,11 @@ public abstract class HBaseServer implem
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
- LOG.info(getName() + " unexpectedly interrupted: " +
- StringUtils.stringifyException(e));
+ LOG.info(getName() + ": unexpectedly interrupted: " +
+ StringUtils.stringifyException(e));
}
} catch (IOException ex) {
- LOG.error("Error in Reader", ex);
+ LOG.error(getName() + ": error in Reader", ex);
}
}
}
@@ -674,7 +704,7 @@ public abstract class HBaseServer implem
} catch (OutOfMemoryError e) {
if (errorHandler != null) {
if (errorHandler.checkOOME(e)) {
- LOG.info(getName() + ": exiting on OOME");
+ LOG.info(getName() + ": exiting on OutOfMemoryError");
closeCurrentConnection(key, e);
cleanupConnections(true);
return;
@@ -683,7 +713,7 @@ public abstract class HBaseServer implem
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
- LOG.warn("Out of Memory in server select", e);
+ LOG.warn(getName() + ": OutOfMemoryError in server select", e);
closeCurrentConnection(key, e);
cleanupConnections(true);
try { Thread.sleep(60000); } catch (Exception ignored) {}
@@ -693,7 +723,7 @@ public abstract class HBaseServer implem
}
cleanupConnections(false);
}
- LOG.info("Stopping " + this.getName());
+ LOG.info(getName() + ": stopping");
synchronized (this) {
try {
@@ -750,7 +780,7 @@ public abstract class HBaseServer implem
numConnections++;
}
if (LOG.isDebugEnabled())
- LOG.debug("Server connection from " + c.toString() +
+ LOG.debug(getName() + ": connection from " + c.toString() +
"; # active connections: " + numConnections +
"; # queued calls: " + callQueue.size());
} finally {
@@ -766,24 +796,23 @@ public abstract class HBaseServer implem
return;
}
c.setLastContact(System.currentTimeMillis());
-
try {
count = c.readAndProcess();
} catch (InterruptedException ieo) {
throw ieo;
} catch (Exception e) {
- LOG.warn(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
+ LOG.warn(getName() + ": count of bytes read: " + count, e);
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": disconnecting client " +
- c.getHostAddress() + ". Number of active connections: "+
- numConnections);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
+ ", because count=" + count +
+ ". Number of active connections: " + numConnections);
+ }
closeConnection(c);
// c = null;
- }
- else {
+ } else {
c.setLastContact(System.currentTimeMillis());
}
}
@@ -797,7 +826,7 @@ public abstract class HBaseServer implem
try {
acceptChannel.socket().close();
} catch (IOException e) {
- LOG.info(getName() + ":Exception in closing listener socket. " + e);
+ LOG.info(getName() + ": exception in closing listener socket. " + e);
}
}
readPool.shutdownNow();
@@ -830,11 +859,11 @@ public abstract class HBaseServer implem
try {
doRunLoop();
} finally {
- LOG.info("Stopping " + this.getName());
+ LOG.info(getName() + ": stopping");
try {
writeSelector.close();
} catch (IOException ioe) {
- LOG.error("Couldn't close write selector in " + this.getName(), ioe);
+ LOG.error(getName() + ": couldn't close write selector", ioe);
}
}
}
@@ -855,7 +884,7 @@ public abstract class HBaseServer implem
doAsyncWrite(key);
}
} catch (IOException e) {
- LOG.info(getName() + ": doAsyncWrite threw exception " + e);
+ LOG.info(getName() + ": asyncWrite", e);
}
}
long now = System.currentTimeMillis();
@@ -867,7 +896,7 @@ public abstract class HBaseServer implem
// If there were some calls that have not been sent out for a
// long time, discard them.
//
- LOG.debug("Checking for old call responses.");
+ if (LOG.isDebugEnabled()) LOG.debug(getName() + ": checking for old call responses.");
ArrayList<Call> calls;
// get the list of channels from list of keys.
@@ -887,13 +916,13 @@ public abstract class HBaseServer implem
try {
doPurge(call, now);
} catch (IOException e) {
- LOG.warn("Error in purging old calls " + e);
+ LOG.warn(getName() + ": error in purging old calls " + e);
}
}
} catch (OutOfMemoryError e) {
if (errorHandler != null) {
if (errorHandler.checkOOME(e)) {
- LOG.info(getName() + ": exiting on OOME");
+ LOG.info(getName() + ": exiting on OutOfMemoryError");
return;
}
} else {
@@ -902,15 +931,15 @@ public abstract class HBaseServer implem
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
//
- LOG.warn("Out of Memory in server select", e);
+ LOG.warn(getName() + ": OutOfMemoryError in server select", e);
try { Thread.sleep(60000); } catch (Exception ignored) {}
}
} catch (Exception e) {
- LOG.warn("Exception in Responder " +
+ LOG.warn(getName() + ": exception in Responder " +
StringUtils.stringifyException(e));
}
}
- LOG.info("Stopping " + this.getName());
+ LOG.info(getName() + ": stopped");
}
private void doAsyncWrite(SelectionKey key) throws IOException {
@@ -958,8 +987,8 @@ public abstract class HBaseServer implem
// Processes one response. Returns true if there are no more pending
// data for this channel.
//
- private boolean processResponse(final LinkedList<Call> responseQueue,
- boolean inHandler) throws IOException {
+ private boolean processResponse(final LinkedList<Call> responseQueue, boolean inHandler)
+ throws IOException {
boolean error = true;
boolean done = false; // there is more data for this channel.
int numElements;
@@ -980,10 +1009,6 @@ public abstract class HBaseServer implem
//
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.id + " from " +
- call.connection);
- }
//
// Send as much data as we can in the non-blocking fashion
//
@@ -1000,8 +1025,8 @@ public abstract class HBaseServer implem
done = false; // more calls pending to be sent.
}
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.id + " from " +
- call.connection + " Wrote " + numBytes + " bytes.");
+ LOG.debug(getName() + ": callId: " + call.id + " sent, wrote " + numBytes +
+ " bytes.");
}
} else {
//
@@ -1017,16 +1042,15 @@ public abstract class HBaseServer implem
done = true;
}
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.id + " from " +
- call.connection + " Wrote partial " + numBytes +
- " bytes.");
+ LOG.debug(getName() + call.toString() + " partially sent, wrote " +
+ numBytes + " bytes.");
}
}
error = false; // everything went off well
}
} finally {
if (error && call != null) {
- LOG.warn(getName()+", call " + call + ": output error");
+ LOG.warn(getName() + call.toString() + ": output error");
done = true; // error. no more data for this channel.
closeConnection(call.connection);
}
@@ -1090,56 +1114,90 @@ public abstract class HBaseServer implem
}
}
+ @SuppressWarnings("serial")
+ public static class CallQueueTooBigException extends IOException {
+ CallQueueTooBigException() {
+ super();
+ }
+ }
+
+ private Function<Pair<RequestHeader, Message>, Integer> qosFunction = null;
+
+ /**
+ * Gets the QOS level for this call. If it is higher than the highPriorityLevel and there
+ * are priorityHandlers available it will be processed in it's own thread set.
+ *
+ * @param newFunc
+ */
+ @Override
+ public void setQosFunction(Function<Pair<RequestHeader, Message>, Integer> newFunc) {
+ qosFunction = newFunc;
+ }
+
+ protected int getQosLevel(Pair<RequestHeader, Message> headerAndParam) {
+ if (qosFunction == null) return 0;
+ Integer res = qosFunction.apply(headerAndParam);
+ return res == null? 0: res;
+ }
+
/** Reads calls from a connection and queues them for handling. */
public class Connection {
- private boolean rpcHeaderRead = false; //if initial signature and
- //version are read
- private boolean headerRead = false; //if the connection header that
- //follows version is read.
+ // If initial preamble with version and magic has been read or not.
+ private boolean connectionPreambleRead = false;
+ // If the connection header has been read or not.
+ private boolean connectionHeaderRead = false;
protected SocketChannel channel;
private ByteBuffer data;
private ByteBuffer dataLengthBuffer;
protected final LinkedList<Call> responseQueue;
private volatile int rpcCount = 0; // number of outstanding rpcs
private long lastContact;
- private int dataLength;
private InetAddress addr;
protected Socket socket;
// Cache the remote host & port info so that even if the socket is
// disconnected, we can say where it used to connect to.
protected String hostAddress;
protected int remotePort;
- ConnectionHeader header;
+ ConnectionHeader connectionHeader;
+ /**
+ * Codec the client asked use.
+ */
+ private Codec codec;
+ /**
+ * Compression codec the client asked us use.
+ */
+ private CompressionCodec compressionCodec;
Class<? extends IpcProtocol> protocol;
protected UserGroupInformation user = null;
private AuthMethod authMethod;
private boolean saslContextEstablished;
private boolean skipInitialSaslHandshake;
- private ByteBuffer rpcHeaderBuffer;
private ByteBuffer unwrappedData;
+ // When is this set? FindBugs wants to know! Says NP
private ByteBuffer unwrappedDataLengthBuffer;
boolean useSasl;
SaslServer saslServer;
private boolean useWrap = false;
// Fake 'call' for failed authorization response
private static final int AUTHROIZATION_FAILED_CALLID = -1;
- private final Call authFailedCall = new Call(AUTHROIZATION_FAILED_CALLID,
- null, this, null, 0, null);
+ private final Call authFailedCall =
+ new Call(AUTHROIZATION_FAILED_CALLID, null, null, null, this, null, 0, null);
private ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
private static final int SASL_CALLID = -33;
- private final Call saslCall = new Call(SASL_CALLID, null, this, null, 0,
- null);
+ private final Call saslCall =
+ new Call(SASL_CALLID, null, null, null, this, null, 0, null);
public UserGroupInformation attemptingUser = null; // user name before auth
+
public Connection(SocketChannel channel, long lastContact) {
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
this.dataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket();
- InetAddress addr = socket.getInetAddress();
+ this.addr = socket.getInetAddress();
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
@@ -1251,8 +1309,9 @@ public abstract class HBaseServer implem
UserGroupInformation current = UserGroupInformation
.getCurrentUser();
String fullName = current.getUserName();
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug("Kerberos principal name is " + fullName);
+ }
final String names[] = SaslUtil.splitKerberosName(fullName);
if (names.length != 3) {
throw new AccessControlException(
@@ -1273,13 +1332,14 @@ public abstract class HBaseServer implem
throw new AccessControlException(
"Unable to find SASL server implementation for "
+ authMethod.getMechanismName());
- if (LOG.isDebugEnabled())
- LOG.debug("Created SASL server with mechanism = "
- + authMethod.getMechanismName());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
+ }
}
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug("Have read input token of size " + saslToken.length
+ " for processing by saslServer.evaluateResponse()");
+ }
replyToken = saslServer.evaluateResponse(saslToken);
} catch (IOException e) {
IOException sendToClient = e;
@@ -1300,9 +1360,10 @@ public abstract class HBaseServer implem
throw e;
}
if (replyToken != null) {
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug("Will send token of size " + replyToken.length
+ " from saslServer.");
+ }
doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
null);
}
@@ -1352,49 +1413,52 @@ public abstract class HBaseServer implem
}
}
+ /**
+ * Read off the wire.
+ * @return Returns -1 if failure (and caller will close connection) else return how many
+ * bytes were read and processed
+ * @throws IOException
+ * @throws InterruptedException
+ */
public int readAndProcess() throws IOException, InterruptedException {
while (true) {
- /* Read at most one RPC. If the header is not read completely yet
- * then iterate until we read first RPC or until there is no data left.
- */
- int count = -1;
- if (dataLengthBuffer.remaining() > 0) {
- count = channelRead(channel, dataLengthBuffer);
- if (count < 0 || dataLengthBuffer.remaining() > 0)
+ // Try and read in an int. If new connection, the int will hold the 'HBas' HEADER. If it
+ // does, read in the rest of the connection preamble, the version and the auth method.
+ // Else it will be length of the data to read (or -1 if a ping). We catch the integer
+ // length into the 4-byte this.dataLengthBuffer.
+ int count;
+ if (this.dataLengthBuffer.remaining() > 0) {
+ count = channelRead(channel, this.dataLengthBuffer);
+ if (count < 0 || this.dataLengthBuffer.remaining() > 0) {
return count;
- }
-
- if (!rpcHeaderRead) {
- //Every connection is expected to send the header.
- if (rpcHeaderBuffer == null) {
- rpcHeaderBuffer = ByteBuffer.allocate(2);
}
- count = channelRead(channel, rpcHeaderBuffer);
- if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
+ }
+ // If we have not read the connection setup preamble, look to see if that is on the wire.
+ if (!connectionPreambleRead) {
+ // Check for 'HBas' magic.
+ this.dataLengthBuffer.flip();
+ if (!HConstants.RPC_HEADER.equals(dataLengthBuffer)) {
+ return doBadPreambleHandling("Expected HEADER=" +
+ Bytes.toStringBinary(HConstants.RPC_HEADER.array()) +
+ " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()));
+ }
+ // Now read the next two bytes, the version and the auth to use.
+ ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
+ count = channelRead(channel, versionAndAuthBytes);
+ if (count < 0 || versionAndAuthBytes.remaining() > 0) {
return count;
}
- int version = rpcHeaderBuffer.get(0);
- byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
- authMethod = AuthMethod.read(new DataInputStream(
- new ByteArrayInputStream(method)));
- dataLengthBuffer.flip();
- if (!HConstants.RPC_HEADER.equals(dataLengthBuffer) || version != HConstants.CURRENT_VERSION) {
- LOG.warn("Incorrect header or version mismatch from " +
- hostAddress + ":" + remotePort +
- " got version " + version +
- " expected version " + HConstants.CURRENT_VERSION);
- setupBadVersionResponse(version);
- return -1;
- }
- dataLengthBuffer.clear();
- if (authMethod == null) {
- throw new IOException("Unable to read authentication method");
+ int version = versionAndAuthBytes.get(0);
+ byte authbyte = versionAndAuthBytes.get(1);
+ this.authMethod = AuthMethod.valueOf(authbyte);
+ if (version != CURRENT_VERSION || authMethod == null) {
+ return doBadPreambleHandling("serverVersion=" + CURRENT_VERSION +
+ ", clientVersion=" + version + ", authMethod=" + authbyte +
+ ", authSupported=" + (authMethod != null));
}
if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
- AccessControlException ae = new AccessControlException(
- "Authentication is required");
- setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
- ae.getClass().getName(), ae.getMessage());
+ AccessControlException ae = new AccessControlException("Authentication is required");
+ setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
responder.doRespond(authFailedCall);
throw ae;
}
@@ -1410,18 +1474,18 @@ public abstract class HBaseServer implem
if (authMethod != AuthMethod.SIMPLE) {
useSasl = true;
}
-
- rpcHeaderBuffer = null;
- rpcHeaderRead = true;
+ connectionPreambleRead = true;
+ // Preamble checks out. Go around again to read actual connection header.
+ dataLengthBuffer.clear();
continue;
}
-
+ // We have read a length and we have read the preamble. It is either the connection header
+ // or it is a request.
if (data == null) {
dataLengthBuffer.flip();
- dataLength = dataLengthBuffer.getInt();
-
+ int dataLength = dataLengthBuffer.getInt();
if (dataLength == HBaseClient.PING_CALL_ID) {
- if(!useWrap) { //covers the !useSasl too
+ if (!useWrap) { //covers the !useSasl too
dataLengthBuffer.clear();
return 0; //ping message
}
@@ -1433,9 +1497,7 @@ public abstract class HBaseServer implem
data = ByteBuffer.allocate(dataLength);
incRpcCount(); // Increment the rpc count
}
-
count = channelRead(channel, data);
-
if (data.remaining() == 0) {
dataLengthBuffer.clear();
data.flip();
@@ -1444,65 +1506,49 @@ public abstract class HBaseServer implem
skipInitialSaslHandshake = false;
continue;
}
- boolean isHeaderRead = headerRead;
+ boolean headerRead = connectionHeaderRead;
if (useSasl) {
saslReadAndProcess(data.array());
} else {
processOneRpc(data.array());
}
- data = null;
- if (!isHeaderRead) {
+ this.data = null;
+ if (!headerRead) {
continue;
}
+ } else {
+ // More to read still; go around again.
+ if (LOG.isTraceEnabled()) LOG.trace("Continue to read rest of data " + data.remaining());
+ continue;
}
return count;
}
}
- /**
- * Try to set up the response to indicate that the client version
- * is incompatible with the server. This can contain special-case
- * code to speak enough of past IPC protocols to pass back
- * an exception to the caller.
- * @param clientVersion the version the caller is using
- * @throws IOException
- */
- private void setupBadVersionResponse(int clientVersion) throws IOException {
- String errMsg = "Server IPC version " + HConstants.CURRENT_VERSION +
- " cannot communicate with client version " + clientVersion;
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-
- if (clientVersion >= 3) {
- // We used to return an id of -1 which caused server to close the
- // connection without telling the client what the problem was. Now
- // we return 0 which will keep the socket up -- bad clients, unless
- // they switch to suit the running server -- will fail later doing
- // getProtocolVersion.
- Call fakeCall = new Call(0, null, this, responder, 0, null);
- // Versions 3 and greater can interpret this exception
- // response in the same manner
- setupResponse(buffer, fakeCall, Status.FATAL,
- VersionMismatch.class.getName(), errMsg);
-
- responder.doRespond(fakeCall);
- }
+ private int doBadPreambleHandling(final String errMsg) throws IOException {
+ String msg = errMsg + "; cannot communicate with client at " + hostAddress + ":" + port;
+ LOG.warn(msg);
+ Call fakeCall = new Call(-1, null, null, null, this, responder, -1, null);
+ setupResponse(null, fakeCall, new FatalConnectionException(msg), msg);
+ responder.doRespond(fakeCall);
+ // Returning -1 closes out the connection.
+ return -1;
}
- /// Reads the connection header following version
- private void processHeader(byte[] buf) throws IOException {
- DataInputStream in =
- new DataInputStream(new ByteArrayInputStream(buf));
- header = ConnectionHeader.parseFrom(in);
+ // Reads the connection header following version
+ private void processConnectionHeader(byte[] buf) throws IOException {
+ this.connectionHeader = ConnectionHeader.parseFrom(buf);
try {
- String protocolClassName = header.getProtocol();
+ String protocolClassName = connectionHeader.getProtocol();
if (protocolClassName != null) {
- protocol = getProtocolClass(header.getProtocol(), conf);
+ protocol = getProtocolClass(connectionHeader.getProtocol(), conf);
}
} catch (ClassNotFoundException cnfe) {
- throw new IOException("Unknown protocol: " + header.getProtocol());
+ throw new IOException("Unknown protocol: " + connectionHeader.getProtocol());
}
+ setupCellBlockCodecs(this.connectionHeader);
- UserGroupInformation protocolUser = createUser(header);
+ UserGroupInformation protocolUser = createUser(connectionHeader);
if (!useSasl) {
user = protocolUser;
if (user != null) {
@@ -1535,6 +1581,30 @@ public abstract class HBaseServer implem
}
}
+ /**
+ * Set up cell block codecs
+ * @param header
+ * @throws FatalConnectionException
+ */
+ private void setupCellBlockCodecs(final ConnectionHeader header)
+ throws FatalConnectionException {
+ // TODO: Plug in other supported decoders.
+ if (!header.hasCellBlockCodecClass()) throw new FatalConnectionException("No codec");
+ String className = header.getCellBlockCodecClass();
+ try {
+ this.codec = (Codec)Class.forName(className).newInstance();
+ } catch (Exception e) {
+ throw new FatalConnectionException("Unsupported codec " + className, e);
+ }
+ if (!header.hasCellBlockCompressorClass()) return;
+ className = header.getCellBlockCompressorClass();
+ try {
+ this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
+ } catch (Exception e) {
+ throw new FatalConnectionException("Unsupported codec " + className, e);
+ }
+ }
+
private void processUnwrappedData(byte[] inBuf) throws IOException,
InterruptedException {
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
@@ -1576,74 +1646,103 @@ public abstract class HBaseServer implem
private void processOneRpc(byte[] buf) throws IOException,
InterruptedException {
- if (headerRead) {
- processData(buf);
+ if (connectionHeaderRead) {
+ processRequest(buf);
} else {
- processHeader(buf);
- headerRead = true;
+ processConnectionHeader(buf);
+ this.connectionHeaderRead = true;
if (!authorizeConnection()) {
throw new AccessControlException("Connection from " + this
- + " for protocol " + header.getProtocol()
+ + " for protocol " + connectionHeader.getProtocol()
+ " is unauthorized for user " + user);
}
}
}
- protected void processData(byte[] buf) throws IOException, InterruptedException {
- DataInputStream dis =
- new DataInputStream(new ByteArrayInputStream(buf));
- RpcRequestHeader request = RpcRequestHeader.parseDelimitedFrom(dis);
-
- int id = request.getCallId();
- long callSize = buf.length;
-
+ /**
+ * @param buf Has the request header and the request param and optionally encoded data buffer
+ * all in this one array.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected void processRequest(byte[] buf) throws IOException, InterruptedException {
+ long totalRequestSize = buf.length;
+ int offset = 0;
+ // Here we read in the header. We avoid having pb
+ // do its default 4k allocation for CodedInputStream. We force it to use backing array.
+ CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
+ int headerSize = cis.readRawVarint32();
+ offset = cis.getTotalBytesRead();
+ RequestHeader header =
+ RequestHeader.newBuilder().mergeFrom(buf, offset, headerSize).build();
+ offset += headerSize;
+ int id = header.getCallId();
if (LOG.isDebugEnabled()) {
- LOG.debug(" got call #" + id + ", " + callSize + " bytes");
+ LOG.debug("RequestHeader " + TextFormat.shortDebugString(header) +
+ " totalRequestSize: " + totalRequestSize + " bytes");
}
// Enforcing the call queue size, this triggers a retry in the client
- if ((callSize + callQueueSize.get()) > maxQueueSize) {
- final Call callTooBig = new Call(id, null, this, responder, callSize,
- null);
+ // This is a bit late to be doing this check - we have already read in the total request.
+ if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
+ final Call callTooBig =
+ new Call(id, null, null, null, this, responder, totalRequestSize, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- setupResponse(responseBuffer, callTooBig, Status.FATAL,
- IOException.class.getName(),
- "Call queue is full, is ipc.server.max.callqueue.size too small?");
+ setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
+ "Call queue is full, is ipc.server.max.callqueue.size too small?");
responder.doRespond(callTooBig);
return;
}
-
- RpcRequestBody rpcRequestBody;
+ Method method = null;
+ Message param = null;
+ CellScanner cellScanner = null;
try {
- rpcRequestBody = RpcRequestBody.parseDelimitedFrom(dis);
+ if (header.hasRequestParam() && header.getRequestParam()) {
+ method = methodCache.getMethod(this.protocol, header.getMethodName());
+ Message m = methodCache.getMethodArgType(method);
+ // Check that there is a param to deserialize.
+ if (m != null) {
+ Builder builder = null;
+ builder = m.newBuilderForType();
+ // To read the varint, I need an inputstream; might as well be a CIS.
+ cis = CodedInputStream.newInstance(buf, offset, buf.length);
+ int paramSize = cis.readRawVarint32();
+ offset += cis.getTotalBytesRead();
+ if (builder != null) {
+ builder.mergeFrom(buf, offset, paramSize);
+ param = builder.build();
+ }
+ offset += paramSize;
+ }
+ }
+ if (header.hasCellBlockMeta()) {
+ cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
+ buf, offset, buf.length);
+ }
} catch (Throwable t) {
- LOG.warn("Unable to read call parameters for client " +
- getHostAddress(), t);
- final Call readParamsFailedCall = new Call(id, null, this, responder,
- callSize, null);
+ String msg = "Unable to read call parameter from client " + getHostAddress();
+ LOG.warn(msg, t);
+ final Call readParamsFailedCall =
+ new Call(id, null, null, null, this, responder, totalRequestSize, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
-
- setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL,
- t.getClass().getName(),
- "IPC server unable to read call parameters: " + t.getMessage());
+ setupResponse(responseBuffer, readParamsFailedCall, t,
+ msg + "; " + t.getMessage());
responder.doRespond(readParamsFailedCall);
return;
}
- Call call;
- if (request.hasTinfo()) {
- call = new Call(id, rpcRequestBody, this, responder, callSize,
- new TraceInfo(request.getTinfo().getTraceId(), request.getTinfo()
- .getParentId()));
+ Call call = null;
+ if (header.hasTraceInfo()) {
+ call = new Call(id, method, param, cellScanner, this, responder, totalRequestSize,
+ new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()));
} else {
- call = new Call(id, rpcRequestBody, this, responder, callSize, null);
+ call = new Call(id, method, param, cellScanner, this, responder, totalRequestSize, null);
}
-
- callQueueSize.add(callSize);
-
- if (priorityCallQueue != null && getQosLevel(rpcRequestBody) > highPriorityLevel) {
+ callQueueSize.add(totalRequestSize);
+ Pair<RequestHeader, Message> headerAndParam = new Pair<RequestHeader, Message>(header, param);
+ if (priorityCallQueue != null && getQosLevel(headerAndParam) > highPriorityLevel) {
priorityCallQueue.put(call);
- } else if (replicationQueue != null
- && getQosLevel(rpcRequestBody) == HConstants.REPLICATION_QOS) {
+ } else if (replicationQueue != null &&
+ getQosLevel(headerAndParam) == HConstants.REPLICATION_QOS) {
replicationQueue.put(call);
} else {
callQueue.put(call); // queue the call; maybe blocked here
@@ -1660,16 +1759,15 @@ public abstract class HBaseServer implem
&& (authMethod != AuthMethod.DIGEST)) {
ProxyUsers.authorize(user, this.getHostAddress(), conf);
}
- authorize(user, header, getHostInetAddress());
+ authorize(user, connectionHeader, getHostInetAddress());
if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully authorized " + header);
+ LOG.debug("Authorized " + TextFormat.shortDebugString(connectionHeader));
}
metrics.authorizationSuccess();
} catch (AuthorizationException ae) {
- LOG.debug("Connection authorization failed: "+ae.getMessage(), ae);
+ LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
metrics.authorizationFailure();
- setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
- ae.getClass().getName(), ae.getMessage());
+ setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
responder.doRespond(authFailedCall);
return false;
}
@@ -1679,7 +1777,7 @@ public abstract class HBaseServer implem
protected synchronized void close() {
disposeSasl();
data = null;
- dataLengthBuffer = null;
+ this.dataLengthBuffer = null;
if (!channel.isOpen())
return;
try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE
@@ -1743,49 +1841,38 @@ public abstract class HBaseServer implem
status.setStatus("starting");
SERVER.set(HBaseServer.this);
while (running) {
-
try {
status.pause("Waiting for a call");
Call call = myCallQueue.take(); // pop the queue; maybe blocked here
status.setStatus("Setting up call");
- status.setConnection(call.connection.getHostAddress(),
- call.connection.getRemotePort());
-
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": has #" + call.id + " from " +
- call.connection);
-
- String errorClass = null;
+ status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
+ if (LOG.isDebugEnabled()) {
+ UserGroupInformation remoteUser = call.connection.user;
+ LOG.debug(call.toString() + " executing as " +
+ ((remoteUser == null)? "NULL principal": remoteUser.getUserName()));
+ }
+ Throwable errorThrowable = null;
String error = null;
- Message value = null;
-
+ Pair<Message, CellScanner> resultPair = null;
CurCall.set(call);
Span currentRequestSpan = NullSpan.getInstance();
try {
- if (!started)
+ if (!started) {
throw new ServerNotRunningYetException("Server is not running yet");
-
+ }
if (call.tinfo != null) {
currentRequestSpan = Trace.startSpan(
"handling " + call.toString(), call.tinfo, Sampler.ALWAYS);
}
-
- if (LOG.isDebugEnabled()) {
- UserGroupInformation remoteUser = call.connection.user;
- LOG.debug(getName() + ": call #" + call.id + " executing as "
- + (remoteUser == null ? "NULL principal" :
- remoteUser.getUserName()));
- }
-
RequestContext.set(User.create(call.connection.user), getRemoteIp(),
- call.connection.protocol);
+ call.connection.protocol);
// make the call
- value = call(call.connection.protocol, call.rpcRequestBody, call.timestamp,
- status);
+ resultPair = call(call.connection.protocol, call.method, call.param, call.cellScanner,
+ call.timestamp, status);
} catch (Throwable e) {
- LOG.debug(getName()+", call "+call+": error: " + e, e);
- errorClass = e.getClass().getName();
+ LOG.debug(getName() + ": " + call.toString() + " error: " + e, e);
+ errorThrowable = e;
error = StringUtils.stringifyException(e);
} finally {
currentRequestSpan.stop();
@@ -1798,21 +1885,20 @@ public abstract class HBaseServer implem
// Set the response for undelayed calls and delayed calls with
// undelayed responses.
if (!call.isDelayed() || !call.isReturnValueDelayed()) {
- call.setResponse(value,
- errorClass == null? Status.SUCCESS: Status.ERROR,
- errorClass, error);
+ Message param = resultPair != null? resultPair.getFirst(): null;
+ CellScanner cells = resultPair != null? resultPair.getSecond(): null;
+ call.setResponse(param, cells, errorThrowable, error);
}
call.sendResponseIfReady();
status.markComplete("Sent response");
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
- LOG.info(getName() + " caught: " +
- StringUtils.stringifyException(e));
+ LOG.info(getName() + ": caught: " + StringUtils.stringifyException(e));
}
} catch (OutOfMemoryError e) {
if (errorHandler != null) {
if (errorHandler.checkOOME(e)) {
- LOG.info(getName() + ": exiting on OOME");
+ LOG.info(getName() + ": exiting on OutOfMemoryError");
return;
}
} else {
@@ -1820,44 +1906,16 @@ public abstract class HBaseServer implem
throw e;
}
} catch (ClosedChannelException cce) {
- LOG.warn(getName() + " caught a ClosedChannelException, " +
+ LOG.warn(getName() + ": caught a ClosedChannelException, " +
"this means that the server was processing a " +
"request but the client went away. The error message was: " +
cce.getMessage());
} catch (Exception e) {
- LOG.warn(getName() + " caught: " +
- StringUtils.stringifyException(e));
+ LOG.warn(getName() + ": caught: " + StringUtils.stringifyException(e));
}
}
LOG.info(getName() + ": exiting");
}
-
- }
-
-
- private Function<RpcRequestBody,Integer> qosFunction = null;
-
- /**
- * Gets the QOS level for this call. If it is higher than the highPriorityLevel and there
- * are priorityHandlers available it will be processed in it's own thread set.
- *
- * @param newFunc
- */
- @Override
- public void setQosFunction(Function<RpcRequestBody, Integer> newFunc) {
- qosFunction = newFunc;
- }
-
- protected int getQosLevel(RpcRequestBody rpcRequestBody) {
- if (qosFunction == null) {
- return 0;
- }
-
- Integer res = qosFunction.apply(rpcRequestBody);
- if (res == null) {
- return 0;
- }
- return res;
}
/* Constructs a server listening on the named port and address. Parameters passed must
@@ -1913,6 +1971,7 @@ public abstract class HBaseServer implem
this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS,
DEFAULT_WARN_DELAYED_CALLS);
this.delayedCalls = new AtomicInteger(0);
+ this.ipcUtil = new IPCUtil(conf);
// Create the responder here
@@ -1943,12 +2002,10 @@ public abstract class HBaseServer implem
* @param error error message, if the call failed
* @throws IOException
*/
- private void setupResponse(ByteArrayOutputStream response,
- Call call, Status status,
- String errorClass, String error)
+ private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
throws IOException {
- response.reset();
- call.setResponse(null, status, errorClass, error);
+ if (response != null) response.reset();
+ call.setResponse(null, null, t, error);
}
protected void closeConnection(Connection connection) {
@@ -2088,6 +2145,7 @@ public abstract class HBaseServer implem
* @param addr InetAddress of incoming connection
* @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
*/
+ @SuppressWarnings("static-access")
public void authorize(UserGroupInformation user,
ConnectionHeader connection,
InetAddress addr
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java?rev=1459013&r1=1459012&r2=1459013&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java Wed Mar 20 19:36:46 2013
@@ -23,36 +23,33 @@ import java.lang.reflect.InvocationTarge
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
/**
* The {@link RpcServerEngine} implementation for ProtoBuf-based RPCs.
*/
@InterfaceAudience.Private
class ProtobufRpcServerEngine implements RpcServerEngine {
- private static final Log LOG =
- LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcServerEngine");
-
ProtobufRpcServerEngine() {
super();
}
@@ -66,7 +63,6 @@ class ProtobufRpcServerEngine implements
metaHandlerCount, verbose, highPriorityLevel);
}
-
public static class Server extends HBaseServer {
boolean verbose;
Object instance;
@@ -111,10 +107,6 @@ class ProtobufRpcServerEngine implements
this.instance = instance;
this.implementation = instance.getClass();
}
- private static final Map<String, Message> methodArg =
- new ConcurrentHashMap<String, Message>();
- private static final Map<String, Method> methodInstances =
- new ConcurrentHashMap<String, Method>();
private AuthenticationTokenSecretManager createSecretManager(){
if (!isSecurityEnabled ||
@@ -152,37 +144,20 @@ class ProtobufRpcServerEngine implements
* the return response has protobuf response payload. On failure, the
* exception name and the stack trace are returned in the protobuf response.
*/
- public Message call(Class<? extends IpcProtocol> protocol,
- RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status)
+ public Pair<Message, CellScanner> call(Class<? extends IpcProtocol> protocol,
+ Method method, Message param, CellScanner cellScanner, long receiveTime,
+ MonitoredRPCHandler status)
throws IOException {
try {
- String methodName = rpcRequest.getMethodName();
- Method method = getMethod(protocol, methodName);
- if (method == null) {
- throw new UnknownProtocolException("Method " + methodName +
- " doesn't exist in protocol " + protocol.getName());
- }
-
- /**
- * RPCs for a particular interface (ie protocol) are done using a
- * IPC connection that is setup using rpcProxy.
- * The rpcProxy's has a declared protocol name that is
- * sent form client to server at connection time.
- */
-
if (verbose) {
- LOG.info("Call: protocol name=" + protocol.getName() +
- ", method=" + methodName);
+ LOG.info("callId: " + CurCall.get().id + " protocol: " + protocol.getName() +
+ " method: " + method.getName());
}
-
- status.setRPC(rpcRequest.getMethodName(),
- new Object[]{rpcRequest.getRequest()}, receiveTime);
- status.setRPCPacket(rpcRequest);
+ status.setRPC(method.getName(), new Object[]{param}, receiveTime);
+ // TODO: Review after we add in encoded data blocks.
+ status.setRPCPacket(param);
status.resume("Servicing call");
//get an instance of the method arg type
- Message protoType = getMethodArgType(method);
- Message param = protoType.newBuilderForType()
- .mergeFrom(rpcRequest.getRequest()).build();
Message result;
Object impl = null;
if (protocol.isAssignableFrom(this.implementation)) {
@@ -190,57 +165,53 @@ class ProtobufRpcServerEngine implements
} else {
throw new UnknownProtocolException(protocol);
}
-
+ PayloadCarryingRpcController controller = null;
long startTime = System.currentTimeMillis();
if (method.getParameterTypes().length == 2) {
- // RpcController + Message in the method args
- // (generated code from RPC bits in .proto files have RpcController)
- result = (Message)method.invoke(impl, null, param);
- } else if (method.getParameterTypes().length == 1) {
- // Message (hand written code usually has only a single argument)
- result = (Message)method.invoke(impl, param);
+ // Always create a controller. Some invocations may not pass data in but will pass
+ // data out and they'll need a controller instance to carry it for them.
+ controller = new PayloadCarryingRpcController(cellScanner);
+ result = (Message)method.invoke(impl, controller, param);
} else {
- throw new ServiceException("Too many parameters for method: ["
- + method.getName() + "]" + ", allowed (at most): 2, Actual: "
- + method.getParameterTypes().length);
+ throw new ServiceException("Wrong number of parameters for method: [" +
+ method.getName() + "]" + ", wanted: 2, actual: " + method.getParameterTypes().length);
}
int processingTime = (int) (System.currentTimeMillis() - startTime);
int qTime = (int) (startTime-receiveTime);
- if (TRACELOG.isDebugEnabled()) {
- TRACELOG.debug("Call #" + CurCall.get().id +
- "; served=" + protocol.getSimpleName() + "#" + method.getName() +
- ", queueTime=" + qTime +
- ", processingTime=" + processingTime +
- ", request=" + param.toString() +
- " response=" + result.toString());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(CurCall.get().toString() +
+ " response: " + TextFormat.shortDebugString(result) +
+ " served: " + protocol.getSimpleName() +
+ " queueTime: " + qTime +
+ " processingTime: " + processingTime);
}
metrics.dequeuedCall(qTime);
metrics.processedCall(processingTime);
-
if (verbose) {
- log("Return: "+result, LOG);
+ log("Return " + TextFormat.shortDebugString(result), LOG);
}
long responseSize = result.getSerializedSize();
// log any RPC responses that are slower than the configured warn
// response time or larger than configured warning size
- boolean tooSlow = (processingTime > warnResponseTime
- && warnResponseTime > -1);
- boolean tooLarge = (responseSize > warnResponseSize
- && warnResponseSize > -1);
+ boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
+ boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
if (tooSlow || tooLarge) {
// when tagging, we let TooLarge trump TooSmall to keep output simple
// note that large responses will often also be slow.
+ // TOOD: This output is useless.... output the serialized pb as toString but do a
+ // short form, shorter than TextFormat.shortDebugString(proto).
StringBuilder buffer = new StringBuilder(256);
- buffer.append(methodName);
+ buffer.append(method.getName());
buffer.append("(");
buffer.append(param.getClass().getName());
buffer.append(")");
- logResponse(new Object[]{rpcRequest.getRequest()},
- methodName, buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
+ logResponse(new Object[]{param},
+ method.getName(), buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
status.getClient(), startTime, processingTime, qTime,
responseSize);
}
- return result;
+ return new Pair<Message, CellScanner>(result,
+ controller != null? controller.cellScanner(): null);
} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof IOException) {
@@ -262,48 +233,6 @@ class ProtobufRpcServerEngine implements
}
}
- static Method getMethod(Class<? extends IpcProtocol> protocol,
- String methodName) {
- Method method = methodInstances.get(methodName);
- if (method != null) {
- return method;
- }
- Method[] methods = protocol.getMethods();
- for (Method m : methods) {
- if (m.getName().equals(methodName)) {
- m.setAccessible(true);
- methodInstances.put(methodName, m);
- return m;
- }
- }
- return null;
- }
-
- static Message getMethodArgType(Method method) throws Exception {
- Message protoType = methodArg.get(method.getName());
- if (protoType != null) {
- return protoType;
- }
-
- Class<?>[] args = method.getParameterTypes();
- Class<?> arg;
- if (args.length == 2) {
- // RpcController + Message in the method args
- // (generated code from RPC bits in .proto files have RpcController)
- arg = args[1];
- } else if (args.length == 1) {
- arg = args[0];
- } else {
- //unexpected
- return null;
- }
- //in the protobuf methods, args[1] is the only significant argument
- Method newInstMethod = arg.getMethod("getDefaultInstance");
- newInstMethod.setAccessible(true);
- protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
- methodArg.put(method.getName(), protoType);
- return protoType;
- }
/**
* Logs an RPC response to the LOG file, producing valid JSON objects for
* client Operations.
@@ -361,10 +290,12 @@ class ProtobufRpcServerEngine implements
mapper.writeValueAsString(responseInfo));
}
}
+
protected static void log(String value, Log LOG) {
String v = value;
- if (v != null && v.length() > 55)
- v = v.substring(0, 55)+"...";
+ final int max = 100;
+ if (v != null && v.length() > max)
+ v = v.substring(0, max) + "...";
LOG.info(v);
}
}