You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/05/24 05:47:40 UTC

svn commit: r1126857 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java

Author: stack
Date: Tue May 24 03:47:40 2011
New Revision: 1126857

URL: http://svn.apache.org/viewvc?rev=1126857&view=rev
Log:
HBASE-3899 HBase RPC to support free-ing up server handler threads even if response is not ready

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1126857&r1=1126856&r2=1126857&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue May 24 03:47:40 2011
@@ -264,6 +264,9 @@ Release 0.91.0 - Unreleased
                RS web UIs
    HBASE-3691  Add compressor support for 'snappy', google's compressor
                (Nichole Treadway and Nicholas Telford)
+   HBASE-3899  enhance HBase RPC to support free-ing up server handler threads
+               even if response is not ready (Dhruba Borthakur)
+
 
 Release 0.90.4 - Unreleased
 

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java?rev=1126857&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java Tue May 24 03:47:40 2011
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+/**
+ * An optional interface to indicate the the values are not immediately
+ * readable.
+ */
+public interface WritableDelayed {
+  /**
+   * Provide a hint to the caller to indicate that
+   * data is not ready yet.
+   */
+  public boolean isDelayed();
+}
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+/**
+ * An optional interface to indicate the the values are not immediately
+ * readable.
+ */
+public interface WritableDelayed {
+  /**
+   * Provide a hint to the caller to indicate that
+   * data is not ready yet.
+   */
+  public boolean isDelayed();
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1126857&r1=1126856&r2=1126857&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue May 24 03:47:40 2011
@@ -57,6 +57,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.io.WritableDelayed;
 import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
@@ -97,7 +98,7 @@ public abstract class HBaseServer implem
   /** Default value for above param */
   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
 
-  private final int warnResponseSize;
+  private static int warnResponseSize;
 
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
@@ -150,6 +151,25 @@ public abstract class HBaseServer implem
     }
     return null;
   }
+  /**
+   * If invoked from inside an RPC, startDelay tells the current call to not 
+   * send out * responses back to the client until endDelay() is invoked.
+   */
+  public static void startDelay(Object o) {
+    Call call = (Call) o;
+    if (call != null) {
+      call.startDelay();
+    }
+  }
+  public static void endDelay(Object o) throws IOException {
+    Call call = (Call) o;
+    if (call != null) {
+      call.endDelay();
+    }
+  }
+  public static Object getCall() {
+    return CurCall.get();
+  }
   /** Returns remote address as a string when invoked inside an RPC.
    *  Returns null in case of an error.
    *  @return String
@@ -241,13 +261,19 @@ public abstract class HBaseServer implem
     protected long timestamp;      // the time received when response is null
                                    // the time served when response is not null
     protected ByteBuffer response;                // the response for this call
+    private boolean delayResponse; // shall the RPC layer delay the response?
+    private boolean doneResponse;  // have we already responded to the client?
+    private Responder responder;   // the thread that sends out responses
 
-    public Call(int id, Writable param, Connection connection) {
+    public Call(int id, Writable param, Connection connection, Responder responder) {
       this.id = id;
       this.param = param;
       this.connection = connection;
       this.timestamp = System.currentTimeMillis();
       this.response = null;
+      this.delayResponse = false;
+      this.doneResponse = false;
+      this.responder = responder;
     }
 
     @Override
@@ -255,9 +281,40 @@ public abstract class HBaseServer implem
       return param.toString() + " from " + connection.toString();
     }
 
-    public void setResponse(ByteBuffer response) {
+    public synchronized void setResponse(ByteBuffer response) {
       this.response = response;
     }
+
+    /**
+     * Delay this call, do not send its response back to the client
+     */
+    public synchronized void startDelay() {
+      this.delayResponse = true;
+    }
+
+    /**
+     * It is ok to send responses back to the client now. This is
+     * typically called by a non-server-handler thread.
+     */
+    public synchronized void endDelay() throws IOException {
+      this.delayResponse = false;
+      if (response != null && !doneResponse) {
+        doneResponse = true;
+        responder.doRespond(this);
+      }
+    }
+
+    /*
+     * If we have a reponse, and delay is not set, then respond
+     * immediately. Otherwise, do not respond to client. This is
+     * called the by the RPC code in the context of the Handler thread.
+     */
+    public synchronized void sendResponseIfReady() throws IOException {
+      if (response != null && !doneResponse && !delayResponse) {
+        doneResponse = true; // going to respond as soon as this call returns.
+        responder.doRespond(this);
+      }
+    }
   }
 
   /** Listens on the socket. Creates jobs for the handler threads*/
@@ -800,15 +857,40 @@ public abstract class HBaseServer implem
     }
 
     //
+    // Enqueue for background thread to send responses out later.
+    //
+    private void enqueueInSelector(Call call) throws IOException {
+      incPending();
+      try {
+        // Wakeup the thread blocked on select, only then can the call
+        // to channel.register() complete.
+        SocketChannel channel = call.connection.channel;
+        writeSelector.wakeup();
+        channel.register(writeSelector, SelectionKey.OP_WRITE, call);
+       } catch (ClosedChannelException e) {
+         //Its ok. channel might be closed else where.
+       } finally {
+         decPending();
+       }
+    }
+
+    //
     // Enqueue a response from the application.
     //
     void doRespond(Call call) throws IOException {
+      // set the serve time when the response has to be sent later
+      call.timestamp = System.currentTimeMillis();
+
+      boolean doRegister = false;
       synchronized (call.connection.responseQueue) {
         call.connection.responseQueue.addLast(call);
         if (call.connection.responseQueue.size() == 1) {
-          processResponse(call.connection.responseQueue, true);
+          doRegister = !processResponse(call.connection.responseQueue, false);
         }
       }
+      if (doRegister) {
+        enqueueInSelector(call); // tell background thread to send it out later
+      }
     }
 
     private synchronized void incPending() {   // call waiting to be enqueued.
@@ -1003,7 +1085,7 @@ public abstract class HBaseServer implem
       Writable param = ReflectionUtils.newInstance(paramClass, conf);           // read param
       param.readFields(dis);
 
-      Call call = new Call(id, param, this);
+      Call call = new Call(id, param, this, responder);
 
       if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
         priorityCallQueue.put(call);
@@ -1070,6 +1152,10 @@ public abstract class HBaseServer implem
           }
           CurCall.set(null);
 
+          if (!(value instanceof WritableDelayed)) {
+            processRpcResponse(call, value, error, errorClass);
+          }
+
           int size = BUFFER_INITIAL_SIZE;
           if (value instanceof WritableWithSize) {
             // get the size hint.
@@ -1106,7 +1192,7 @@ public abstract class HBaseServer implem
 
 
           call.setResponse(buf.getByteBuffer());
-          responder.doRespond(call);
+          call.sendResponseIfReady(); // maybe delay sending out response to the client
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
             LOG.info(getName() + " caught: " +
@@ -1132,6 +1218,51 @@ public abstract class HBaseServer implem
 
   }
 
+  // Packages the rpc invocation response back into the call.
+  static public void processRpcResponse(Object callobj, Writable value,
+    String error, String errorClass) throws IOException {
+    Call call = (Call)callobj;
+
+    int size = Handler.BUFFER_INITIAL_SIZE;
+    if (value instanceof WritableWithSize) {
+      // get the size hint.
+      WritableWithSize ohint = (WritableWithSize)value;
+      long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
+      if (hint > 0) {
+        if ((hint) > Integer.MAX_VALUE) {
+          // oops, new problem.
+          IOException ioe = 
+            new IOException("Result buffer size too large: " + hint);
+          errorClass = ioe.getClass().getName();
+          error = StringUtils.stringifyException(ioe);
+        } else {
+          size = (int)hint;
+        }
+      }
+    }
+    ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
+    DataOutputStream out = new DataOutputStream(buf);
+    out.writeInt(call.id);                // write call id
+    out.writeBoolean(error != null);      // write error flag
+
+    if (error == null) {
+      value.write(out);
+    } else {
+      WritableUtils.writeString(out, errorClass);
+      WritableUtils.writeString(out, error);
+    }
+
+    if (buf.size() > warnResponseSize) {
+      LOG.warn("responseTooLarge for: "+call+": Size: "
+                     + StringUtils.humanReadableInt(buf.size()));
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Served: " + ((Invocation)call.param).getMethodName());
+    }
+    call.setResponse(buf.getByteBuffer());
+    call.sendResponseIfReady(); // maybe delay sending out response to the client
+  }
+
   /**
    * Gets the QOS level for this call.  If it is higher than the highPriorityLevel and there
    * are priorityHandlers available it will be processed in it's own thread set.