You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/06/05 14:50:44 UTC
svn commit: r1346374 - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase:
HConstants.java ipc/HBaseRPC.java ipc/HBaseServer.java
Author: mbautin
Date: Tue Jun 5 12:50:44 2012
New Revision: 1346374
URL: http://svn.apache.org/viewvc?rev=1346374&view=rev
Log:
[HBASE-6148] [89-fb] Avoid allocating large objects when reading corrupted RPC
Author: liyintang
Summary:
Recently RegionServer allocates very large objects when reading some corrupted RPC calls, which may caused by client-server version incompatibility. We need to add a protection before allocating the objects.
Apache trunk won't suffer from this problem since it had moved to the versioned invocation.
Test Plan:
running all the unit tests
test on the dev cluster
Reviewers: kannan, mbautin
Reviewed By: mbautin
CC: hbase-eng@, cgthayer, vinodv, mbautin
Differential Revision: https://phabricator.fb.com/D484913
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1346374&r1=1346373&r2=1346374&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Jun 5 12:50:44 2012
@@ -552,6 +552,8 @@ public final class HConstants {
public static final boolean[] BOOLEAN_VALUES = { false, true };
+ public static final int IPC_CALL_PARAMETER_LENGTH_MAX = 1000;
+
private HConstants() {
// Can't be instantiated with this ctor.
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1346374&r1=1346373&r2=1346374&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Tue Jun 5 12:50:44 2012
@@ -84,11 +84,7 @@ import java.util.Map;
* the protocol instance is transmitted.
*/
public class HBaseRPC {
- // Leave this out in the hadoop ipc package but keep class name. Do this
- // so that we dont' get the logging of this class's invocations by doing our
- // blanket enabling DEBUG on the o.a.h.h. package.
- protected static final Log LOG =
- LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
+ protected static final Log LOG = LogFactory.getLog(HBaseRPC.class.getName());
private HBaseRPC() {
super();
@@ -129,7 +125,15 @@ public class HBaseRPC {
public void readFields(DataInput in) throws IOException {
methodName = in.readUTF();
- parameters = new Object[in.readInt()];
+
+ int parameterLength = in.readInt();
+ if (parameterLength < 0 || parameterLength > HConstants.IPC_CALL_PARAMETER_LENGTH_MAX) {
+ String error = "Invalid parameter length: " + parameterLength +
+ " for the method " + methodName;
+ LOG.error(error);
+ throw new IllegalArgumentException(error);
+ }
+ parameters = new Object[parameterLength];
parameterClasses = new Class[parameters.length];
HbaseObjectWritable objectWritable = new HbaseObjectWritable();
for (int i = 0; i < parameters.length; i++) {
@@ -265,17 +269,17 @@ public class HBaseRPC {
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
- final boolean logDebug = LOG.isDebugEnabled();
+ final boolean isTraceEnabled = LOG.isTraceEnabled();
long startTime = 0;
- if (logDebug) {
+ if (isTraceEnabled) {
startTime = System.currentTimeMillis();
}
HbaseObjectWritable value = (HbaseObjectWritable)
client.call(new Invocation(method, args), address, ticket,
rpcTimeout, rpcCompression);
- if (logDebug) {
+ if (isTraceEnabled) {
long callTime = System.currentTimeMillis() - startTime;
- LOG.debug("Call: " + method.getName() + " " + callTime);
+ LOG.trace("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
@@ -367,13 +371,13 @@ public class HBaseRPC {
} catch(ConnectException se) { // namenode has not been started
ioe = se;
if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
- LOG.info("Server at " + addr + " could not be reached after " +
+ LOG.warn("Server at " + addr + " could not be reached after " +
reconnectAttempts + " tries, giving up.");
throw new RetriesExhaustedException("Failed setting up proxy to " +
addr.toString() + " after attempts=" + reconnectAttempts);
}
} catch(SocketTimeoutException te) { // namenode is busy
- LOG.info("Problem connecting to server: " + addr);
+ LOG.warn("Problem connecting to server: " + addr);
ioe = te;
}
// check if timed out
@@ -642,7 +646,7 @@ public class HBaseRPC {
throw new IOException("Could not find requested method, the usual " +
"cause is a version mismatch between client and server.");
}
- if (verbose) log("Call: " + call);
+ if (verbose) trace("Call: " + call);
Method method = implementation.getMethod(call.getMethodName(),
call.getParameterClasses());
status.setRPC(call.getMethodName(), call.getParameters(), receivedTime, method);
@@ -653,15 +657,15 @@ public class HBaseRPC {
Object value = method.invoke(instance, call.getParameters());
int processingTime = (int) (System.currentTimeMillis() - startTime);
int qTime = (int) (startTime - receivedTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Served: " + call.getMethodName() +
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Served: " + call.getMethodName() +
" queueTime= " + qTime +
" procesingTime= " + processingTime);
}
rpcMetrics.rpcQueueTime.inc(qTime);
rpcMetrics.rpcProcessingTime.inc(processingTime);
rpcMetrics.inc(call.getMethodName(), processingTime);
- if (verbose) log("Return: " + value);
+ if (verbose) trace("Return: " + value);
HbaseObjectWritable retVal =
new HbaseObjectWritable(method.getReturnType(), value);
@@ -764,10 +768,10 @@ public class HBaseRPC {
}
}
- protected static void log(String value) {
+ protected static void trace(String value) {
String v = value;
if (v != null && v.length() > 55)
v = v.substring(0, 55)+"...";
- LOG.info(v);
+ LOG.trace(v);
}
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1346374&r1=1346373&r2=1346374&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Jun 5 12:50:44 2012
@@ -105,8 +105,7 @@ public abstract class HBaseServer {
*/
private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
- public static final Log LOG =
- LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
+ public static final Log LOG = LogFactory.getLog(HBaseServer.class.getName());
protected static final ThreadLocal<HBaseServer> SERVER =
new ThreadLocal<HBaseServer>();
@@ -368,8 +367,8 @@ public abstract class HBaseServer {
} catch (Exception e) {return;}
}
if (c.timedOut(currentTime)) {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
+ if (LOG.isTraceEnabled())
+ LOG.trace(getName() + ": disconnecting client " + c.getHostAddress());
closeConnection(c);
numNuked++;
end--;
@@ -459,8 +458,8 @@ public abstract class HBaseServer {
try {
doRead(readSelectionKey);
} catch (InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Caught: " + StringUtils.stringifyException(e) +
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Caught: " + StringUtils.stringifyException(e) +
" when processing " + readSelectionKey.attachment());
}
} finally {
@@ -472,8 +471,8 @@ public abstract class HBaseServer {
});
} catch (Throwable e) {
setReadInterest(readSelectionKey);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Caught " + e.getMessage() + " when processing the remote connection " +
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Caught " + e.getMessage() + " when processing the remote connection " +
readSelectionKey.attachment().toString());
}
}
@@ -483,8 +482,8 @@ public abstract class HBaseServer {
if (key != null) {
Connection c = (Connection)key.attachment();
if (c != null) {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
+ if (LOG.isTraceEnabled())
+ LOG.trace(getName() + ": disconnecting client " + c.getHostAddress());
closeConnection(c);
}
}
@@ -512,8 +511,8 @@ public abstract class HBaseServer {
connectionList.add(numConnections, c);
numConnections++;
}
- if (LOG.isDebugEnabled())
- LOG.debug("Server connection from " + c.toString() +
+ if (LOG.isTraceEnabled())
+ LOG.trace("Server connection from " + c.toString() +
"; # active connections: " + numConnections +
"; # queued calls: " + callQueue.size());
}
@@ -532,12 +531,13 @@ public abstract class HBaseServer {
} catch (InterruptedException ieo) {
throw ieo;
} catch (Exception e) {
- LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
+ LOG.warn(getName() + ": readAndProcess threw exception " + e +
+ ". Count of bytes read: " + count, e);
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": disconnecting client " +
+ if (LOG.isTraceEnabled())
+ LOG.trace(getName() + ": disconnecting client " +
c.getHostAddress() + ". Number of active connections: "+
numConnections);
closeConnection(c);
@@ -557,7 +557,7 @@ public abstract class HBaseServer {
try {
acceptChannel.socket().close();
} catch (IOException e) {
- LOG.info(getName() + ":Exception in closing listener socket. " + e);
+ LOG.warn(getName() + ":Exception in closing listener socket. " + e);
}
}
}
@@ -596,7 +596,7 @@ public abstract class HBaseServer {
doAsyncWrite(key);
}
} catch (IOException e) {
- LOG.info(getName() + ": doAsyncWrite threw exception " + e);
+ LOG.warn(getName() + ": doAsyncWrite threw exception " + e);
}
}
long now = System.currentTimeMillis();
@@ -608,7 +608,9 @@ public abstract class HBaseServer {
// If there were some calls that have not been sent out for a
// long time, discard them.
//
- LOG.debug("Checking for old call responses.");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Checking for old call responses.");
+ }
ArrayList<Call> calls;
// get the list of channels from list of keys.
@@ -718,8 +720,8 @@ public abstract class HBaseServer {
//
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.id + " from " +
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(getName() + ": responding to #" + call.id + " from " +
call.connection);
}
//
@@ -737,8 +739,8 @@ public abstract class HBaseServer {
} else {
done = false; // more calls pending to be sent.
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.id + " from " +
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(getName() + ": responding to #" + call.id + " from " +
call.connection + " Wrote " + numBytes + " bytes.");
}
} else {
@@ -765,8 +767,8 @@ public abstract class HBaseServer {
decPending();
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to #" + call.id + " from " +
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(getName() + ": responding to #" + call.id + " from " +
call.connection + " Wrote partial " + numBytes +
" bytes.");
}
@@ -984,8 +986,8 @@ public abstract class HBaseServer {
// 1. read the call id uncompressed
int id = uncompressedIs.readInt();
- if (LOG.isDebugEnabled())
- LOG.debug(" got #" + id);
+ if (LOG.isTraceEnabled())
+ LOG.trace(" got #" + id);
if (version >= VERSION_COMPRESSED_RPC) {
@@ -1057,9 +1059,8 @@ public abstract class HBaseServer {
status.setConnection(call.connection.getHostAddress(),
call.connection.getRemotePort());
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": has #" + call.id + " from " +
- call.connection);
+ if (LOG.isTraceEnabled())
+ LOG.trace(getName() + ": has #" + call.id + " from " + call.connection);
String errorClass = null;
String error = null;
@@ -1071,7 +1072,7 @@ public abstract class HBaseServer {
// make the call
value = call(call.param, call.timestamp, status);
} catch (Throwable e) {
- LOG.info(getName()+", call "+call+": error: " + e, e);
+ LOG.warn(getName()+", call "+call+": error: " + e, e);
errorClass = e.getClass().getName();
error = StringUtils.stringifyException(e);
}
@@ -1133,13 +1134,13 @@ public abstract class HBaseServer {
responder.doRespond(call);
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
- LOG.info(getName() + " caught: " +
+ LOG.warn(getName() + " caught: " +
StringUtils.stringifyException(e));
}
} catch (OutOfMemoryError e) {
if (errorHandler != null) {
if (errorHandler.checkOOME(e)) {
- LOG.info(getName() + ": exiting on OOME");
+ LOG.error(getName() + ": exiting on OOME");
return;
}
} else {
@@ -1147,13 +1148,12 @@ public abstract class HBaseServer {
throw e;
}
} catch (Exception e) {
- LOG.info(getName() + " caught: " +
+ LOG.warn(getName() + " caught: " +
StringUtils.stringifyException(e));
}
}
LOG.info(getName() + ": exiting");
}
-
}
protected HBaseServer(String bindAddress, int port,