You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/04/03 18:26:51 UTC
svn commit: r1309019 [1/3] - in /hbase/trunk/src/main:
java/org/apache/hadoop/hbase/io/ java/org/apache/hadoop/hbase/ipc/
java/org/apache/hadoop/hbase/protobuf/generated/
java/org/apache/hadoop/hbase/security/ protobuf/
Author: stack
Date: Tue Apr 3 16:26:50 2012
New Revision: 1309019
URL: http://svn.apache.org/viewvc?rev=1309019&view=rev
Log:
HBASE-5451 Switch RPC call envelope/headers to PBs
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
hbase/trunk/src/main/protobuf/RPC.proto
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DataOutputOutputStream.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/security/User.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DataOutputOutputStream.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DataOutputOutputStream.java?rev=1309019&r1=1309018&r2=1309019&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DataOutputOutputStream.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/DataOutputOutputStream.java Tue Apr 3 16:26:50 2012
@@ -27,7 +27,7 @@ import org.apache.hadoop.classification.
* OutputStream implementation that wraps a DataOutput.
*/
@InterfaceAudience.Private
-class DataOutputOutputStream extends OutputStream {
+public class DataOutputOutputStream extends OutputStream {
private final DataOutput out;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1309019&r1=1309018&r2=1309019&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue Apr 3 16:26:50 2012
@@ -46,18 +46,23 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.hbase.io.DataOutputOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
+import com.google.protobuf.ByteString;
+
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class.
@@ -233,8 +238,9 @@ public class HBaseClient {
User ticket = remoteId.getTicket();
Class<? extends VersionedProtocol> protocol = remoteId.getProtocol();
- header = new ConnectionHeader(
- protocol == null ? null : protocol.getName(), ticket);
+ ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
+ builder.setProtocol(protocol == null ? "" : protocol.getName());
+ this.header = builder.build();
this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
remoteId.getAddress().toString() +
@@ -436,13 +442,8 @@ public class HBaseClient {
private void writeHeader() throws IOException {
out.write(HBaseServer.HEADER.array());
out.write(HBaseServer.CURRENT_VERSION);
- //When there are more fields we can have ConnectionHeader Writable.
- DataOutputBuffer buf = new DataOutputBuffer();
- header.write(buf);
-
- int bufLen = buf.getLength();
- out.writeInt(bufLen);
- out.write(buf.getData(), 0, bufLen);
+ out.writeInt(header.getSerializedSize());
+ header.writeTo(out);
}
/* wait till someone signals us to start reading RPC response or
@@ -451,7 +452,6 @@ public class HBaseClient {
*
* Return true if it is time to read a response; false otherwise.
*/
- @SuppressWarnings({"ThrowableInstanceNeverThrown"})
protected synchronized boolean waitForWork() {
if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
long timeout = maxIdleTime-
@@ -526,32 +526,24 @@ public class HBaseClient {
if (shouldCloseConnection.get()) {
return;
}
-
- // For serializing the data to be written.
-
- final DataOutputBuffer d = new DataOutputBuffer();
try {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
-
- d.writeInt(0xdeadbeef); // placeholder for data length
- d.writeInt(call.id);
- call.param.write(d);
- byte[] data = d.getData();
- int dataLength = d.getLength();
- // fill in the placeholder
- Bytes.putInt(data, 0, dataLength - 4);
+ RpcRequest.Builder builder = RPCProtos.RpcRequest.newBuilder();
+ builder.setCallId(call.id);
+ Invocation invocation = (Invocation)call.param;
+ DataOutputBuffer d = new DataOutputBuffer();
+ invocation.write(d);
+ builder.setRequest(ByteString.copyFrom(d.getData()));
//noinspection SynchronizeOnNonFinalField
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
- out.write(data, 0, dataLength);
- out.flush();
+ RpcRequest obj = builder.build();
+ this.out.writeInt(obj.getSerializedSize());
+ obj.writeTo(DataOutputOutputStream.constructOutputStream(this.out));
+ this.out.flush();
}
} catch(IOException e) {
markClosed(e);
- } finally {
- //the buffer is just an in-memory buffer, but it is still polite to
- // close early
- IOUtils.closeStream(d);
}
}
@@ -566,33 +558,31 @@ public class HBaseClient {
try {
// See HBaseServer.Call.setResponse for where we write out the response.
- // It writes the call.id (int), a flag byte, then optionally the length
- // of the response (int) followed by data.
+ // It writes the call.id (int), a boolean signifying any error (and if
+ // so the exception name/trace), and the response bytes
// Read the call id.
- int id = in.readInt();
+ RpcResponse response = RpcResponse.parseDelimitedFrom(in);
+ int id = response.getCallId();
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id);
Call call = calls.remove(id);
- // Read the flag byte
- byte flag = in.readByte();
- boolean isError = ResponseFlag.isError(flag);
- if (ResponseFlag.isLength(flag)) {
- // Currently length if present is unused.
- in.readInt();
- }
- int state = in.readInt(); // Read the state. Currently unused.
+ boolean isError = response.getError();
if (isError) {
if (call != null) {
//noinspection ThrowableInstanceNeverThrown
- call.setException(new RemoteException(WritableUtils.readString(in),
- WritableUtils.readString(in)));
+ call.setException(new RemoteException(
+ response.getException().getExceptionName(),
+ response.getException().getStackTrace()));
}
} else {
+ ByteString responseObj = response.getResponse();
+ DataInputStream dis =
+ new DataInputStream(responseObj.newInput());
Writable value = ReflectionUtils.newInstance(valueClass, conf);
- value.readFields(in); // read value
+ value.readFields(dis); // read value
// it's possible that this call may have been cleaned up due to a RPC
// timeout, so check if it still exists before setting the value.
if (call != null) {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1309019&r1=1309018&r2=1309019&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Apr 3 16:26:50 2012
@@ -59,22 +59,28 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.DataOutputOutputStream;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ByteString;
import org.cliffc.high_scale_lib.Counter;
@@ -94,7 +100,7 @@ public abstract class HBaseServer implem
* The first four bytes of Hadoop RPC connections
*/
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
- public static final byte CURRENT_VERSION = 3;
+ public static final byte CURRENT_VERSION = 5;
/**
* How many calls/handler are allowed in the queue.
@@ -333,40 +339,27 @@ public abstract class HBaseServer implem
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream out = new DataOutputStream(buf);
try {
+ RpcResponse.Builder builder = RpcResponse.newBuilder();
// Call id.
- out.writeInt(this.id);
- // Write flag.
- byte flag = (error != null)?
- ResponseFlag.getErrorAndLengthSet(): ResponseFlag.getLengthSetOnly();
- out.writeByte(flag);
- // Place holder for length set later below after we
- // fill the buffer with data.
- out.writeInt(0xdeadbeef);
- out.writeInt(status.state);
- } catch (IOException e) {
- errorClass = e.getClass().getName();
- error = StringUtils.stringifyException(e);
- }
-
- try {
- if (error == null) {
- result.write(out);
+ builder.setCallId(this.id);
+ builder.setError(error != null);
+ if (error != null) {
+ RpcException.Builder b = RpcException.newBuilder();
+ b.setExceptionName(errorClass);
+ b.setStackTrace(error);
+ builder.setException(b.build());
} else {
- WritableUtils.writeString(out, errorClass);
- WritableUtils.writeString(out, error);
+ DataOutputBuffer d = new DataOutputBuffer(size);
+ result.write(d);
+ byte[] response = d.getData();
+ builder.setResponse(ByteString.copyFrom(response));
}
+ builder.build().writeDelimitedTo(
+ DataOutputOutputStream.constructOutputStream(out));
} catch (IOException e) {
- LOG.warn("Error sending response to call: ", e);
+ LOG.warn("Exception while creating response " + e);
}
-
- // Set the length into the ByteBuffer after call id and after
- // byte flag.
ByteBuffer bb = buf.getByteBuffer();
- int bufSiz = bb.remaining();
- // Move to the size location in our ByteBuffer past call.id
- // and past the byte flag.
- bb.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
- bb.putInt(bufSiz);
bb.position(0);
this.response = bb;
}
@@ -1065,9 +1058,9 @@ public abstract class HBaseServer implem
// disconnected, we can say where it used to connect to.
protected String hostAddress;
protected int remotePort;
- ConnectionHeader header = new ConnectionHeader();
+ ConnectionHeader header;
Class<? extends VersionedProtocol> protocol;
- protected User ticket = null;
+ protected User user = null;
public Connection(SocketChannel channel, long lastContact) {
this.channel = channel;
@@ -1231,26 +1224,21 @@ public abstract class HBaseServer implem
/// Reads the connection header following version
private void processHeader() throws IOException {
- DataInputStream in =
- new DataInputStream(new ByteArrayInputStream(data.array()));
- header.readFields(in);
+ header = ConnectionHeader.parseFrom(new ByteArrayInputStream(data.array()));
try {
String protocolClassName = header.getProtocol();
- if (protocolClassName == null) {
- protocolClassName = "org.apache.hadoop.hbase.ipc.HRegionInterface";
- }
protocol = getProtocolClass(protocolClassName, conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException("Unknown protocol: " + header.getProtocol());
}
- ticket = header.getUser();
+ user = User.createUser(header);
}
protected void processData(byte[] buf) throws IOException, InterruptedException {
- DataInputStream dis =
- new DataInputStream(new ByteArrayInputStream(buf));
- int id = dis.readInt(); // try to read an id
+ RpcRequest request = RpcRequest.parseFrom(buf);
+ int id = request.getCallId();
+ ByteString clientRequest = request.getRequest();
long callSize = buf.length;
if (LOG.isDebugEnabled()) {
@@ -1271,6 +1259,8 @@ public abstract class HBaseServer implem
Writable param;
try {
+ DataInputStream dis =
+ new DataInputStream(clientRequest.newInput());
param = ReflectionUtils.newInstance(paramClass, conf);//read param
param.readFields(dis);
} catch (Throwable t) {
@@ -1372,12 +1362,12 @@ public abstract class HBaseServer implem
throw new ServerNotRunningYetException("Server is not running yet");
if (LOG.isDebugEnabled()) {
- User remoteUser = call.connection.ticket;
+ User remoteUser = call.connection.user;
LOG.debug(getName() + ": call #" + call.id + " executing as "
+ (remoteUser == null ? "NULL principal" : remoteUser.getName()));
}
- RequestContext.set(call.connection.ticket, getRemoteIp(),
+ RequestContext.set(call.connection.user, getRemoteIp(),
call.connection.protocol);
// make the call
value = call(call.connection.protocol, call.param, call.timestamp,