You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by to...@apache.org on 2011/08/05 22:05:09 UTC
svn commit: r1154366 - in /hbase/trunk: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java
src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
Author: todd
Date: Fri Aug 5 20:05:09 2011
New Revision: 1154366
URL: http://svn.apache.org/viewvc?rev=1154366&view=rev
Log:
HBASE-3899. Add ability for delayed RPC calls to set return value immediately at call return.
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1154366&r1=1154365&r2=1154366&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Aug 5 20:05:09 2011
@@ -358,6 +358,8 @@ Release 0.91.0 - Unreleased
HBASE-3810 Registering a coprocessor in HTableDescriptor should be easier
(Mingjie Lai via garyh)
HBASE-4158 Upgrade pom.xml to surefire 2.9 (Aaron Kushner & Mikhail)
+ HBASE-3899 Add ability for delayed RPC calls to set return value
+ immediately at call return. (Vlad Dogaru via todd)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java?rev=1154366&r1=1154365&r2=1154366&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java Fri Aug 5 20:05:09 2011
@@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-
/**
* A call whose response can be delayed by the server.
*/
@@ -30,8 +28,12 @@ public interface Delayable {
/**
* Signal that the call response should be delayed, thus freeing the RPC
* server to handle different requests.
+ *
+ * @param delayReturnValue Controls whether the return value of the call
+ * should be set when ending the delay or right away. There are cases when
+ * the return value can be set right away, even if the call is delayed.
*/
- public void startDelay();
+ public void startDelay(boolean delayReturnValue);
/**
* @return is the call delayed?
@@ -39,10 +41,31 @@ public interface Delayable {
public boolean isDelayed();
/**
- * Signal that the response to the call is ready and the RPC server is now
- * allowed to send the response.
- * @param result The result to return to the caller.
+ * @return is the return value delayed?
+ */
+ public boolean isReturnValueDelayed();
+
+ /**
+ * Signal that the RPC server is now allowed to send the response.
+ * @param result The value to return to the caller. If the corresponding
+ * {@link #delayResponse(boolean)} specified that the return value should
+ * not be delayed, this parameter must be null.
* @throws IOException
*/
public void endDelay(Object result) throws IOException;
+
+ /**
+ * Signal the end of a delayed RPC, without specifying the return value. Use
+ * this only if the return value was not delayed
+ * @throws IOException
+ */
+ public void endDelay() throws IOException;
+
+ /**
+ * End the call, throwing and exception to the caller. This works regardless
+ * of the return value being delayed.
+ * @param t Object to throw to the client.
+ * @throws IOException
+ */
+ public void endDelayThrowing(Throwable t) throws IOException;
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1154366&r1=1154365&r2=1154366&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri Aug 5 20:05:09 2011
@@ -256,6 +256,9 @@ public abstract class HBaseServer implem
protected ByteBuffer response; // the response for this call
protected boolean delayResponse;
protected Responder responder;
+ protected boolean delayReturnValue; // if the return value should be
+ // set at call completion
+ protected boolean isError;
public Call(int id, Writable param, Connection connection,
Responder responder) {
@@ -266,6 +269,7 @@ public abstract class HBaseServer implem
this.response = null;
this.delayResponse = false;
this.responder = responder;
+ this.isError = false;
}
@Override
@@ -275,6 +279,14 @@ public abstract class HBaseServer implem
private synchronized void setResponse(Object value, String errorClass,
String error) {
+ // Avoid overwriting an error value in the response. This can happen if
+ // endDelayThrowing is called by another thread before the actual call
+ // returning.
+ if (this.isError)
+ return;
+ if (errorClass != null) {
+ this.isError = true;
+ }
Writable result = null;
if (value instanceof Writable) {
result = (Writable) value;
@@ -334,16 +346,24 @@ public abstract class HBaseServer implem
@Override
public synchronized void endDelay(Object result) throws IOException {
assert this.delayResponse;
+ assert this.delayReturnValue || result == null;
this.delayResponse = false;
delayedCalls.decrementAndGet();
- this.setResponse(result, null, null);
+ if (this.delayReturnValue)
+ this.setResponse(result, null, null);
this.responder.doRespond(this);
}
@Override
- public synchronized void startDelay() {
+ public synchronized void endDelay() throws IOException {
+ this.endDelay(null);
+ }
+
+ @Override
+ public synchronized void startDelay(boolean delayReturnValue) {
assert !this.delayResponse;
this.delayResponse = true;
+ this.delayReturnValue = delayReturnValue;
int numDelayed = delayedCalls.incrementAndGet();
if (numDelayed > warnDelayedCalls) {
LOG.warn("Too many delayed calls: limit " + warnDelayedCalls +
@@ -352,10 +372,23 @@ public abstract class HBaseServer implem
}
@Override
+ public synchronized void endDelayThrowing(Throwable t) throws IOException {
+ this.setResponse(null, t.getClass().toString(),
+ StringUtils.stringifyException(t));
+ this.delayResponse = false;
+ this.sendResponseIfReady();
+ }
+
+ @Override
public synchronized boolean isDelayed() {
return this.delayResponse;
}
+ @Override
+ public synchronized boolean isReturnValueDelayed() {
+ return this.delayReturnValue;
+ }
+
/**
* If we have a response, and delay is not set, then respond
* immediately. Otherwise, do not respond to client. This is
@@ -1194,7 +1227,9 @@ public abstract class HBaseServer implem
}
CurCall.set(null);
- if (!call.isDelayed()) {
+ // Set the response for undelayed calls and delayed calls with
+ // undelayed responses.
+ if (!call.isDelayed() || !call.isReturnValueDelayed()) {
call.setResponse(value, errorClass, error);
}
call.sendResponseIfReady();
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java?rev=1154366&r1=1154365&r2=1154366&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java Fri Aug 5 20:05:09 2011
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -48,17 +49,26 @@ public class TestDelayedRpc {
public static final int DELAYED = 1;
@Test
- public void testDelayedRpc() throws Exception {
+ public void testDelayedRpcImmediateReturnValue() throws Exception {
+ testDelayedRpc(false);
+ }
+
+ @Test
+ public void testDelayedRpcDelayedReturnValue() throws Exception {
+ testDelayedRpc(true);
+ }
+
+ private void testDelayedRpc(boolean delayReturnValue) throws Exception {
Configuration conf = HBaseConfiguration.create();
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
- rpcServer = HBaseRPC.getServer(new TestRpcImpl(),
+ rpcServer = HBaseRPC.getServer(new TestRpcImpl(delayReturnValue),
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
- rpcServer.getListenerAddress(), conf, 400);
+ rpcServer.getListenerAddress(), conf, 1000);
List<Integer> results = new ArrayList<Integer>();
@@ -77,7 +87,8 @@ public class TestDelayedRpc {
assertEquals(results.get(0).intValue(), UNDELAYED);
assertEquals(results.get(1).intValue(), UNDELAYED);
- assertEquals(results.get(2).intValue(), DELAYED);
+ assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :
+ 0xDEADBEEF);
}
private static class ListAppender extends AppenderSkeleton {
@@ -113,7 +124,7 @@ public class TestDelayedRpc {
log.addAppender(listAppender);
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
- rpcServer = HBaseRPC.getServer(new TestRpcImpl(),
+ rpcServer = HBaseRPC.getServer(new TestRpcImpl(true),
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
@@ -150,26 +161,41 @@ public class TestDelayedRpc {
}
private static class TestRpcImpl implements TestRpc {
+ /**
+ * Should the return value of delayed call be set at the end of the delay
+ * or at call return.
+ */
+ private boolean delayReturnValue;
+
+ /**
+ * @param delayReturnValue Should the response to the delayed call be set
+ * at the start or the end of the delay.
+ * @param delay Amount of milliseconds to delay the call by
+ */
+ public TestRpcImpl(boolean delayReturnValue) {
+ this.delayReturnValue = delayReturnValue;
+ }
+
@Override
- public int test(boolean delay) {
+ public int test(final boolean delay) {
if (!delay) {
return UNDELAYED;
}
final Delayable call = rpcServer.getCurrentCall();
- call.startDelay();
+ call.startDelay(delayReturnValue);
new Thread() {
public void run() {
try {
Thread.sleep(500);
- call.endDelay(DELAYED);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
+ call.endDelay(delayReturnValue ? DELAYED : null);
+ } catch (Exception e) {
e.printStackTrace();
}
}
}.start();
- return 0xDEADBEEF; // this return value should not go back to client
+ // This value should go back to client only if the response is set
+ // immediately at delay time.
+ return 0xDEADBEEF;
}
@Override
@@ -199,4 +225,63 @@ public class TestDelayedRpc {
}
}
}
+
+ @Test
+ public void testEndDelayThrowing() throws IOException {
+ Configuration conf = HBaseConfiguration.create();
+ InetSocketAddress isa = new InetSocketAddress("localhost", 0);
+
+ rpcServer = HBaseRPC.getServer(new FaultyTestRpc(),
+ new Class<?>[]{ TestRpcImpl.class },
+ isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
+ rpcServer.start();
+
+ TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
+ rpcServer.getListenerAddress(), conf, 1000);
+
+ int result = 0xDEADBEEF;
+
+ try {
+ result = client.test(false);
+ } catch (Exception e) {
+ fail("No exception should have been thrown.");
+ }
+ assertEquals(result, UNDELAYED);
+
+ boolean caughtException = false;
+ try {
+ result = client.test(true);
+ } catch(Exception e) {
+ // Exception thrown by server is enclosed in a RemoteException.
+ if (e.getCause().getMessage().startsWith(
+ "java.lang.Exception: Something went wrong"))
+ caughtException = true;
+ }
+ assertTrue(caughtException);
+ }
+
+ /**
+ * Delayed calls to this class throw an exception.
+ */
+ private static class FaultyTestRpc implements TestRpc {
+ @Override
+ public int test(boolean delay) {
+ if (!delay)
+ return UNDELAYED;
+ Delayable call = rpcServer.getCurrentCall();
+ call.startDelay(true);
+ try {
+ call.endDelayThrowing(new Exception("Something went wrong"));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ // Client will receive the Exception, not this value.
+ return DELAYED;
+ }
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return 0;
+ }
+ }
}