You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/06/05 14:50:44 UTC

svn commit: r1346374 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase: HConstants.java ipc/HBaseRPC.java ipc/HBaseServer.java

Author: mbautin
Date: Tue Jun  5 12:50:44 2012
New Revision: 1346374

URL: http://svn.apache.org/viewvc?rev=1346374&view=rev
Log:
[HBASE-6148] [89-fb] Avoid allocating large objects when reading corrupted RPC

Author: liyintang

Summary:
Recently RegionServer allocates very large objects when reading some corrupted RPC calls, which may caused by client-server version incompatibility. We need to add a protection before allocating the objects.
Apache trunk won't suffer from this problem since it had moved to the versioned invocation.

Test Plan:
running all the unit tests
test on the dev cluster

Reviewers: kannan, mbautin

Reviewed By: mbautin

CC: hbase-eng@, cgthayer, vinodv, mbautin

Differential Revision: https://phabricator.fb.com/D484913

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1346374&r1=1346373&r2=1346374&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Jun  5 12:50:44 2012
@@ -552,6 +552,8 @@ public final class HConstants {
 
   public static final boolean[] BOOLEAN_VALUES = { false, true };
 
+  public static final int IPC_CALL_PARAMETER_LENGTH_MAX = 1000;
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1346374&r1=1346373&r2=1346374&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Tue Jun  5 12:50:44 2012
@@ -84,11 +84,7 @@ import java.util.Map;
  * the protocol instance is transmitted.
  */
 public class HBaseRPC {
-  // Leave this out in the hadoop ipc package but keep class name.  Do this
-  // so that we dont' get the logging of this class's invocations by doing our
-  // blanket enabling DEBUG on the o.a.h.h. package.
-  protected static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
+  protected static final Log LOG = LogFactory.getLog(HBaseRPC.class.getName());
 
   private HBaseRPC() {
     super();
@@ -129,7 +125,15 @@ public class HBaseRPC {
 
     public void readFields(DataInput in) throws IOException {
       methodName = in.readUTF();
-      parameters = new Object[in.readInt()];
+      
+      int parameterLength = in.readInt();
+      if (parameterLength < 0 || parameterLength > HConstants.IPC_CALL_PARAMETER_LENGTH_MAX) {
+        String error = "Invalid parameter length: " +  parameterLength +
+        " for the method " + methodName;
+        LOG.error(error);
+        throw new IllegalArgumentException(error);
+      }
+      parameters = new Object[parameterLength];
       parameterClasses = new Class[parameters.length];
       HbaseObjectWritable objectWritable = new HbaseObjectWritable();
       for (int i = 0; i < parameters.length; i++) {
@@ -265,17 +269,17 @@ public class HBaseRPC {
 
     public Object invoke(Object proxy, Method method, Object[] args)
         throws Throwable {
-      final boolean logDebug = LOG.isDebugEnabled();
+      final boolean isTraceEnabled = LOG.isTraceEnabled();
       long startTime = 0;
-      if (logDebug) {
+      if (isTraceEnabled) {
         startTime = System.currentTimeMillis();
       }
       HbaseObjectWritable value = (HbaseObjectWritable)
         client.call(new Invocation(method, args), address, ticket,
             rpcTimeout, rpcCompression);
-      if (logDebug) {
+      if (isTraceEnabled) {
         long callTime = System.currentTimeMillis() - startTime;
-        LOG.debug("Call: " + method.getName() + " " + callTime);
+        LOG.trace("Call: " + method.getName() + " " + callTime);
       }
       return value.get();
     }
@@ -367,13 +371,13 @@ public class HBaseRPC {
       } catch(ConnectException se) {  // namenode has not been started
         ioe = se;
         if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
-          LOG.info("Server at " + addr + " could not be reached after " +
+          LOG.warn("Server at " + addr + " could not be reached after " +
             reconnectAttempts + " tries, giving up.");
           throw new RetriesExhaustedException("Failed setting up proxy to " +
             addr.toString() + " after attempts=" + reconnectAttempts);
       }
       } catch(SocketTimeoutException te) {  // namenode is busy
-        LOG.info("Problem connecting to server: " + addr);
+        LOG.warn("Problem connecting to server: " + addr);
         ioe = te;
       }
       // check if timed out
@@ -642,7 +646,7 @@ public class HBaseRPC {
           throw new IOException("Could not find requested method, the usual " +
               "cause is a version mismatch between client and server.");
         }
-        if (verbose) log("Call: " + call);
+        if (verbose) trace("Call: " + call);
         Method method = implementation.getMethod(call.getMethodName(),
                 call.getParameterClasses());
         status.setRPC(call.getMethodName(), call.getParameters(), receivedTime, method);
@@ -653,15 +657,15 @@ public class HBaseRPC {
         Object value = method.invoke(instance, call.getParameters());
         int processingTime = (int) (System.currentTimeMillis() - startTime);
         int qTime = (int) (startTime - receivedTime);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Served: " + call.getMethodName() +
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Served: " + call.getMethodName() +
             " queueTime= " + qTime +
             " procesingTime= " + processingTime);
         }
         rpcMetrics.rpcQueueTime.inc(qTime);
         rpcMetrics.rpcProcessingTime.inc(processingTime);
         rpcMetrics.inc(call.getMethodName(), processingTime);
-        if (verbose) log("Return: " + value);
+        if (verbose) trace("Return: " + value);
 
         HbaseObjectWritable retVal =
           new HbaseObjectWritable(method.getReturnType(), value);
@@ -764,10 +768,10 @@ public class HBaseRPC {
     }
   }
 
-  protected static void log(String value) {
+  protected static void trace(String value) {
     String v = value;
     if (v != null && v.length() > 55)
       v = v.substring(0, 55)+"...";
-    LOG.info(v);
+    LOG.trace(v);
   }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1346374&r1=1346373&r2=1346374&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Jun  5 12:50:44 2012
@@ -105,8 +105,7 @@ public abstract class HBaseServer {
    */
   private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
 
-  public static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
+  public static final Log LOG = LogFactory.getLog(HBaseServer.class.getName());
 
   protected static final ThreadLocal<HBaseServer> SERVER =
     new ThreadLocal<HBaseServer>();
@@ -368,8 +367,8 @@ public abstract class HBaseServer {
             } catch (Exception e) {return;}
           }
           if (c.timedOut(currentTime)) {
-            if (LOG.isDebugEnabled())
-              LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
+            if (LOG.isTraceEnabled())
+              LOG.trace(getName() + ": disconnecting client " + c.getHostAddress());
             closeConnection(c);
             numNuked++;
             end--;
@@ -459,8 +458,8 @@ public abstract class HBaseServer {
             try {
               doRead(readSelectionKey);
             } catch (InterruptedException e) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Caught: " + StringUtils.stringifyException(e) +
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("Caught: " + StringUtils.stringifyException(e) +
                     " when processing " + readSelectionKey.attachment());
               }
             } finally {
@@ -472,8 +471,8 @@ public abstract class HBaseServer {
         });
       } catch (Throwable e) {
         setReadInterest(readSelectionKey);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Caught " + e.getMessage() + " when processing the remote connection " +
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Caught " + e.getMessage() + " when processing the remote connection " +
               readSelectionKey.attachment().toString());
         }
       }
@@ -483,8 +482,8 @@ public abstract class HBaseServer {
       if (key != null) {
         Connection c = (Connection)key.attachment();
         if (c != null) {
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
+          if (LOG.isTraceEnabled())
+            LOG.trace(getName() + ": disconnecting client " + c.getHostAddress());
           closeConnection(c);
         }
       }
@@ -512,8 +511,8 @@ public abstract class HBaseServer {
           connectionList.add(numConnections, c);
           numConnections++;
         }
-        if (LOG.isDebugEnabled())
-          LOG.debug("Server connection from " + c.toString() +
+        if (LOG.isTraceEnabled())
+          LOG.trace("Server connection from " + c.toString() +
               "; # active connections: " + numConnections +
               "; # queued calls: " + callQueue.size());
       }
@@ -532,12 +531,13 @@ public abstract class HBaseServer {
       } catch (InterruptedException ieo) {
         throw ieo;
       } catch (Exception e) {
-        LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
+        LOG.warn(getName() + ": readAndProcess threw exception " + e +
+            ". Count of bytes read: " + count, e);
         count = -1; //so that the (count < 0) block is executed
       }
       if (count < 0) {
-        if (LOG.isDebugEnabled())
-          LOG.debug(getName() + ": disconnecting client " +
+        if (LOG.isTraceEnabled())
+          LOG.trace(getName() + ": disconnecting client " +
                     c.getHostAddress() + ". Number of active connections: "+
                     numConnections);
         closeConnection(c);
@@ -557,7 +557,7 @@ public abstract class HBaseServer {
         try {
           acceptChannel.socket().close();
         } catch (IOException e) {
-          LOG.info(getName() + ":Exception in closing listener socket. " + e);
+          LOG.warn(getName() + ":Exception in closing listener socket. " + e);
         }
       }
     }
@@ -596,7 +596,7 @@ public abstract class HBaseServer {
                   doAsyncWrite(key);
               }
             } catch (IOException e) {
-              LOG.info(getName() + ": doAsyncWrite threw exception " + e);
+              LOG.warn(getName() + ": doAsyncWrite threw exception " + e);
             }
           }
           long now = System.currentTimeMillis();
@@ -608,7 +608,9 @@ public abstract class HBaseServer {
           // If there were some calls that have not been sent out for a
           // long time, discard them.
           //
-          LOG.debug("Checking for old call responses.");
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Checking for old call responses.");
+          }
           ArrayList<Call> calls;
 
           // get the list of channels from list of keys.
@@ -718,8 +720,8 @@ public abstract class HBaseServer {
           //
           call = responseQueue.removeFirst();
           SocketChannel channel = call.connection.channel;
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(getName() + ": responding to #" + call.id + " from " +
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(getName() + ": responding to #" + call.id + " from " +
                       call.connection);
           }
           //
@@ -737,8 +739,8 @@ public abstract class HBaseServer {
             } else {
               done = false;            // more calls pending to be sent.
             }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+            if (LOG.isTraceEnabled()) {
+              LOG.trace(getName() + ": responding to #" + call.id + " from " +
                         call.connection + " Wrote " + numBytes + " bytes.");
             }
           } else {
@@ -765,8 +767,8 @@ public abstract class HBaseServer {
                 decPending();
               }
             }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+            if (LOG.isTraceEnabled()) {
+              LOG.trace(getName() + ": responding to #" + call.id + " from " +
                         call.connection + " Wrote partial " + numBytes +
                         " bytes.");
             }
@@ -984,8 +986,8 @@ public abstract class HBaseServer {
 
       // 1. read the call id uncompressed
       int id = uncompressedIs.readInt();
-      if (LOG.isDebugEnabled())
-        LOG.debug(" got #" + id);
+      if (LOG.isTraceEnabled())
+        LOG.trace(" got #" + id);
 
       if (version >= VERSION_COMPRESSED_RPC) {
 
@@ -1057,9 +1059,8 @@ public abstract class HBaseServer {
           status.setConnection(call.connection.getHostAddress(),
               call.connection.getRemotePort());
 
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + ": has #" + call.id + " from " +
-                      call.connection);
+          if (LOG.isTraceEnabled())
+            LOG.trace(getName() + ": has #" + call.id + " from " + call.connection);
 
           String errorClass = null;
           String error = null;
@@ -1071,7 +1072,7 @@ public abstract class HBaseServer {
             // make the call
             value = call(call.param, call.timestamp, status);
           } catch (Throwable e) {
-            LOG.info(getName()+", call "+call+": error: " + e, e);
+            LOG.warn(getName()+", call "+call+": error: " + e, e);
             errorClass = e.getClass().getName();
             error = StringUtils.stringifyException(e);
           }
@@ -1133,13 +1134,13 @@ public abstract class HBaseServer {
           responder.doRespond(call);
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
-            LOG.info(getName() + " caught: " +
+            LOG.warn(getName() + " caught: " +
                      StringUtils.stringifyException(e));
           }
         } catch (OutOfMemoryError e) {
           if (errorHandler != null) {
             if (errorHandler.checkOOME(e)) {
-              LOG.info(getName() + ": exiting on OOME");
+              LOG.error(getName() + ": exiting on OOME");
               return;
             }
           } else {
@@ -1147,13 +1148,12 @@ public abstract class HBaseServer {
             throw e;
           }
         } catch (Exception e) {
-          LOG.info(getName() + " caught: " +
+          LOG.warn(getName() + " caught: " +
                    StringUtils.stringifyException(e));
         }
       }
       LOG.info(getName() + ": exiting");
     }
-
   }
 
   protected HBaseServer(String bindAddress, int port,