You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/10/18 14:20:01 UTC
svn commit: r1185617 - in /hbase/trunk: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
Author: stack
Date: Tue Oct 18 12:20:01 2011
New Revision: 1185617
URL: http://svn.apache.org/viewvc?rev=1185617&view=rev
Log:
HBASE-3581 hbase rpc should send size of response
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1185617&r1=1185616&r2=1185617&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Oct 18 12:20:01 2011
@@ -624,6 +624,7 @@ Release 0.92.0 - Unreleased
HBASE-4558 Addendum for TestMasterFailover (Ram) - Breaks the build
HBASE-4568 Make zk dump jsp response faster
HBASE-4606 Remove spam in HCM and fix a list.size == 0
+ HBASE-3581 hbase rpc should send size of response
TASKS
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1185617&r1=1185616&r2=1185617&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue Oct 18 12:20:01 2011
@@ -33,14 +33,10 @@ import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
-import java.util.Hashtable;
import java.util.Iterator;
-import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.SocketFactory;
@@ -280,7 +276,7 @@ public class HBaseClient {
* otherwise, throw the timeout exception.
*/
private void handleTimeout(SocketTimeoutException e) throws IOException {
- if (shouldCloseConnection.get() || !running.get() ||
+ if (shouldCloseConnection.get() || !running.get() ||
remoteId.rpcTimeout > 0) {
throw e;
}
@@ -552,14 +548,24 @@ public class HBaseClient {
touch();
try {
- int id = in.readInt(); // try to read an id
+ // See HBaseServer.Call.setResponse for where we write out the response.
+ // It writes the call.id (int), a flag byte, then optionally the length
+ // of the response (int) followed by data.
+
+ // Read the call id.
+ int id = in.readInt();
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id);
-
Call call = calls.remove(id);
- boolean isError = in.readBoolean(); // read if error
+ // Read the flag byte
+ byte flag = in.readByte();
+ boolean isError = ResponseFlag.isError(flag);
+ if (ResponseFlag.isLength(flag)) {
+ // Currently length if present is unused.
+ in.readInt();
+ }
if (isError) {
//noinspection ThrowableInstanceNeverThrown
call.setException(new RemoteException( WritableUtils.readString(in),
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1185617&r1=1185616&r2=1185617&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Oct 18 12:20:01 2011
@@ -298,7 +298,8 @@ public abstract class HBaseServer implem
if (result instanceof WritableWithSize) {
// get the size hint.
WritableWithSize ohint = (WritableWithSize) result;
- long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
+ long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE +
+ (2 * Bytes.SIZEOF_INT);
if (hint > Integer.MAX_VALUE) {
// oops, new problem.
IOException ioe =
@@ -313,8 +314,15 @@ public abstract class HBaseServer implem
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream out = new DataOutputStream(buf);
try {
- out.writeInt(this.id); // write call id
- out.writeBoolean(error != null); // write error flag
+ // Call id.
+ out.writeInt(this.id);
+ // Write flag.
+ byte flag = (error != null)?
+ ResponseFlag.getErrorAndLengthSet(): ResponseFlag.getLengthSetOnly();
+ out.writeByte(flag);
+ // Place holder for length set later below after we
+ // fill the buffer with data.
+ out.writeInt(0xdeadbeef);
} catch (IOException e) {
errorClass = e.getClass().getName();
error = StringUtils.stringifyException(e);
@@ -331,7 +339,16 @@ public abstract class HBaseServer implem
LOG.warn("Error sending response to call: ", e);
}
- this.response = buf.getByteBuffer();
+ // Set the length into the ByteBuffer after call id and after
+ // byte flag.
+ ByteBuffer bb = buf.getByteBuffer();
+ int bufSiz = bb.remaining();
+ // Move to the size location in our ByteBuffer past call.id
+ // and past the byte flag.
+ bb.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
+ bb.putInt(bufSiz);
+ bb.position(0);
+ this.response = bb;
}
@Override