You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/07/26 22:30:58 UTC
svn commit: r1151227 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/ipc/
src/test/java/org/apache/hadoop/hbase/ipc/
Author: stack
Date: Tue Jul 26 20:30:57 2011
New Revision: 1151227
URL: http://svn.apache.org/viewvc?rev=1151227&view=rev
Log:
HBASE-3899 enhance HBase RPC to support free-ing up server handler threads even if response is not ready
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/
hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1151227&r1=1151226&r2=1151227&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Jul 26 20:30:57 2011
@@ -332,6 +332,8 @@ Release 0.91.0 - Unreleased
HBASE-4081 Issues with HRegion.compactStores methods (Ming Ma)
HBASE-3465 Hbase should use a HADOOP_HOME environment variable if available
(Alejandro Abdelnur)
+ HBASE-3899 enhance HBase RPC to support free-ing up server handler threads
+ even if response is not ready (Vlad Dogaru)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java?rev=1151227&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java Tue Jul 26 20:30:57 2011
@@ -0,0 +1,48 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A call whose response can be delayed by the server.
+ */
+public interface Delayable {
+ /**
+ * Signal that the call response should be delayed, thus freeing the RPC
+ * server to handle different requests.
+ */
+ public void startDelay();
+
+ /**
+ * @return is the call delayed?
+ */
+ public boolean isDelayed();
+
+ /**
+ * Signal that the response to the call is ready and the RPC server is now
+ * allowed to send the response.
+ * @param result The result to return to the caller.
+ * @throws IOException
+ */
+ public void endDelay(Object result) throws IOException;
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1151227&r1=1151226&r2=1151227&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Jul 26 20:30:57 2011
@@ -47,6 +47,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -56,6 +57,7 @@ import java.util.concurrent.LinkedBlocki
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import org.apache.hadoop.hbase.util.Bytes;
@@ -97,8 +99,19 @@ public abstract class HBaseServer implem
/** Default value for above param */
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
+ static final int BUFFER_INITIAL_SIZE = 1024;
+
private final int warnResponseSize;
+ private static final String WARN_DELAYED_CALLS =
+ "hbase.ipc.warn.delayedrpc.number";
+
+ private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
+
+ private final int warnDelayedCalls;
+
+ private AtomicInteger delayedCalls;
+
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
@@ -234,20 +247,25 @@ public abstract class HBaseServer implem
}
/** A call queued for handling. */
- private static class Call {
+ private class Call implements Delayable {
protected int id; // the client's call id
protected Writable param; // the parameter passed
protected Connection connection; // connection to client
protected long timestamp; // the time received when response is null
// the time served when response is not null
protected ByteBuffer response; // the response for this call
+ protected boolean delayResponse;
+ protected Responder responder;
- public Call(int id, Writable param, Connection connection) {
+ public Call(int id, Writable param, Connection connection,
+ Responder responder) {
this.id = id;
this.param = param;
this.connection = connection;
this.timestamp = System.currentTimeMillis();
this.response = null;
+ this.delayResponse = false;
+ this.responder = responder;
}
@Override
@@ -255,8 +273,98 @@ public abstract class HBaseServer implem
return param.toString() + " from " + connection.toString();
}
- public void setResponse(ByteBuffer response) {
- this.response = response;
+ private synchronized void setResponse(Object value, String errorClass,
+ String error) {
+ Writable result = null;
+ if (value instanceof Writable) {
+ result = (Writable) value;
+ } else {
+ /* We might have a null value and errors. Avoid creating a
+ * HbaseObjectWritable, because the constructor fails on null. */
+ if (value != null) {
+ result = new HbaseObjectWritable(value);
+ }
+ }
+
+ int size = BUFFER_INITIAL_SIZE;
+ if (result instanceof WritableWithSize) {
+ // get the size hint.
+ WritableWithSize ohint = (WritableWithSize) result;
+ long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
+ if (hint > Integer.MAX_VALUE) {
+ // oops, new problem.
+ IOException ioe =
+ new IOException("Result buffer size too large: " + hint);
+ errorClass = ioe.getClass().getName();
+ error = StringUtils.stringifyException(ioe);
+ } else {
+ size = (int)hint;
+ }
+ }
+
+ ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
+ DataOutputStream out = new DataOutputStream(buf);
+ try {
+ out.writeInt(this.id); // write call id
+ out.writeBoolean(error != null); // write error flag
+ } catch (IOException e) {
+ errorClass = e.getClass().getName();
+ error = StringUtils.stringifyException(e);
+ }
+
+ try {
+ if (error == null) {
+ result.write(out);
+ } else {
+ WritableUtils.writeString(out, errorClass);
+ WritableUtils.writeString(out, error);
+ }
+ } catch (IOException e) {
+ LOG.warn("Error sending response to call: ", e);
+ }
+
+ if (buf.size() > warnResponseSize) {
+ LOG.warn("responseTooLarge for: "+this+": Size: "
+ + StringUtils.humanReadableInt(buf.size()));
+ }
+
+ this.response = buf.getByteBuffer();
+ }
+
+ @Override
+ public synchronized void endDelay(Object result) throws IOException {
+ assert this.delayResponse;
+ this.delayResponse = false;
+ delayedCalls.decrementAndGet();
+ this.setResponse(result, null, null);
+ this.responder.doRespond(this);
+ }
+
+ @Override
+ public synchronized void startDelay() {
+ assert !this.delayResponse;
+ this.delayResponse = true;
+ int numDelayed = delayedCalls.incrementAndGet();
+ if (numDelayed > warnDelayedCalls) {
+ LOG.warn("Too many delayed calls: limit " + warnDelayedCalls +
+ " current " + numDelayed);
+ }
+ }
+
+ @Override
+ public synchronized boolean isDelayed() {
+ return this.delayResponse;
+ }
+
+ /**
+ * If we have a response, and delay is not set, then respond
+ * immediately. Otherwise, do not respond to client. This is
+ * called the by the RPC code in the context of the Handler thread.
+ */
+ public synchronized void sendResponseIfReady() throws IOException {
+ if (!this.delayResponse) {
+ this.responder.doRespond(this);
+ }
}
}
@@ -767,19 +875,8 @@ public abstract class HBaseServer implem
if (inHandler) {
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
-
- incPending();
- try {
- // Wakeup the thread blocked on select, only then can the call
- // to channel.register() complete.
- writeSelector.wakeup();
- channel.register(writeSelector, SelectionKey.OP_WRITE, call);
- } catch (ClosedChannelException e) {
- //Its ok. channel might be closed else where.
+ if (enqueueInSelector(call))
done = true;
- } finally {
- decPending();
- }
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " +
@@ -800,15 +897,43 @@ public abstract class HBaseServer implem
}
//
+ // Enqueue for background thread to send responses out later.
+ //
+ private boolean enqueueInSelector(Call call) throws IOException {
+ boolean done = false;
+ incPending();
+ try {
+ // Wake up the thread blocked on select, only then can the call
+ // to channel.register() complete.
+ SocketChannel channel = call.connection.channel;
+ writeSelector.wakeup();
+ channel.register(writeSelector, SelectionKey.OP_WRITE, call);
+ } catch (ClosedChannelException e) {
+ //It's OK. Channel might be closed else where.
+ done = true;
+ } finally {
+ decPending();
+ }
+ return done;
+ }
+
+ //
// Enqueue a response from the application.
//
void doRespond(Call call) throws IOException {
+ // set the serve time when the response has to be sent later
+ call.timestamp = System.currentTimeMillis();
+
+ boolean doRegister = false;
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
- processResponse(call.connection.responseQueue, true);
+ doRegister = !processResponse(call.connection.responseQueue, false);
}
}
+ if (doRegister) {
+ enqueueInSelector(call);
+ }
}
private synchronized void incPending() { // call waiting to be enqueued.
@@ -1003,7 +1128,7 @@ public abstract class HBaseServer implem
Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param
param.readFields(dis);
- Call call = new Call(id, param, this);
+ Call call = new Call(id, param, this, responder);
if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
priorityCallQueue.put(call);
@@ -1028,7 +1153,6 @@ public abstract class HBaseServer implem
/** Handles queued calls . */
private class Handler extends Thread {
private final BlockingQueue<Call> myCallQueue;
- static final int BUFFER_INITIAL_SIZE = 1024;
public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
this.myCallQueue = cq;
@@ -1070,43 +1194,10 @@ public abstract class HBaseServer implem
}
CurCall.set(null);
- int size = BUFFER_INITIAL_SIZE;
- if (value instanceof WritableWithSize) {
- // get the size hint.
- WritableWithSize ohint = (WritableWithSize)value;
- long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
- if (hint > 0) {
- if ((hint) > Integer.MAX_VALUE) {
- // oops, new problem.
- IOException ioe =
- new IOException("Result buffer size too large: " + hint);
- errorClass = ioe.getClass().getName();
- error = StringUtils.stringifyException(ioe);
- } else {
- size = (int)hint;
- }
- }
- }
- ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
- DataOutputStream out = new DataOutputStream(buf);
- out.writeInt(call.id); // write call id
- out.writeBoolean(error != null); // write error flag
-
- if (error == null) {
- value.write(out);
- } else {
- WritableUtils.writeString(out, errorClass);
- WritableUtils.writeString(out, error);
- }
-
- if (buf.size() > warnResponseSize) {
- LOG.warn(getName()+", responseTooLarge for: "+call+": Size: "
- + StringUtils.humanReadableInt(buf.size()));
+ if (!call.isDelayed()) {
+ call.setResponse(value, errorClass, error);
}
-
-
- call.setResponse(buf.getByteBuffer());
- responder.doRespond(call);
+ call.sendResponseIfReady();
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(getName() + " caught: " +
@@ -1201,6 +1292,9 @@ public abstract class HBaseServer implem
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
DEFAULT_WARN_RESPONSE_SIZE);
+ this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS,
+ DEFAULT_WARN_DELAYED_CALLS);
+ this.delayedCalls = new AtomicInteger(0);
// Create the responder here
@@ -1428,4 +1522,8 @@ public abstract class HBaseServer implem
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
}
+
+ public Delayable getCurrentCall() {
+ return CurCall.get();
+ }
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1151227&r1=1151226&r2=1151227&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Tue Jul 26 20:30:57 2011
@@ -26,7 +26,6 @@ import org.apache.hadoop.ipc.VersionedPr
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
/**
*/
@@ -65,6 +64,13 @@ public interface RpcServer {
void startThreads();
/**
+ * Needed for delayed calls. We need to be able to store the current call
+ * so that we can complete it later.
+ * @return Call the server is currently handling.
+ */
+ Delayable getCurrentCall();
+
+ /**
* Returns the metrics instance for reporting RPC call statistics
*/
HBaseRpcMetrics getRpcMetrics();
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java?rev=1151227&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java Tue Jul 26 20:30:57 2011
@@ -0,0 +1,202 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Test;
+
+/**
+ * Test that delayed RPCs work. Fire up three calls, the first of which should
+ * be delayed. Check that the last two, which are undelayed, return before the
+ * first one.
+ */
+public class TestDelayedRpc {
+ public static RpcServer rpcServer;
+
+ public static final int UNDELAYED = 0;
+ public static final int DELAYED = 1;
+
+ @Test
+ public void testDelayedRpc() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ InetSocketAddress isa = new InetSocketAddress("localhost", 0);
+
+ rpcServer = HBaseRPC.getServer(new TestRpcImpl(),
+ new Class<?>[]{ TestRpcImpl.class },
+ isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
+ rpcServer.start();
+
+ TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
+ rpcServer.getListenerAddress(), conf, 400);
+
+ List<Integer> results = new ArrayList<Integer>();
+
+ TestThread th1 = new TestThread(client, true, results);
+ TestThread th2 = new TestThread(client, false, results);
+ TestThread th3 = new TestThread(client, false, results);
+ th1.start();
+ Thread.sleep(100);
+ th2.start();
+ Thread.sleep(200);
+ th3.start();
+
+ th1.join();
+ th2.join();
+ th3.join();
+
+ assertEquals(results.get(0).intValue(), UNDELAYED);
+ assertEquals(results.get(1).intValue(), UNDELAYED);
+ assertEquals(results.get(2).intValue(), DELAYED);
+ }
+
+ private static class ListAppender extends AppenderSkeleton {
+ private List<String> messages = new ArrayList<String>();
+
+ @Override
+ protected void append(LoggingEvent event) {
+ messages.add(event.getMessage().toString());
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ public List<String> getMessages() {
+ return messages;
+ }
+ }
+
+ @Test
+ public void testTooManyDelayedRpcs() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ final int MAX_DELAYED_RPC = 10;
+ conf.setInt("hbase.ipc.warn.delayedrpc.number", MAX_DELAYED_RPC);
+
+ ListAppender listAppender = new ListAppender();
+ Logger log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer");
+ log.addAppender(listAppender);
+
+ InetSocketAddress isa = new InetSocketAddress("localhost", 0);
+ rpcServer = HBaseRPC.getServer(new TestRpcImpl(),
+ new Class<?>[]{ TestRpcImpl.class },
+ isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
+ rpcServer.start();
+ TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
+ rpcServer.getListenerAddress(), conf, 1000);
+
+ Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
+
+ for (int i = 0; i < MAX_DELAYED_RPC; i++) {
+ threads[i] = new TestThread(client, true, null);
+ threads[i].start();
+ }
+
+ /* No warnings till here. */
+ assertTrue(listAppender.getMessages().isEmpty());
+
+ /* This should give a warning. */
+ threads[MAX_DELAYED_RPC] = new TestThread(client, true, null);
+ threads[MAX_DELAYED_RPC].start();
+
+ for (int i = 0; i < MAX_DELAYED_RPC; i++) {
+ threads[i].join();
+ }
+
+ assertFalse(listAppender.getMessages().isEmpty());
+ assertTrue(listAppender.getMessages().get(0).startsWith(
+ "Too many delayed calls"));
+
+ log.removeAppender(listAppender);
+ }
+
+ public interface TestRpc extends VersionedProtocol {
+ int test(boolean delay);
+ }
+
+ private static class TestRpcImpl implements TestRpc {
+ @Override
+ public int test(boolean delay) {
+ if (!delay) {
+ return UNDELAYED;
+ }
+ final Delayable call = rpcServer.getCurrentCall();
+ call.startDelay();
+ new Thread() {
+ public void run() {
+ try {
+ Thread.sleep(500);
+ call.endDelay(DELAYED);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ return 0xDEADBEEF; // this return value should not go back to client
+ }
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return 0;
+ }
+ }
+
+ private static class TestThread extends Thread {
+ private TestRpc server;
+ private boolean delay;
+ private List<Integer> results;
+
+ public TestThread(TestRpc server, boolean delay, List<Integer> results) {
+ this.server = server;
+ this.delay = delay;
+ this.results = results;
+ }
+
+ @Override
+ public void run() {
+ Integer result = new Integer(server.test(delay));
+ if (results != null) {
+ synchronized (results) {
+ results.add(result);
+ }
+ }
+ }
+ }
+}