You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/07/26 22:30:58 UTC

svn commit: r1151227 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/ipc/ src/test/java/org/apache/hadoop/hbase/ipc/

Author: stack
Date: Tue Jul 26 20:30:57 2011
New Revision: 1151227

URL: http://svn.apache.org/viewvc?rev=1151227&view=rev
Log:
HBASE-3899 enhance HBase RPC to support free-ing up server handler threads even if response is not ready

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1151227&r1=1151226&r2=1151227&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Jul 26 20:30:57 2011
@@ -332,6 +332,8 @@ Release 0.91.0 - Unreleased
    HBASE-4081  Issues with HRegion.compactStores methods (Ming Ma)
    HBASE-3465  Hbase should use a HADOOP_HOME environment variable if available
                (Alejandro Abdelnur)
+   HBASE-3899  enhance HBase RPC to support free-ing up server handler threads
+               even if response is not ready (Vlad Dogaru)
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java?rev=1151227&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java Tue Jul 26 20:30:57 2011
@@ -0,0 +1,48 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A call whose response can be delayed by the server.
+ */
+public interface Delayable {
+  /**
+   * Signal that the call response should be delayed, thus freeing the RPC
+   * server to handle different requests.
+   */
+  public void startDelay();
+
+  /**
+   * @return is the call delayed?
+   */
+  public boolean isDelayed();
+
+  /**
+   * Signal that the response to the call is ready and the RPC server is now
+   * allowed to send the response.
+   * @param result The result to return to the caller.
+   * @throws IOException
+   */
+  public void endDelay(Object result) throws IOException;
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1151227&r1=1151226&r2=1151227&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Jul 26 20:30:57 2011
@@ -47,6 +47,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -56,6 +57,7 @@ import java.util.concurrent.LinkedBlocki
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
 import org.apache.hadoop.hbase.io.WritableWithSize;
 import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -97,8 +99,19 @@ public abstract class HBaseServer implem
   /** Default value for above param */
   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
 
+  static final int BUFFER_INITIAL_SIZE = 1024;
+
   private final int warnResponseSize;
 
+  private static final String WARN_DELAYED_CALLS =
+      "hbase.ipc.warn.delayedrpc.number";
+
+  private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
+
+  private final int warnDelayedCalls;
+
+  private AtomicInteger delayedCalls;
+
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
 
@@ -234,20 +247,25 @@ public abstract class HBaseServer implem
   }
 
   /** A call queued for handling. */
-  private static class Call {
+  private class Call implements Delayable {
     protected int id;                             // the client's call id
     protected Writable param;                     // the parameter passed
     protected Connection connection;              // connection to client
     protected long timestamp;      // the time received when response is null
                                    // the time served when response is not null
     protected ByteBuffer response;                // the response for this call
+    protected boolean delayResponse;
+    protected Responder responder;
 
-    public Call(int id, Writable param, Connection connection) {
+    public Call(int id, Writable param, Connection connection,
+        Responder responder) {
       this.id = id;
       this.param = param;
       this.connection = connection;
       this.timestamp = System.currentTimeMillis();
       this.response = null;
+      this.delayResponse = false;
+      this.responder = responder;
     }
 
     @Override
@@ -255,8 +273,98 @@ public abstract class HBaseServer implem
       return param.toString() + " from " + connection.toString();
     }
 
-    public void setResponse(ByteBuffer response) {
-      this.response = response;
+    private synchronized void setResponse(Object value, String errorClass,
+        String error) {
+      Writable result = null;
+      if (value instanceof Writable) {
+        result = (Writable) value;
+      } else {
+        /* We might have a null value and errors. Avoid creating a
+         * HbaseObjectWritable, because the constructor fails on null. */
+        if (value != null) {
+          result = new HbaseObjectWritable(value);
+        }
+      }
+
+      int size = BUFFER_INITIAL_SIZE;
+      if (result instanceof WritableWithSize) {
+        // get the size hint.
+        WritableWithSize ohint = (WritableWithSize) result;
+        long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
+        if (hint > Integer.MAX_VALUE) {
+          // oops, new problem.
+          IOException ioe =
+            new IOException("Result buffer size too large: " + hint);
+          errorClass = ioe.getClass().getName();
+          error = StringUtils.stringifyException(ioe);
+        } else {
+          size = (int)hint;
+        }
+      }
+
+      ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
+      DataOutputStream out = new DataOutputStream(buf);
+      try {
+        out.writeInt(this.id);                // write call id
+        out.writeBoolean(error != null);      // write error flag
+      } catch (IOException e) {
+        errorClass = e.getClass().getName();
+        error = StringUtils.stringifyException(e);
+      }
+
+      try {
+        if (error == null) {
+          result.write(out);
+        } else {
+          WritableUtils.writeString(out, errorClass);
+          WritableUtils.writeString(out, error);
+        }
+      } catch (IOException e) {
+        LOG.warn("Error sending response to call: ", e);
+      }
+
+      if (buf.size() > warnResponseSize) {
+        LOG.warn("responseTooLarge for: "+this+": Size: "
+            + StringUtils.humanReadableInt(buf.size()));
+      }
+
+      this.response = buf.getByteBuffer();
+    }
+
+    @Override
+    public synchronized void endDelay(Object result) throws IOException {
+      assert this.delayResponse;
+      this.delayResponse = false;
+      delayedCalls.decrementAndGet();
+      this.setResponse(result, null, null);
+      this.responder.doRespond(this);
+    }
+
+    @Override
+    public synchronized void startDelay() {
+      assert !this.delayResponse;
+      this.delayResponse = true;
+      int numDelayed = delayedCalls.incrementAndGet();
+      if (numDelayed > warnDelayedCalls) {
+        LOG.warn("Too many delayed calls: limit " + warnDelayedCalls +
+            " current " + numDelayed);
+      }
+    }
+
+    @Override
+    public synchronized boolean isDelayed() {
+      return this.delayResponse;
+    }
+
+    /**
+     * If we have a response, and delay is not set, then respond
+     * immediately.  Otherwise, do not respond to client.  This is
+     * called the by the RPC code in the context of the Handler thread.
+     */
+    public synchronized void sendResponseIfReady() throws IOException {
+      if (!this.delayResponse) {
+        this.responder.doRespond(this);
+      }
     }
   }
 
@@ -767,19 +875,8 @@ public abstract class HBaseServer implem
             if (inHandler) {
               // set the serve time when the response has to be sent later
               call.timestamp = System.currentTimeMillis();
-
-              incPending();
-              try {
-                // Wakeup the thread blocked on select, only then can the call
-                // to channel.register() complete.
-                writeSelector.wakeup();
-                channel.register(writeSelector, SelectionKey.OP_WRITE, call);
-              } catch (ClosedChannelException e) {
-                //Its ok. channel might be closed else where.
+              if (enqueueInSelector(call))
                 done = true;
-              } finally {
-                decPending();
-              }
             }
             if (LOG.isDebugEnabled()) {
               LOG.debug(getName() + ": responding to #" + call.id + " from " +
@@ -800,15 +897,43 @@ public abstract class HBaseServer implem
     }
 
     //
+    // Enqueue for background thread to send responses out later.
+    //
+    private boolean enqueueInSelector(Call call) throws IOException {
+      boolean done = false;
+      incPending();
+      try {
+        // Wake up the thread blocked on select, only then can the call
+        // to channel.register() complete.
+        SocketChannel channel = call.connection.channel;
+        writeSelector.wakeup();
+        channel.register(writeSelector, SelectionKey.OP_WRITE, call);
+      } catch (ClosedChannelException e) {
+        //It's OK.  Channel might be closed else where.
+        done = true;
+      } finally {
+        decPending();
+      }
+      return done;
+    }
+
+    //
     // Enqueue a response from the application.
     //
     void doRespond(Call call) throws IOException {
+      // set the serve time when the response has to be sent later
+      call.timestamp = System.currentTimeMillis();
+
+      boolean doRegister = false;
       synchronized (call.connection.responseQueue) {
         call.connection.responseQueue.addLast(call);
         if (call.connection.responseQueue.size() == 1) {
-          processResponse(call.connection.responseQueue, true);
+          doRegister = !processResponse(call.connection.responseQueue, false);
         }
       }
+      if (doRegister) {
+        enqueueInSelector(call);
+      }
     }
 
     private synchronized void incPending() {   // call waiting to be enqueued.
@@ -1003,7 +1128,7 @@ public abstract class HBaseServer implem
       Writable param = ReflectionUtils.newInstance(paramClass, conf);           // read param
       param.readFields(dis);
 
-      Call call = new Call(id, param, this);
+      Call call = new Call(id, param, this, responder);
 
       if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
         priorityCallQueue.put(call);
@@ -1028,7 +1153,6 @@ public abstract class HBaseServer implem
   /** Handles queued calls . */
   private class Handler extends Thread {
     private final BlockingQueue<Call> myCallQueue;
-    static final int BUFFER_INITIAL_SIZE = 1024;
 
     public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
       this.myCallQueue = cq;
@@ -1070,43 +1194,10 @@ public abstract class HBaseServer implem
           }
           CurCall.set(null);
 
-          int size = BUFFER_INITIAL_SIZE;
-          if (value instanceof WritableWithSize) {
-            // get the size hint.
-            WritableWithSize ohint = (WritableWithSize)value;
-            long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
-            if (hint > 0) {
-              if ((hint) > Integer.MAX_VALUE) {
-                // oops, new problem.
-                IOException ioe =
-                    new IOException("Result buffer size too large: " + hint);
-                errorClass = ioe.getClass().getName();
-                error = StringUtils.stringifyException(ioe);
-              } else {
-                size = (int)hint;
-              }
-            }
-          }
-          ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
-          DataOutputStream out = new DataOutputStream(buf);
-          out.writeInt(call.id);                // write call id
-          out.writeBoolean(error != null);      // write error flag
-
-          if (error == null) {
-            value.write(out);
-          } else {
-            WritableUtils.writeString(out, errorClass);
-            WritableUtils.writeString(out, error);
-          }
-
-          if (buf.size() > warnResponseSize) {
-            LOG.warn(getName()+", responseTooLarge for: "+call+": Size: "
-                     + StringUtils.humanReadableInt(buf.size()));
+          if (!call.isDelayed()) {
+            call.setResponse(value, errorClass, error);
           }
-
-
-          call.setResponse(buf.getByteBuffer());
-          responder.doRespond(call);
+          call.sendResponseIfReady();
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
             LOG.info(getName() + " caught: " +
@@ -1201,6 +1292,9 @@ public abstract class HBaseServer implem
 
     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
                                         DEFAULT_WARN_RESPONSE_SIZE);
+    this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS,
+                                        DEFAULT_WARN_DELAYED_CALLS);
+    this.delayedCalls = new AtomicInteger(0);
 
 
     // Create the responder here
@@ -1428,4 +1522,8 @@ public abstract class HBaseServer implem
     int nBytes = initialRemaining - buf.remaining();
     return (nBytes > 0) ? nBytes : ret;
   }
+
+  public Delayable getCurrentCall() {
+    return CurCall.get();
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1151227&r1=1151226&r2=1151227&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Tue Jul 26 20:30:57 2011
@@ -26,7 +26,6 @@ import org.apache.hadoop.ipc.VersionedPr
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 
 /**
  */
@@ -65,6 +64,13 @@ public interface RpcServer {
   void startThreads();
 
   /**
+   * Needed for delayed calls.  We need to be able to store the current call
+   * so that we can complete it later.
+   * @return Call the server is currently handling.
+   */
+  Delayable getCurrentCall();
+
+  /**
    * Returns the metrics instance for reporting RPC call statistics
    */
   HBaseRpcMetrics getRpcMetrics();

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java?rev=1151227&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java Tue Jul 26 20:30:57 2011
@@ -0,0 +1,202 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Test;
+
+/**
+ * Test that delayed RPCs work. Fire up three calls, the first of which should
+ * be delayed. Check that the last two, which are undelayed, return before the
+ * first one.
+ */
+public class TestDelayedRpc {
+  public static RpcServer rpcServer;
+
+  public static final int UNDELAYED = 0;
+  public static final int DELAYED = 1;
+
+  @Test
+  public void testDelayedRpc() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    InetSocketAddress isa = new InetSocketAddress("localhost", 0);
+
+    rpcServer = HBaseRPC.getServer(new TestRpcImpl(),
+        new Class<?>[]{ TestRpcImpl.class },
+        isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
+    rpcServer.start();
+
+    TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
+        rpcServer.getListenerAddress(), conf, 400);
+
+    List<Integer> results = new ArrayList<Integer>();
+
+    TestThread th1 = new TestThread(client, true, results);
+    TestThread th2 = new TestThread(client, false, results);
+    TestThread th3 = new TestThread(client, false, results);
+    th1.start();
+    Thread.sleep(100);
+    th2.start();
+    Thread.sleep(200);
+    th3.start();
+
+    th1.join();
+    th2.join();
+    th3.join();
+
+    assertEquals(results.get(0).intValue(), UNDELAYED);
+    assertEquals(results.get(1).intValue(), UNDELAYED);
+    assertEquals(results.get(2).intValue(), DELAYED);
+  }
+
+  private static class ListAppender extends AppenderSkeleton {
+    private List<String> messages = new ArrayList<String>();
+
+    @Override
+    protected void append(LoggingEvent event) {
+      messages.add(event.getMessage().toString());
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public boolean requiresLayout() {
+      return false;
+    }
+
+    public List<String> getMessages() {
+      return messages;
+    }
+  }
+
+  @Test
+  public void testTooManyDelayedRpcs() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    final int MAX_DELAYED_RPC = 10;
+    conf.setInt("hbase.ipc.warn.delayedrpc.number", MAX_DELAYED_RPC);
+
+    ListAppender listAppender = new ListAppender();
+    Logger log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer");
+    log.addAppender(listAppender);
+
+    InetSocketAddress isa = new InetSocketAddress("localhost", 0);
+    rpcServer = HBaseRPC.getServer(new TestRpcImpl(),
+        new Class<?>[]{ TestRpcImpl.class },
+        isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
+    rpcServer.start();
+    TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
+        rpcServer.getListenerAddress(), conf, 1000);
+
+    Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
+
+    for (int i = 0; i < MAX_DELAYED_RPC; i++) {
+      threads[i] = new TestThread(client, true, null);
+      threads[i].start();
+    }
+
+    /* No warnings till here. */
+    assertTrue(listAppender.getMessages().isEmpty());
+
+    /* This should give a warning. */
+    threads[MAX_DELAYED_RPC] = new TestThread(client, true, null);
+    threads[MAX_DELAYED_RPC].start();
+
+    for (int i = 0; i < MAX_DELAYED_RPC; i++) {
+      threads[i].join();
+    }
+
+    assertFalse(listAppender.getMessages().isEmpty());
+    assertTrue(listAppender.getMessages().get(0).startsWith(
+        "Too many delayed calls"));
+
+    log.removeAppender(listAppender);
+  }
+
+  public interface TestRpc extends VersionedProtocol {
+    int test(boolean delay);
+  }
+
+  private static class TestRpcImpl implements TestRpc {
+    @Override
+    public int test(boolean delay) {
+      if (!delay) {
+        return UNDELAYED;
+      }
+      final Delayable call = rpcServer.getCurrentCall();
+      call.startDelay();
+      new Thread() {
+        public void run() {
+          try {
+            Thread.sleep(500);
+            call.endDelay(DELAYED);
+          } catch (IOException e) {
+            e.printStackTrace();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      }.start();
+      return 0xDEADBEEF; // this return value should not go back to client
+    }
+
+    @Override
+    public long getProtocolVersion(String arg0, long arg1) throws IOException {
+      return 0;
+    }
+  }
+
+  private static class TestThread extends Thread {
+    private TestRpc server;
+    private boolean delay;
+    private List<Integer> results;
+
+    public TestThread(TestRpc server, boolean delay, List<Integer> results) {
+      this.server = server;
+      this.delay = delay;
+      this.results = results;
+    }
+
+    @Override
+    public void run() {
+      Integer result = new Integer(server.test(delay));
+      if (results != null) {
+        synchronized (results) {
+          results.add(result);
+        }
+      }
+    }
+  }
+}