You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by to...@apache.org on 2011/08/05 22:05:09 UTC

svn commit: r1154366 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java

Author: todd
Date: Fri Aug  5 20:05:09 2011
New Revision: 1154366

URL: http://svn.apache.org/viewvc?rev=1154366&view=rev
Log:
HBASE-3899. Add ability for delayed RPC calls to set return value immediately at call return.

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1154366&r1=1154365&r2=1154366&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Aug  5 20:05:09 2011
@@ -358,6 +358,8 @@ Release 0.91.0 - Unreleased
    HBASE-3810  Registering a coprocessor in HTableDescriptor should be easier
                (Mingjie Lai via garyh)
    HBASE-4158  Upgrade pom.xml to surefire 2.9 (Aaron Kushner & Mikhail)
+   HBASE-3899  Add ability for delayed RPC calls to set return value
+               immediately at call return. (Vlad Dogaru via todd)
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java?rev=1154366&r1=1154365&r2=1154366&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java Fri Aug  5 20:05:09 2011
@@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.Writable;
-
 /**
  * A call whose response can be delayed by the server.
  */
@@ -30,8 +28,12 @@ public interface Delayable {
   /**
    * Signal that the call response should be delayed, thus freeing the RPC
    * server to handle different requests.
+   *
+   * @param delayReturnValue Controls whether the return value of the call
+   * should be set when ending the delay or right away.  There are cases when
+   * the return value can be set right away, even if the call is delayed.
    */
-  public void startDelay();
+  public void startDelay(boolean delayReturnValue);
 
   /**
    * @return is the call delayed?
@@ -39,10 +41,31 @@ public interface Delayable {
   public boolean isDelayed();
 
   /**
-   * Signal that the response to the call is ready and the RPC server is now
-   * allowed to send the response.
-   * @param result The result to return to the caller.
+   * @return is the return value delayed?
+   */
+  public boolean isReturnValueDelayed();
+
+  /**
+   * Signal that the  RPC server is now allowed to send the response.
+   * @param result The value to return to the caller.  If the corresponding
+   * {@link #delayResponse(boolean)} specified that the return value should
+   * not be delayed, this parameter must be null.
    * @throws IOException
    */
   public void endDelay(Object result) throws IOException;
+
+  /**
+   * Signal the end of a delayed RPC, without specifying the return value.  Use
+   * this only if the return value was not delayed
+   * @throws IOException
+   */
+  public void endDelay() throws IOException;
+
+  /**
+   * End the call, throwing and exception to the caller.  This works regardless
+   * of the return value being delayed.
+   * @param t Object to throw to the client.
+   * @throws IOException
+   */
+  public void endDelayThrowing(Throwable t) throws IOException;
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1154366&r1=1154365&r2=1154366&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri Aug  5 20:05:09 2011
@@ -256,6 +256,9 @@ public abstract class HBaseServer implem
     protected ByteBuffer response;                // the response for this call
     protected boolean delayResponse;
     protected Responder responder;
+    protected boolean delayReturnValue;           // if the return value should be
+                                                  // set at call completion
+    protected boolean isError;
 
     public Call(int id, Writable param, Connection connection,
         Responder responder) {
@@ -266,6 +269,7 @@ public abstract class HBaseServer implem
       this.response = null;
       this.delayResponse = false;
       this.responder = responder;
+      this.isError = false;
     }
 
     @Override
@@ -275,6 +279,14 @@ public abstract class HBaseServer implem
 
     private synchronized void setResponse(Object value, String errorClass,
         String error) {
+      // Avoid overwriting an error value in the response.  This can happen if
+      // endDelayThrowing is called by another thread before the actual call
+      // returning.
+      if (this.isError)
+        return;
+      if (errorClass != null) {
+        this.isError = true;
+      }
       Writable result = null;
       if (value instanceof Writable) {
         result = (Writable) value;
@@ -334,16 +346,24 @@ public abstract class HBaseServer implem
     @Override
     public synchronized void endDelay(Object result) throws IOException {
       assert this.delayResponse;
+      assert this.delayReturnValue || result == null;
       this.delayResponse = false;
       delayedCalls.decrementAndGet();
-      this.setResponse(result, null, null);
+      if (this.delayReturnValue)
+        this.setResponse(result, null, null);
       this.responder.doRespond(this);
     }
 
     @Override
-    public synchronized void startDelay() {
+    public synchronized void endDelay() throws IOException {
+      this.endDelay(null);
+    }
+
+    @Override
+    public synchronized void startDelay(boolean delayReturnValue) {
       assert !this.delayResponse;
       this.delayResponse = true;
+      this.delayReturnValue = delayReturnValue;
       int numDelayed = delayedCalls.incrementAndGet();
       if (numDelayed > warnDelayedCalls) {
         LOG.warn("Too many delayed calls: limit " + warnDelayedCalls +
@@ -352,10 +372,23 @@ public abstract class HBaseServer implem
     }
 
     @Override
+    public synchronized void endDelayThrowing(Throwable t) throws IOException {
+      this.setResponse(null, t.getClass().toString(),
+          StringUtils.stringifyException(t));
+      this.delayResponse = false;
+      this.sendResponseIfReady();
+    }
+
+    @Override
     public synchronized boolean isDelayed() {
       return this.delayResponse;
     }
 
+    @Override
+    public synchronized boolean isReturnValueDelayed() {
+      return this.delayReturnValue;
+    }
+
     /**
      * If we have a response, and delay is not set, then respond
      * immediately.  Otherwise, do not respond to client.  This is
@@ -1194,7 +1227,9 @@ public abstract class HBaseServer implem
           }
           CurCall.set(null);
 
-          if (!call.isDelayed()) {
+          // Set the response for undelayed calls and delayed calls with
+          // undelayed responses.
+          if (!call.isDelayed() || !call.isReturnValueDelayed()) {
             call.setResponse(value, errorClass, error);
           }
           call.sendResponseIfReady();

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java?rev=1154366&r1=1154365&r2=1154366&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java Fri Aug  5 20:05:09 2011
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.ipc;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -48,17 +49,26 @@ public class TestDelayedRpc {
   public static final int DELAYED = 1;
 
   @Test
-  public void testDelayedRpc() throws Exception {
+  public void testDelayedRpcImmediateReturnValue() throws Exception {
+    testDelayedRpc(false);
+  }
+
+  @Test
+  public void testDelayedRpcDelayedReturnValue() throws Exception {
+    testDelayedRpc(true);
+  }
+
+  private void testDelayedRpc(boolean delayReturnValue) throws Exception {
     Configuration conf = HBaseConfiguration.create();
     InetSocketAddress isa = new InetSocketAddress("localhost", 0);
 
-    rpcServer = HBaseRPC.getServer(new TestRpcImpl(),
+    rpcServer = HBaseRPC.getServer(new TestRpcImpl(delayReturnValue),
         new Class<?>[]{ TestRpcImpl.class },
         isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
     rpcServer.start();
 
     TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
-        rpcServer.getListenerAddress(), conf, 400);
+        rpcServer.getListenerAddress(), conf, 1000);
 
     List<Integer> results = new ArrayList<Integer>();
 
@@ -77,7 +87,8 @@ public class TestDelayedRpc {
 
     assertEquals(results.get(0).intValue(), UNDELAYED);
     assertEquals(results.get(1).intValue(), UNDELAYED);
-    assertEquals(results.get(2).intValue(), DELAYED);
+    assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :
+        0xDEADBEEF);
   }
 
   private static class ListAppender extends AppenderSkeleton {
@@ -113,7 +124,7 @@ public class TestDelayedRpc {
     log.addAppender(listAppender);
 
     InetSocketAddress isa = new InetSocketAddress("localhost", 0);
-    rpcServer = HBaseRPC.getServer(new TestRpcImpl(),
+    rpcServer = HBaseRPC.getServer(new TestRpcImpl(true),
         new Class<?>[]{ TestRpcImpl.class },
         isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
     rpcServer.start();
@@ -150,26 +161,41 @@ public class TestDelayedRpc {
   }
 
   private static class TestRpcImpl implements TestRpc {
+    /**
+     * Should the return value of delayed call be set at the end of the delay
+     * or at call return.
+     */
+    private boolean delayReturnValue;
+
+    /**
+     * @param delayReturnValue Should the response to the delayed call be set
+     * at the start or the end of the delay.
+     * @param delay Amount of milliseconds to delay the call by
+     */
+    public TestRpcImpl(boolean delayReturnValue) {
+      this.delayReturnValue = delayReturnValue;
+    }
+
     @Override
-    public int test(boolean delay) {
+    public int test(final boolean delay) {
       if (!delay) {
         return UNDELAYED;
       }
       final Delayable call = rpcServer.getCurrentCall();
-      call.startDelay();
+      call.startDelay(delayReturnValue);
       new Thread() {
         public void run() {
           try {
             Thread.sleep(500);
-            call.endDelay(DELAYED);
-          } catch (IOException e) {
-            e.printStackTrace();
-          } catch (InterruptedException e) {
+            call.endDelay(delayReturnValue ? DELAYED : null);
+          } catch (Exception e) {
             e.printStackTrace();
           }
         }
       }.start();
-      return 0xDEADBEEF; // this return value should not go back to client
+      // This value should go back to client only if the response is set
+      // immediately at delay time.
+      return 0xDEADBEEF;
     }
 
     @Override
@@ -199,4 +225,63 @@ public class TestDelayedRpc {
       }
     }
   }
+
+  @Test
+  public void testEndDelayThrowing() throws IOException {
+    Configuration conf = HBaseConfiguration.create();
+    InetSocketAddress isa = new InetSocketAddress("localhost", 0);
+
+    rpcServer = HBaseRPC.getServer(new FaultyTestRpc(),
+        new Class<?>[]{ TestRpcImpl.class },
+        isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
+    rpcServer.start();
+
+    TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
+        rpcServer.getListenerAddress(), conf, 1000);
+
+    int result = 0xDEADBEEF;
+
+    try {
+      result = client.test(false);
+    } catch (Exception e) {
+      fail("No exception should have been thrown.");
+    }
+    assertEquals(result, UNDELAYED);
+
+    boolean caughtException = false;
+    try {
+      result = client.test(true);
+    } catch(Exception e) {
+      // Exception thrown by server is enclosed in a RemoteException.
+      if (e.getCause().getMessage().startsWith(
+          "java.lang.Exception: Something went wrong"))
+        caughtException = true;
+    }
+    assertTrue(caughtException);
+  }
+
+  /**
+   * Delayed calls to this class throw an exception.
+   */
+  private static class FaultyTestRpc implements TestRpc {
+    @Override
+    public int test(boolean delay) {
+      if (!delay)
+        return UNDELAYED;
+      Delayable call = rpcServer.getCurrentCall();
+      call.startDelay(true);
+      try {
+        call.endDelayThrowing(new Exception("Something went wrong"));
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      // Client will receive the Exception, not this value.
+      return DELAYED;
+    }
+
+    @Override
+    public long getProtocolVersion(String arg0, long arg1) throws IOException {
+      return 0;
+    }
+  }
 }